mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-05 13:54:36 +00:00
Use hashlink over lru for LruCache (#8911)
Use the `LruCache` implementation provided by `hashlink` instead of the current `lru` one. This is mostly a 1-to-1 swap with only slight API incompatibilities. I have decided to leave some config files which previously used `NonZeroUsize` but they may not be required anymore and could potentially switch to `usize`. Co-Authored-By: Mac L <mjladson@pm.me>
This commit is contained in:
23
Cargo.lock
generated
23
Cargo.lock
generated
@@ -364,7 +364,7 @@ dependencies = [
|
||||
"either",
|
||||
"futures",
|
||||
"futures-utils-wasm",
|
||||
"lru 0.13.0",
|
||||
"lru",
|
||||
"parking_lot",
|
||||
"pin-project",
|
||||
"reqwest",
|
||||
@@ -1229,13 +1229,13 @@ dependencies = [
|
||||
"fork_choice",
|
||||
"futures",
|
||||
"genesis",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"int_to_bytes",
|
||||
"itertools 0.14.0",
|
||||
"kzg",
|
||||
"lighthouse_version",
|
||||
"logging",
|
||||
"lru 0.12.5",
|
||||
"maplit",
|
||||
"merkle_proof",
|
||||
"metrics",
|
||||
@@ -3394,13 +3394,13 @@ dependencies = [
|
||||
"fork_choice",
|
||||
"hash-db",
|
||||
"hash256-std-hasher",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"jsonwebtoken",
|
||||
"keccak-hash",
|
||||
"kzg",
|
||||
"lighthouse_version",
|
||||
"logging",
|
||||
"lru 0.12.5",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
"pretty_reqwest_error",
|
||||
@@ -4240,12 +4240,12 @@ dependencies = [
|
||||
"fixed_bytes",
|
||||
"futures",
|
||||
"genesis",
|
||||
"hashlink",
|
||||
"health_metrics",
|
||||
"hex",
|
||||
"lighthouse_network",
|
||||
"lighthouse_version",
|
||||
"logging",
|
||||
"lru 0.12.5",
|
||||
"metrics",
|
||||
"network",
|
||||
"network_utils",
|
||||
@@ -5549,6 +5549,7 @@ dependencies = [
|
||||
"fixed_bytes",
|
||||
"fnv",
|
||||
"futures",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"if-addrs 0.14.0",
|
||||
"itertools 0.14.0",
|
||||
@@ -5556,7 +5557,6 @@ dependencies = [
|
||||
"libp2p-mplex",
|
||||
"lighthouse_version",
|
||||
"logging",
|
||||
"lru 0.12.5",
|
||||
"lru_cache",
|
||||
"metrics",
|
||||
"network_utils",
|
||||
@@ -5703,15 +5703,6 @@ dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
|
||||
dependencies = [
|
||||
"hashbrown 0.15.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.13.0"
|
||||
@@ -8415,10 +8406,10 @@ dependencies = [
|
||||
"filesystem",
|
||||
"fixed_bytes",
|
||||
"flate2",
|
||||
"hashlink",
|
||||
"libmdbx",
|
||||
"lmdb-rkv",
|
||||
"lmdb-rkv-sys",
|
||||
"lru 0.12.5",
|
||||
"maplit",
|
||||
"metrics",
|
||||
"parking_lot",
|
||||
@@ -8652,10 +8643,10 @@ dependencies = [
|
||||
"ethereum_ssz",
|
||||
"ethereum_ssz_derive",
|
||||
"fixed_bytes",
|
||||
"hashlink",
|
||||
"itertools 0.14.0",
|
||||
"leveldb",
|
||||
"logging",
|
||||
"lru 0.12.5",
|
||||
"metrics",
|
||||
"milhouse",
|
||||
"parking_lot",
|
||||
|
||||
@@ -152,7 +152,7 @@ fs2 = "0.4"
|
||||
futures = "0.3"
|
||||
genesis = { path = "beacon_node/genesis" }
|
||||
graffiti_file = { path = "validator_client/graffiti_file" }
|
||||
hashlink = "0.9.0"
|
||||
hashlink = "0.11"
|
||||
health_metrics = { path = "common/health_metrics" }
|
||||
hex = "0.4"
|
||||
http_api = { path = "beacon_node/http_api" }
|
||||
@@ -170,7 +170,6 @@ lockfile = { path = "common/lockfile" }
|
||||
log = "0.4"
|
||||
logging = { path = "common/logging" }
|
||||
logroller = "0.1.8"
|
||||
lru = "0.12"
|
||||
lru_cache = { path = "common/lru_cache" }
|
||||
malloc_utils = { path = "common/malloc_utils" }
|
||||
maplit = "1"
|
||||
|
||||
@@ -35,13 +35,13 @@ fixed_bytes = { workspace = true }
|
||||
fork_choice = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
genesis = { workspace = true }
|
||||
hashlink = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
int_to_bytes = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
kzg = { workspace = true }
|
||||
lighthouse_version = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
merkle_proof = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
milhouse = { workspace = true }
|
||||
|
||||
@@ -10,21 +10,19 @@
|
||||
|
||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use fork_choice::ExecutionStatus;
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use once_cell::sync::OnceCell;
|
||||
use parking_lot::Mutex;
|
||||
use safe_arith::SafeArith;
|
||||
use smallvec::SmallVec;
|
||||
use state_processing::state_advance::partial_state_advance;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, instrument};
|
||||
use typenum::Unsigned;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot};
|
||||
|
||||
/// The number of sets of proposer indices that should be cached.
|
||||
const CACHE_SIZE: NonZeroUsize = new_non_zero_usize(16);
|
||||
const CACHE_SIZE: usize = 16;
|
||||
|
||||
/// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being
|
||||
/// incorrect is non-substantial from a consensus perspective (and probably also from a
|
||||
@@ -138,7 +136,8 @@ impl BeaconProposerCache {
|
||||
) -> Arc<OnceCell<EpochBlockProposers>> {
|
||||
let key = (epoch, shuffling_decision_block);
|
||||
self.cache
|
||||
.get_or_insert(key, || Arc::new(OnceCell::new()))
|
||||
.entry(key)
|
||||
.or_insert_with(|| Arc::new(OnceCell::new()))
|
||||
.clone()
|
||||
}
|
||||
|
||||
@@ -155,10 +154,10 @@ impl BeaconProposerCache {
|
||||
fork: Fork,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let key = (epoch, shuffling_decision_block);
|
||||
if !self.cache.contains(&key) {
|
||||
if !self.cache.contains_key(&key) {
|
||||
let epoch_proposers = EpochBlockProposers::new(epoch, fork, proposers);
|
||||
self.cache
|
||||
.put(key, Arc::new(OnceCell::with_value(epoch_proposers)));
|
||||
.insert(key, Arc::new(OnceCell::with_value(epoch_proposers)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -11,7 +11,6 @@ use slot_clock::SlotClock;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task_executor::TaskExecutor;
|
||||
@@ -20,7 +19,7 @@ use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn};
|
||||
use types::{
|
||||
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
|
||||
DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError,
|
||||
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize,
|
||||
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot,
|
||||
};
|
||||
|
||||
mod error;
|
||||
@@ -49,7 +48,7 @@ pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCh
|
||||
///
|
||||
/// `PendingComponents` are now never removed from the cache manually are only removed via LRU
|
||||
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
|
||||
const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
||||
const OVERFLOW_LRU_CAPACITY: usize = 32;
|
||||
|
||||
/// Cache to hold fully valid data that can't be imported to fork-choice yet. After Dencun hard-fork
|
||||
/// blocks have a sidecar of data that is received separately from the network. We call the concept
|
||||
@@ -124,13 +123,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
enable_partial_columns: bool,
|
||||
) -> Result<Self, AvailabilityCheckError> {
|
||||
let inner = DataAvailabilityCheckerInner::new(
|
||||
OVERFLOW_LRU_CAPACITY_NON_ZERO,
|
||||
OVERFLOW_LRU_CAPACITY,
|
||||
custody_context.clone(),
|
||||
spec.clone(),
|
||||
)?;
|
||||
let partial_assembler = if enable_partial_columns {
|
||||
Some(Arc::new(PartialDataColumnAssembler::new(
|
||||
OVERFLOW_LRU_CAPACITY_NON_ZERO,
|
||||
OVERFLOW_LRU_CAPACITY,
|
||||
)))
|
||||
} else {
|
||||
None
|
||||
|
||||
@@ -7,11 +7,10 @@ use crate::block_verification_types::{
|
||||
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
|
||||
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
|
||||
use crate::{BeaconChainTypes, BlockProcessStatus};
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use ssz_types::RuntimeFixedVector;
|
||||
use std::cmp::Ordering;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::{Span, debug, debug_span};
|
||||
use types::data::BlobIdentifier;
|
||||
@@ -365,7 +364,7 @@ pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
|
||||
|
||||
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
pub fn new(
|
||||
capacity: NonZeroUsize,
|
||||
capacity: usize,
|
||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Result<Self, AvailabilityCheckError> {
|
||||
@@ -565,7 +564,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
let mut write_lock = self.critical.write();
|
||||
|
||||
{
|
||||
let pending_components = write_lock.get_or_insert_mut(block_root, || {
|
||||
let pending_components = write_lock.entry(block_root).or_insert_with(|| {
|
||||
PendingComponents::empty(block_root, self.spec.max_blobs_per_block(epoch) as usize)
|
||||
});
|
||||
update_fn(pending_components)?
|
||||
@@ -672,7 +671,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
if let Some(BlockProcessStatus::NotValidated(_, _)) = self.get_cached_block(block_root) {
|
||||
// If the block is execution invalid, this status is permanent and idempotent to this
|
||||
// block_root. We drop its components (e.g. columns) because they will never be useful.
|
||||
self.critical.write().pop(block_root);
|
||||
self.critical.write().remove(block_root);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -733,7 +732,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
}
|
||||
// Now remove keys
|
||||
for key in keys_to_remove {
|
||||
write_lock.pop(&key);
|
||||
write_lock.remove(&key);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -765,7 +764,6 @@ mod test {
|
||||
use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend};
|
||||
use tempfile::{TempDir, tempdir};
|
||||
use tracing::info;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{DataColumnSubnetId, MinimalEthSpec};
|
||||
|
||||
const LOW_VALIDATOR_COUNT: usize = 32;
|
||||
@@ -930,19 +928,14 @@ mod test {
|
||||
let chain_db_path = tempdir().expect("should get temp dir");
|
||||
let harness = get_fulu_chain(&chain_db_path).await;
|
||||
let spec = harness.spec.clone();
|
||||
let capacity_non_zero = new_non_zero_usize(capacity);
|
||||
let custody_context = Arc::new(CustodyContext::new(
|
||||
NodeCustodyType::Fullnode,
|
||||
generate_data_column_indices_rand_order::<E>(),
|
||||
&spec,
|
||||
));
|
||||
let cache = Arc::new(
|
||||
DataAvailabilityCheckerInner::<T>::new(
|
||||
capacity_non_zero,
|
||||
custody_context,
|
||||
spec.clone(),
|
||||
)
|
||||
.expect("should create cache"),
|
||||
DataAvailabilityCheckerInner::<T>::new(capacity, custody_context, spec.clone())
|
||||
.expect("should create cache"),
|
||||
);
|
||||
(harness, cache, chain_db_path)
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ use eth2::types::BlobsBundle;
|
||||
use execution_layer::json_structures::{BlobAndProof, BlobAndProofV1, BlobAndProofV2};
|
||||
use execution_layer::test_utils::generate_blobs;
|
||||
use maplit::hashset;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use types::{
|
||||
@@ -339,7 +338,7 @@ fn mock_beacon_adapter(fork_name: ForkName, get_blobs_v3: bool) -> MockFetchBlob
|
||||
let test_runtime = TestRuntime::default();
|
||||
let spec = Arc::new(fork_name.make_genesis_spec(E::default_spec()));
|
||||
let kzg = get_kzg(&spec);
|
||||
let partial_assembler = PartialDataColumnAssembler::new(NonZeroUsize::new(32).unwrap());
|
||||
let partial_assembler = PartialDataColumnAssembler::new(32);
|
||||
|
||||
let mut mock_adapter = MockFetchBlobsBeaconAdapter::default();
|
||||
mock_adapter.expect_spec().return_const(spec.clone());
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
use crate::errors::BeaconChainError;
|
||||
use crate::{BeaconChainTypes, BeaconStore, metrics};
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use safe_arith::SafeArith;
|
||||
use ssz::Decode;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use store::DBColumn;
|
||||
use store::KeyValueStore;
|
||||
use tracing::debug;
|
||||
use tree_hash::TreeHash;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{
|
||||
BeaconBlockRef, BeaconState, ChainSpec, Checkpoint, EthSpec, ForkName, Hash256,
|
||||
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
|
||||
@@ -19,7 +18,7 @@ use types::{
|
||||
/// A prev block cache miss requires to re-generate the state of the post-parent block. Items in the
|
||||
/// prev block cache are very small 32 * (6 + 1) = 224 bytes. 32 is an arbitrary number that
|
||||
/// represents unlikely re-orgs, while keeping the cache very small.
|
||||
const PREV_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32);
|
||||
const PREV_BLOCK_CACHE_SIZE: usize = 32;
|
||||
|
||||
/// This cache computes light client messages ahead of time, required to satisfy p2p and API
|
||||
/// requests. These messages include proofs on historical states, so on-demand computation is
|
||||
@@ -39,7 +38,7 @@ pub struct LightClientServerCache<T: BeaconChainTypes> {
|
||||
/// Caches the current sync committee,
|
||||
latest_written_current_sync_committee: RwLock<Option<Arc<SyncCommittee<T::EthSpec>>>>,
|
||||
/// Caches state proofs by block root
|
||||
prev_block_cache: Mutex<lru::LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
|
||||
prev_block_cache: Mutex<LruCache<Hash256, LightClientCachedData<T::EthSpec>>>,
|
||||
/// Tracks the latest broadcasted finality update
|
||||
latest_broadcasted_finality_update: RwLock<Option<LightClientFinalityUpdate<T::EthSpec>>>,
|
||||
/// Tracks the latest broadcasted optimistic update
|
||||
@@ -55,7 +54,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
|
||||
latest_written_current_sync_committee: None.into(),
|
||||
latest_broadcasted_finality_update: None.into(),
|
||||
latest_broadcasted_optimistic_update: None.into(),
|
||||
prev_block_cache: lru::LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
|
||||
prev_block_cache: LruCache::new(PREV_BLOCK_CACHE_SIZE).into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,7 +73,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
|
||||
if fork_name.altair_enabled() {
|
||||
// Persist in memory cache for a descendent block
|
||||
let cached_data = LightClientCachedData::from_state(block_post_state)?;
|
||||
self.prev_block_cache.lock().put(block_root, cached_data);
|
||||
self.prev_block_cache.lock().insert(block_root, cached_data);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -335,7 +334,7 @@ impl<T: BeaconChainTypes> LightClientServerCache<T> {
|
||||
// Insert value and return owned
|
||||
self.prev_block_cache
|
||||
.lock()
|
||||
.put(*block_root, new_value.clone());
|
||||
.insert(*block_root, new_value.clone());
|
||||
Ok(new_value)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
use crate::data_column_verification::{
|
||||
KzgVerifiedCustodyDataColumn, KzgVerifiedCustodyPartialDataColumn,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::error;
|
||||
use types::core::{Epoch, EthSpec, Hash256};
|
||||
@@ -44,7 +43,7 @@ pub struct PartialMergeResult<E: EthSpec> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
pub fn new(capacity: NonZeroUsize) -> Self {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
assemblies: RwLock::new(LruCache::new(capacity)),
|
||||
}
|
||||
@@ -55,7 +54,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
pub fn init(&self, block_root: Hash256, header: Arc<PartialDataColumnHeader<E>>) -> bool {
|
||||
let mut assemblies = self.assemblies.write();
|
||||
|
||||
if assemblies.contains(&block_root) {
|
||||
if assemblies.contains_key(&block_root) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -65,7 +64,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
columns: HashMap::new(),
|
||||
};
|
||||
|
||||
assemblies.put(block_root, assembly);
|
||||
assemblies.insert(block_root, assembly);
|
||||
|
||||
true
|
||||
}
|
||||
@@ -79,11 +78,13 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
header: Arc<PartialDataColumnHeader<E>>,
|
||||
) -> Option<PartialMergeResult<E>> {
|
||||
let mut assemblies = self.assemblies.write();
|
||||
let assembly = assemblies.get_or_insert_mut(block_root, || PartialAssembly {
|
||||
header: header.clone(),
|
||||
has_local_blobs: false,
|
||||
columns: HashMap::new(),
|
||||
});
|
||||
let assembly = assemblies
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| PartialAssembly {
|
||||
header: header.clone(),
|
||||
has_local_blobs: false,
|
||||
columns: HashMap::new(),
|
||||
});
|
||||
|
||||
let mut full_columns = Vec::new();
|
||||
let mut updated_partials = Vec::new();
|
||||
@@ -165,15 +166,17 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
};
|
||||
|
||||
let mut assemblies = self.assemblies.write();
|
||||
let assembly = assemblies.get_or_insert_mut(block_root, || PartialAssembly {
|
||||
header: Arc::new(PartialDataColumnHeader {
|
||||
kzg_commitments: fulu.kzg_commitments.clone(),
|
||||
signed_block_header: fulu.signed_block_header.clone(),
|
||||
kzg_commitments_inclusion_proof: fulu.kzg_commitments_inclusion_proof.clone(),
|
||||
}),
|
||||
has_local_blobs: false,
|
||||
columns: Default::default(),
|
||||
});
|
||||
let assembly = assemblies
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| PartialAssembly {
|
||||
header: Arc::new(PartialDataColumnHeader {
|
||||
kzg_commitments: fulu.kzg_commitments.clone(),
|
||||
signed_block_header: fulu.signed_block_header.clone(),
|
||||
kzg_commitments_inclusion_proof: fulu.kzg_commitments_inclusion_proof.clone(),
|
||||
}),
|
||||
has_local_blobs: false,
|
||||
columns: Default::default(),
|
||||
});
|
||||
let prev = assembly
|
||||
.columns
|
||||
.insert(column.index(), AssemblyColumn::Complete(column.clone()));
|
||||
@@ -215,11 +218,13 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
header: &Arc<PartialDataColumnHeader<E>>,
|
||||
) -> Vec<AssemblyColumn<E>> {
|
||||
let mut assemblies = self.assemblies.write();
|
||||
let assembly = assemblies.get_or_insert_mut(block_root, || PartialAssembly {
|
||||
header: header.clone(),
|
||||
has_local_blobs: true,
|
||||
columns: Default::default(),
|
||||
});
|
||||
let assembly = assemblies
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| PartialAssembly {
|
||||
header: header.clone(),
|
||||
has_local_blobs: true,
|
||||
columns: Default::default(),
|
||||
});
|
||||
|
||||
assembly.has_local_blobs = true;
|
||||
|
||||
@@ -253,7 +258,7 @@ impl<E: EthSpec> PartialDataColumnAssembler<E> {
|
||||
}
|
||||
|
||||
for root in to_remove {
|
||||
assemblies.pop(&root);
|
||||
assemblies.remove(&root);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -362,7 +367,7 @@ mod tests {
|
||||
}
|
||||
|
||||
fn make_assembler() -> PartialDataColumnAssembler<E> {
|
||||
PartialDataColumnAssembler::new(NonZeroUsize::new(16).unwrap())
|
||||
PartialDataColumnAssembler::new(16)
|
||||
}
|
||||
|
||||
// -- init and get_header tests --
|
||||
|
||||
@@ -15,13 +15,12 @@ use crate::payload_envelope_verification::{
|
||||
AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope,
|
||||
};
|
||||
use crate::{BeaconChainTypes, CustodyContext, metrics};
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use kzg::Kzg;
|
||||
use lru::LruCache;
|
||||
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tracing::{Span, debug, error, instrument};
|
||||
use types::{
|
||||
@@ -41,7 +40,6 @@ use crate::metrics::{
|
||||
use crate::observed_data_sidecars::ObservationStrategy;
|
||||
use pending_components::{PendingComponents, ReconstructColumnsDecision};
|
||||
use types::SignedExecutionPayloadBid;
|
||||
use types::new_non_zero_usize;
|
||||
|
||||
/// The LRU Cache stores `PendingComponents`, which store the block root, the execution payload bid, and its associated column data.
|
||||
/// The execution payload bid stores the kzg commitments which we use to verify against incoming column data.
|
||||
@@ -49,7 +47,7 @@ use types::new_non_zero_usize;
|
||||
///
|
||||
/// `PendingComponents` are now never removed from the cache manually and are only removed via LRU
|
||||
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
|
||||
const AVAILABILITY_CACHE_CAPACITY: NonZeroUsize = new_non_zero_usize(32);
|
||||
const AVAILABILITY_CACHE_CAPACITY: usize = 32;
|
||||
|
||||
/// This type is returned after adding a bid / column to the `DataAvailabilityChecker`.
|
||||
///
|
||||
@@ -206,7 +204,9 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
/// This will silently drop the bid if a bid for this block root already exists in the cache.
|
||||
pub fn insert_bid(&self, block_root: Hash256, bid: Arc<SignedExecutionPayloadBid<T::EthSpec>>) {
|
||||
let mut write_lock = self.availability_cache.write();
|
||||
write_lock.get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid));
|
||||
write_lock
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| PendingComponents::new(block_root, bid));
|
||||
}
|
||||
|
||||
/// Perform KZG verification on RPC custody columns and insert them into the cache.
|
||||
@@ -423,7 +423,8 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
|
||||
{
|
||||
let pending_components = write_lock
|
||||
.get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid));
|
||||
.entry(block_root)
|
||||
.or_insert_with(|| PendingComponents::new(block_root, bid));
|
||||
update_fn(pending_components)
|
||||
}
|
||||
|
||||
@@ -496,7 +497,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
|
||||
}
|
||||
}
|
||||
for key in keys_to_remove {
|
||||
write_lock.pop(&key);
|
||||
write_lock.remove(&key);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,15 +1,13 @@
|
||||
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use itertools::process_results;
|
||||
use lru::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::time::Duration;
|
||||
use tracing::debug;
|
||||
use types::Hash256;
|
||||
use types::new_non_zero_usize;
|
||||
|
||||
const BLOCK_ROOT_CACHE_LIMIT: NonZeroUsize = new_non_zero_usize(512);
|
||||
const LOOKUP_LIMIT: NonZeroUsize = new_non_zero_usize(8);
|
||||
const BLOCK_ROOT_CACHE_LIMIT: usize = 512;
|
||||
const LOOKUP_LIMIT: usize = 8;
|
||||
const METRICS_TIMEOUT: Duration = Duration::from_millis(100);
|
||||
|
||||
/// Cache for rejecting attestations to blocks from before finalization.
|
||||
@@ -49,13 +47,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let mut cache = self.pre_finalization_block_cache.cache.lock();
|
||||
|
||||
// Check the cache to see if we already know this pre-finalization block root.
|
||||
if cache.block_roots.contains(&block_root) {
|
||||
if cache.block_roots.contains_key(&block_root) {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// Avoid repeating the disk lookup for blocks that are already subject to a network lookup.
|
||||
// Sync will take care of de-duplicating the single block lookups.
|
||||
if cache.in_progress_lookups.contains(&block_root) {
|
||||
if cache.in_progress_lookups.contains_key(&block_root) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -68,19 +66,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.map_err(BeaconChainError::BeaconStateError)
|
||||
})?;
|
||||
if is_recent_finalized_block {
|
||||
cache.block_roots.put(block_root, ());
|
||||
cache.block_roots.insert(block_root, ());
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// 2. Check on disk.
|
||||
if self.store.get_blinded_block(&block_root)?.is_some() {
|
||||
cache.block_roots.put(block_root, ());
|
||||
cache.block_roots.insert(block_root, ());
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// 3. Check the network with a single block lookup.
|
||||
cache.in_progress_lookups.put(block_root, ());
|
||||
if cache.in_progress_lookups.len() == LOOKUP_LIMIT.get() {
|
||||
cache.in_progress_lookups.insert(block_root, ());
|
||||
if cache.in_progress_lookups.len() == LOOKUP_LIMIT {
|
||||
// NOTE: we expect this to occur sometimes if a lot of blocks that we look up fail to be
|
||||
// imported for reasons other than being pre-finalization. The cache will eventually
|
||||
// self-repair in this case by replacing old entries with new ones until all the failed
|
||||
@@ -95,8 +93,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
pub fn pre_finalization_block_rejected(&self, block_root: Hash256) {
|
||||
// Future requests can know that this block is invalid without having to look it up again.
|
||||
let mut cache = self.pre_finalization_block_cache.cache.lock();
|
||||
cache.in_progress_lookups.pop(&block_root);
|
||||
cache.block_roots.put(block_root, ());
|
||||
cache.in_progress_lookups.remove(&block_root);
|
||||
cache.block_roots.insert(block_root, ());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,11 +102,11 @@ impl PreFinalizationBlockCache {
|
||||
pub fn block_processed(&self, block_root: Hash256) {
|
||||
// Future requests will find this block in fork choice, so no need to cache it in the
|
||||
// ongoing lookup cache any longer.
|
||||
self.cache.lock().in_progress_lookups.pop(&block_root);
|
||||
self.cache.lock().in_progress_lookups.remove(&block_root);
|
||||
}
|
||||
|
||||
pub fn contains(&self, block_root: Hash256) -> bool {
|
||||
self.cache.lock().block_roots.contains(&block_root)
|
||||
self.cache.lock().block_roots.contains_key(&block_root)
|
||||
}
|
||||
|
||||
pub fn metrics(&self) -> Option<(usize, usize)> {
|
||||
|
||||
@@ -36,7 +36,6 @@ use rand::SeedableRng;
|
||||
use rand::rngs::{OsRng, StdRng};
|
||||
use slasher::Slasher;
|
||||
use slasher_service::SlasherService;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -641,8 +640,7 @@ where
|
||||
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
|
||||
sse_logging_components: runtime_context.sse_logging_components.clone(),
|
||||
historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new(
|
||||
NonZeroUsize::new(self.http_api_config.historical_committee_cache_size)
|
||||
.unwrap_or(NonZeroUsize::MIN),
|
||||
self.http_api_config.historical_committee_cache_size,
|
||||
)),
|
||||
});
|
||||
|
||||
|
||||
@@ -20,13 +20,13 @@ fixed_bytes = { workspace = true }
|
||||
fork_choice = { workspace = true }
|
||||
hash-db = "0.15.2"
|
||||
hash256-std-hasher = "0.15.2"
|
||||
hashlink = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
jsonwebtoken = "9"
|
||||
keccak-hash = "0.10.0"
|
||||
kzg = { workspace = true }
|
||||
lighthouse_version = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
pretty_reqwest_error = { workspace = true }
|
||||
|
||||
@@ -5,9 +5,8 @@ use crate::engine_api::{
|
||||
PayloadId,
|
||||
};
|
||||
use crate::{ClientVersionV1, HttpJsonRpc};
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use std::future::Future;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use task_executor::TaskExecutor;
|
||||
@@ -15,12 +14,11 @@ use tokio::sync::{Mutex, RwLock, watch};
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use types::ExecutionBlockHash;
|
||||
use types::new_non_zero_usize;
|
||||
|
||||
/// The number of payload IDs that will be stored for each `Engine`.
|
||||
///
|
||||
/// Since the size of each value is small (~800 bytes) a large number is used for safety.
|
||||
const PAYLOAD_ID_LRU_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(512);
|
||||
const PAYLOAD_ID_LRU_CACHE_SIZE: usize = 512;
|
||||
const CACHED_RESPONSE_AGE_LIMIT: Duration = Duration::from_secs(900); // 15 minutes
|
||||
|
||||
/// Stores the remembered state of a engine.
|
||||
@@ -175,7 +173,7 @@ impl Engine {
|
||||
if let Some(key) = payload_attributes
|
||||
.map(|pa| PayloadIdCacheKey::new(&forkchoice_state.head_block_hash, &pa))
|
||||
{
|
||||
self.payload_id_cache.lock().await.put(key, payload_id);
|
||||
self.payload_id_cache.lock().await.insert(key, payload_id);
|
||||
} else {
|
||||
debug!(?payload_id, "Engine returned unexpected payload_id");
|
||||
}
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use eth2::types::FullPayloadContents;
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroUsize;
|
||||
use tree_hash::TreeHash;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{EthSpec, Hash256};
|
||||
|
||||
pub const DEFAULT_PAYLOAD_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(10);
|
||||
pub const DEFAULT_PAYLOAD_CACHE_SIZE: usize = 10;
|
||||
|
||||
/// A cache mapping execution payloads by tree hash roots.
|
||||
pub struct PayloadCache<E: EthSpec> {
|
||||
@@ -27,11 +25,11 @@ impl<E: EthSpec> Default for PayloadCache<E> {
|
||||
impl<E: EthSpec> PayloadCache<E> {
|
||||
pub fn put(&self, payload: FullPayloadContents<E>) -> Option<FullPayloadContents<E>> {
|
||||
let root = payload.payload_ref().tree_hash_root();
|
||||
self.payloads.lock().put(PayloadCacheId(root), payload)
|
||||
self.payloads.lock().insert(PayloadCacheId(root), payload)
|
||||
}
|
||||
|
||||
pub fn pop(&self, root: &Hash256) -> Option<FullPayloadContents<E>> {
|
||||
self.payloads.lock().pop(&PayloadCacheId(*root))
|
||||
self.payloads.lock().remove(&PayloadCacheId(*root))
|
||||
}
|
||||
|
||||
pub fn get(&self, hash: &Hash256) -> Option<FullPayloadContents<E>> {
|
||||
|
||||
@@ -20,12 +20,12 @@ ethereum_ssz = { workspace = true }
|
||||
execution_layer = { workspace = true }
|
||||
fixed_bytes = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
hashlink = { workspace = true }
|
||||
health_metrics = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
lighthouse_network = { workspace = true }
|
||||
lighthouse_version = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
network = { workspace = true }
|
||||
network_utils = { workspace = true }
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use types::{AttestationShufflingId, CommitteeCache, Epoch};
|
||||
|
||||
@@ -25,7 +24,7 @@ pub struct HistoricalCommitteeCache {
|
||||
}
|
||||
|
||||
impl HistoricalCommitteeCache {
|
||||
pub fn new(size: NonZeroUsize) -> Self {
|
||||
pub fn new(size: usize) -> Self {
|
||||
Self {
|
||||
committees: Mutex::new(LruCache::new(size)),
|
||||
}
|
||||
@@ -38,6 +37,6 @@ impl HistoricalCommitteeCache {
|
||||
}
|
||||
|
||||
pub fn insert(&self, id: HistoricalShufflingId, cache: Arc<CommitteeCache>) {
|
||||
self.committees.lock().put(id, cache);
|
||||
self.committees.lock().insert(id, cache);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,10 +22,10 @@ use lighthouse_network::{
|
||||
};
|
||||
use network::{NetworkReceivers, NetworkSenders};
|
||||
use sensitive_url::SensitiveUrl;
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::{future::Future, num::NonZeroUsize};
|
||||
use store::MemoryStore;
|
||||
use task_executor::test_utils::TestRuntime;
|
||||
use types::{ChainSpec, EthSpec};
|
||||
@@ -294,7 +294,7 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
|
||||
beacon_processor_send: Some(beacon_processor_send),
|
||||
sse_logging_components: None,
|
||||
historical_committee_cache: Arc::new(HistoricalCommitteeCache::new(
|
||||
NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(),
|
||||
http_config.historical_committee_cache_size,
|
||||
)),
|
||||
});
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ ethereum_ssz_derive = { workspace = true }
|
||||
fixed_bytes = { workspace = true }
|
||||
fnv = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
hashlink = { workspace = true }
|
||||
hex = { workspace = true }
|
||||
if-addrs = "0.14"
|
||||
itertools = { workspace = true }
|
||||
@@ -28,7 +29,6 @@ libp2p = { workspace = true }
|
||||
libp2p-mplex = { git = "https://github.com/libp2p/rust-libp2p.git" }
|
||||
lighthouse_version = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
lru_cache = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
network_utils = { workspace = true }
|
||||
|
||||
@@ -18,6 +18,7 @@ use alloy_rlp::bytes::Bytes;
|
||||
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
|
||||
use futures::prelude::*;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use libp2p::core::transport::PortUse;
|
||||
use libp2p::multiaddr::Protocol;
|
||||
use libp2p::swarm::THandlerInEvent;
|
||||
@@ -31,10 +32,8 @@ pub use libp2p::{
|
||||
},
|
||||
};
|
||||
use logging::crit;
|
||||
use lru::LruCache;
|
||||
use network_utils::discovery_metrics;
|
||||
use ssz::Encode;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque},
|
||||
net::{IpAddr, SocketAddr},
|
||||
@@ -51,7 +50,6 @@ use types::{ChainSpec, EnrForkId, EthSpec};
|
||||
mod subnet_predicate;
|
||||
use crate::discovery::enr::{NEXT_FORK_DIGEST_ENR_KEY, PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY};
|
||||
pub use subnet_predicate::subnet_predicate;
|
||||
use types::new_non_zero_usize;
|
||||
|
||||
/// Local ENR storage filename.
|
||||
pub const ENR_FILENAME: &str = "enr.dat";
|
||||
@@ -74,7 +72,7 @@ pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16;
|
||||
/// The threshold for updating `min_ttl` on a connected peer.
|
||||
const DURATION_DIFFERENCE: Duration = Duration::from_millis(1);
|
||||
/// The capacity of the Discovery ENR cache.
|
||||
const ENR_CACHE_CAPACITY: NonZeroUsize = new_non_zero_usize(50);
|
||||
const ENR_CACHE_CAPACITY: usize = 50;
|
||||
|
||||
/// A query has completed. This result contains a mapping of discovered peer IDs to the `min_ttl`
|
||||
/// of the peer if it is specified.
|
||||
@@ -358,7 +356,7 @@ impl<E: EthSpec> Discovery<E> {
|
||||
|
||||
/// Removes a cached ENR from the list.
|
||||
pub fn remove_cached_enr(&mut self, peer_id: &PeerId) -> Option<Enr> {
|
||||
self.cached_enrs.pop(peer_id)
|
||||
self.cached_enrs.remove(peer_id)
|
||||
}
|
||||
|
||||
/// This adds a new `FindPeers` query to the queue if one doesn't already exist.
|
||||
@@ -394,7 +392,7 @@ impl<E: EthSpec> Discovery<E> {
|
||||
/// Add an ENR to the routing table of the discovery mechanism.
|
||||
pub fn add_enr(&mut self, enr: Enr) {
|
||||
// add the enr to seen caches
|
||||
self.cached_enrs.put(enr.peer_id(), enr.clone());
|
||||
self.cached_enrs.insert(enr.peer_id(), enr.clone());
|
||||
|
||||
if let Err(e) = self.discv5.add_enr(enr) {
|
||||
debug!(
|
||||
@@ -665,7 +663,7 @@ impl<E: EthSpec> Discovery<E> {
|
||||
}
|
||||
// Remove the peer from the cached list, to prevent redialing disconnected
|
||||
// peers.
|
||||
self.cached_enrs.pop(peer_id);
|
||||
self.cached_enrs.remove(peer_id);
|
||||
}
|
||||
|
||||
/* Internal Functions */
|
||||
@@ -875,7 +873,7 @@ impl<E: EthSpec> Discovery<E> {
|
||||
.into_iter()
|
||||
.map(|enr| {
|
||||
// cache the found ENR's
|
||||
self.cached_enrs.put(enr.peer_id(), enr.clone());
|
||||
self.cached_enrs.insert(enr.peer_id(), enr.clone());
|
||||
(enr, None)
|
||||
})
|
||||
.collect();
|
||||
@@ -910,7 +908,7 @@ impl<E: EthSpec> Discovery<E> {
|
||||
|
||||
// cache the found ENR's
|
||||
for enr in r.iter().cloned() {
|
||||
self.cached_enrs.put(enr.peer_id(), enr);
|
||||
self.cached_enrs.insert(enr.peer_id(), enr);
|
||||
}
|
||||
|
||||
// Map each subnet query's min_ttl to the set of ENR's returned for that subnet.
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use crate::types::HeaderSentSet;
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use std::collections::HashSet;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use types::core::Hash256;
|
||||
|
||||
const MAX_BLOCKS: NonZeroUsize = NonZeroUsize::new(4).unwrap();
|
||||
const MAX_BLOCKS: usize = 4;
|
||||
|
||||
pub struct PartialColumnHeaderTracker {
|
||||
blocks: LruCache<Hash256, HeaderSentSet>,
|
||||
@@ -22,7 +21,8 @@ impl PartialColumnHeaderTracker {
|
||||
pub fn get_for_block(&mut self, hash: Hash256) -> HeaderSentSet {
|
||||
Arc::clone(
|
||||
self.blocks
|
||||
.get_or_insert(hash, || Arc::new(Mutex::new(HashSet::new()))),
|
||||
.entry(hash)
|
||||
.or_insert_with(|| Arc::new(Mutex::new(HashSet::new()))),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,10 +16,10 @@ directory = { workspace = true }
|
||||
ethereum_ssz = { workspace = true }
|
||||
ethereum_ssz_derive = { workspace = true }
|
||||
fixed_bytes = { workspace = true }
|
||||
hashlink = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
leveldb = { version = "0.8.6", optional = true, default-features = false }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
milhouse = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::hdiff::{Error, HDiffBuffer};
|
||||
use crate::metrics;
|
||||
use lru::LruCache;
|
||||
use std::num::NonZeroUsize;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use types::{BeaconState, ChainSpec, EthSpec, Slot};
|
||||
|
||||
/// Holds a combination of finalized states in two formats:
|
||||
@@ -25,7 +24,7 @@ pub struct Metrics {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HistoricStateCache<E> {
|
||||
pub fn new(hdiff_buffer_cache_size: NonZeroUsize, state_cache_size: NonZeroUsize) -> Self {
|
||||
pub fn new(hdiff_buffer_cache_size: usize, state_cache_size: usize) -> Self {
|
||||
Self {
|
||||
hdiff_buffers: LruCache::new(hdiff_buffer_cache_size),
|
||||
states: LruCache::new(state_cache_size),
|
||||
@@ -47,7 +46,7 @@ impl<E: EthSpec> HistoricStateCache<E> {
|
||||
);
|
||||
let cloned = buffer.clone();
|
||||
drop(_timer);
|
||||
self.hdiff_buffers.put(slot, cloned);
|
||||
self.hdiff_buffers.insert(slot, cloned);
|
||||
Some(buffer)
|
||||
} else {
|
||||
None
|
||||
@@ -63,7 +62,7 @@ impl<E: EthSpec> HistoricStateCache<E> {
|
||||
Ok(Some(state.clone()))
|
||||
} else if let Some(buffer) = self.hdiff_buffers.get(&slot) {
|
||||
let state = buffer.as_state(spec)?;
|
||||
self.states.put(slot, state.clone());
|
||||
self.states.insert(slot, state.clone());
|
||||
Ok(Some(state))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -71,11 +70,11 @@ impl<E: EthSpec> HistoricStateCache<E> {
|
||||
}
|
||||
|
||||
pub fn put_state(&mut self, slot: Slot, state: BeaconState<E>) {
|
||||
self.states.put(slot, state);
|
||||
self.states.insert(slot, state);
|
||||
}
|
||||
|
||||
pub fn put_hdiff_buffer(&mut self, slot: Slot, buffer: HDiffBuffer) {
|
||||
self.hdiff_buffers.put(slot, buffer);
|
||||
self.hdiff_buffers.insert(slot, buffer);
|
||||
}
|
||||
|
||||
pub fn put_both(&mut self, slot: Slot, state: BeaconState<E>, buffer: HDiffBuffer) {
|
||||
|
||||
@@ -19,8 +19,8 @@ use crate::{
|
||||
parse_data_column_key,
|
||||
};
|
||||
use fixed_bytes::FixedBytesExtended;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use itertools::{Itertools, process_results};
|
||||
use lru::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use safe_arith::SafeArith;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -34,7 +34,6 @@ use std::cmp::{Ordering, min};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{Read, Write};
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -97,7 +96,7 @@ struct BlockCache<E: EthSpec> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockCache<E> {
|
||||
pub fn new(size: NonZeroUsize) -> Self {
|
||||
pub fn new(size: usize) -> Self {
|
||||
Self {
|
||||
block_cache: LruCache::new(size),
|
||||
blob_cache: LruCache::new(size),
|
||||
@@ -106,14 +105,15 @@ impl<E: EthSpec> BlockCache<E> {
|
||||
}
|
||||
}
|
||||
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
|
||||
self.block_cache.put(block_root, block);
|
||||
self.block_cache.insert(block_root, block);
|
||||
}
|
||||
pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList<E>) {
|
||||
self.blob_cache.put(block_root, blobs);
|
||||
self.blob_cache.insert(block_root, blobs);
|
||||
}
|
||||
pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc<DataColumnSidecar<E>>) {
|
||||
self.data_column_cache
|
||||
.get_or_insert_mut(block_root, Default::default)
|
||||
.entry(block_root)
|
||||
.or_insert_with(Default::default)
|
||||
.insert(*data_column.index(), data_column);
|
||||
}
|
||||
pub fn put_data_column_custody_info(
|
||||
@@ -143,13 +143,13 @@ impl<E: EthSpec> BlockCache<E> {
|
||||
self.data_column_custody_info_cache.clone()
|
||||
}
|
||||
pub fn delete_block(&mut self, block_root: &Hash256) {
|
||||
let _ = self.block_cache.pop(block_root);
|
||||
let _ = self.block_cache.remove(block_root);
|
||||
}
|
||||
pub fn delete_blobs(&mut self, block_root: &Hash256) {
|
||||
let _ = self.blob_cache.pop(block_root);
|
||||
let _ = self.blob_cache.remove(block_root);
|
||||
}
|
||||
pub fn delete_data_columns(&mut self, block_root: &Hash256) {
|
||||
let _ = self.data_column_cache.pop(block_root);
|
||||
let _ = self.data_column_cache.remove(block_root);
|
||||
}
|
||||
pub fn delete(&mut self, block_root: &Hash256) {
|
||||
self.delete_block(block_root);
|
||||
@@ -236,17 +236,16 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore, MemoryStore> {
|
||||
cold_db: MemoryStore::open(),
|
||||
blobs_db: MemoryStore::open(),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: NonZeroUsize::new(config.block_cache_size)
|
||||
.map(BlockCache::new)
|
||||
.map(Mutex::new),
|
||||
block_cache: (config.block_cache_size > 0)
|
||||
.then(|| Mutex::new(BlockCache::new(config.block_cache_size))),
|
||||
state_cache: Mutex::new(StateCache::new(
|
||||
config.state_cache_size,
|
||||
config.state_cache_headroom,
|
||||
config.hot_hdiff_buffer_cache_size,
|
||||
)),
|
||||
historic_state_cache: Mutex::new(HistoricStateCache::new(
|
||||
config.cold_hdiff_buffer_cache_size,
|
||||
config.historic_state_cache_size,
|
||||
config.cold_hdiff_buffer_cache_size.get(),
|
||||
config.historic_state_cache_size.get(),
|
||||
)),
|
||||
config,
|
||||
hierarchy,
|
||||
@@ -290,17 +289,16 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend, BeaconNodeBackend> {
|
||||
blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?,
|
||||
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
|
||||
hot_db,
|
||||
block_cache: NonZeroUsize::new(config.block_cache_size)
|
||||
.map(BlockCache::new)
|
||||
.map(Mutex::new),
|
||||
block_cache: (config.block_cache_size > 0)
|
||||
.then(|| Mutex::new(BlockCache::new(config.block_cache_size))),
|
||||
state_cache: Mutex::new(StateCache::new(
|
||||
config.state_cache_size,
|
||||
config.state_cache_headroom,
|
||||
config.hot_hdiff_buffer_cache_size,
|
||||
)),
|
||||
historic_state_cache: Mutex::new(HistoricStateCache::new(
|
||||
config.cold_hdiff_buffer_cache_size,
|
||||
config.historic_state_cache_size,
|
||||
config.cold_hdiff_buffer_cache_size.get(),
|
||||
config.historic_state_cache_size.get(),
|
||||
)),
|
||||
config,
|
||||
hierarchy,
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{
|
||||
Error,
|
||||
metrics::{self, HOT_METRIC},
|
||||
};
|
||||
use lru::LruCache;
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::num::NonZeroUsize;
|
||||
use tracing::instrument;
|
||||
@@ -86,9 +86,9 @@ impl<E: EthSpec> StateCache<E> {
|
||||
) -> Self {
|
||||
StateCache {
|
||||
finalized_state: None,
|
||||
states: LruCache::new(state_capacity),
|
||||
states: LruCache::new(state_capacity.get()),
|
||||
block_map: BlockMap::default(),
|
||||
hdiff_buffers: HotHDiffBufferCache::new(hdiff_capacity),
|
||||
hdiff_buffers: HotHDiffBufferCache::new(hdiff_capacity.get()),
|
||||
max_epoch: Epoch::new(0),
|
||||
head_block_root: Hash256::ZERO,
|
||||
headroom,
|
||||
@@ -100,7 +100,7 @@ impl<E: EthSpec> StateCache<E> {
|
||||
}
|
||||
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.states.cap().get()
|
||||
self.states.capacity()
|
||||
}
|
||||
|
||||
pub fn num_hdiff_buffers(&self) -> usize {
|
||||
@@ -154,7 +154,7 @@ impl<E: EthSpec> StateCache<E> {
|
||||
// preferences older slots.
|
||||
// NOTE: This isn't perfect as it prunes by slot: there could be multiple buffers
|
||||
// at some slots in the case of long forks without finality.
|
||||
let new_hdiff_cache = HotHDiffBufferCache::new(self.hdiff_buffers.cap());
|
||||
let new_hdiff_cache = HotHDiffBufferCache::new(self.hdiff_buffers.capacity());
|
||||
let old_hdiff_cache = std::mem::replace(&mut self.hdiff_buffers, new_hdiff_cache);
|
||||
for (state_root, (slot, buffer)) in old_hdiff_cache.hdiff_buffers {
|
||||
if pre_finalized_slots_to_retain.contains(&slot) {
|
||||
@@ -164,7 +164,7 @@ impl<E: EthSpec> StateCache<E> {
|
||||
|
||||
// Delete states.
|
||||
for state_root in state_roots_to_prune {
|
||||
if let Some((_, state)) = self.states.pop(&state_root) {
|
||||
if let Some((_, state)) = self.states.remove(&state_root) {
|
||||
// Add the hdiff buffer for this state to the hdiff cache if it is now part of
|
||||
// the pre-finalized grid. The `put` method will take care of keeping the most
|
||||
// useful buffers.
|
||||
@@ -260,7 +260,7 @@ impl<E: EthSpec> StateCache<E> {
|
||||
|
||||
// Insert the full state into the cache.
|
||||
if let Some((deleted_state_root, _)) =
|
||||
self.states.put(state_root, (state_root, state.clone()))
|
||||
self.states.insert(state_root, (state_root, state.clone()))
|
||||
{
|
||||
deleted_states.push(deleted_state_root);
|
||||
}
|
||||
@@ -334,14 +334,14 @@ impl<E: EthSpec> StateCache<E> {
|
||||
}
|
||||
|
||||
pub fn delete_state(&mut self, state_root: &Hash256) {
|
||||
self.states.pop(state_root);
|
||||
self.states.remove(state_root);
|
||||
self.block_map.delete(state_root);
|
||||
}
|
||||
|
||||
pub fn delete_block_states(&mut self, block_root: &Hash256) {
|
||||
if let Some(slot_map) = self.block_map.delete_block_states(block_root) {
|
||||
for state_root in slot_map.slots.values() {
|
||||
self.states.pop(state_root);
|
||||
self.states.remove(state_root);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -366,9 +366,10 @@ impl<E: EthSpec> StateCache<E> {
|
||||
let mut old_boundary_state_roots = vec![];
|
||||
let mut good_boundary_state_roots = vec![];
|
||||
|
||||
// Skip the `cull_exempt` most-recently used, then reverse the iterator to start at
|
||||
// least-recently used states.
|
||||
for (&state_root, (_, state)) in self.states.iter().skip(cull_exempt).rev() {
|
||||
// Start at the least-recently used states, excluding the `cull_exempt` most-recently
|
||||
// used (which are the final entries of the iterator).
|
||||
let num_cull_candidates = self.states.len().saturating_sub(cull_exempt);
|
||||
for (&state_root, (_, state)) in self.states.iter().take(num_cull_candidates) {
|
||||
let is_advanced = state.slot() > state.latest_block_header().slot;
|
||||
let is_boundary = state.slot() % E::slots_per_epoch() == 0;
|
||||
let could_finalize =
|
||||
@@ -450,7 +451,7 @@ impl BlockMap {
|
||||
}
|
||||
|
||||
impl HotHDiffBufferCache {
|
||||
pub fn new(capacity: NonZeroUsize) -> Self {
|
||||
pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
hdiff_buffers: LruCache::new(capacity),
|
||||
}
|
||||
@@ -467,8 +468,8 @@ impl HotHDiffBufferCache {
|
||||
/// If the value was inserted then `true` is returned.
|
||||
pub fn put(&mut self, state_root: Hash256, slot: Slot, buffer: HDiffBuffer) -> bool {
|
||||
// If the cache is not full, simply insert the value.
|
||||
if self.hdiff_buffers.len() != self.hdiff_buffers.cap().get() {
|
||||
self.hdiff_buffers.put(state_root, (slot, buffer));
|
||||
if self.hdiff_buffers.len() != self.hdiff_buffers.capacity() {
|
||||
self.hdiff_buffers.insert(state_root, (slot, buffer));
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -484,23 +485,23 @@ impl HotHDiffBufferCache {
|
||||
return false;
|
||||
};
|
||||
|
||||
if self.hdiff_buffers.cap().get() > 1 || slot < min_slot {
|
||||
if self.hdiff_buffers.capacity() > 1 || slot < min_slot {
|
||||
// Remove LRU value. Cache is now at size `cap - 1`.
|
||||
let Some((removed_state_root, (removed_slot, removed_buffer))) =
|
||||
self.hdiff_buffers.pop_lru()
|
||||
self.hdiff_buffers.remove_lru()
|
||||
else {
|
||||
// Unreachable: cache is full so should have at least one entry to pop.
|
||||
return false;
|
||||
};
|
||||
|
||||
// Insert new value. Cache size is now at size `cap`.
|
||||
self.hdiff_buffers.put(state_root, (slot, buffer));
|
||||
self.hdiff_buffers.insert(state_root, (slot, buffer));
|
||||
|
||||
// If the removed value had the min slot and we didn't intend to replace it (cap=1)
|
||||
// then we reinsert it.
|
||||
if removed_slot == min_slot && slot >= min_slot {
|
||||
self.hdiff_buffers
|
||||
.put(removed_state_root, (removed_slot, removed_buffer));
|
||||
.insert(removed_state_root, (removed_slot, removed_buffer));
|
||||
}
|
||||
true
|
||||
} else {
|
||||
@@ -509,8 +510,8 @@ impl HotHDiffBufferCache {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cap(&self) -> NonZeroUsize {
|
||||
self.hdiff_buffers.cap()
|
||||
pub fn capacity(&self) -> usize {
|
||||
self.hdiff_buffers.capacity()
|
||||
}
|
||||
|
||||
#[allow(clippy::len_without_is_empty)]
|
||||
|
||||
@@ -21,6 +21,7 @@ deny = [
|
||||
{ crate = "scrypt", deny-multiple-versions = true, reason = "takes a long time to compile" },
|
||||
{ crate = "syn", deny-multiple-versions = true, reason = "takes a long time to compile" },
|
||||
{ crate = "uuid", deny-multiple-versions = true, reason = "dependency hygiene" },
|
||||
{ crate = "lru", deny-multiple-versions = true, reason = "use hashlink instead" },
|
||||
]
|
||||
|
||||
[sources]
|
||||
|
||||
@@ -22,9 +22,9 @@ ethereum_ssz_derive = { workspace = true }
|
||||
filesystem = { workspace = true }
|
||||
fixed_bytes = { workspace = true }
|
||||
flate2 = { version = "1.0.14", features = ["zlib"], default-features = false }
|
||||
hashlink = { workspace = true }
|
||||
lmdb-rkv = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true }
|
||||
lmdb-rkv-sys = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true }
|
||||
lru = { workspace = true }
|
||||
|
||||
# MDBX is pinned at the last version with Windows and macOS support.
|
||||
mdbx = { package = "libmdbx", git = "https://github.com/sigp/libmdbx-rs", rev = "e6ff4b9377c1619bcf0bfdf52bee5a980a432a1a", optional = true }
|
||||
|
||||
@@ -9,8 +9,8 @@ use crate::{
|
||||
};
|
||||
use bls::AggregateSignature;
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use hashlink::lru_cache::LruCache;
|
||||
use interface::{Environment, OpenDatabases, RwTransaction};
|
||||
use lru::LruCache;
|
||||
use parking_lot::Mutex;
|
||||
use serde::de::DeserializeOwned;
|
||||
use ssz::{Decode, Encode};
|
||||
@@ -305,7 +305,8 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
}
|
||||
}
|
||||
|
||||
let attestation_root_cache = Mutex::new(LruCache::new(config.attestation_root_cache_size));
|
||||
let attestation_root_cache =
|
||||
Mutex::new(LruCache::new(config.attestation_root_cache_size.get()));
|
||||
|
||||
let mut db = Self {
|
||||
env,
|
||||
@@ -559,7 +560,7 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
let indexed_attestation = self.get_indexed_attestation(txn, indexed_id)?;
|
||||
let attestation_data_root = indexed_attestation.data().tree_hash_root();
|
||||
|
||||
cache.put(indexed_id, attestation_data_root);
|
||||
cache.insert(indexed_id, attestation_data_root);
|
||||
|
||||
Ok((attestation_data_root, Some(indexed_attestation)))
|
||||
}
|
||||
@@ -570,13 +571,13 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
attestation_data_root: Hash256,
|
||||
) {
|
||||
let mut cache = self.attestation_root_cache.lock();
|
||||
cache.put(indexed_attestation_id, attestation_data_root);
|
||||
cache.insert(indexed_attestation_id, attestation_data_root);
|
||||
}
|
||||
|
||||
fn delete_attestation_data_roots(&self, ids: impl IntoIterator<Item = IndexedAttestationId>) {
|
||||
let mut cache = self.attestation_root_cache.lock();
|
||||
for indexed_id in ids {
|
||||
cache.pop(&indexed_id);
|
||||
cache.remove(&indexed_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user