diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 030137246a..2fa04304f5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5476,6 +5476,7 @@ impl BeaconChain { let shuffling_id = BlockShufflingIds { current: head_block.current_epoch_shuffling_id.clone(), next: head_block.next_epoch_shuffling_id.clone(), + previous: None, block_root: head_block.root, } .id_for_epoch(shuffling_epoch) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ca377635d6..b0f0015b9a 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,7 +6,7 @@ use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_bound use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::shuffling_cache::ShufflingCache; +use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::ValidatorMonitor; @@ -691,6 +691,8 @@ where )?; } + let head_shuffling_ids = BlockShufflingIds::try_from_head(head_block_root, &head_state)?; + let mut head_snapshot = BeaconSnapshot { beacon_block_root: head_block_root, beacon_block: Arc::new(head_block), @@ -847,7 +849,11 @@ where DEFAULT_SNAPSHOT_CACHE_SIZE, head_for_snapshot_cache, )), - shuffling_cache: TimeoutRwLock::new(ShufflingCache::new(shuffling_cache_size)), + shuffling_cache: TimeoutRwLock::new(ShufflingCache::new( + shuffling_cache_size, + head_shuffling_ids, + log.clone(), + )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 0e1c8a5305..2b1f714362 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -31,7 +31,9 @@ //! the head block root. This is unacceptable for fast-responding functions like the networking //! stack. +use crate::beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT; use crate::persisted_fork_choice::PersistedForkChoice; +use crate::shuffling_cache::BlockShufflingIds; use crate::{ beacon_chain::{ BeaconForkChoice, BeaconStore, OverrideForkchoiceUpdate, @@ -846,6 +848,35 @@ impl BeaconChain { ); }); + match BlockShufflingIds::try_from_head( + new_snapshot.beacon_block_root, + &new_snapshot.beacon_state, + ) { + Ok(head_shuffling_ids) => { + self.shuffling_cache + .try_write_for(ATTESTATION_CACHE_LOCK_TIMEOUT) + .map(|mut shuffling_cache| { + shuffling_cache.update_head_shuffling_ids(head_shuffling_ids) + }) + .unwrap_or_else(|| { + error!( + self.log, + "Failed to obtain cache write lock"; + "lock" => "shuffling_cache", + "task" => "update head shuffling decision root" + ); + }); + } + Err(e) => { + error!( + self.log, + "Failed to get head shuffling ids"; + "error" => ?e, + "head_block_root" => ?new_snapshot.beacon_block_root + ); + } + } + observe_head_block_delays( &mut self.block_times_cache.write(), &new_head_proto_block, diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 91a1e24d82..086e1c0949 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -1,10 +1,18 @@ -use crate::{metrics, BeaconChainError}; -use lru::LruCache; -use oneshot_broadcast::{oneshot, Receiver, Sender}; +use std::collections::HashMap; use std::sync::Arc; -use types::{beacon_state::CommitteeCache, AttestationShufflingId, Epoch, Hash256}; -/// The size of the LRU cache that stores committee caches for quicker verification. +use itertools::Itertools; +use slog::{debug, Logger}; + +use oneshot_broadcast::{oneshot, Receiver, Sender}; +use types::{ + beacon_state::CommitteeCache, AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, + RelativeEpoch, +}; + +use crate::{metrics, BeaconChainError}; + +/// The size of the cache that stores committee caches for quicker verification. /// /// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + /// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this @@ -45,18 +53,24 @@ impl CacheItem { } } -/// Provides an LRU cache for `CommitteeCache`. +/// Provides a cache for `CommitteeCache`. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. pub struct ShufflingCache { - cache: LruCache, + cache: HashMap, + cache_size: usize, + head_shuffling_ids: BlockShufflingIds, + logger: Logger, } impl ShufflingCache { - pub fn new(cache_size: usize) -> Self { + pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds, logger: Logger) -> Self { Self { - cache: LruCache::new(cache_size), + cache: HashMap::new(), + cache_size, + head_shuffling_ids, + logger, } } @@ -76,7 +90,7 @@ impl ShufflingCache { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); let ready = CacheItem::Committee(committee); - self.cache.put(key.clone(), ready.clone()); + self.insert_cache_item(key.clone(), ready.clone()); Some(ready) } // The promise has not yet been resolved. Return the promise so the caller can await @@ -93,13 +107,12 @@ impl ShufflingCache { // It's worth noting that this is the only place where we removed unresolved // promises from the cache. This means unresolved promises will only be removed if // we try to access them again. This is OK, since the promises don't consume much - // memory and the nature of the LRU cache means that future, relevant entries will - // still be added to the cache. We expect that *all* promises should be resolved, - // unless there is a programming or database error. + // memory. We expect that *all* promises should be resolved, unless there is a + // programming or database error. Err(oneshot_broadcast::Error::SenderDropped) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_FAILS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); - self.cache.pop(key); + self.cache.remove(key); None } }, @@ -112,13 +125,13 @@ impl ShufflingCache { } pub fn contains(&self, key: &AttestationShufflingId) -> bool { - self.cache.contains(key) + self.cache.contains_key(key) } - pub fn insert_committee_cache( + pub fn insert_committee_cache( &mut self, key: AttestationShufflingId, - committee_cache: &T, + committee_cache: &C, ) { if self .cache @@ -127,13 +140,55 @@ impl ShufflingCache { // worth two in the promise-bush! .map_or(true, CacheItem::is_promise) { - self.cache.put( + self.insert_cache_item( key, CacheItem::Committee(committee_cache.to_arc_committee_cache()), ); } } + /// Prunes the cache first before inserting a new cache item. + fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { + self.prune_cache(); + self.cache.insert(key, cache_item); + } + + /// Prunes the `cache` to keep the size below the `cache_size` limit, based on the following + /// preferences: + /// - Entries from more recent epochs are preferred over older ones. + /// - Entries with shuffling ids matching the head's previous, current, and future epochs must + /// not be pruned. + fn prune_cache(&mut self) { + let target_cache_size = self.cache_size.saturating_sub(1); + if let Some(prune_count) = self.cache.len().checked_sub(target_cache_size) { + let shuffling_ids_to_prune = self + .cache + .keys() + .sorted_by_key(|key| key.shuffling_epoch) + .filter(|shuffling_id| { + Some(shuffling_id) + != self + .head_shuffling_ids + .id_for_epoch(shuffling_id.shuffling_epoch) + .as_ref() + .as_ref() + }) + .take(prune_count) + .cloned() + .collect::>(); + + for shuffling_id in shuffling_ids_to_prune.iter() { + debug!( + self.logger, + "Removing old shuffling from cache"; + "shuffling_epoch" => shuffling_id.shuffling_epoch, + "shuffling_decision_block" => ?shuffling_id.shuffling_decision_block + ); + self.cache.remove(shuffling_id); + } + } + } + pub fn create_promise( &mut self, key: AttestationShufflingId, @@ -148,9 +203,17 @@ impl ShufflingCache { } let (sender, receiver) = oneshot(); - self.cache.put(key, CacheItem::Promise(receiver)); + self.insert_cache_item(key, CacheItem::Promise(receiver)); Ok(sender) } + + /// Inform the cache that the shuffling decision roots for the head has changed. + /// + /// The shufflings for the head's previous, current, and future epochs will never be ejected from + /// the cache during `Self::insert_cache_item`. + pub fn update_head_shuffling_ids(&mut self, head_shuffling_ids: BlockShufflingIds) { + self.head_shuffling_ids = head_shuffling_ids; + } } /// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. @@ -170,26 +233,29 @@ impl ToArcCommitteeCache for Arc { } } -impl Default for ShufflingCache { - fn default() -> Self { - Self::new(DEFAULT_CACHE_SIZE) - } -} - /// Contains the shuffling IDs for a beacon block. +#[derive(Clone)] pub struct BlockShufflingIds { pub current: AttestationShufflingId, pub next: AttestationShufflingId, + pub previous: Option, pub block_root: Hash256, } impl BlockShufflingIds { /// Returns the shuffling ID for the given epoch. /// - /// Returns `None` if `epoch` is prior to `self.current.shuffling_epoch`. + /// Returns `None` if `epoch` is prior to `self.previous?.shuffling_epoch` or + /// `self.current.shuffling_epoch` (if `previous` is `None`). pub fn id_for_epoch(&self, epoch: Epoch) -> Option { if epoch == self.current.shuffling_epoch { Some(self.current.clone()) + } else if self + .previous + .as_ref() + .map_or(false, |id| id.shuffling_epoch == epoch) + { + self.previous.clone() } else if epoch == self.next.shuffling_epoch { Some(self.next.clone()) } else if epoch > self.next.shuffling_epoch { @@ -201,18 +267,57 @@ impl BlockShufflingIds { None } } + + pub fn try_from_head( + head_block_root: Hash256, + head_state: &BeaconState, + ) -> Result { + let get_shuffling_id = |relative_epoch| { + AttestationShufflingId::new(head_block_root, head_state, relative_epoch).map_err(|e| { + format!( + "Unable to get attester shuffling decision slot for the epoch {:?}: {:?}", + relative_epoch, e + ) + }) + }; + + Ok(Self { + current: get_shuffling_id(RelativeEpoch::Current)?, + next: get_shuffling_id(RelativeEpoch::Next)?, + previous: Some(get_shuffling_id(RelativeEpoch::Previous)?), + block_root: head_block_root, + }) + } } // Disable tests in debug since the beacon chain harness is slow unless in release. #[cfg(not(debug_assertions))] #[cfg(test)] mod test { - use super::*; - use crate::test_utils::EphemeralHarnessType; + use task_executor::test_utils::null_logger; use types::*; - type BeaconChainHarness = - crate::test_utils::BeaconChainHarness>; + use crate::test_utils::EphemeralHarnessType; + + use super::*; + + type E = MinimalEthSpec; + type TestBeaconChainType = EphemeralHarnessType; + type BeaconChainHarness = crate::test_utils::BeaconChainHarness; + const TEST_CACHE_SIZE: usize = 5; + + // Creates a new shuffling cache for testing + fn new_shuffling_cache() -> ShufflingCache { + let current_epoch = 8; + let head_shuffling_ids = BlockShufflingIds { + current: shuffling_id(current_epoch), + next: shuffling_id(current_epoch + 1), + previous: Some(shuffling_id(current_epoch - 1)), + block_root: Hash256::from_low_u64_le(0), + }; + let logger = null_logger().unwrap(); + ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids, logger) + } /// Returns two different committee caches for testing. fn committee_caches() -> (Arc, Arc) { @@ -249,7 +354,7 @@ mod test { fn resolved_promise() { let (committee_a, _) = committee_caches(); let id_a = shuffling_id(1); - let mut cache = ShufflingCache::default(); + let mut cache = new_shuffling_cache(); // Create a promise. let sender = cache.create_promise(id_a.clone()).unwrap(); @@ -276,7 +381,7 @@ mod test { #[test] fn unresolved_promise() { let id_a = shuffling_id(1); - let mut cache = ShufflingCache::default(); + let mut cache = new_shuffling_cache(); // Create a promise. let sender = cache.create_promise(id_a.clone()).unwrap(); @@ -301,7 +406,7 @@ mod test { fn two_promises() { let (committee_a, committee_b) = committee_caches(); let (id_a, id_b) = (shuffling_id(1), shuffling_id(2)); - let mut cache = ShufflingCache::default(); + let mut cache = new_shuffling_cache(); // Create promise A. let sender_a = cache.create_promise(id_a.clone()).unwrap(); @@ -355,7 +460,7 @@ mod test { #[test] fn too_many_promises() { - let mut cache = ShufflingCache::default(); + let mut cache = new_shuffling_cache(); for i in 0..MAX_CONCURRENT_PROMISES { cache.create_promise(shuffling_id(i as u64)).unwrap(); @@ -375,4 +480,105 @@ mod test { "the cache should have two entries" ); } + + #[test] + fn should_insert_committee_cache() { + let mut cache = new_shuffling_cache(); + let id_a = shuffling_id(1); + let committee_cache_a = Arc::new(CommitteeCache::default()); + cache.insert_committee_cache(id_a.clone(), &committee_cache_a); + assert!( + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + "should insert committee cache" + ); + } + + #[test] + fn should_prune_committee_cache_with_lowest_epoch() { + let mut cache = new_shuffling_cache(); + let shuffling_id_and_committee_caches = (0..(TEST_CACHE_SIZE + 1)) + .map(|i| (shuffling_id(i as u64), Arc::new(CommitteeCache::default()))) + .collect::>(); + + for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { + cache.insert_committee_cache(shuffling_id.clone(), committee_cache); + } + + for i in 1..(TEST_CACHE_SIZE + 1) { + assert!( + cache.contains(&shuffling_id_and_committee_caches.get(i).unwrap().0), + "should contain recent epoch shuffling ids" + ); + } + + assert!( + !cache.contains(&shuffling_id_and_committee_caches.get(0).unwrap().0), + "should not contain oldest epoch shuffling id" + ); + assert_eq!( + cache.cache.len(), + cache.cache_size, + "should limit cache size" + ); + } + + #[test] + fn should_retain_head_state_shufflings() { + let mut cache = new_shuffling_cache(); + let current_epoch = 10; + let committee_cache = Arc::new(CommitteeCache::default()); + + // Insert a few entries for next the epoch with different decision roots. + for i in 0..TEST_CACHE_SIZE { + let shuffling_id = AttestationShufflingId { + shuffling_epoch: (current_epoch + 1).into(), + shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), + }; + cache.insert_committee_cache(shuffling_id, &committee_cache); + } + + // Now, update the head shuffling ids + let head_shuffling_ids = BlockShufflingIds { + current: shuffling_id(current_epoch), + next: shuffling_id(current_epoch + 1), + previous: Some(shuffling_id(current_epoch - 1)), + block_root: Hash256::from_low_u64_le(42), + }; + cache.update_head_shuffling_ids(head_shuffling_ids.clone()); + + // Insert head state shuffling ids. Should not be overridden by other shuffling ids. + cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache); + cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache); + cache.insert_committee_cache( + head_shuffling_ids.previous.clone().unwrap(), + &committee_cache, + ); + + // Insert a few entries for older epochs. + for i in 0..TEST_CACHE_SIZE { + let shuffling_id = AttestationShufflingId { + shuffling_epoch: Epoch::from(i), + shuffling_decision_block: Hash256::from_low_u64_be(i as u64), + }; + cache.insert_committee_cache(shuffling_id, &committee_cache); + } + + assert!( + cache.contains(&head_shuffling_ids.current), + "should retain head shuffling id for the current epoch." + ); + assert!( + cache.contains(&head_shuffling_ids.next), + "should retain head shuffling id for the next epoch." + ); + assert!( + cache.contains(&head_shuffling_ids.previous.unwrap()), + "should retain head shuffling id for previous epoch." + ); + assert_eq!( + cache.cache.len(), + cache.cache_size, + "should limit cache size" + ); + } }