diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f618cf6321..1a3edca8f1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -71,7 +71,9 @@ use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache; -use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; +use crate::shuffling_cache::{ + BlockShufflingIds, CachedShuffling, ShufflingCache, get_ptc_for_shuffling_epoch, +}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -466,7 +468,7 @@ pub struct BeaconChain { /// HTTP server is enabled. pub event_handler: Option>, /// Caches the attester shuffling for a given epoch and shuffling key root. - pub shuffling_cache: RwLock, + pub shuffling_cache: RwLock>, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. pub beacon_proposer_cache: Arc>, /// Caches a map of `validator_index -> validator_pubkey`. @@ -4794,9 +4796,12 @@ impl BeaconChain { if !shuffling_is_cached { state.build_committee_cache(relative_epoch, &self.spec)?; let committee_cache = state.committee_cache(relative_epoch)?; + let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch()); + let ptc = get_ptc_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?; + let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc); self.shuffling_cache .write() - .insert_committee_cache(shuffling_id, committee_cache); + .insert_committee_cache_with_ptc(shuffling_id, cached_shuffling); } } Ok(()) @@ -6930,8 +6935,11 @@ impl BeaconChain { // access. drop(shuffling_cache); - let committee_cache = cache_item.wait()?; - map_fn(&committee_cache, shuffling_id.shuffling_decision_block) + let cached_shuffling = cache_item.wait()?; + map_fn( + &cached_shuffling.committee_cache, + shuffling_id.shuffling_decision_block, + ) } else { // Create an entry in the cache that "promises" this value will eventually be computed. // This avoids the case where multiple threads attempt to produce the same value at the @@ -7029,17 +7037,19 @@ impl BeaconChain { state.build_committee_cache(relative_epoch, &self.spec)?; let committee_cache = state.committee_cache(relative_epoch)?.clone(); + let ptc = get_ptc_for_shuffling_epoch(&state, shuffling_epoch, &self.spec)?; let shuffling_decision_block = shuffling_id.shuffling_decision_block; + let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc); self.shuffling_cache .write() - .insert_committee_cache(shuffling_id, &committee_cache); + .insert_committee_cache_with_ptc(shuffling_id, cached_shuffling.clone()); metrics::stop_timer(committee_building_timer); - sender.send(committee_cache.clone()); + sender.send(cached_shuffling.clone()); - map_fn(&committee_cache, shuffling_decision_block) + map_fn(&cached_shuffling.committee_cache, shuffling_decision_block) } } diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 3d0fd80cf6..acaff24e5c 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -5,21 +5,22 @@ use itertools::Itertools; use oneshot_broadcast::{Receiver, Sender, oneshot}; use tracing::debug; use types::{ - AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, - state::CommitteeCache, + AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC, + RelativeEpoch, state::CommitteeCache, }; use crate::{BeaconChainError, metrics}; -/// The size of the cache that stores committee caches for quicker verification. +/// The size of the cache that stores shufflings for quicker verification. /// -/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + -/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this -/// ignores a few extra bytes in the caches that should be insignificant compared to the indices). +/// Each entry should be around `8 + 800,000 + 4,096 = 804,104` bytes in size with 100k validators +/// and a 512-validator PTC. Therefore, this cache should be approx `16 * 804,104 = 12.9 MB`. +/// (Note: this ignores a few extra bytes in the caches that should be insignificant compared to the +/// indices). pub const DEFAULT_CACHE_SIZE: usize = 16; -/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this -/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// The maximum number of concurrent shuffling "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the shuffling. /// This prevents excessive memory usage at the cost of rejecting some attestations. /// /// We set this value to 2 since states can be quite large and have a significant impact on memory @@ -30,19 +31,34 @@ pub const DEFAULT_CACHE_SIZE: usize = 16; const MAX_CONCURRENT_PROMISES: usize = 2; #[derive(Clone)] -pub enum CacheItem { - /// A committee. - Committee(Arc), - /// A promise for a future committee. - Promise(Receiver>), +pub struct CachedShuffling { + pub committee_cache: Arc, + pub ptc: Option>, } -impl CacheItem { +impl CachedShuffling { + pub fn new(committee_cache: Arc, ptc: Option>) -> Self { + Self { + committee_cache, + ptc, + } + } +} + +#[derive(Clone)] +pub enum CacheItem { + /// A cached shuffling. + Committee(CachedShuffling), + /// A promise for a future cached shuffling. + Promise(Receiver>), +} + +impl CacheItem { pub fn is_promise(&self) -> bool { matches!(self, CacheItem::Promise(_)) } - pub fn wait(self) -> Result, BeaconChainError> { + pub fn wait(self) -> Result, BeaconChainError> { match self { CacheItem::Committee(cache) => Ok(cache), CacheItem::Promise(receiver) => receiver @@ -52,17 +68,17 @@ impl CacheItem { } } -/// Provides a cache for `CommitteeCache`. +/// Provides a cache for `CommitteeCache` and the associated optional PTC. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. -pub struct ShufflingCache { - cache: HashMap, +pub struct ShufflingCache { + cache: HashMap>, cache_size: usize, head_shuffling_ids: BlockShufflingIds, } -impl ShufflingCache { +impl ShufflingCache { pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds) -> Self { Self { cache: HashMap::new(), @@ -71,22 +87,22 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + pub fn get(&mut self, key: &AttestationShufflingId) -> Option> { match self.cache.get(key) { - // The cache contained the committee cache, return it. + // The cache contained the shuffling, return it. item @ Some(CacheItem::Committee(_)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The cache contains a promise for the committee cache. Check to see if the promise has + // The cache contains a promise for the shuffling. Check to see if the promise has // already been resolved, without waiting for it. item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { // The promise has already been resolved. Replace the entry in the cache with a - // `Committee` entry and then return the committee. - Ok(Some(committee)) => { + // `Committee` entry and then return the cached shuffling. + Ok(Some(cached_shuffling)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - let ready = CacheItem::Committee(committee); + let ready = CacheItem::Committee(cached_shuffling); self.insert_cache_item(key.clone(), ready.clone()); Some(ready) } @@ -97,8 +113,8 @@ impl ShufflingCache { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The sender has been dropped without sending a committee. There was most likely an - // error computing the committee cache. Drop the key from the cache and return + // The sender has been dropped without sending a shuffling. There was most likely an + // error computing the shuffling. Drop the key from the cache and return // `None` so the caller can recompute the committee. // // It's worth noting that this is the only place where we removed unresolved @@ -113,7 +129,7 @@ impl ShufflingCache { None } }, - // The cache does not have this committee and it's not already promised to be computed. + // The cache does not have this shuffling and it's not already promised to be computed. None => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); None @@ -129,23 +145,30 @@ impl ShufflingCache { &mut self, key: AttestationShufflingId, committee_cache: &C, + ) { + self.insert_committee_cache_with_ptc( + key, + CachedShuffling::new(committee_cache.to_arc_committee_cache(), None), + ); + } + + pub fn insert_committee_cache_with_ptc( + &mut self, + key: AttestationShufflingId, + cached_shuffling: CachedShuffling, ) { if self .cache .get(&key) - // Replace the committee if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! + // Replace the cached shuffling if it's not present or if it's a promise. .is_none_or(CacheItem::is_promise) { - self.insert_cache_item( - key, - CacheItem::Committee(committee_cache.to_arc_committee_cache()), - ); + self.insert_cache_item(key, CacheItem::Committee(cached_shuffling)); } } /// Prunes the cache first before inserting a new cache item. - fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { + fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { self.prune_cache(); self.cache.insert(key, cache_item); } @@ -188,7 +211,7 @@ impl ShufflingCache { pub fn create_promise( &mut self, key: AttestationShufflingId, - ) -> Result>, BeaconChainError> { + ) -> Result>, BeaconChainError> { let num_active_promises = self .cache .iter() @@ -212,6 +235,20 @@ impl ShufflingCache { } } +/// Return the PTC associated with the first slot in `shuffling_epoch`, when the state supports PTCs. +pub fn get_ptc_for_shuffling_epoch( + state: &BeaconState, + shuffling_epoch: Epoch, + spec: &ChainSpec, +) -> Result>, BeaconStateError> { + if state.fork_name_unchecked().gloas_enabled() { + let slot = shuffling_epoch.start_slot(E::slots_per_epoch()); + state.get_ptc(slot, spec).map(Some) + } else { + Ok(None) + } +} + /// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. pub trait ToArcCommitteeCache { fn to_arc_committee_cache(&self) -> Arc; @@ -304,7 +341,7 @@ mod test { const TEST_CACHE_SIZE: usize = 5; // Creates a new shuffling cache for testing - fn new_shuffling_cache() -> ShufflingCache { + fn new_shuffling_cache() -> ShufflingCache { create_test_tracing_subscriber(); let current_epoch = 8; @@ -318,6 +355,10 @@ mod test { ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids) } + fn cached_shuffling(committee_cache: Arc) -> CachedShuffling { + CachedShuffling::new(committee_cache, None) + } + /// Returns two different committee caches for testing. fn committee_caches() -> (Arc, Arc) { let harness = BeaconChainHarness::builder(MinimalEthSpec) @@ -366,12 +407,12 @@ mod test { ); // Resolve the promise. - sender.send(committee_a.clone()); + sender.send(cached_shuffling(committee_a.clone())); // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "the promise should be resolved" ); assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); @@ -428,30 +469,30 @@ mod test { ); // Resolve promise A. - sender_a.send(committee_a.clone()); + sender_a.send(cached_shuffling(committee_a.clone())); // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should be resolved" ); // Resolve promise B. - sender_b.send(committee_b.clone()); + sender_b.send(cached_shuffling(committee_b.clone())); // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_b), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should be resolved" ); // Check both entries again. assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should remain resolved" ); assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should remain resolved" ); assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); @@ -487,7 +528,7 @@ mod test { let committee_cache_a = Arc::new(CommitteeCache::default()); cache.insert_committee_cache(id_a.clone(), &committee_cache_a); assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a), "should insert committee cache" ); } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cb916cb514..e4aa09a7e7 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -15,7 +15,9 @@ //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + BeaconChain, BeaconChainError, BeaconChainTypes, + chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + shuffling_cache::{CachedShuffling, get_ptc_for_shuffling_epoch}, }; use slot_clock::SlotClock; use state_processing::per_slot_processing; @@ -395,10 +397,17 @@ fn advance_head(beacon_chain: &Arc>) -> Resu let committee_cache = state .committee_cache(RelativeEpoch::Next) .map_err(BeaconChainError::from)?; + let ptc = get_ptc_for_shuffling_epoch( + &state, + RelativeEpoch::Next.into_epoch(state.current_epoch()), + &beacon_chain.spec, + ) + .map_err(BeaconChainError::from)?; + let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc); beacon_chain .shuffling_cache .write() - .insert_committee_cache(shuffling_id.clone(), committee_cache); + .insert_committee_cache_with_ptc(shuffling_id.clone(), cached_shuffling); debug!( ?head_block_root, diff --git a/beacon_node/http_api/src/beacon/states.rs b/beacon_node/http_api/src/beacon/states.rs index 84ef3c1f26..cdc60f0418 100644 --- a/beacon_node/http_api/src/beacon/states.rs +++ b/beacon_node/http_api/src/beacon/states.rs @@ -382,6 +382,7 @@ pub fn get_beacon_state_committees( .try_write_for(std::time::Duration::from_secs(1)) .and_then(|mut cache_write| cache_write.get(shuffling_id)) .and_then(|cache_item| cache_item.wait().ok()) + .map(|cached_shuffling| cached_shuffling.committee_cache) } else { None };