diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 27f99dc490..209ac59e19 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,9 +77,7 @@ use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache; -use crate::shuffling_cache::{ - CachedShuffling, ShufflingCache, get_ptcs_for_shuffling_epoch, with_cached_shuffling, -}; +use crate::shuffling_cache::{CachedPTCs, CachedShuffling, ShufflingCache, with_cached_shuffling}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -4927,11 +4925,11 @@ impl BeaconChain { if !shuffling_is_cached { state.build_committee_cache(relative_epoch, &self.spec)?; let committee_cache = state.committee_cache(relative_epoch)?; - let ptcs = get_ptcs_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?; + let ptcs = CachedPTCs::from_state(state, shuffling_epoch, &self.spec)?; let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs); self.shuffling_cache .write() - .insert_committee_cache_with_ptcs(shuffling_id, cached_shuffling, &self.spec)?; + .insert_committee_cache(shuffling_id, cached_shuffling)?; } } Ok(()) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 19e0c693c2..99ee82acb3 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -254,6 +254,13 @@ pub enum BeaconChainError { request_epoch: Epoch, cache_epoch: Epoch, }, + AttesterCachePtcOutOfBounds { + slot: Slot, + epoch: Epoch, + }, + AttesterCacheNoPtcPreGloas { + slot: Slot, + }, SkipProposerPreparation, FailedColumnCustodyInfoUpdate, } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index f0f410554e..ec7d7121bd 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -87,9 +87,8 @@ impl VerifiedPayloadAttestationMessage { ctx.spec, beacon_block_root, message_epoch, - |cached_shuffling, _| Ok::<_, Error>(cached_shuffling.ptc_for_slot(slot).cloned()), - )? - .ok_or(Error::MissingPTC { slot })?; + |cached_shuffling, _| cached_shuffling.ptc_for_slot(slot), + )?; // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. if !ptc.0.contains(&(validator_index as usize)) { diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs index 3c0efce6ed..89ae1bbbdd 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -66,12 +66,6 @@ pub enum Error { /// /// The peer has sent an invalid message. NotInPTC { validator_index: u64, slot: Slot }, - /// The shuffling cache entry did not contain a PTC for this slot. - /// - /// ## Peer scoring - /// - /// We were unable to process this message due to an internal error. - MissingPTC { slot: Slot }, /// The validator index is unknown. /// /// ## Peer scoring diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 4ef53de233..5d7e666748 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -7,8 +7,8 @@ use parking_lot::RwLock; use state_processing::state_advance::partial_state_advance; use tracing::debug; use types::{ - AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC, - RelativeEpoch, Slot, state::CommitteeCache, + AttestationShufflingId, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, PTC, RelativeEpoch, + Slot, state::CommitteeCache, }; use crate::{ @@ -17,9 +17,9 @@ use crate::{ /// The size of the cache that stores shufflings for quicker verification. /// -/// Each entry should be around `8 + 800,000 + 4,096 = 804,104` bytes in size with 100k validators +/// Each entry should be around `8 * 2M + 128KB ~= 16 MB` in size with 2M validators /// and 32 512-validator PTCs. Therefore, this cache should be approx -/// `16 * (8 + 800,000 + 131,072) = 14.9 MB`. (Note: this ignores a few extra bytes in the +/// `16 * 16 MB ~= 256 MB`. (Note: this ignores a few extra bytes in the /// caches that should be insignificant compared to the indices). pub const DEFAULT_CACHE_SIZE: usize = 16; @@ -37,32 +37,53 @@ const MAX_CONCURRENT_PROMISES: usize = 2; #[derive(Clone)] pub struct CachedShuffling { pub committee_cache: Arc, - pub ptcs: Option>>, + pub ptcs: CachedPTCs, +} + +#[derive(Clone)] +pub enum CachedPTCs { + PreGloas, + PostGloas(Vec>, Epoch), +} + +impl CachedPTCs { + pub fn from_state( + state: &BeaconState, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result { + if shuffling_requires_ptcs(epoch, spec) { + let ptcs = epoch + .slot_iter(E::slots_per_epoch()) + .map(|slot| state.get_ptc(slot, spec)) + .collect::, _>>()?; + Ok(Self::PostGloas(ptcs, epoch)) + } else { + Ok(Self::PreGloas) + } + } } impl CachedShuffling { - pub fn new(committee_cache: Arc, ptcs: Option>>) -> Self { + pub fn new(committee_cache: Arc, ptcs: CachedPTCs) -> Self { Self { committee_cache, ptcs, } } - pub fn ptc_for_slot(&self, slot: Slot) -> Option<&PTC> { - self.ptcs - .as_ref()? - .get(slot.as_usize() % E::slots_per_epoch() as usize) - } - - fn ensure_ptcs_for_gloas_shuffling( - &self, - shuffling_epoch: Epoch, - spec: &ChainSpec, - ) -> Result<(), BeaconChainError> { - if shuffling_requires_ptcs(shuffling_epoch, spec) && self.ptcs.is_none() { - Err(BeaconChainError::MissingPtcForGloasShuffling { shuffling_epoch }) - } else { - Ok(()) + pub fn ptc_for_slot(&self, slot: Slot) -> Result, BeaconChainError> { + match &self.ptcs { + CachedPTCs::PreGloas => Err(BeaconChainError::AttesterCacheNoPtcPreGloas { slot }), + &CachedPTCs::PostGloas(ref ptcs, epoch) => { + if slot.epoch(E::slots_per_epoch()) != epoch { + Err(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } else { + ptcs.get(slot.as_usize() % E::slots_per_epoch() as usize) + .cloned() + .ok_or(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } + } } } } @@ -173,7 +194,9 @@ impl ShufflingCache { self.cache.iter().all(|(key, item)| { if shuffling_requires_ptcs(key.shuffling_epoch, spec) { match item { - CacheItem::Committee(cached_shuffling) => cached_shuffling.ptcs.is_some(), + CacheItem::Committee(cached_shuffling) => { + matches!(cached_shuffling.ptcs, CachedPTCs::PostGloas(..)) + } CacheItem::Promise(_) => true, } } else { @@ -182,30 +205,14 @@ impl ShufflingCache { }) } - pub fn insert_committee_cache( - &mut self, - key: AttestationShufflingId, - committee_cache: &C, - spec: &ChainSpec, - ) -> Result<(), BeaconChainError> { - self.insert_committee_cache_with_ptcs( - key, - CachedShuffling::new(committee_cache.to_arc_committee_cache(), None), - spec, - ) - } - - pub fn insert_committee_cache_with_ptcs( + pub fn insert_committee_cache( &mut self, key: AttestationShufflingId, cached_shuffling: CachedShuffling, - spec: &ChainSpec, ) -> Result<(), BeaconChainError> { - cached_shuffling.ensure_ptcs_for_gloas_shuffling(key.shuffling_epoch, spec)?; - match self.cache.get(&key) { - Some(CacheItem::Committee(existing)) => { - existing.ensure_ptcs_for_gloas_shuffling(key.shuffling_epoch, spec)?; + Some(CacheItem::Committee(_)) => { + // Calculation is deterministic, so no need to replace the existing entry. } // Replace the committee if it's not present or if it's a promise. A bird in the hand is // worth two in the promise-bush! @@ -324,7 +331,6 @@ where drop(shuffling_cache); let cached_shuffling = cache_item.wait()?; - cached_shuffling.ensure_ptcs_for_gloas_shuffling(shuffling_epoch, spec)?; map_fn(&cached_shuffling, shuffling_id.shuffling_decision_block) } else { // Create an entry in the cache that "promises" this value will eventually be computed. @@ -395,8 +401,6 @@ where let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { (state, state_root) } else { - // We assume that the `Pending` state has the same shufflings as a `Full` state for the - // same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root let (state_root, state) = store .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root) .map_err(BeaconChainError::DBError)? @@ -434,14 +438,13 @@ where .committee_cache(relative_epoch) .map_err(BeaconChainError::from)? .clone(); - let ptcs = get_ptcs_for_shuffling_epoch(&state, shuffling_epoch, spec) - .map_err(BeaconChainError::from)?; + let ptcs = CachedPTCs::from_state(&state, shuffling_epoch, spec)?; let shuffling_decision_block = shuffling_id.shuffling_decision_block; let cached_shuffling = CachedShuffling::new(committee_cache, ptcs); shuffling_cache_lock .write() - .insert_committee_cache_with_ptcs(shuffling_id, cached_shuffling.clone(), spec)?; + .insert_committee_cache(shuffling_id, cached_shuffling.clone())?; metrics::stop_timer(committee_building_timer); @@ -450,39 +453,6 @@ where map_fn(&cached_shuffling, shuffling_decision_block) } } -/// Return the PTCs associated with each slot in `shuffling_epoch`, when the state supports PTCs. -pub fn get_ptcs_for_shuffling_epoch( - state: &BeaconState, - shuffling_epoch: Epoch, - spec: &ChainSpec, -) -> Result>>, BeaconStateError> { - if shuffling_requires_ptcs(shuffling_epoch, spec) { - shuffling_epoch - .slot_iter(E::slots_per_epoch()) - .map(|slot| state.get_ptc(slot, spec)) - .collect::, _>>() - .map(Some) - } else { - Ok(None) - } -} - -/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. -pub trait ToArcCommitteeCache { - fn to_arc_committee_cache(&self) -> Arc; -} - -impl ToArcCommitteeCache for CommitteeCache { - fn to_arc_committee_cache(&self) -> Arc { - Arc::new(self.clone()) - } -} - -impl ToArcCommitteeCache for Arc { - fn to_arc_committee_cache(&self) -> Arc { - self.clone() - } -} /// Contains the shuffling IDs for a beacon block. #[derive(Clone)] @@ -573,14 +543,8 @@ mod test { ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids) } - fn test_spec() -> ChainSpec { - // Use a Fulu spec specifically because behaviour changes at Gloas. - // The Gloas tests explicitly enable Gloas. - ForkName::Fulu.make_genesis_spec(E::default_spec()) - } - fn cached_shuffling(committee_cache: Arc) -> CachedShuffling { - CachedShuffling::new(committee_cache, None) + CachedShuffling::new(committee_cache, CachedPTCs::PreGloas) } /// Returns two different committee caches for testing. @@ -748,11 +712,10 @@ mod test { #[test] fn should_insert_committee_cache() { let mut cache = new_shuffling_cache(); - let spec = test_spec(); let id_a = shuffling_id(1); let committee_cache_a = Arc::new(CommitteeCache::default()); cache - .insert_committee_cache(id_a.clone(), &committee_cache_a, &spec) + .insert_committee_cache(id_a.clone(), cached_shuffling(committee_cache_a.clone())) .unwrap(); assert!( matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a), @@ -760,34 +723,19 @@ mod test { ); } - #[test] - fn should_reject_gloas_committee_cache_without_ptc() { - let mut cache = new_shuffling_cache(); - let spec = ForkName::Gloas.make_genesis_spec(E::default_spec()); - let id = shuffling_id(1); - let committee_cache = Arc::new(CommitteeCache::default()); - - let result = cache.insert_committee_cache(id.clone(), &committee_cache, &spec); - - assert!(matches!( - result, - Err(BeaconChainError::MissingPtcForGloasShuffling { shuffling_epoch }) - if shuffling_epoch == id.shuffling_epoch - )); - assert!(!cache.contains(&id), "should not insert invalid cache"); - } - #[test] fn should_prune_committee_cache_with_lowest_epoch() { let mut cache = new_shuffling_cache(); - let spec = test_spec(); let shuffling_id_and_committee_caches = (0..(TEST_CACHE_SIZE + 1)) .map(|i| (shuffling_id(i as u64), Arc::new(CommitteeCache::default()))) .collect::>(); for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { cache - .insert_committee_cache(shuffling_id.clone(), committee_cache, &spec) + .insert_committee_cache( + shuffling_id.clone(), + cached_shuffling(committee_cache.clone()), + ) .unwrap(); } @@ -812,7 +760,6 @@ mod test { #[test] fn should_retain_head_state_shufflings() { let mut cache = new_shuffling_cache(); - let spec = test_spec(); let current_epoch = 10; let committee_cache = Arc::new(CommitteeCache::default()); @@ -823,7 +770,7 @@ mod test { shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), }; cache - .insert_committee_cache(shuffling_id, &committee_cache, &spec) + .insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())) .unwrap(); } @@ -838,16 +785,21 @@ mod test { // Insert head state shuffling ids. Should not be overridden by other shuffling ids. cache - .insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache, &spec) + .insert_committee_cache( + head_shuffling_ids.current.clone(), + cached_shuffling(committee_cache.clone()), + ) .unwrap(); cache - .insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache, &spec) + .insert_committee_cache( + head_shuffling_ids.next.clone(), + cached_shuffling(committee_cache.clone()), + ) .unwrap(); cache .insert_committee_cache( head_shuffling_ids.previous.clone().unwrap(), - &committee_cache, - &spec, + cached_shuffling(committee_cache.clone()), ) .unwrap(); @@ -858,7 +810,7 @@ mod test { shuffling_decision_block: Hash256::from_low_u64_be(i as u64), }; cache - .insert_committee_cache(shuffling_id, &committee_cache, &spec) + .insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())) .unwrap(); } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index f387563e13..4969b8df5f 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -13,11 +13,11 @@ //! 1. We are required to store an additional `BeaconState` for the head block. This consumes //! memory. //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. +use crate::shuffling_cache::CachedPTCs; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, - chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, - shuffling_cache::{CachedShuffling, get_ptcs_for_shuffling_epoch}, + BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + shuffling_cache::CachedShuffling, }; use slot_clock::SlotClock; use state_processing::per_slot_processing; @@ -410,17 +410,12 @@ fn advance_head(beacon_chain: &Arc>) -> Resu "Skipping priming of attester cache for Gloas boundary epoch" ); } else { - let ptcs = get_ptcs_for_shuffling_epoch(&state, shuffling_epoch, &beacon_chain.spec) - .map_err(BeaconChainError::from)?; + let ptcs = CachedPTCs::from_state(&state, shuffling_epoch, &beacon_chain.spec)?; let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs); beacon_chain .shuffling_cache .write() - .insert_committee_cache_with_ptcs( - shuffling_id.clone(), - cached_shuffling, - &beacon_chain.spec, - )?; + .insert_committee_cache(shuffling_id.clone(), cached_shuffling)?; debug!( ?head_block_root, diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9dfb8304bc..f532ef716e 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -36,6 +36,7 @@ use rand::SeedableRng; use rand::rngs::{OsRng, StdRng}; use slasher::Slasher; use slasher_service::SlasherService; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -639,6 +640,10 @@ where network_globals: self.network_globals.clone(), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), + historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new( + NonZeroUsize::new(self.http_api_config.historical_committee_cache_size) + .unwrap_or(NonZeroUsize::MIN), + )), }); let exit = runtime_context.executor.exit(); diff --git a/beacon_node/http_api/src/beacon/states.rs b/beacon_node/http_api/src/beacon/states.rs index d68c777428..1b765aa227 100644 --- a/beacon_node/http_api/src/beacon/states.rs +++ b/beacon_node/http_api/src/beacon/states.rs @@ -1,4 +1,5 @@ use crate::StateId; +use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId}; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::ResponseFilter; use crate::validator::pubkey_to_validator_index; @@ -13,7 +14,10 @@ use eth2::types::{ }; use ssz::Encode; use std::sync::Arc; -use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch}; +use types::{ + AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch, + RelativeEpochError, +}; use warp::filters::BoxedFilter; use warp::http::Response; use warp::hyper::Body; @@ -26,6 +30,8 @@ type BeaconStatesPath = BoxedFilter<( Arc>, )>; +type BeaconStatesCommitteesFilter = BoxedFilter<(Arc,)>; + // GET beacon/states/{state_id}/pending_consolidations pub fn get_beacon_state_pending_consolidations( beacon_states_path: BeaconStatesPath, @@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees( // GET beacon/states/{state_id}/committees?slot,index,epoch pub fn get_beacon_state_committees( beacon_states_path: BeaconStatesPath, + beacon_states_committees_filter: BeaconStatesCommitteesFilter, ) -> ResponseFilter { beacon_states_path .clone() .and(warp::path("committees")) .and(warp::query::()) + .and(beacon_states_committees_filter) .and(warp::path::end()) .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, - query: eth2::types::CommitteesQuery| { + query: eth2::types::CommitteesQuery, + historical_committee_cache: Arc| { task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -364,108 +373,75 @@ pub fn get_beacon_state_committees( let shuffling_id = if let Ok(Some(shuffling_decision_block)) = chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) { - Some(AttestationShufflingId { - shuffling_epoch: epoch, - shuffling_decision_block, - }) + Some(HistoricalShufflingId::ShufflingId( + AttestationShufflingId { + shuffling_epoch: epoch, + shuffling_decision_block, + }, + )) + } else if epoch < chain.head().finalized_checkpoint().epoch { + // Use the case for finalized epochs + Some(HistoricalShufflingId::FinalizedEpoch(epoch)) } else { None }; // Attempt to read from the chain cache if there exists a // shuffling_id - let maybe_cached_shuffling = if let Some(shuffling_id) = - shuffling_id.as_ref() - { - chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - .and_then(|mut cache_write| cache_write.get(shuffling_id)) - .and_then(|cache_item| cache_item.wait().ok()) - .map(|cached_shuffling| cached_shuffling.committee_cache) - } else { - None - }; + let maybe_cached_shuffling = + if let Some(shuffling_id) = shuffling_id.as_ref() { + historical_committee_cache.get(shuffling_id) + } else { + None + }; let committee_cache = if let Some(shuffling) = maybe_cached_shuffling { shuffling } else { - let possibly_built_cache = - match RelativeEpoch::from_epoch(current_epoch, epoch) { - Ok(relative_epoch) - if state.committee_cache_is_initialized( - relative_epoch, - ) => - { - state.committee_cache(relative_epoch).cloned() - } - _ => CommitteeCache::initialized( + let committee_cache = match RelativeEpoch::from_epoch( + current_epoch, + epoch, + ) { + Ok(relative_epoch) + if state.committee_cache_is_initialized( + relative_epoch, + ) => + { + state.committee_cache(relative_epoch).cloned() + } + Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => { + CommitteeCache::initialized( state, epoch, &chain.spec, - ), + ) } - .map_err( - |e| match e { - BeaconStateError::EpochOutOfBounds => { - let max_sprp = - T::EthSpec::slots_per_historical_root() - as u64; - let first_subsequent_restore_point_slot = - ((epoch.start_slot( - T::EthSpec::slots_per_epoch(), - ) / max_sprp) - + 1) - * max_sprp; - if epoch < current_epoch { - warp_utils::reject::custom_bad_request( - format!( - "epoch out of bounds, \ - try state at slot {}", - first_subsequent_restore_point_slot, - ), - ) - } else { - warp_utils::reject::custom_bad_request( - "epoch out of bounds, \ - too far in future" - .into(), - ) - } - } - _ => warp_utils::reject::unhandled_error( - BeaconChainError::from(e), - ), - }, - )?; - - // Attempt to write to the beacon cache (only if the cache - // size is not the default value). - if chain.config.shuffling_cache_size - != beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE - && let Some(shuffling_id) = shuffling_id - && let Some(mut cache_write) = chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - { - let decision_block_root = - shuffling_id.shuffling_decision_block; - if let Err(error) = cache_write.insert_committee_cache( - shuffling_id.clone(), - &possibly_built_cache, - &chain.spec, - ) { - tracing::warn!( - %epoch, - ?decision_block_root, - ?error, - "Priming committee cache failed" - ); + Err(RelativeEpochError::EpochTooHigh { .. }) => { + Err(BeaconStateError::EpochOutOfBounds) + } + Err(RelativeEpochError::ArithError(e)) => { + Err(BeaconStateError::ArithError(e)) } } + .map_err(|e| match e { + BeaconStateError::EpochOutOfBounds => { + warp_utils::reject::custom_bad_request(format!( + "epoch {} out of bounds for state at {}", + epoch, current_epoch + )) + } + _ => warp_utils::reject::unhandled_error( + BeaconChainError::from(e), + ), + })?; - possibly_built_cache + if let Some(shuffling_id) = shuffling_id { + historical_committee_cache + .insert(shuffling_id, committee_cache.clone()); + } + + committee_cache }; // Use either the supplied slot or all slots in the epoch. diff --git a/beacon_node/http_api/src/caches.rs b/beacon_node/http_api/src/caches.rs new file mode 100644 index 0000000000..d92571594a --- /dev/null +++ b/beacon_node/http_api/src/caches.rs @@ -0,0 +1,43 @@ +use lru::LruCache; +use parking_lot::Mutex; +use std::num::NonZeroUsize; +use std::sync::Arc; +use types::{AttestationShufflingId, CommitteeCache, Epoch}; + +/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale +pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16; + +/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we +/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to +/// key those committees by finalized epoch. +#[derive(Eq, Hash, PartialEq)] +pub enum HistoricalShufflingId { + FinalizedEpoch(Epoch), + ShufflingId(AttestationShufflingId), +} + +/// Dedicated cache for attestation committees, used exclusively by the HTTP API. +/// +/// This may contain committees for finalized and unfinalized epochs. The name is slightly +/// missleading :) +pub struct HistoricalCommitteeCache { + committees: Mutex>>, +} + +impl HistoricalCommitteeCache { + pub fn new(size: NonZeroUsize) -> Self { + Self { + committees: Mutex::new(LruCache::new(size)), + } + } +} + +impl HistoricalCommitteeCache { + pub fn get(&self, id: &HistoricalShufflingId) -> Option> { + self.committees.lock().get(id).cloned() + } + + pub fn insert(&self, id: HistoricalShufflingId, cache: Arc) { + self.committees.lock().put(id, cache); + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index f31817c5ba..74bf1ccd76 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -12,6 +12,7 @@ mod beacon; mod block_id; mod build_block_contents; mod builder_states; +mod caches; mod custody; mod database; mod light_client; @@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{ post_beacon_execution_payload_envelope_ssz, }; use crate::beacon::pool::*; +use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE; +pub use crate::caches::HistoricalCommitteeCache; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::utils::{AnyVersionFilter, EthV1Filter}; use crate::validator::post_validator_liveness_epoch; @@ -132,6 +135,7 @@ pub struct Context { pub network_globals: Option>>, pub beacon_processor_send: Option>, pub sse_logging_components: Option, + pub historical_committee_cache: Arc, } /// Configuration for the HTTP server. @@ -148,6 +152,7 @@ pub struct Config { #[serde(with = "eth2::types::serde_status_code")] pub duplicate_block_status_code: StatusCode, pub target_peers: usize, + pub historical_committee_cache_size: usize, } impl Default for Config { @@ -163,6 +168,7 @@ impl Default for Config { enable_beacon_processor: true, duplicate_block_status_code: StatusCode::ACCEPTED, target_peers: 100, + historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE, } } } @@ -416,6 +422,11 @@ pub fn serve( }) .boxed(); + let historical_committee_cache = ctx.historical_committee_cache.clone(); + let beacon_states_committees_filter = warp::any() + .map(move || historical_committee_cache.clone()) + .boxed(); + // Create a `warp` filter that provides access to the network sender channel. let network_tx = ctx .network_senders @@ -628,8 +639,10 @@ pub fn serve( states::get_beacon_state_validators_id(beacon_states_path.clone()); // GET beacon/states/{state_id}/committees?slot,index,epoch - let get_beacon_state_committees = - states::get_beacon_state_committees(beacon_states_path.clone()); + let get_beacon_state_committees = states::get_beacon_state_committees( + beacon_states_path.clone(), + beacon_states_committees_filter, + ); // GET beacon/states/{state_id}/sync_committees?epoch let get_beacon_state_sync_committees = diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 27e2a27d35..f27a04d17a 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{Config, Context}; +use crate::{Config, Context, caches::HistoricalCommitteeCache}; use beacon_chain::{ BeaconChain, BeaconChainTypes, custody_context::NodeCustodyType, @@ -22,10 +22,10 @@ use lighthouse_network::{ }; use network::{NetworkReceivers, NetworkSenders}; use sensitive_url::SensitiveUrl; -use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::{future::Future, num::NonZeroUsize}; use store::MemoryStore; use task_executor::test_utils::TestRuntime; use types::{ChainSpec, EthSpec}; @@ -293,6 +293,9 @@ pub async fn create_api_server_with_config( network_globals: Some(network_globals), beacon_processor_send: Some(beacon_processor_send), sse_logging_components: None, + historical_committee_cache: Arc::new(HistoricalCommitteeCache::new( + NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(), + )), }); let (listening_socket, server) = diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 71ee6b7ec2..e4291bd8d9 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4254,8 +4254,7 @@ impl NetworkBeaconProcessor { "payload_attn_invalid_sig", ); } - PayloadAttestationError::MissingPTC { .. } - | PayloadAttestationError::BeaconChainError(_) => { + PayloadAttestationError::BeaconChainError(_) => { debug!( %peer_id, %message_slot, diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 8ba2c0f321..f10f9e3b45 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -215,6 +215,9 @@ pub fn get_config( if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; + // Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak + // the behaviour of the HTTP API route `beacon/states/committees` + client_config.http_api.historical_committee_cache_size = cache_size; } if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {