mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 13:24:39 +00:00
Add cache for parallel HTTP requests (#4879)
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -569,9 +569,9 @@ dependencies = [
|
||||
"lru",
|
||||
"maplit",
|
||||
"merkle_proof",
|
||||
"oneshot_broadcast",
|
||||
"operation_pool",
|
||||
"parking_lot 0.12.1",
|
||||
"promise_cache",
|
||||
"proto_array",
|
||||
"rand 0.8.5",
|
||||
"rayon",
|
||||
@@ -6160,6 +6160,16 @@ dependencies = [
|
||||
"syn 2.0.48",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "promise_cache"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"derivative",
|
||||
"itertools",
|
||||
"oneshot_broadcast",
|
||||
"slog",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proto_array"
|
||||
version = "0.2.0"
|
||||
|
||||
@@ -37,6 +37,7 @@ members = [
|
||||
"common/malloc_utils",
|
||||
"common/oneshot_broadcast",
|
||||
"common/pretty_reqwest_error",
|
||||
"common/promise_cache",
|
||||
"common/sensitive_url",
|
||||
"common/slot_clock",
|
||||
"common/system_health",
|
||||
|
||||
@@ -43,9 +43,9 @@ lighthouse_metrics = { workspace = true }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
merkle_proof = { workspace = true }
|
||||
oneshot_broadcast = { path = "../../common/oneshot_broadcast/" }
|
||||
operation_pool = { workspace = true }
|
||||
parking_lot = { workspace = true }
|
||||
promise_cache = { path = "../../common/promise_cache" }
|
||||
proto_array = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
rayon = { workspace = true }
|
||||
|
||||
@@ -53,6 +53,7 @@ use crate::observed_blob_sidecars::ObservedBlobSidecars;
|
||||
use crate::observed_block_producers::ObservedBlockProducers;
|
||||
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
|
||||
use crate::observed_slashable::ObservedSlashable;
|
||||
use crate::parallel_state_cache::ParallelStateCache;
|
||||
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
|
||||
use crate::persisted_fork_choice::PersistedForkChoice;
|
||||
use crate::pre_finalization_cache::PreFinalizationBlockCache;
|
||||
@@ -460,6 +461,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
|
||||
/// A cache used to track pre-finalization block roots for quick rejection.
|
||||
pub pre_finalization_block_cache: PreFinalizationBlockCache,
|
||||
/// A cache used to de-duplicate HTTP state requests.
|
||||
///
|
||||
/// The cache is keyed by `state_root`.
|
||||
pub parallel_state_cache: Arc<RwLock<ParallelStateCache<T::EthSpec>>>,
|
||||
/// Sender given to tasks, so that if they encounter a state in which execution cannot
|
||||
/// continue they can request that everything shuts down.
|
||||
pub shutdown_sender: Sender<ShutdownReason>,
|
||||
@@ -3868,7 +3873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.shuffling_cache
|
||||
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or(Error::AttestationCacheLockTimeout)?
|
||||
.insert_committee_cache(shuffling_id, committee_cache);
|
||||
.insert_value(shuffling_id, committee_cache);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -6041,7 +6046,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// access.
|
||||
drop(shuffling_cache);
|
||||
|
||||
let committee_cache = cache_item.wait()?;
|
||||
let committee_cache = cache_item.wait().map_err(Error::ShufflingCacheError)?;
|
||||
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
|
||||
} else {
|
||||
// Create an entry in the cache that "promises" this value will eventually be computed.
|
||||
@@ -6050,7 +6055,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
//
|
||||
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
|
||||
// promise from being created twice.
|
||||
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;
|
||||
let sender = shuffling_cache
|
||||
.create_promise(shuffling_id.clone())
|
||||
.map_err(Error::ShufflingCacheError)?;
|
||||
|
||||
// Drop the shuffling cache to avoid holding the lock for any longer than
|
||||
// required.
|
||||
@@ -6144,7 +6151,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.shuffling_cache
|
||||
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or(Error::AttestationCacheLockTimeout)?
|
||||
.insert_committee_cache(shuffling_id, &committee_cache);
|
||||
.insert_value(shuffling_id, &committee_cache);
|
||||
|
||||
metrics::stop_timer(committee_building_timer);
|
||||
|
||||
@@ -6446,6 +6453,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
self.data_availability_checker.data_availability_boundary()
|
||||
}
|
||||
|
||||
pub fn logger(&self) -> &Logger {
|
||||
&self.log
|
||||
}
|
||||
|
||||
/// Gets the `LightClientBootstrap` object for a requested block root.
|
||||
///
|
||||
/// Returns `None` when the state or block is not found in the database.
|
||||
|
||||
@@ -23,6 +23,7 @@ use futures::channel::mpsc::Sender;
|
||||
use kzg::{Kzg, TrustedSetup};
|
||||
use operation_pool::{OperationPool, PersistedOperationPool};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use promise_cache::PromiseCache;
|
||||
use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
|
||||
use slasher::Slasher;
|
||||
use slog::{crit, debug, error, info, o, Logger};
|
||||
@@ -851,6 +852,7 @@ where
|
||||
let genesis_time = head_snapshot.beacon_state.genesis_time();
|
||||
let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot));
|
||||
let shuffling_cache_size = self.chain_config.shuffling_cache_size;
|
||||
let parallel_state_cache_size = self.chain_config.parallel_state_cache_size;
|
||||
|
||||
// Calculate the weak subjectivity point in which to backfill blocks to.
|
||||
let genesis_backfill_slot = if self.chain_config.genesis_backfill {
|
||||
@@ -933,6 +935,11 @@ where
|
||||
beacon_proposer_cache,
|
||||
block_times_cache: <_>::default(),
|
||||
pre_finalization_block_cache: <_>::default(),
|
||||
parallel_state_cache: Arc::new(RwLock::new(PromiseCache::new(
|
||||
parallel_state_cache_size,
|
||||
Default::default(),
|
||||
log.clone(),
|
||||
))),
|
||||
validator_pubkey_cache,
|
||||
attester_cache: <_>::default(),
|
||||
early_attester_cache: <_>::default(),
|
||||
|
||||
@@ -820,9 +820,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
Ok(head_shuffling_ids) => {
|
||||
self.shuffling_cache
|
||||
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
|
||||
.map(|mut shuffling_cache| {
|
||||
shuffling_cache.update_head_shuffling_ids(head_shuffling_ids)
|
||||
})
|
||||
.map(|mut shuffling_cache| shuffling_cache.update_protector(head_shuffling_ids))
|
||||
.unwrap_or_else(|| {
|
||||
error!(
|
||||
self.log,
|
||||
|
||||
@@ -15,6 +15,9 @@ pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3;
|
||||
/// Fraction of a slot lookahead for fork choice in the state advance timer (500ms on mainnet).
|
||||
pub const FORK_CHOICE_LOOKAHEAD_FACTOR: u32 = 24;
|
||||
|
||||
/// Cache only a small number of states in the parallel cache by default.
|
||||
pub const DEFAULT_PARALLEL_STATE_CACHE_SIZE: usize = 2;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)]
|
||||
pub struct ChainConfig {
|
||||
/// Maximum number of slots to skip when importing an attestation.
|
||||
@@ -83,6 +86,8 @@ pub struct ChainConfig {
|
||||
pub progressive_balances_mode: ProgressiveBalancesMode,
|
||||
/// Number of epochs between each migration of data from the hot database to the freezer.
|
||||
pub epochs_per_migration: u64,
|
||||
/// Size of the promise cache for de-duplicating parallel state requests.
|
||||
pub parallel_state_cache_size: usize,
|
||||
}
|
||||
|
||||
impl Default for ChainConfig {
|
||||
@@ -114,6 +119,7 @@ impl Default for ChainConfig {
|
||||
always_prepare_payload: false,
|
||||
progressive_balances_mode: ProgressiveBalancesMode::Fast,
|
||||
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
|
||||
parallel_state_cache_size: DEFAULT_PARALLEL_STATE_CACHE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -211,8 +211,7 @@ pub enum BeaconChainError {
|
||||
},
|
||||
AttestationHeadNotInForkChoice(Hash256),
|
||||
MissingPersistedForkChoice,
|
||||
CommitteePromiseFailed(oneshot_broadcast::Error),
|
||||
MaxCommitteePromises(usize),
|
||||
ShufflingCacheError(promise_cache::PromiseCacheError),
|
||||
BlsToExecutionPriorToCapella,
|
||||
BlsToExecutionConflictsWithPool,
|
||||
InconsistentFork(InconsistentFork),
|
||||
|
||||
@@ -42,6 +42,7 @@ pub mod observed_block_producers;
|
||||
pub mod observed_operations;
|
||||
mod observed_slashable;
|
||||
pub mod otb_verification_service;
|
||||
mod parallel_state_cache;
|
||||
mod persisted_beacon_chain;
|
||||
mod persisted_fork_choice;
|
||||
mod pre_finalization_cache;
|
||||
|
||||
22
beacon_node/beacon_chain/src/parallel_state_cache.rs
Normal file
22
beacon_node/beacon_chain/src/parallel_state_cache.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use promise_cache::{PromiseCache, Protect};
|
||||
use types::{BeaconState, Hash256};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ParallelStateProtector;
|
||||
|
||||
impl Protect<Hash256> for ParallelStateProtector {
|
||||
type SortKey = usize;
|
||||
|
||||
/// Evict in arbitrary (hashmap) order by using the same key for every value.
|
||||
fn sort_key(&self, _: &Hash256) -> Self::SortKey {
|
||||
0
|
||||
}
|
||||
|
||||
/// We don't care too much about preventing evictions of particular states here. All the states
|
||||
/// in this cache should be different from the head state.
|
||||
fn protect_from_eviction(&self, _: &Hash256) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub type ParallelStateCache<E> = PromiseCache<Hash256, BeaconState<E>, ParallelStateProtector>;
|
||||
@@ -1,240 +1,53 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use itertools::Itertools;
|
||||
use promise_cache::{PromiseCache, Protect};
|
||||
use slog::{debug, Logger};
|
||||
|
||||
use oneshot_broadcast::{oneshot, Receiver, Sender};
|
||||
use types::{
|
||||
beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256,
|
||||
RelativeEpoch,
|
||||
};
|
||||
|
||||
use crate::{metrics, BeaconChainError};
|
||||
|
||||
/// The size of the cache that stores committee caches for quicker verification.
|
||||
///
|
||||
/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash +
|
||||
/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this
|
||||
/// ignores a few extra bytes in the caches that should be insignificant compared to the indices).
|
||||
pub const DEFAULT_CACHE_SIZE: usize = 16;
|
||||
|
||||
/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this
|
||||
/// limits the number of concurrent states that can be loaded into memory for the committee cache.
|
||||
/// This prevents excessive memory usage at the cost of rejecting some attestations.
|
||||
///
|
||||
/// The cache size also determines the maximum number of concurrent committee cache "promises" that
|
||||
/// can be issued. In effect, this limits the number of concurrent states that can be loaded into
|
||||
/// memory for the committee cache. This prevents excessive memory usage at the cost of rejecting
|
||||
/// some attestations.
|
||||
///
|
||||
/// We set this value to 2 since states can be quite large and have a significant impact on memory
|
||||
/// usage. A healthy network cannot have more than a few committee caches and those caches should
|
||||
/// always be inserted during block import. Unstable networks with a high degree of forking might
|
||||
/// see some attestations dropped due to this concurrency limit, however I propose that this is
|
||||
/// better than low-resource nodes going OOM.
|
||||
const MAX_CONCURRENT_PROMISES: usize = 2;
|
||||
pub const DEFAULT_CACHE_SIZE: usize = 16;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum CacheItem {
|
||||
/// A committee.
|
||||
Committee(Arc<CommitteeCache>),
|
||||
/// A promise for a future committee.
|
||||
Promise(Receiver<Arc<CommitteeCache>>),
|
||||
impl Protect<AttestationShufflingId> for BlockShufflingIds {
|
||||
type SortKey = Epoch;
|
||||
|
||||
fn sort_key(&self, k: &AttestationShufflingId) -> Epoch {
|
||||
k.shuffling_epoch
|
||||
}
|
||||
|
||||
impl CacheItem {
|
||||
pub fn is_promise(&self) -> bool {
|
||||
matches!(self, CacheItem::Promise(_))
|
||||
fn protect_from_eviction(&self, shuffling_id: &AttestationShufflingId) -> bool {
|
||||
Some(shuffling_id) == self.id_for_epoch(shuffling_id.shuffling_epoch).as_ref()
|
||||
}
|
||||
|
||||
pub fn wait(self) -> Result<Arc<CommitteeCache>, BeaconChainError> {
|
||||
match self {
|
||||
CacheItem::Committee(cache) => Ok(cache),
|
||||
CacheItem::Promise(receiver) => receiver
|
||||
.recv()
|
||||
.map_err(BeaconChainError::CommitteePromiseFailed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Provides a cache for `CommitteeCache`.
|
||||
///
|
||||
/// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like
|
||||
/// a find/replace error.
|
||||
pub struct ShufflingCache {
|
||||
cache: HashMap<AttestationShufflingId, CacheItem>,
|
||||
cache_size: usize,
|
||||
head_shuffling_ids: BlockShufflingIds,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
impl ShufflingCache {
|
||||
pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds, logger: Logger) -> Self {
|
||||
Self {
|
||||
cache: HashMap::new(),
|
||||
cache_size,
|
||||
head_shuffling_ids,
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &AttestationShufflingId) -> Option<CacheItem> {
|
||||
match self.cache.get(key) {
|
||||
// The cache contained the committee cache, return it.
|
||||
item @ Some(CacheItem::Committee(_)) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
|
||||
item.cloned()
|
||||
}
|
||||
// The cache contains a promise for the committee cache. Check to see if the promise has
|
||||
// already been resolved, without waiting for it.
|
||||
item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() {
|
||||
// The promise has already been resolved. Replace the entry in the cache with a
|
||||
// `Committee` entry and then return the committee.
|
||||
Ok(Some(committee)) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
|
||||
let ready = CacheItem::Committee(committee);
|
||||
self.insert_cache_item(key.clone(), ready.clone());
|
||||
Some(ready)
|
||||
}
|
||||
// The promise has not yet been resolved. Return the promise so the caller can await
|
||||
// it.
|
||||
Ok(None) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS);
|
||||
item.cloned()
|
||||
}
|
||||
// The sender has been dropped without sending a committee. There was most likely an
|
||||
// error computing the committee cache. Drop the key from the cache and return
|
||||
// `None` so the caller can recompute the committee.
|
||||
//
|
||||
// It's worth noting that this is the only place where we removed unresolved
|
||||
// promises from the cache. This means unresolved promises will only be removed if
|
||||
// we try to access them again. This is OK, since the promises don't consume much
|
||||
// memory. We expect that *all* promises should be resolved, unless there is a
|
||||
// programming or database error.
|
||||
Err(oneshot_broadcast::Error::SenderDropped) => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS);
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES);
|
||||
self.cache.remove(key);
|
||||
None
|
||||
}
|
||||
},
|
||||
// The cache does not have this committee and it's not already promised to be computed.
|
||||
None => {
|
||||
metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, key: &AttestationShufflingId) -> bool {
|
||||
self.cache.contains_key(key)
|
||||
}
|
||||
|
||||
pub fn insert_committee_cache<C: ToArcCommitteeCache>(
|
||||
&mut self,
|
||||
key: AttestationShufflingId,
|
||||
committee_cache: &C,
|
||||
) {
|
||||
if self
|
||||
.cache
|
||||
.get(&key)
|
||||
// Replace the committee if it's not present or if it's a promise. A bird in the hand is
|
||||
// worth two in the promise-bush!
|
||||
.map_or(true, CacheItem::is_promise)
|
||||
{
|
||||
self.insert_cache_item(
|
||||
key,
|
||||
CacheItem::Committee(committee_cache.to_arc_committee_cache()),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Prunes the cache first before inserting a new cache item.
|
||||
fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) {
|
||||
self.prune_cache();
|
||||
self.cache.insert(key, cache_item);
|
||||
}
|
||||
|
||||
/// Prunes the `cache` to keep the size below the `cache_size` limit, based on the following
|
||||
/// preferences:
|
||||
/// - Entries from more recent epochs are preferred over older ones.
|
||||
/// - Entries with shuffling ids matching the head's previous, current, and future epochs must
|
||||
/// not be pruned.
|
||||
fn prune_cache(&mut self) {
|
||||
let target_cache_size = self.cache_size.saturating_sub(1);
|
||||
if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) {
|
||||
let shuffling_ids_to_prune = self
|
||||
.cache
|
||||
.keys()
|
||||
.sorted_by_key(|key| key.shuffling_epoch)
|
||||
.filter(|shuffling_id| {
|
||||
Some(shuffling_id)
|
||||
!= self
|
||||
.head_shuffling_ids
|
||||
.id_for_epoch(shuffling_id.shuffling_epoch)
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
})
|
||||
.take(prune_count)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for shuffling_id in shuffling_ids_to_prune.iter() {
|
||||
fn notify_eviction(&self, shuffling_id: &AttestationShufflingId, logger: &Logger) {
|
||||
debug!(
|
||||
self.logger,
|
||||
logger,
|
||||
"Removing old shuffling from cache";
|
||||
"shuffling_epoch" => shuffling_id.shuffling_epoch,
|
||||
"shuffling_decision_block" => ?shuffling_id.shuffling_decision_block
|
||||
);
|
||||
self.cache.remove(shuffling_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_promise(
|
||||
&mut self,
|
||||
key: AttestationShufflingId,
|
||||
) -> Result<Sender<Arc<CommitteeCache>>, BeaconChainError> {
|
||||
let num_active_promises = self
|
||||
.cache
|
||||
.iter()
|
||||
.filter(|(_, item)| item.is_promise())
|
||||
.count();
|
||||
if num_active_promises >= MAX_CONCURRENT_PROMISES {
|
||||
return Err(BeaconChainError::MaxCommitteePromises(num_active_promises));
|
||||
}
|
||||
|
||||
let (sender, receiver) = oneshot();
|
||||
self.insert_cache_item(key, CacheItem::Promise(receiver));
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
/// Inform the cache that the shuffling decision roots for the head has changed.
|
||||
///
|
||||
/// The shufflings for the head's previous, current, and future epochs will never be ejected from
|
||||
/// the cache during `Self::insert_cache_item`.
|
||||
pub fn update_head_shuffling_ids(&mut self, head_shuffling_ids: BlockShufflingIds) {
|
||||
self.head_shuffling_ids = head_shuffling_ids;
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache.
|
||||
pub trait ToArcCommitteeCache {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache>;
|
||||
}
|
||||
|
||||
impl ToArcCommitteeCache for CommitteeCache {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache> {
|
||||
Arc::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl ToArcCommitteeCache for Arc<CommitteeCache> {
|
||||
fn to_arc_committee_cache(&self) -> Arc<CommitteeCache> {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
pub type ShufflingCache = PromiseCache<AttestationShufflingId, CommitteeCache, BlockShufflingIds>;
|
||||
|
||||
/// Contains the shuffling IDs for a beacon block.
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BlockShufflingIds {
|
||||
pub current: AttestationShufflingId,
|
||||
pub next: AttestationShufflingId,
|
||||
@@ -294,13 +107,13 @@ impl BlockShufflingIds {
|
||||
#[cfg(not(debug_assertions))]
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::test_utils::EphemeralHarnessType;
|
||||
use promise_cache::{CacheItem, PromiseCacheError};
|
||||
use std::sync::Arc;
|
||||
use task_executor::test_utils::null_logger;
|
||||
use types::*;
|
||||
|
||||
use crate::test_utils::EphemeralHarnessType;
|
||||
|
||||
use super::*;
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
type TestBeaconChainType = EphemeralHarnessType<E>;
|
||||
type BeaconChainHarness = crate::test_utils::BeaconChainHarness<TestBeaconChainType>;
|
||||
@@ -372,10 +185,10 @@ mod test {
|
||||
// Ensure the promise has been resolved.
|
||||
let item = cache.get(&id_a).unwrap();
|
||||
assert!(
|
||||
matches!(item, CacheItem::Committee(committee) if committee == committee_a),
|
||||
matches!(item, CacheItem::Complete(committee) if committee == committee_a),
|
||||
"the promise should be resolved"
|
||||
);
|
||||
assert_eq!(cache.cache.len(), 1, "the cache should have one entry");
|
||||
assert_eq!(cache.len(), 1, "the cache should have one entry");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -399,7 +212,7 @@ mod test {
|
||||
|
||||
// Ensure the key now indicates an empty slot.
|
||||
assert!(cache.get(&id_a).is_none(), "the slot should be empty");
|
||||
assert!(cache.cache.is_empty(), "the cache should be empty");
|
||||
assert!(cache.is_empty(), "the cache should be empty");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -433,7 +246,7 @@ mod test {
|
||||
// Ensure promise A has been resolved.
|
||||
let item = cache.get(&id_a).unwrap();
|
||||
assert!(
|
||||
matches!(item, CacheItem::Committee(committee) if committee == committee_a),
|
||||
matches!(item, CacheItem::Complete(committee) if committee == committee_a),
|
||||
"promise A should be resolved"
|
||||
);
|
||||
|
||||
@@ -442,41 +255,40 @@ mod test {
|
||||
// Ensure promise B has been resolved.
|
||||
let item = cache.get(&id_b).unwrap();
|
||||
assert!(
|
||||
matches!(item, CacheItem::Committee(committee) if committee == committee_b),
|
||||
matches!(item, CacheItem::Complete(committee) if committee == committee_b),
|
||||
"promise B should be resolved"
|
||||
);
|
||||
|
||||
// Check both entries again.
|
||||
assert!(
|
||||
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a),
|
||||
matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee) if committee == committee_a),
|
||||
"promise A should remain resolved"
|
||||
);
|
||||
assert!(
|
||||
matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b),
|
||||
matches!(cache.get(&id_b).unwrap(), CacheItem::Complete(committee) if committee == committee_b),
|
||||
"promise B should remain resolved"
|
||||
);
|
||||
assert_eq!(cache.cache.len(), 2, "the cache should have two entries");
|
||||
assert_eq!(cache.len(), 2, "the cache should have two entries");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn too_many_promises() {
|
||||
let mut cache = new_shuffling_cache();
|
||||
|
||||
for i in 0..MAX_CONCURRENT_PROMISES {
|
||||
for i in 0..cache.max_concurrent_promises() {
|
||||
cache.create_promise(shuffling_id(i as u64)).unwrap();
|
||||
}
|
||||
|
||||
// Ensure that the next promise returns an error. It is important for the application to
|
||||
// dump his ass when he can't keep his promises, you're a queen and you deserve better.
|
||||
assert!(matches!(
|
||||
cache.create_promise(shuffling_id(MAX_CONCURRENT_PROMISES as u64)),
|
||||
Err(BeaconChainError::MaxCommitteePromises(
|
||||
MAX_CONCURRENT_PROMISES
|
||||
))
|
||||
cache.create_promise(shuffling_id(cache.max_concurrent_promises() as u64)),
|
||||
Err(PromiseCacheError::MaxConcurrentPromises(n))
|
||||
if n == cache.max_concurrent_promises()
|
||||
));
|
||||
assert_eq!(
|
||||
cache.cache.len(),
|
||||
MAX_CONCURRENT_PROMISES,
|
||||
cache.len(),
|
||||
cache.max_concurrent_promises(),
|
||||
"the cache should have two entries"
|
||||
);
|
||||
}
|
||||
@@ -486,9 +298,9 @@ mod test {
|
||||
let mut cache = new_shuffling_cache();
|
||||
let id_a = shuffling_id(1);
|
||||
let committee_cache_a = Arc::new(CommitteeCache::default());
|
||||
cache.insert_committee_cache(id_a.clone(), &committee_cache_a);
|
||||
cache.insert_value(id_a.clone(), &committee_cache_a);
|
||||
assert!(
|
||||
matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a),
|
||||
matches!(cache.get(&id_a).unwrap(), CacheItem::Complete(committee_cache) if committee_cache == committee_cache_a),
|
||||
"should insert committee cache"
|
||||
);
|
||||
}
|
||||
@@ -501,7 +313,7 @@ mod test {
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() {
|
||||
cache.insert_committee_cache(shuffling_id.clone(), committee_cache);
|
||||
cache.insert_value(shuffling_id.clone(), committee_cache);
|
||||
}
|
||||
|
||||
for i in 1..(TEST_CACHE_SIZE + 1) {
|
||||
@@ -515,11 +327,7 @@ mod test {
|
||||
!cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0),
|
||||
"should not contain oldest epoch shuffling id"
|
||||
);
|
||||
assert_eq!(
|
||||
cache.cache.len(),
|
||||
cache.cache_size,
|
||||
"should limit cache size"
|
||||
);
|
||||
assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size");
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -534,7 +342,7 @@ mod test {
|
||||
shuffling_epoch: (current_epoch + 1).into(),
|
||||
shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64),
|
||||
};
|
||||
cache.insert_committee_cache(shuffling_id, &committee_cache);
|
||||
cache.insert_value(shuffling_id, &committee_cache);
|
||||
}
|
||||
|
||||
// Now, update the head shuffling ids
|
||||
@@ -544,12 +352,12 @@ mod test {
|
||||
previous: Some(shuffling_id(current_epoch - 1)),
|
||||
block_root: Hash256::from_low_u64_le(42),
|
||||
};
|
||||
cache.update_head_shuffling_ids(head_shuffling_ids.clone());
|
||||
cache.update_protector(head_shuffling_ids.clone());
|
||||
|
||||
// Insert head state shuffling ids. Should not be overridden by other shuffling ids.
|
||||
cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache);
|
||||
cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache);
|
||||
cache.insert_committee_cache(
|
||||
cache.insert_value(head_shuffling_ids.current.clone(), &committee_cache);
|
||||
cache.insert_value(head_shuffling_ids.next.clone(), &committee_cache);
|
||||
cache.insert_value(
|
||||
head_shuffling_ids.previous.clone().unwrap(),
|
||||
&committee_cache,
|
||||
);
|
||||
@@ -560,7 +368,7 @@ mod test {
|
||||
shuffling_epoch: Epoch::from(i),
|
||||
shuffling_decision_block: Hash256::from_low_u64_be(i as u64),
|
||||
};
|
||||
cache.insert_committee_cache(shuffling_id, &committee_cache);
|
||||
cache.insert_value(shuffling_id, &committee_cache);
|
||||
}
|
||||
|
||||
assert!(
|
||||
@@ -575,10 +383,6 @@ mod test {
|
||||
cache.contains(&head_shuffling_ids.previous.unwrap()),
|
||||
"should retain head shuffling id for previous epoch."
|
||||
);
|
||||
assert_eq!(
|
||||
cache.cache.len(),
|
||||
cache.cache_size,
|
||||
"should limit cache size"
|
||||
);
|
||||
assert_eq!(cache.len(), TEST_CACHE_SIZE, "should limit cache size");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,7 +408,7 @@ fn advance_head<T: BeaconChainTypes>(
|
||||
.shuffling_cache
|
||||
.try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT)
|
||||
.ok_or(BeaconChainError::AttestationCacheLockTimeout)?
|
||||
.insert_committee_cache(shuffling_id.clone(), committee_cache);
|
||||
.insert_value(shuffling_id.clone(), committee_cache);
|
||||
|
||||
debug!(
|
||||
log,
|
||||
|
||||
@@ -926,7 +926,7 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
.shuffling_cache
|
||||
.try_write_for(std::time::Duration::from_secs(1))
|
||||
{
|
||||
cache_write.insert_committee_cache(
|
||||
cache_write.insert_value(
|
||||
shuffling_id,
|
||||
&possibly_built_cache,
|
||||
);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::ExecutionOptimistic;
|
||||
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
|
||||
use eth2::types::StateId as CoreStateId;
|
||||
use slog::{debug, warn};
|
||||
use std::fmt;
|
||||
use std::str::FromStr;
|
||||
use types::{BeaconState, Checkpoint, EthSpec, Fork, Hash256, Slot};
|
||||
@@ -187,6 +188,49 @@ impl StateId {
|
||||
_ => (self.root(chain)?, None),
|
||||
};
|
||||
|
||||
let mut opt_state_cache = Some(chain.parallel_state_cache.write());
|
||||
|
||||
// Try the cache.
|
||||
if let Some(cache_item) = opt_state_cache
|
||||
.as_mut()
|
||||
.and_then(|cache| cache.get(&state_root))
|
||||
{
|
||||
drop(opt_state_cache.take());
|
||||
match cache_item.wait() {
|
||||
Ok(state) => {
|
||||
debug!(
|
||||
chain.logger(),
|
||||
"HTTP state cache hit";
|
||||
"state_root" => ?state_root,
|
||||
"slot" => state.slot(),
|
||||
);
|
||||
return Ok(((*state).clone(), execution_optimistic, finalized));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
chain.logger(),
|
||||
"State promise failed";
|
||||
"state_root" => ?state_root,
|
||||
"outcome" => "re-computing",
|
||||
"error" => ?e,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-lock only in case of failed promise.
|
||||
debug!(
|
||||
chain.logger(),
|
||||
"HTTP state cache miss";
|
||||
"state_root" => ?state_root
|
||||
);
|
||||
let mut state_cache = opt_state_cache.unwrap_or_else(|| chain.parallel_state_cache.write());
|
||||
|
||||
let sender = state_cache.create_promise(state_root).map_err(|e| {
|
||||
warp_utils::reject::custom_server_error(format!("too many concurrent requests: {e:?}"))
|
||||
})?;
|
||||
drop(state_cache);
|
||||
|
||||
let state = chain
|
||||
.get_state(&state_root, slot_opt)
|
||||
.map_err(warp_utils::reject::beacon_chain_error)
|
||||
@@ -199,6 +243,11 @@ impl StateId {
|
||||
})
|
||||
})?;
|
||||
|
||||
// Fulfil promise (and re-lock again).
|
||||
let mut state_cache = chain.parallel_state_cache.write();
|
||||
state_cache.resolve_promise(sender, state_root, &state);
|
||||
drop(state_cache);
|
||||
|
||||
Ok((state, execution_optimistic, finalized))
|
||||
}
|
||||
|
||||
|
||||
@@ -2315,7 +2315,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
|
||||
debug!(self.log, "Attestation for finalized state"; "peer_id" => % peer_id);
|
||||
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore);
|
||||
}
|
||||
e @ AttnError::BeaconChainError(BeaconChainError::MaxCommitteePromises(_)) => {
|
||||
AttnError::BeaconChainError(BeaconChainError::ShufflingCacheError(e)) => {
|
||||
debug!(
|
||||
self.log,
|
||||
"Dropping attestation";
|
||||
|
||||
@@ -821,6 +821,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.takes_value(true)
|
||||
.default_value("0")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("parallel-state-cache-size")
|
||||
.long("parallel-state-cache-size")
|
||||
.value_name("N")
|
||||
.help("Set the size of the cache used to de-duplicate requests for the same \
|
||||
state. This cache is additional to other state caches within Lighthouse \
|
||||
and should be kept small unless a large number of parallel requests for \
|
||||
different states are anticipated.")
|
||||
.takes_value(true)
|
||||
.default_value("2")
|
||||
)
|
||||
|
||||
/*
|
||||
* Misc.
|
||||
|
||||
@@ -391,6 +391,11 @@ pub fn get_config<E: EthSpec>(
|
||||
if let Some(state_cache_size) = clap_utils::parse_optional(cli_args, "state-cache-size")? {
|
||||
client_config.store.state_cache_size = state_cache_size;
|
||||
}
|
||||
if let Some(parallel_state_cache_size) =
|
||||
clap_utils::parse_optional(cli_args, "parallel-state-cache-size")?
|
||||
{
|
||||
client_config.chain.parallel_state_cache_size = parallel_state_cache_size;
|
||||
}
|
||||
if let Some(diff_buffer_cache_size) =
|
||||
clap_utils::parse_optional(cli_args, "diff-buffer-cache-size")?
|
||||
{
|
||||
|
||||
@@ -117,6 +117,8 @@ FLAGS:
|
||||
--eth1` pre-merge
|
||||
--subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also
|
||||
advertise the beacon node as being long-lived subscribed to all subnets.
|
||||
--unsafe-and-dangerous-mode Don't use this flag unless you know what you're doing. Go back and
|
||||
download a stable Lighthouse release
|
||||
--validator-monitor-auto Enables the automatic detection and monitoring of validators connected to
|
||||
the HTTP API and using the subnet subscription endpoint. This generally
|
||||
has the effect of providing additional logging and metrics for locally
|
||||
@@ -195,6 +197,9 @@ OPTIONS:
|
||||
--checkpoint-sync-url-timeout <SECONDS>
|
||||
Set the timeout for checkpoint sync calls to remote beacon node HTTP endpoint. [default: 180]
|
||||
|
||||
--compression-level <LEVEL>
|
||||
Compression level (-99 to 22) for zstd compression applied to states on disk [default: 1]. You may change
|
||||
the compression level freely without re-syncing.
|
||||
-d, --datadir <DIR>
|
||||
Used to specify a custom root data directory for lighthouse keys and databases. Defaults to
|
||||
$HOME/.lighthouse/{network} where network is the value of the `network` flag Note: Users should specify
|
||||
@@ -202,6 +207,9 @@ OPTIONS:
|
||||
--debug-level <LEVEL>
|
||||
Specifies the verbosity level used when emitting logs to the terminal. [default: info] [possible values:
|
||||
info, debug, trace, warn, error, crit]
|
||||
--diff-buffer-cache-size <SIZE>
|
||||
The maximum number of diff buffers to hold in memory. This cache is used when fetching historic states
|
||||
[default: 16]
|
||||
--discovery-port <PORT>
|
||||
The UDP port that discovery will listen on. Defaults to `port`
|
||||
|
||||
@@ -238,6 +246,10 @@ OPTIONS:
|
||||
--epochs-per-migration <N>
|
||||
The number of epochs to wait between running the migration of data from the hot DB to the cold DB. Less
|
||||
frequent runs can be useful for minimizing disk writes [default: 1]
|
||||
--epochs-per-state-diff <EPOCHS>
|
||||
Number of epochs between state diffs stored in the database. Lower values result in more writes and more
|
||||
data stored, while higher values result in more block replaying and longer load times in case of cache miss.
|
||||
[default: 16]
|
||||
--eth1-blocks-per-log-query <BLOCKS>
|
||||
Specifies the number of blocks that a deposit log query should span. This will reduce the size of responses
|
||||
from the Eth1 endpoint. [default: 1000]
|
||||
@@ -280,6 +292,13 @@ OPTIONS:
|
||||
--graffiti <GRAFFITI>
|
||||
Specify your custom graffiti to be included in blocks. Defaults to the current version and commit, truncated
|
||||
to fit in 32 bytes.
|
||||
--hierarchy-exponents <EXPONENTS>
|
||||
Specifies the frequency for storing full state snapshots and hierarchical diffs in the freezer DB. Accepts a
|
||||
comma-separated list of ascending exponents. Each exponent defines an interval for storing diffs to the
|
||||
layer above. The last exponent defines the interval for full snapshots. For example, a config of '4,8,12'
|
||||
would store a full snapshot every 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second-
|
||||
level diffs every 16 (2^4) slots. Cannot be changed after initialization. [default:
|
||||
5,9,11,13,16,18,21]
|
||||
--historic-state-cache-size <SIZE>
|
||||
Specifies how many states from the freezer database should cache in memory [default: 1]
|
||||
|
||||
@@ -376,6 +395,10 @@ OPTIONS:
|
||||
--network-dir <DIR>
|
||||
Data directory for network keys. Defaults to network/ inside the beacon node dir.
|
||||
|
||||
--parallel-state-cache-size <N>
|
||||
Set the size of the cache used to de-duplicate requests for the same state. This cache is additional to
|
||||
other state caches within Lighthouse and should be kept small unless a large number of parallel requests for
|
||||
different states are anticipated. [default: 2]
|
||||
--port <PORT>
|
||||
The TCP/UDP ports to listen on. There are two UDP ports. The discovery UDP port will be set to this value
|
||||
and the Quic UDP port will be set to this value + 1. The discovery port can be modified by the --discovery-
|
||||
@@ -462,6 +485,9 @@ OPTIONS:
|
||||
--slots-per-restore-point <SLOT_COUNT>
|
||||
Specifies how often a freezer DB restore point should be stored. Cannot be changed after initialization.
|
||||
[default: 8192 (mainnet) or 64 (minimal)]
|
||||
--state-cache-size <SIZE>
|
||||
Specifies how many states the database should cache in memory [default: 128]
|
||||
|
||||
--suggested-fee-recipient <SUGGESTED-FEE-RECIPIENT>
|
||||
Emergency fallback fee recipient for use in case the validator client does not have one configured. You
|
||||
should set this flag on the validator client instead of (or in addition to) setting it here.
|
||||
|
||||
10
common/promise_cache/Cargo.toml
Normal file
10
common/promise_cache/Cargo.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "promise_cache"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
|
||||
[dependencies]
|
||||
derivative = { workspace = true }
|
||||
oneshot_broadcast = { path = "../oneshot_broadcast" }
|
||||
itertools = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
227
common/promise_cache/src/lib.rs
Normal file
227
common/promise_cache/src/lib.rs
Normal file
@@ -0,0 +1,227 @@
|
||||
use derivative::Derivative;
|
||||
use itertools::Itertools;
|
||||
use oneshot_broadcast::{oneshot, Receiver, Sender};
|
||||
use slog::Logger;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::Hash;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PromiseCache<K, V, P>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
P: Protect<K>,
|
||||
{
|
||||
cache: HashMap<K, CacheItem<V>>,
|
||||
capacity: usize,
|
||||
protector: P,
|
||||
max_concurrent_promises: usize,
|
||||
logger: Logger,
|
||||
}
|
||||
|
||||
/// A value implementing `Protect` is capable of preventing keys of type `K` from being evicted.
|
||||
///
|
||||
/// It also dictates an ordering on keys which is used to prioritise evictions.
|
||||
pub trait Protect<K> {
|
||||
type SortKey: Ord;
|
||||
|
||||
fn sort_key(&self, k: &K) -> Self::SortKey;
|
||||
|
||||
fn protect_from_eviction(&self, k: &K) -> bool;
|
||||
|
||||
fn notify_eviction(&self, _k: &K, _log: &Logger) {}
|
||||
}
|
||||
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Clone(bound = ""))]
|
||||
pub enum CacheItem<T> {
|
||||
Complete(Arc<T>),
|
||||
Promise(Receiver<Arc<T>>),
|
||||
}
|
||||
|
||||
impl<T: std::fmt::Debug> std::fmt::Debug for CacheItem<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
match self {
|
||||
CacheItem::Complete(value) => value.fmt(f),
|
||||
CacheItem::Promise(_) => "Promise(..)".fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PromiseCacheError {
|
||||
Failed(oneshot_broadcast::Error),
|
||||
MaxConcurrentPromises(usize),
|
||||
}
|
||||
|
||||
pub trait ToArc<T> {
|
||||
fn to_arc(&self) -> Arc<T>;
|
||||
}
|
||||
|
||||
impl<T> CacheItem<T> {
|
||||
pub fn is_promise(&self) -> bool {
|
||||
matches!(self, CacheItem::Promise(_))
|
||||
}
|
||||
|
||||
pub fn wait(self) -> Result<Arc<T>, PromiseCacheError> {
|
||||
match self {
|
||||
CacheItem::Complete(value) => Ok(value),
|
||||
CacheItem::Promise(receiver) => receiver.recv().map_err(PromiseCacheError::Failed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ToArc<T> for Arc<T> {
|
||||
fn to_arc(&self) -> Arc<T> {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> ToArc<T> for T
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
fn to_arc(&self) -> Arc<T> {
|
||||
Arc::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V, P> PromiseCache<K, V, P>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
P: Protect<K>,
|
||||
{
|
||||
pub fn new(capacity: usize, protector: P, logger: Logger) -> Self {
|
||||
// Making the concurrent promises directly configurable is considered overkill for now,
|
||||
// so we just derive a vaguely sensible value from the cache size.
|
||||
let max_concurrent_promises = std::cmp::max(2, capacity / 8);
|
||||
Self {
|
||||
cache: HashMap::new(),
|
||||
capacity,
|
||||
protector,
|
||||
max_concurrent_promises,
|
||||
logger,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: &K) -> Option<CacheItem<V>> {
|
||||
match self.cache.get(key) {
|
||||
// The cache contained the value, return it.
|
||||
item @ Some(CacheItem::Complete(_)) => item.cloned(),
|
||||
// The cache contains a promise for the value. Check to see if the promise has already
|
||||
// been resolved, without waiting for it.
|
||||
item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() {
|
||||
// The promise has already been resolved. Replace the entry in the cache with a
|
||||
// `Complete` entry and then return the value.
|
||||
Ok(Some(value)) => {
|
||||
let ready = CacheItem::Complete(value);
|
||||
self.insert_cache_item(key.clone(), ready.clone());
|
||||
Some(ready)
|
||||
}
|
||||
// The promise has not yet been resolved. Return the promise so the caller can await
|
||||
// it.
|
||||
Ok(None) => item.cloned(),
|
||||
// The sender has been dropped without sending a value. There was most likely an
|
||||
// error computing the value. Drop the key from the cache and return
|
||||
// `None` so the caller can recompute the value.
|
||||
//
|
||||
// It's worth noting that this is the only place where we removed unresolved
|
||||
// promises from the cache. This means unresolved promises will only be removed if
|
||||
// we try to access them again. This is OK, since the promises don't consume much
|
||||
// memory. We expect that *all* promises should be resolved, unless there is a
|
||||
// programming or database error.
|
||||
Err(oneshot_broadcast::Error::SenderDropped) => {
|
||||
self.cache.remove(key);
|
||||
None
|
||||
}
|
||||
},
|
||||
// The cache does not have this value and it's not already promised to be computed.
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains(&self, key: &K) -> bool {
|
||||
self.cache.contains_key(key)
|
||||
}
|
||||
|
||||
pub fn insert_value<C: ToArc<V>>(&mut self, key: K, value: &C) {
|
||||
if self
|
||||
.cache
|
||||
.get(&key)
|
||||
// Replace the value if it's not present or if it's a promise. A bird in the hand is
|
||||
// worth two in the promise-bush!
|
||||
.map_or(true, CacheItem::is_promise)
|
||||
{
|
||||
self.insert_cache_item(key, CacheItem::Complete(value.to_arc()));
|
||||
}
|
||||
}
|
||||
|
||||
/// Take care of resolving a promise by ensuring the value is made available:
|
||||
///
|
||||
/// 1. To all waiting thread that are holding a `Receiver`.
|
||||
/// 2. In the cache itself for future callers.
|
||||
pub fn resolve_promise<C: ToArc<V>>(&mut self, sender: Sender<Arc<V>>, key: K, value: &C) {
|
||||
// Use the sender to notify all actively waiting receivers.
|
||||
let arc_value = value.to_arc();
|
||||
sender.send(arc_value.clone());
|
||||
|
||||
// Re-insert the value into the cache. The promise may have been evicted in the meantime,
|
||||
// but we probably want to keep this value (which resolved recently) over other older cache
|
||||
// entries.
|
||||
self.insert_value(key, &arc_value);
|
||||
}
|
||||
|
||||
/// Prunes the cache first before inserting a new item.
|
||||
fn insert_cache_item(&mut self, key: K, cache_item: CacheItem<V>) {
|
||||
self.prune_cache();
|
||||
self.cache.insert(key, cache_item);
|
||||
}
|
||||
|
||||
pub fn create_promise(&mut self, key: K) -> Result<Sender<Arc<V>>, PromiseCacheError> {
|
||||
let num_active_promises = self.cache.values().filter(|item| item.is_promise()).count();
|
||||
if num_active_promises >= self.max_concurrent_promises {
|
||||
return Err(PromiseCacheError::MaxConcurrentPromises(
|
||||
num_active_promises,
|
||||
));
|
||||
}
|
||||
|
||||
let (sender, receiver) = oneshot();
|
||||
self.insert_cache_item(key, CacheItem::Promise(receiver));
|
||||
Ok(sender)
|
||||
}
|
||||
|
||||
fn prune_cache(&mut self) {
|
||||
let target_cache_size = self.capacity.saturating_sub(1);
|
||||
if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) {
|
||||
let keys_to_prune = self
|
||||
.cache
|
||||
.keys()
|
||||
.filter(|k| !self.protector.protect_from_eviction(*k))
|
||||
.sorted_by_key(|k| self.protector.sort_key(k))
|
||||
.take(prune_count)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for key in &keys_to_prune {
|
||||
self.protector.notify_eviction(key, &self.logger);
|
||||
self.cache.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_protector(&mut self, protector: P) {
|
||||
self.protector = protector;
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.cache.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.cache.is_empty()
|
||||
}
|
||||
|
||||
pub fn max_concurrent_promises(&self) -> usize {
|
||||
self.max_concurrent_promises
|
||||
}
|
||||
}
|
||||
@@ -1717,6 +1717,25 @@ fn historic_state_cache_size_default() {
|
||||
});
|
||||
}
|
||||
#[test]
|
||||
fn parallel_state_cache_size_flag() {
|
||||
CommandLineTest::new()
|
||||
.flag("parallel-state-cache-size", Some("4"))
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert_eq!(config.chain.parallel_state_cache_size, 4_usize));
|
||||
}
|
||||
#[test]
|
||||
fn parallel_state_cache_size_default() {
|
||||
use beacon_node::beacon_chain::chain_config::DEFAULT_PARALLEL_STATE_CACHE_SIZE;
|
||||
CommandLineTest::new()
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| {
|
||||
assert_eq!(
|
||||
config.chain.parallel_state_cache_size,
|
||||
DEFAULT_PARALLEL_STATE_CACHE_SIZE
|
||||
);
|
||||
});
|
||||
}
|
||||
#[test]
|
||||
fn auto_compact_db_flag() {
|
||||
CommandLineTest::new()
|
||||
.flag("auto-compact-db", Some("false"))
|
||||
|
||||
Reference in New Issue
Block a user