From 4903fff43052b048f9e28ae149f65c3faafaed69 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 25 May 2026 15:06:27 +1000 Subject: [PATCH] Fix non-canonical payload attestation processing (#9305) Breakout from: - https://github.com/sigp/lighthouse/pull/9295 We currently do not handle the verification of payload attestations on non-canonical side chains, we always attempt to use the head. The included regression test demonstrates this, and there is _also_ a fork choice compliance test in #9295 that triggers it. This PR is a bit opinionated, but I'll explain my judgements: - We need a way to get the PTC for an arbitrary slot from an arbitrary state. This involves potential state advances, database lookups, etc. There is some fiddly logic required to check that states are in range/etc. - We _already have_ a cache with the exact same lifecycle as the PTCs, namely the attester shuffling cache. Therefore, we can de-duplicate a lot of the complexity by storing the PTCs for a given epoch (and decision block) in this cache. The other opinionated change is in the tests. The previous tests were set up kind of nicely to avoid instantiating a `BeaconChainHarness`. However they were not using mocking, which made testing the non-canonical chain case kind of infeasible. To remedy this, I've changed them to just use a beacon chain harness and create two chains using its relatively easy to use methods for doing this. The running time of the tests goes from something like 2.6s for 8 tests to 3.3s for 9 tests, which is only an increase of 0.04s/test. Negligible. Another plus to using the `BeaconChainHarness` is that it avoids a bunch of the cruft to create synthetic non-mocked beacon chain bits. At the same time, I've made some attempt to improve modularity (and fit with the `GossipVerificationContext`) by pulling out the guts of `with_committee_cache` into a new function (`with_cached_shuffling`) that clearly shows its dependency surface. Co-Authored-By: Michael Sproul Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- .../src/attestation_verification.rs | 6 +- beacon_node/beacon_chain/src/beacon_chain.rs | 186 ++------ beacon_node/beacon_chain/src/errors.rs | 7 + .../gossip_verified_payload_attestation.rs | 73 +-- .../payload_attestation_verification/mod.rs | 14 +- .../payload_attestation_verification/tests.rs | 316 +++++++++---- .../beacon_chain/src/shuffling_cache.rs | 426 +++++++++++++++--- .../beacon_chain/src/state_advance_timer.rs | 39 +- beacon_node/beacon_chain/tests/store_tests.rs | 6 +- .../gossip_methods.rs | 3 +- .../per_block_processing/signature_sets.rs | 27 +- 11 files changed, 687 insertions(+), 416 deletions(-) diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index f35de59e1f..635ca3a2ae 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { let (committee_opt, committees_per_slot) = chain.with_committee_cache( attestation.data.target.root, attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committee_opt = committee_cache .get_beacon_committee(attestation.data.slot, attestation.committee_index) .map(|beacon_committee| beacon_committee.committee.to_vec()); @@ -1574,7 +1575,8 @@ where return Err(Error::UnknownTargetRoot(target.root)); } - chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| { + chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committees_per_slot = committee_cache.committees_per_slot(); Ok(committee_cache diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d78e279936..b3d258a2fb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,7 +77,7 @@ use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache; -use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; +use crate::shuffling_cache::{CachedPTCs, CachedShuffling, ShufflingCache, with_cached_shuffling}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -472,7 +472,7 @@ pub struct BeaconChain { /// HTTP server is enabled. pub event_handler: Option>, /// Caches the attester shuffling for a given epoch and shuffling key root. - pub shuffling_cache: RwLock, + pub shuffling_cache: RwLock>, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. pub beacon_proposer_cache: Arc>, /// Caches a map of `validator_index -> validator_pubkey`. @@ -1696,7 +1696,8 @@ impl BeaconChain { let (duties, dependent_root) = self.with_committee_cache( head_block_root, epoch, - |committee_cache, dependent_root| { + |cached_shuffling, dependent_root| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let duties = validator_indices .iter() .map(|validator_index| { @@ -4914,15 +4915,20 @@ impl BeaconChain { ) -> Result<(), BlockError> { for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; + let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch()); - let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id); + if self.shuffling_cache.read().contains(&shuffling_id) { + continue; + } - if !shuffling_is_cached { - state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; - self.shuffling_cache - .write() - .insert_committee_cache(shuffling_id, committee_cache); + state.build_committee_cache(relative_epoch, &self.spec)?; + let committee_cache = state.committee_cache(relative_epoch)?.clone(); + + if let Some(ptcs) = CachedPTCs::try_from_state(state, shuffling_epoch, &self.spec)? { + self.shuffling_cache.write().insert_committee_cache( + shuffling_id, + CachedShuffling::new(committee_cache, ptcs), + ); } } Ok(()) @@ -7013,11 +7019,11 @@ impl BeaconChain { ) } - /// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head + /// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head /// `head_block_root`. The `map_fn` will be supplied two values: /// - /// - `&CommitteeCache`: the committee cache that serves the given parameters. - /// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`. + /// - `&CachedShuffling`: the committee cache and optional PTCs that serve the given parameters. + /// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling. /// /// It's not necessary that `head_block_root` matches our current view of the chain, it can be /// any block that is: @@ -7034,12 +7040,12 @@ impl BeaconChain { /// /// ## Notes /// - /// This function exists in this odd "map" pattern because efficiently obtaining a committee + /// This function exists in this odd "map" pattern because efficiently obtaining a shuffling /// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache` /// or it might involve reading it from a state from the DB. Due to the complexities of /// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here. /// - /// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the + /// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the /// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`. pub fn with_committee_cache( &self, @@ -7048,149 +7054,17 @@ impl BeaconChain { map_fn: F, ) -> Result where - F: Fn(&CommitteeCache, Hash256) -> Result, + F: Fn(&CachedShuffling, Hash256) -> Result, { - let head_block = self - .canonical_head - .fork_choice_read_lock() - .get_block(&head_block_root) - .ok_or(Error::MissingBeaconBlock(head_block_root))?; - - let shuffling_id = BlockShufflingIds { - current: head_block.current_epoch_shuffling_id.clone(), - next: head_block.next_epoch_shuffling_id.clone(), - previous: None, - block_root: head_block.root, - } - .id_for_epoch(shuffling_epoch) - .ok_or_else(|| Error::InvalidShufflingId { + with_cached_shuffling( + &self.canonical_head, + &self.shuffling_cache, + &self.store, + &self.spec, + head_block_root, shuffling_epoch, - head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), - })?; - - // Obtain the shuffling cache, timing how long we wait. - let mut shuffling_cache = { - let _ = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); - self.shuffling_cache.write() - }; - - if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { - // The shuffling cache is no longer required, drop the write-lock to allow concurrent - // access. - drop(shuffling_cache); - - let committee_cache = cache_item.wait()?; - map_fn(&committee_cache, shuffling_id.shuffling_decision_block) - } else { - // Create an entry in the cache that "promises" this value will eventually be computed. - // This avoids the case where multiple threads attempt to produce the same value at the - // same time. - // - // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same - // promise from being created twice. - let sender = shuffling_cache.create_promise(shuffling_id.clone())?; - - // Drop the shuffling cache to avoid holding the lock for any longer than - // required. - drop(shuffling_cache); - - debug!( - shuffling_id = ?shuffling_epoch, - head_block_root = head_block_root.to_string(), - "Committee cache miss" - ); - - // If the block's state will be so far ahead of `shuffling_epoch` that even its - // previous epoch committee cache will be too new, then error. Callers of this function - // shouldn't be requesting such old shufflings for this `head_block_root`. - let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); - if head_block_epoch > shuffling_epoch + 1 { - return Err(Error::InvalidStateForShuffling { - state_epoch: head_block_epoch, - shuffling_epoch, - }); - } - - let state_read_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); - - // If the head of the chain can serve this request, use it. - // - // This code is a little awkward because we need to ensure that the head we read and - // the head we copy is identical. Taking one lock to read the head values and another - // to copy the head is liable to race-conditions. - let head_state_opt = self.with_head(|head| { - if head.beacon_block_root == head_block_root { - Ok(Some((head.beacon_state.clone(), head.beacon_state_root()))) - } else { - Ok::<_, Error>(None) - } - })?; - - // Compute the `target_slot` to advance the block's state to. - // - // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to - // only advance into the first slot of the epoch prior to `shuffling_epoch`. - // - // If the `head_block` is already ahead of that slot, then we should load the state - // at that slot, as we've determined above that the `shuffling_epoch` cache will - // not be too far in the past. - let target_slot = std::cmp::max( - shuffling_epoch - .saturating_sub(1_u64) - .start_slot(T::EthSpec::slots_per_epoch()), - head_block.slot, - ); - - // If the head state is useful for this request, use it. Otherwise, read a state from - // disk that is advanced as close as possible to `target_slot`. - let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { - (state, state_root) - } else { - // We assume that the `Pending` state has the same shufflings as a `Full` state - // for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root - let (state_root, state) = self - .store - .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? - .ok_or(Error::MissingBeaconState(head_block.state_root))?; - (state, state_root) - }; - - metrics::stop_timer(state_read_timer); - let state_skip_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); - - // If the state is still in an earlier epoch, advance it to the `target_slot` so - // that its next epoch committee cache matches the `shuffling_epoch`. - if state.current_epoch() + 1 < shuffling_epoch { - // Advance the state into the required slot, using the "partial" method since the - // state roots are not relevant for the shuffling. - partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?; - } - metrics::stop_timer(state_skip_timer); - - let committee_building_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); - - let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) - .map_err(Error::IncorrectStateForAttestation)?; - - state.build_committee_cache(relative_epoch, &self.spec)?; - - let committee_cache = state.committee_cache(relative_epoch)?.clone(); - let shuffling_decision_block = shuffling_id.shuffling_decision_block; - - self.shuffling_cache - .write() - .insert_committee_cache(shuffling_id, &committee_cache); - - metrics::stop_timer(committee_building_timer); - - sender.send(committee_cache.clone()); - - map_fn(&committee_cache, shuffling_decision_block) - } + map_fn, + ) } /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9802f091e0..5efe9a3c23 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -251,6 +251,13 @@ pub enum BeaconChainError { request_epoch: Epoch, cache_epoch: Epoch, }, + AttesterCachePtcOutOfBounds { + slot: Slot, + epoch: Epoch, + }, + AttesterCacheNoPtcPreGloas { + slot: Slot, + }, SkipProposerPreparation, FailedColumnCustodyInfoUpdate, } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index c36c73b344..3e9f9e4b60 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -2,26 +2,29 @@ use super::Error; use crate::beacon_chain::BeaconStore; use crate::canonical_head::CanonicalHead; use crate::observed_attesters::ObservedPayloadAttesters; +use crate::shuffling_cache::{ShufflingCache, with_cached_shuffling}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use bls::AggregateSignature; use educe::Educe; use eth2::types::{EventKind, ForkVersionedResponse}; use parking_lot::RwLock; -use safe_arith::SafeArith; use slot_clock::SlotClock; -use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set; -use state_processing::state_advance::partial_state_advance; +use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set_from_pubkeys; use std::borrow::Cow; -use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot}; +use types::{ + ChainSpec, EthSpec, Hash256, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot, +}; pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { pub slot_clock: &'a T::SlotClock, pub spec: &'a ChainSpec, pub observed_payload_attesters: &'a RwLock>, pub canonical_head: &'a CanonicalHead, + pub shuffling_cache: &'a RwLock>, pub validator_pubkey_cache: &'a RwLock>, pub store: &'a BeaconStore, + pub genesis_validators_root: Hash256, } /// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network. @@ -76,56 +79,18 @@ impl VerifiedPayloadAttestationMessage { return Err(Error::UnknownHeadBlock { beacon_block_root }); } - // Get head state for PTC computation. If the cached head state is too stale - // (e.g. during liveness failures with many skipped slots), fall back to loading - // a more recent state from the store and advancing it if necessary. - let head = ctx.canonical_head.cached_head(); - let head_state = &head.snapshot.beacon_state; - let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let state_epoch = head_state.current_epoch(); - - // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. - // If the message epoch is beyond that range, the head state is stale. - let advanced_state = if message_epoch - > state_epoch - .safe_add(ctx.spec.min_seed_lookahead) - .map_err(BeaconChainError::from)? - { - let head_block_root = head.head_block_root(); - let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch()); - - let (state_root, mut state) = ctx - .store - .get_advanced_hot_state( - head_block_root, - target_slot, - head.snapshot.beacon_state_root(), - ) - .map_err(BeaconChainError::from)? - .ok_or(BeaconChainError::MissingBeaconState( - head.snapshot.beacon_state_root(), - ))?; - - if state - .current_epoch() - .safe_add(ctx.spec.min_seed_lookahead) - .map_err(BeaconChainError::from)? - < message_epoch - { - partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec) - .map_err(BeaconChainError::from)?; - } - - Some(state) - } else { - None - }; - - let state = advanced_state.as_ref().unwrap_or(head_state); + let ptc = with_cached_shuffling( + ctx.canonical_head, + ctx.shuffling_cache, + ctx.store, + ctx.spec, + beacon_block_root, + message_epoch, + |cached_shuffling, _| cached_shuffling.ptc_for_slot(slot), + )?; // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. - let ptc = state.get_ptc(slot, ctx.spec)?; if !ptc.0.contains(&(validator_index as usize)) { return Err(Error::NotInPTC { validator_index, @@ -145,11 +110,11 @@ impl VerifiedPayloadAttestationMessage { { // [REJECT] The signature is valid with respect to the `validator_index`. let pubkey_cache = ctx.validator_pubkey_cache.read(); - let signature_set = indexed_payload_attestation_signature_set( - state, + let signature_set = indexed_payload_attestation_signature_set_from_pubkeys( |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), &indexed_payload_attestation.signature, &indexed_payload_attestation, + ctx.genesis_validators_root, ctx.spec, ) .map_err(|_| Error::UnknownValidatorIndex(validator_index))?; @@ -204,8 +169,10 @@ impl BeaconChain { spec: &self.spec, observed_payload_attesters: &self.observed_payload_attesters, canonical_head: &self.canonical_head, + shuffling_cache: &self.shuffling_cache, validator_pubkey_cache: &self.validator_pubkey_cache, store: &self.store, + genesis_validators_root: self.genesis_validators_root, } } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs index 477527c0aa..89ae1bbbdd 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -9,7 +9,7 @@ use crate::BeaconChainError; use strum::AsRefStr; -use types::{BeaconStateError, Hash256, Slot}; +use types::{Hash256, Slot}; pub mod gossip_verified_payload_attestation; @@ -86,12 +86,6 @@ pub enum Error { /// We were unable to process this message due to an internal error. It's unclear if the /// message is valid. BeaconChainError(Box), - /// An error reading beacon state. - /// - /// ## Peer scoring - /// - /// We were unable to process this message due to an internal error. - BeaconStateError(BeaconStateError), } impl From for Error { @@ -100,11 +94,5 @@ impl From for Error { } } -impl From for Error { - fn from(e: BeaconStateError) -> Self { - Error::BeaconStateError(e) - } -} - #[cfg(test)] mod tests; diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs index c45df51ac8..d4b82c41fc 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -1,25 +1,15 @@ use std::sync::Arc; use std::time::Duration; -use bls::{Keypair, Signature}; -use fork_choice::ForkChoice; -use genesis::{generate_deterministic_keypairs, interop_genesis_state}; -use parking_lot::RwLock; -use proto_array::PayloadStatus; +use bls::Signature; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::AllCaches; -use state_processing::genesis::genesis_block; -use store::{HotColdDB, StoreConfig}; use types::{ - ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, - PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, + Domain, Epoch, EthSpec, ForkName, Hash256, MinimalEthSpec, PayloadAttestationData, + PayloadAttestationMessage, SignedRoot, Slot, }; use crate::{ - beacon_fork_choice_store::BeaconForkChoiceStore, - beacon_snapshot::BeaconSnapshot, - canonical_head::CanonicalHead, - observed_attesters::ObservedPayloadAttesters, payload_attestation_verification::{ Error as PayloadAttestationError, gossip_verified_payload_attestation::{ @@ -27,7 +17,6 @@ use crate::{ }, }, test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, - validator_pubkey_cache::ValidatorPubkeyCache, }; type E = MinimalEthSpec; @@ -36,96 +25,48 @@ type T = EphemeralHarnessType; const NUM_VALIDATORS: usize = 64; struct TestContext { - canonical_head: CanonicalHead, - observed_payload_attesters: RwLock>, - validator_pubkey_cache: RwLock>, - slot_clock: TestingSlotClock, - keypairs: Vec, - spec: ChainSpec, + harness: BeaconChainHarness, genesis_block_root: Hash256, - store: Arc>, } impl TestContext { fn new() -> Self { - let spec = test_spec::(); - let store = Arc::new( - HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) - .expect("should open ephemeral store"), - ); - - let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); - - let mut state = - interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) - .expect("should build genesis state"); - - *state.finalized_checkpoint_mut() = Checkpoint { - epoch: Epoch::new(1), - root: Hash256::ZERO, - }; - - let mut block = genesis_block(&state, &spec).expect("should build genesis block"); - *block.state_root_mut() = state - .update_tree_hash_cache() - .expect("should hash genesis state"); - let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); - let block_root = signed_block.canonical_root(); - - let snapshot = BeaconSnapshot::new( - Arc::new(signed_block.clone()), - None, - block_root, - state.clone(), - ); - - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) - .expect("should create fork choice store"); - let fork_choice = - ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) - .expect("should create fork choice"); - - let canonical_head = - CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); - + let spec = Arc::new(test_spec::()); let slot_clock = TestingSlotClock::new( Slot::new(0), Duration::from_secs(0), spec.get_slot_duration(), ); - // Advance past genesis so `now_with_past_tolerance` doesn't underflow. - slot_clock.set_current_time(spec.get_slot_duration()); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .testing_slot_clock(slot_clock) + .build(); - let validator_pubkey_cache = - ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + // Advance past genesis so `now_with_past_tolerance` doesn't underflow. + harness + .chain + .slot_clock + .set_current_time(harness.spec.get_slot_duration()); + let genesis_block_root = harness.chain.genesis_block_root; Self { - canonical_head, - observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), - validator_pubkey_cache: RwLock::new(validator_pubkey_cache), - slot_clock, - keypairs, - spec, - genesis_block_root: block_root, - store, + harness, + genesis_block_root, } } fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { - GossipVerificationContext { - slot_clock: &self.slot_clock, - spec: &self.spec, - observed_payload_attesters: &self.observed_payload_attesters, - canonical_head: &self.canonical_head, - validator_pubkey_cache: &self.validator_pubkey_cache, - store: &self.store, - } + self.harness.chain.payload_attestation_gossip_context() } fn ptc_members(&self, slot: Slot) -> Vec { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + let ptc = state + .get_ptc(slot, &self.harness.spec) + .expect("should get PTC"); ptc.0.to_vec() } @@ -134,16 +75,18 @@ impl TestContext { data: PayloadAttestationData, validator_index: u64, ) -> PayloadAttestationMessage { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let domain = self.spec.get_domain( + let domain = self.harness.spec.get_domain( data.slot.epoch(E::slots_per_epoch()), Domain::PTCAttester, &state.fork(), state.genesis_validators_root(), ); let message = data.signing_root(domain); - let signature = self.keypairs[validator_index as usize].sk.sign(message); + let signature = self.harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); PayloadAttestationMessage { validator_index, data, @@ -192,7 +135,7 @@ fn past_slot() { return; } let ctx = TestContext::new(); - ctx.slot_clock.set_slot(5); + ctx.harness.chain.slot_clock.set_slot(5); let gossip = ctx.gossip_ctx(); let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); @@ -328,20 +271,95 @@ fn duplicate_after_valid() { )); } -/// Exercises the `partial_state_advance` fallback in gossip verification when -/// the head state is too stale to compute PTC membership (e.g., during a -/// network liveness failure with many missed slots). #[tokio::test] -async fn stale_head_with_partial_advance() { +async fn ptc_cache_is_primed_at_gloas_fork_boundary() { + // Only run this test once, when FORK_NAME=gloas exactly. + let mut spec = test_spec::(); + if spec.fork_name_at_epoch(Epoch::new(0)) != ForkName::Gloas { + return; + } + + let gloas_fork_epoch = Epoch::new(2); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + assert_eq!( + spec.fork_name_at_epoch(gloas_fork_epoch - 1), + ForkName::Fulu + ); + assert_eq!(spec.fork_name_at_epoch(gloas_fork_epoch), ForkName::Gloas); + + let slots_per_epoch = E::slots_per_epoch(); + let fork_boundary_slot = gloas_fork_epoch.start_slot(slots_per_epoch); + let test_slots = (fork_boundary_slot.as_u64() + ..fork_boundary_slot.as_u64() + slots_per_epoch * 2) + .map(Slot::new); + + let harness = BeaconChainHarness::builder(E::default()) + .spec(Arc::new(spec)) + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.extend_to_slot(fork_boundary_slot).await; + + for slot in test_slots { + harness.chain.slot_clock.set_slot(slot.as_u64()); + assert!( + harness + .chain + .shuffling_cache + .read() + .check_gloas_ptcs_invariant(&harness.spec), + "shuffling cache should satisfy the Gloas PTC invariant" + ); + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let ptc = state.get_ptc(slot, &harness.spec).expect("should get PTC"); + let validator_index = *ptc.0.first().expect("PTC should have a member") as u64; + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot, + payload_present: true, + blob_data_available: true, + }; + let domain = harness.spec.get_domain( + data.slot.epoch(slots_per_epoch), + Domain::PTCAttester, + &state.fork(), + state.genesis_validators_root(), + ); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(data.signing_root(domain)); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + let result = harness + .chain + .verify_payload_attestation_message_for_gossip(msg); + assert!( + result.is_ok(), + "expected PTC payload attestation to verify at slot {}, got: {:?}", + slot, + result.unwrap_err() + ); + } +} + +/// Exercises payload attestation gossip verification when the message epoch is ahead of the +/// canonical head due to many missed slots. +#[tokio::test] +async fn stale_head_payload_attestation() { if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { return; } let slots_per_epoch = E::slots_per_epoch(); - // Head at epoch 1, message at epoch 5 — 4 epochs of missed slots. - // This exceeds min_seed_lookahead (1), triggering the fallback path: - // get_advanced_hot_state loads the stored state, then partial_state_advance - // advances it through epoch boundaries to populate ptc_window. + // Head at epoch 1, message at epoch 5: 4 epochs of missed slots. let head_slot = Slot::new(slots_per_epoch); let missed_epochs = 4; let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs)); @@ -360,7 +378,7 @@ async fn stale_head_with_partial_advance() { let head_epoch = head.snapshot.beacon_state.current_epoch(); assert!( target_epoch > head_epoch + harness.spec.min_seed_lookahead, - "precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback" + "precondition: message epoch must exceed head + min_seed_lookahead" ); // GIVEN a slot clock advanced to epoch 5 without producing blocks @@ -385,7 +403,9 @@ async fn stale_head_with_partial_advance() { .expect("should get PTC from reference state"); let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; - // WHEN a properly-signed payload attestation from a PTC member is verified. + // WHEN a properly-signed payload attestation from a PTC member is verified. The signature + // domain should come from the spec fork schedule and genesis validators root, not a loaded + // state in the verifier. let domain = harness.spec.get_domain( target_epoch, Domain::PTCAttester, @@ -420,3 +440,105 @@ async fn stale_head_with_partial_advance() { result.unwrap_err() ); } + +/// Exercises payload attestation gossip verification for a non-canonical block whose PTC differs +/// from the canonical chain's PTC for the same slot. +#[tokio::test] +async fn side_chain_payload_attestation_uses_side_chain_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let slots_per_epoch = E::slots_per_epoch(); + let fork_slot = Slot::new(slots_per_epoch); + let target_slot = Slot::new(slots_per_epoch * 4); + let target_epoch = target_slot.epoch(slots_per_epoch); + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Build a common prefix through epoch 1. + harness.extend_to_slot(fork_slot).await; + let fork_state = harness.chain.head_snapshot().beacon_state.clone(); + + // Build two branches for several epochs. The side chain skips its first slot, giving it + // different RANDAO mixes and therefore a different PTC by the target slot. The canonical chain + // is processed second and receives sub-finality attestations, so it remains the head without + // finalizing past the side-chain fork point. + let side_slots: Vec<_> = ((fork_slot + 2).as_u64()..=target_slot.as_u64()) + .map(Slot::new) + .collect(); + let canonical_slots: Vec<_> = ((fork_slot + 1).as_u64()..=target_slot.as_u64()) + .map(Slot::new) + .collect(); + let canonical_validators = (0..NUM_VALIDATORS / 2).collect::>(); + + let results = harness + .add_blocks_on_multiple_chains(vec![ + (fork_state.clone(), side_slots, vec![]), + (fork_state, canonical_slots, canonical_validators), + ]) + .await; + + let side_head_root: Hash256 = results[0].2.into(); + let side_head_state = &results[0].3; + let canonical_head_root: Hash256 = results[1].2.into(); + let canonical_head_state = &results[1].3; + + assert_ne!(side_head_root, canonical_head_root); + assert_eq!( + harness.chain.head_snapshot().beacon_block_root, + canonical_head_root + ); + + let side_ptc = side_head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get side-chain PTC"); + let canonical_ptc = canonical_head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get canonical PTC"); + assert_ne!( + side_ptc, canonical_ptc, + "precondition: side-chain PTC should differ from canonical PTC" + ); + + let validator_index = side_ptc + .0 + .iter() + .copied() + .find(|validator_index| !canonical_ptc.0.contains(validator_index)) + .expect("should find a validator in the side-chain PTC only") + as u64; + + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &side_head_state.fork(), + side_head_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: side_head_root, + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + let verified = harness + .chain + .verify_payload_attestation_message_for_gossip(msg) + .expect("side-chain payload attestation should verify"); + assert_eq!(verified.ptc(), &side_ptc); +} diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0377b553e3..daaede6ed1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -3,23 +3,28 @@ use std::sync::Arc; use itertools::Itertools; use oneshot_broadcast::{Receiver, Sender, oneshot}; +use parking_lot::RwLock; +use state_processing::state_advance::partial_state_advance; use tracing::debug; use types::{ - AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, - state::CommitteeCache, + AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC, + RelativeEpoch, Slot, state::CommitteeCache, }; -use crate::{BeaconChainError, metrics}; +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, canonical_head::CanonicalHead, metrics, +}; -/// The size of the cache that stores committee caches for quicker verification. +/// The size of the cache that stores shufflings for quicker verification. /// -/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + -/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this -/// ignores a few extra bytes in the caches that should be insignificant compared to the indices). +/// Each entry should be around `8 * 2M + 128KB ~= 16 MB` in size with 2M validators +/// and 32 512-validator PTCs. Therefore, this cache should be approx +/// `16 * 16 MB ~= 256 MB`. (Note: this ignores a few extra bytes in the +/// caches that should be insignificant compared to the indices). pub const DEFAULT_CACHE_SIZE: usize = 16; -/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this -/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// The maximum number of concurrent shuffling "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the shuffling. /// This prevents excessive memory usage at the cost of rejecting some attestations. /// /// We set this value to 2 since states can be quite large and have a significant impact on memory @@ -30,19 +35,82 @@ pub const DEFAULT_CACHE_SIZE: usize = 16; const MAX_CONCURRENT_PROMISES: usize = 2; #[derive(Clone)] -pub enum CacheItem { - /// A committee. - Committee(Arc), - /// A promise for a future committee. - Promise(Receiver>), +pub struct CachedShuffling { + pub committee_cache: Arc, + pub ptcs: CachedPTCs, } -impl CacheItem { +#[derive(Clone)] +pub enum CachedPTCs { + PreGloas, + PostGloas(Vec>, Epoch), +} + +impl CachedPTCs { + /// Returns `None` at the Gloas fork boundary (pre-Gloas state, Gloas shuffling epoch); the + /// on-demand miss path in `with_cached_shuffling` handles those. + pub fn try_from_state( + state: &BeaconState, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result, BeaconChainError> { + if shuffling_requires_ptcs(epoch, spec) { + if !state.fork_name_unchecked().gloas_enabled() { + return Ok(None); + } + let ptcs = epoch + .slot_iter(E::slots_per_epoch()) + .map(|slot| state.get_ptc(slot, spec)) + .collect::, _>>()?; + Ok(Some(Self::PostGloas(ptcs, epoch))) + } else { + Ok(Some(Self::PreGloas)) + } + } +} + +impl CachedShuffling { + pub fn new(committee_cache: Arc, ptcs: CachedPTCs) -> Self { + Self { + committee_cache, + ptcs, + } + } + + pub fn ptc_for_slot(&self, slot: Slot) -> Result, BeaconChainError> { + match &self.ptcs { + CachedPTCs::PreGloas => Err(BeaconChainError::AttesterCacheNoPtcPreGloas { slot }), + &CachedPTCs::PostGloas(ref ptcs, epoch) => { + if slot.epoch(E::slots_per_epoch()) != epoch { + Err(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } else { + ptcs.get(slot.as_usize() % E::slots_per_epoch() as usize) + .cloned() + .ok_or(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } + } + } + } +} + +fn shuffling_requires_ptcs(shuffling_epoch: Epoch, spec: &ChainSpec) -> bool { + spec.fork_name_at_epoch(shuffling_epoch).gloas_enabled() +} + +#[derive(Clone)] +pub enum CacheItem { + /// A cached shuffling. + Committee(CachedShuffling), + /// A promise for a future cached shuffling. + Promise(Receiver>), +} + +impl CacheItem { pub fn is_promise(&self) -> bool { matches!(self, CacheItem::Promise(_)) } - pub fn wait(self) -> Result, BeaconChainError> { + pub fn wait(self) -> Result, BeaconChainError> { match self { CacheItem::Committee(cache) => Ok(cache), CacheItem::Promise(receiver) => receiver @@ -52,17 +120,17 @@ impl CacheItem { } } -/// Provides a cache for `CommitteeCache`. +/// Provides a cache for `CommitteeCache` and the associated optional PTCs. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. -pub struct ShufflingCache { - cache: HashMap, +pub struct ShufflingCache { + cache: HashMap>, cache_size: usize, head_shuffling_ids: BlockShufflingIds, } -impl ShufflingCache { +impl ShufflingCache { pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds) -> Self { Self { cache: HashMap::new(), @@ -71,22 +139,22 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + pub fn get(&mut self, key: &AttestationShufflingId) -> Option> { match self.cache.get(key) { - // The cache contained the committee cache, return it. + // The cache contained the shuffling, return it. item @ Some(CacheItem::Committee(_)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The cache contains a promise for the committee cache. Check to see if the promise has + // The cache contains a promise for the shuffling. Check to see if the promise has // already been resolved, without waiting for it. item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { // The promise has already been resolved. Replace the entry in the cache with a - // `Committee` entry and then return the committee. - Ok(Some(committee)) => { + // `Committee` entry and then return the cached shuffling. + Ok(Some(cached_shuffling)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - let ready = CacheItem::Committee(committee); + let ready = CacheItem::Committee(cached_shuffling); self.insert_cache_item(key.clone(), ready.clone()); Some(ready) } @@ -97,8 +165,8 @@ impl ShufflingCache { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The sender has been dropped without sending a committee. There was most likely an - // error computing the committee cache. Drop the key from the cache and return + // The sender has been dropped without sending a shuffling. There was most likely an + // error computing the shuffling. Drop the key from the cache and return // `None` so the caller can recompute the committee. // // It's worth noting that this is the only place where we removed unresolved @@ -113,7 +181,7 @@ impl ShufflingCache { None } }, - // The cache does not have this committee and it's not already promised to be computed. + // The cache does not have this shuffling and it's not already promised to be computed. None => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); None @@ -125,27 +193,41 @@ impl ShufflingCache { self.cache.contains_key(key) } - pub fn insert_committee_cache( + /// Check that all entries for Gloas epochs have PTCs. + #[cfg(test)] + pub fn check_gloas_ptcs_invariant(&self, spec: &ChainSpec) -> bool { + self.cache.iter().all(|(key, item)| { + if shuffling_requires_ptcs(key.shuffling_epoch, spec) { + match item { + CacheItem::Committee(cached_shuffling) => { + matches!(cached_shuffling.ptcs, CachedPTCs::PostGloas(..)) + } + CacheItem::Promise(_) => true, + } + } else { + true + } + }) + } + + pub fn insert_committee_cache( &mut self, key: AttestationShufflingId, - committee_cache: &C, + cached_shuffling: CachedShuffling, ) { - if self - .cache - .get(&key) - // Replace the committee if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! - .is_none_or(CacheItem::is_promise) - { - self.insert_cache_item( - key, - CacheItem::Committee(committee_cache.to_arc_committee_cache()), - ); + match self.cache.get(&key) { + Some(CacheItem::Committee(_)) => { + // Calculation is deterministic, so no need to replace the existing entry. + } + // A bird in the hand is worth two in the promise-bush! + Some(CacheItem::Promise(_)) | None => { + self.insert_cache_item(key, CacheItem::Committee(cached_shuffling)); + } } } /// Prunes the cache first before inserting a new cache item. - fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { + fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { self.prune_cache(); self.cache.insert(key, cache_item); } @@ -188,7 +270,7 @@ impl ShufflingCache { pub fn create_promise( &mut self, key: AttestationShufflingId, - ) -> Result>, BeaconChainError> { + ) -> Result>, BeaconChainError> { let num_active_promises = self .cache .iter() @@ -212,20 +294,170 @@ impl ShufflingCache { } } -/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. -pub trait ToArcCommitteeCache { - fn to_arc_committee_cache(&self) -> Arc; -} +pub fn with_cached_shuffling( + canonical_head: &CanonicalHead, + shuffling_cache_lock: &RwLock>, + store: &BeaconStore, + spec: &ChainSpec, + head_block_root: Hash256, + shuffling_epoch: Epoch, + map_fn: F, +) -> Result +where + T: BeaconChainTypes, + F: Fn(&CachedShuffling, Hash256) -> Result, + Error: From, +{ + let head_block = canonical_head + .fork_choice_read_lock() + .get_block(&head_block_root) + .ok_or(BeaconChainError::MissingBeaconBlock(head_block_root))?; -impl ToArcCommitteeCache for CommitteeCache { - fn to_arc_committee_cache(&self) -> Arc { - Arc::new(self.clone()) + let shuffling_id = BlockShufflingIds { + current: head_block.current_epoch_shuffling_id.clone(), + next: head_block.next_epoch_shuffling_id.clone(), + previous: None, + block_root: head_block.root, } -} + .id_for_epoch(shuffling_epoch) + .ok_or_else(|| BeaconChainError::InvalidShufflingId { + shuffling_epoch, + head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), + })?; -impl ToArcCommitteeCache for Arc { - fn to_arc_committee_cache(&self) -> Arc { - self.clone() + let mut shuffling_cache = { + let _ = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); + shuffling_cache_lock.write() + }; + + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + drop(shuffling_cache); + + let cached_shuffling = cache_item.wait()?; + map_fn(&cached_shuffling, shuffling_id.shuffling_decision_block) + } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + + // Drop the shuffling cache to avoid holding the lock for any longer than required. + drop(shuffling_cache); + + debug!( + shuffling_id = ?shuffling_epoch, + head_block_root = head_block_root.to_string(), + "Committee cache miss" + ); + + // If the block's state will be so far ahead of `shuffling_epoch` that even its previous + // epoch committee cache will be too new, then error. Callers of this function shouldn't be + // requesting such old shufflings for this `head_block_root`. + let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); + if head_block_epoch > shuffling_epoch + 1 { + return Err(BeaconChainError::InvalidStateForShuffling { + state_epoch: head_block_epoch, + shuffling_epoch, + } + .into()); + } + + let state_read_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); + + let cached_head = canonical_head.cached_head(); + let head_state_opt = if cached_head.head_block_root() == head_block_root { + Some(( + cached_head.snapshot.beacon_state.clone(), + cached_head.head_state_root(), + )) + } else { + None + }; + + // Compute the `target_slot` to advance the block's state to. + // + // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to only + // advance into the first slot of the epoch prior to `shuffling_epoch`. + // + // If the `head_block` is already ahead of that slot, then we should load the state at that + // slot, as we've determined above that the `shuffling_epoch` cache will not be too far in + // the past. + let mut target_slot = std::cmp::max( + shuffling_epoch + .saturating_sub(1_u64) + .start_slot(T::EthSpec::slots_per_epoch()), + head_block.slot, + ); + if spec.gloas_fork_epoch == Some(shuffling_epoch) { + target_slot = std::cmp::max( + target_slot, + shuffling_epoch.start_slot(T::EthSpec::slots_per_epoch()), + ); + } + + // If the head state is useful for this request, use it. Otherwise, read a state from disk + // that is advanced as close as possible to `target_slot`. + let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { + (state, state_root) + } else { + let (state_root, state) = store + .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root) + .map_err(BeaconChainError::DBError)? + .ok_or(BeaconChainError::MissingBeaconState(head_block.state_root))?; + (state, state_root) + }; + + metrics::stop_timer(state_read_timer); + let state_skip_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); + + // If the state is still in an earlier epoch, advance it to the `target_slot` so that its + // next epoch committee cache matches the `shuffling_epoch`. + let advance_to_gloas_fork = spec.gloas_fork_epoch == Some(shuffling_epoch) + && state.current_epoch() < shuffling_epoch; + if state.current_epoch() + 1 < shuffling_epoch || advance_to_gloas_fork { + // Advance the state into the required slot, using the "partial" method since the state + // roots are not relevant for the shuffling. + partial_state_advance(&mut state, Some(state_root), target_slot, spec) + .map_err(BeaconChainError::from)?; + } + metrics::stop_timer(state_skip_timer); + + let committee_building_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); + + let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) + .map_err(BeaconChainError::IncorrectStateForAttestation)?; + + state + .build_committee_cache(relative_epoch, spec) + .map_err(BeaconChainError::from)?; + + let committee_cache = state + .committee_cache(relative_epoch) + .map_err(BeaconChainError::from)? + .clone(); + // The state has been advanced through the upgrade if needed, so `try_from_state` + // cannot return None here. + let ptcs = CachedPTCs::try_from_state(&state, shuffling_epoch, spec)?.ok_or( + BeaconChainError::BeaconStateError(BeaconStateError::IncorrectStateVariant), + )?; + let shuffling_decision_block = shuffling_id.shuffling_decision_block; + let cached_shuffling = CachedShuffling::new(committee_cache, ptcs); + + shuffling_cache_lock + .write() + .insert_committee_cache(shuffling_id, cached_shuffling.clone()); + + metrics::stop_timer(committee_building_timer); + + sender.send(cached_shuffling.clone()); + + map_fn(&cached_shuffling, shuffling_decision_block) } } @@ -304,7 +536,7 @@ mod test { const TEST_CACHE_SIZE: usize = 5; // Creates a new shuffling cache for testing - fn new_shuffling_cache() -> ShufflingCache { + fn new_shuffling_cache() -> ShufflingCache { create_test_tracing_subscriber(); let current_epoch = 8; @@ -318,6 +550,10 @@ mod test { ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids) } + fn cached_shuffling(committee_cache: Arc) -> CachedShuffling { + CachedShuffling::new(committee_cache, CachedPTCs::PreGloas) + } + /// Returns two different committee caches for testing. fn committee_caches() -> (Arc, Arc) { let harness = BeaconChainHarness::builder(MinimalEthSpec) @@ -366,12 +602,12 @@ mod test { ); // Resolve the promise. - sender.send(committee_a.clone()); + sender.send(cached_shuffling(committee_a.clone())); // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "the promise should be resolved" ); assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); @@ -428,30 +664,30 @@ mod test { ); // Resolve promise A. - sender_a.send(committee_a.clone()); + sender_a.send(cached_shuffling(committee_a.clone())); // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should be resolved" ); // Resolve promise B. - sender_b.send(committee_b.clone()); + sender_b.send(cached_shuffling(committee_b.clone())); // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_b), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should be resolved" ); // Check both entries again. assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should remain resolved" ); assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should remain resolved" ); assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); @@ -485,9 +721,9 @@ mod test { let mut cache = new_shuffling_cache(); let id_a = shuffling_id(1); let committee_cache_a = Arc::new(CommitteeCache::default()); - cache.insert_committee_cache(id_a.clone(), &committee_cache_a); + cache.insert_committee_cache(id_a.clone(), cached_shuffling(committee_cache_a.clone())); assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a), "should insert committee cache" ); } @@ -500,7 +736,10 @@ mod test { .collect::>(); for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { - cache.insert_committee_cache(shuffling_id.clone(), committee_cache); + cache.insert_committee_cache( + shuffling_id.clone(), + cached_shuffling(committee_cache.clone()), + ); } for i in 1..(TEST_CACHE_SIZE + 1) { @@ -533,7 +772,7 @@ mod test { shuffling_epoch: (current_epoch + 1).into(), shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())); } // Now, update the head shuffling ids @@ -546,11 +785,17 @@ mod test { cache.update_head_shuffling_ids(head_shuffling_ids.clone()); // Insert head state shuffling ids. Should not be overridden by other shuffling ids. - cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache); - cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache); + cache.insert_committee_cache( + head_shuffling_ids.current.clone(), + cached_shuffling(committee_cache.clone()), + ); + cache.insert_committee_cache( + head_shuffling_ids.next.clone(), + cached_shuffling(committee_cache.clone()), + ); cache.insert_committee_cache( head_shuffling_ids.previous.clone().unwrap(), - &committee_cache, + cached_shuffling(committee_cache.clone()), ); // Insert a few entries for older epochs. @@ -559,7 +804,7 @@ mod test { shuffling_epoch: Epoch::from(i), shuffling_decision_block: Hash256::from_low_u64_be(i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())); } assert!( @@ -580,4 +825,41 @@ mod test { "should limit cache size" ); } + + /// Pre-Gloas state across the Gloas fork: epoch G-1 returns `Some(PreGloas)`, epoch G and + /// G+1 return `None` (the boundary skip). + #[test] + fn try_from_state_skips_at_gloas_boundary() { + create_test_tracing_subscriber(); + + let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let gloas_fork_epoch = Epoch::new(2); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(Arc::new(spec.clone())) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let state = harness.get_current_state(); + assert!(!state.fork_name_unchecked().gloas_enabled()); + + for (epoch, expect_pre_gloas) in [ + (gloas_fork_epoch - 1, true), + (gloas_fork_epoch, false), + (gloas_fork_epoch + 1, false), + ] { + let result = CachedPTCs::::try_from_state(&state, epoch, &spec) + .expect("must not error at the boundary"); + if expect_pre_gloas { + assert!( + matches!(result, Some(CachedPTCs::PreGloas)), + "epoch {}: expected Some(PreGloas)", + epoch + ); + } else { + assert!(result.is_none(), "epoch {}: expected None", epoch); + } + } + } } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cb916cb514..6408f861f8 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -15,7 +15,9 @@ //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + BeaconChain, BeaconChainError, BeaconChainTypes, + chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + shuffling_cache::{CachedPTCs, CachedShuffling}, }; use slot_clock::SlotClock; use state_processing::per_slot_processing; @@ -394,19 +396,30 @@ fn advance_head(beacon_chain: &Arc>) -> Resu .map_err(BeaconChainError::from)?; let committee_cache = state .committee_cache(RelativeEpoch::Next) - .map_err(BeaconChainError::from)?; - beacon_chain - .shuffling_cache - .write() - .insert_committee_cache(shuffling_id.clone(), committee_cache); + .map_err(BeaconChainError::from)? + .clone(); + let shuffling_epoch = RelativeEpoch::Next.into_epoch(state.current_epoch()); - debug!( - ?head_block_root, - next_epoch_shuffling_root = ?shuffling_id.shuffling_decision_block, - state_epoch = %state.current_epoch(), - current_epoch = %current_slot.epoch(T::EthSpec::slots_per_epoch()), - "Primed proposer and attester caches" - ); + if let Some(ptcs) = CachedPTCs::try_from_state(&state, shuffling_epoch, &beacon_chain.spec)? + { + beacon_chain.shuffling_cache.write().insert_committee_cache( + shuffling_id.clone(), + CachedShuffling::new(committee_cache, ptcs), + ); + + debug!( + ?head_block_root, + next_epoch_shuffling_root = ?shuffling_id.shuffling_decision_block, + state_epoch = %state.current_epoch(), + current_epoch = %current_slot.epoch(T::EthSpec::slots_per_epoch()), + "Primed proposer and attester caches" + ); + } else { + debug!( + %shuffling_epoch, + "Skipping priming of attester cache for Gloas boundary epoch" + ); + } } let final_slot = state.slot(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7e50f4e5ac..0ac77dcfaa 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1209,7 +1209,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.current_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Current).unwrap(); // We used to check for false negatives here, but had to remove that check // because `shuffling_is_compatible` does not guarantee their absence. @@ -1247,7 +1248,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.previous_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Previous).unwrap(); if previous_epoch_shuffling_is_compatible { assert_eq!(committee_cache, state_cache.as_ref()); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3e8845f017..14cda1b483 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4258,8 +4258,7 @@ impl NetworkBeaconProcessor { "payload_attn_invalid_sig", ); } - PayloadAttestationError::BeaconChainError(_) - | PayloadAttestationError::BeaconStateError(_) => { + PayloadAttestationError::BeaconChainError(_) => { debug!( %peer_id, %message_slot, diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index ef7109dd94..f56cb17554 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -363,6 +363,26 @@ pub fn indexed_payload_attestation_signature_set<'a, 'b, E, F>( indexed_payload_attestation: &'b IndexedPayloadAttestation, spec: &'a ChainSpec, ) -> Result> +where + E: EthSpec, + F: Fn(usize) -> Option>, +{ + indexed_payload_attestation_signature_set_from_pubkeys( + get_pubkey, + signature, + indexed_payload_attestation, + state.genesis_validators_root(), + spec, + ) +} + +pub fn indexed_payload_attestation_signature_set_from_pubkeys<'a, 'b, E, F>( + get_pubkey: F, + signature: &'a AggregateSignature, + indexed_payload_attestation: &'b IndexedPayloadAttestation, + genesis_validators_root: Hash256, + spec: &'a ChainSpec, +) -> Result> where E: EthSpec, F: Fn(usize) -> Option>, @@ -379,12 +399,7 @@ where .slot .epoch(E::slots_per_epoch()); let fork = spec.fork_at_epoch(epoch); - let domain = spec.get_domain( - epoch, - Domain::PTCAttester, - &fork, - state.genesis_validators_root(), - ); + let domain = spec.get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root); let message = indexed_payload_attestation.data.signing_root(domain);