diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index f35de59e1f..635ca3a2ae 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { let (committee_opt, committees_per_slot) = chain.with_committee_cache( attestation.data.target.root, attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committee_opt = committee_cache .get_beacon_committee(attestation.data.slot, attestation.committee_index) .map(|beacon_committee| beacon_committee.committee.to_vec()); @@ -1574,7 +1575,8 @@ where return Err(Error::UnknownTargetRoot(target.root)); } - chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| { + chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committees_per_slot = committee_cache.committees_per_slot(); Ok(committee_cache diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1a3edca8f1..e7d2390e35 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -72,7 +72,7 @@ use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache; use crate::shuffling_cache::{ - BlockShufflingIds, CachedShuffling, ShufflingCache, get_ptc_for_shuffling_epoch, + CachedShuffling, ShufflingCache, get_ptc_for_shuffling_epoch, with_cached_shuffling, }; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, @@ -1711,7 +1711,8 @@ impl BeaconChain { let (duties, dependent_root) = self.with_committee_cache( head_block_root, epoch, - |committee_cache, dependent_root| { + |cached_shuffling, dependent_root| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let duties = validator_indices .iter() .map(|validator_index| { @@ -6868,11 +6869,11 @@ impl BeaconChain { ) } - /// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head + /// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head /// `head_block_root`. The `map_fn` will be supplied two values: /// - /// - `&CommitteeCache`: the committee cache that serves the given parameters. - /// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`. + /// - `&CachedShuffling`: the committee cache and optional PTC that serves the given parameters. + /// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling. /// /// It's not necessary that `head_block_root` matches our current view of the chain, it can be /// any block that is: @@ -6889,12 +6890,12 @@ impl BeaconChain { /// /// ## Notes /// - /// This function exists in this odd "map" pattern because efficiently obtaining a committee + /// This function exists in this odd "map" pattern because efficiently obtaining a shuffling /// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache` /// or it might involve reading it from a state from the DB. Due to the complexities of /// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here. /// - /// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the + /// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the /// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`. pub fn with_committee_cache( &self, @@ -6903,154 +6904,17 @@ impl BeaconChain { map_fn: F, ) -> Result where - F: Fn(&CommitteeCache, Hash256) -> Result, + F: Fn(&CachedShuffling, Hash256) -> Result, { - let head_block = self - .canonical_head - .fork_choice_read_lock() - .get_block(&head_block_root) - .ok_or(Error::MissingBeaconBlock(head_block_root))?; - - let shuffling_id = BlockShufflingIds { - current: head_block.current_epoch_shuffling_id.clone(), - next: head_block.next_epoch_shuffling_id.clone(), - previous: None, - block_root: head_block.root, - } - .id_for_epoch(shuffling_epoch) - .ok_or_else(|| Error::InvalidShufflingId { + with_cached_shuffling( + &self.canonical_head, + &self.shuffling_cache, + &self.store, + &self.spec, + head_block_root, shuffling_epoch, - head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), - })?; - - // Obtain the shuffling cache, timing how long we wait. - let mut shuffling_cache = { - let _ = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); - self.shuffling_cache.write() - }; - - if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { - // The shuffling cache is no longer required, drop the write-lock to allow concurrent - // access. - drop(shuffling_cache); - - let cached_shuffling = cache_item.wait()?; - map_fn( - &cached_shuffling.committee_cache, - shuffling_id.shuffling_decision_block, - ) - } else { - // Create an entry in the cache that "promises" this value will eventually be computed. - // This avoids the case where multiple threads attempt to produce the same value at the - // same time. - // - // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same - // promise from being created twice. - let sender = shuffling_cache.create_promise(shuffling_id.clone())?; - - // Drop the shuffling cache to avoid holding the lock for any longer than - // required. - drop(shuffling_cache); - - debug!( - shuffling_id = ?shuffling_epoch, - head_block_root = head_block_root.to_string(), - "Committee cache miss" - ); - - // If the block's state will be so far ahead of `shuffling_epoch` that even its - // previous epoch committee cache will be too new, then error. Callers of this function - // shouldn't be requesting such old shufflings for this `head_block_root`. - let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); - if head_block_epoch > shuffling_epoch + 1 { - return Err(Error::InvalidStateForShuffling { - state_epoch: head_block_epoch, - shuffling_epoch, - }); - } - - let state_read_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); - - // If the head of the chain can serve this request, use it. - // - // This code is a little awkward because we need to ensure that the head we read and - // the head we copy is identical. Taking one lock to read the head values and another - // to copy the head is liable to race-conditions. - let head_state_opt = self.with_head(|head| { - if head.beacon_block_root == head_block_root { - Ok(Some((head.beacon_state.clone(), head.beacon_state_root()))) - } else { - Ok::<_, Error>(None) - } - })?; - - // Compute the `target_slot` to advance the block's state to. - // - // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to - // only advance into the first slot of the epoch prior to `shuffling_epoch`. - // - // If the `head_block` is already ahead of that slot, then we should load the state - // at that slot, as we've determined above that the `shuffling_epoch` cache will - // not be too far in the past. - let target_slot = std::cmp::max( - shuffling_epoch - .saturating_sub(1_u64) - .start_slot(T::EthSpec::slots_per_epoch()), - head_block.slot, - ); - - // If the head state is useful for this request, use it. Otherwise, read a state from - // disk that is advanced as close as possible to `target_slot`. - let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { - (state, state_root) - } else { - // We assume that the `Pending` state has the same shufflings as a `Full` state - // for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root - let (state_root, state) = self - .store - .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? - .ok_or(Error::MissingBeaconState(head_block.state_root))?; - (state, state_root) - }; - - metrics::stop_timer(state_read_timer); - let state_skip_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); - - // If the state is still in an earlier epoch, advance it to the `target_slot` so - // that its next epoch committee cache matches the `shuffling_epoch`. - if state.current_epoch() + 1 < shuffling_epoch { - // Advance the state into the required slot, using the "partial" method since the - // state roots are not relevant for the shuffling. - partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?; - } - metrics::stop_timer(state_skip_timer); - - let committee_building_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); - - let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) - .map_err(Error::IncorrectStateForAttestation)?; - - state.build_committee_cache(relative_epoch, &self.spec)?; - - let committee_cache = state.committee_cache(relative_epoch)?.clone(); - let ptc = get_ptc_for_shuffling_epoch(&state, shuffling_epoch, &self.spec)?; - let shuffling_decision_block = shuffling_id.shuffling_decision_block; - let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptc); - - self.shuffling_cache - .write() - .insert_committee_cache_with_ptc(shuffling_id, cached_shuffling.clone()); - - metrics::stop_timer(committee_building_timer); - - sender.send(cached_shuffling.clone()); - - map_fn(&cached_shuffling.committee_cache, shuffling_decision_block) - } + map_fn, + ) } /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index c36c73b344..6ff5b55ae5 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -2,6 +2,7 @@ use super::Error; use crate::beacon_chain::BeaconStore; use crate::canonical_head::CanonicalHead; use crate::observed_attesters::ObservedPayloadAttesters; +use crate::shuffling_cache::{ShufflingCache, with_cached_shuffling}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use bls::AggregateSignature; @@ -20,6 +21,7 @@ pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { pub spec: &'a ChainSpec, pub observed_payload_attesters: &'a RwLock>, pub canonical_head: &'a CanonicalHead, + pub shuffling_cache: &'a RwLock>, pub validator_pubkey_cache: &'a RwLock>, pub store: &'a BeaconStore, } @@ -76,13 +78,32 @@ impl VerifiedPayloadAttestationMessage { return Err(Error::UnknownHeadBlock { beacon_block_root }); } - // Get head state for PTC computation. If the cached head state is too stale + let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let ptc = with_cached_shuffling( + ctx.canonical_head, + ctx.shuffling_cache, + ctx.store, + ctx.spec, + beacon_block_root, + message_epoch, + |cached_shuffling, _| Ok::<_, Error>(cached_shuffling.ptc.clone()), + )? + .ok_or(Error::MissingPTC { slot })?; + + // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. + if !ptc.0.contains(&(validator_index as usize)) { + return Err(Error::NotInPTC { + validator_index, + slot, + }); + } + + // Get a state for signature verification. If the cached head state is too stale // (e.g. during liveness failures with many skipped slots), fall back to loading // a more recent state from the store and advancing it if necessary. let head = ctx.canonical_head.cached_head(); let head_state = &head.snapshot.beacon_state; - let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let state_epoch = head_state.current_epoch(); // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. @@ -124,15 +145,6 @@ impl VerifiedPayloadAttestationMessage { let state = advanced_state.as_ref().unwrap_or(head_state); - // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. - let ptc = state.get_ptc(slot, ctx.spec)?; - if !ptc.0.contains(&(validator_index as usize)) { - return Err(Error::NotInPTC { - validator_index, - slot, - }); - } - // Build the indexed form for signature verification and downstream fork choice. let indexed_payload_attestation = IndexedPayloadAttestation { attesting_indices: vec![validator_index] @@ -204,6 +216,7 @@ impl BeaconChain { spec: &self.spec, observed_payload_attesters: &self.observed_payload_attesters, canonical_head: &self.canonical_head, + shuffling_cache: &self.shuffling_cache, validator_pubkey_cache: &self.validator_pubkey_cache, store: &self.store, } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs index 477527c0aa..f32e264a90 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -66,6 +66,12 @@ pub enum Error { /// /// The peer has sent an invalid message. NotInPTC { validator_index: u64, slot: Slot }, + /// The shuffling cache entry did not contain a PTC for this slot. + /// + /// ## Peer scoring + /// + /// We were unable to process this message due to an internal error. + MissingPTC { slot: Slot }, /// The validator index is unknown. /// /// ## Peer scoring diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs index 7faad98e55..4c44eb88f5 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -1,33 +1,18 @@ -use std::sync::Arc; -use std::time::Duration; - -use bls::{Keypair, Signature}; -use fork_choice::ForkChoice; -use genesis::{generate_deterministic_keypairs, interop_genesis_state}; -use parking_lot::RwLock; -use proto_array::PayloadStatus; -use slot_clock::{SlotClock, TestingSlotClock}; +use bls::Signature; use state_processing::AllCaches; -use state_processing::genesis::genesis_block; -use store::{HotColdDB, StoreConfig}; use types::{ - ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, - PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, + Domain, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, + SignedRoot, Slot, }; use crate::{ - beacon_fork_choice_store::BeaconForkChoiceStore, - beacon_snapshot::BeaconSnapshot, - canonical_head::CanonicalHead, - observed_attesters::ObservedPayloadAttesters, payload_attestation_verification::{ Error as PayloadAttestationError, gossip_verified_payload_attestation::{ GossipVerificationContext, VerifiedPayloadAttestationMessage, }, }, - test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, - validator_pubkey_cache::ValidatorPubkeyCache, + test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env}, }; type E = MinimalEthSpec; @@ -36,96 +21,41 @@ type T = EphemeralHarnessType; const NUM_VALIDATORS: usize = 64; struct TestContext { - canonical_head: CanonicalHead, - observed_payload_attesters: RwLock>, - validator_pubkey_cache: RwLock>, - slot_clock: TestingSlotClock, - keypairs: Vec, - spec: ChainSpec, + harness: BeaconChainHarness, genesis_block_root: Hash256, - store: Arc, store::MemoryStore>>, } impl TestContext { fn new() -> Self { - let spec = test_spec::(); - let store = Arc::new( - HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) - .expect("should open ephemeral store"), - ); + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .build(); - let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); - - let mut state = - interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) - .expect("should build genesis state"); - - *state.finalized_checkpoint_mut() = Checkpoint { - epoch: Epoch::new(1), - root: Hash256::ZERO, - }; - - let mut block = genesis_block(&state, &spec).expect("should build genesis block"); - *block.state_root_mut() = state - .update_tree_hash_cache() - .expect("should hash genesis state"); - let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); - let block_root = signed_block.canonical_root(); - - let snapshot = BeaconSnapshot::new( - Arc::new(signed_block.clone()), - None, - block_root, - state.clone(), - ); - - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) - .expect("should create fork choice store"); - let fork_choice = - ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) - .expect("should create fork choice"); - - let canonical_head = - CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); - - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(0), - spec.get_slot_duration(), - ); // Advance past genesis so `now_with_past_tolerance` doesn't underflow. - slot_clock.set_current_time(spec.get_slot_duration()); - - let validator_pubkey_cache = - ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + harness + .chain + .slot_clock + .set_current_time(harness.spec.get_slot_duration()); + let genesis_block_root = harness.chain.genesis_block_root; Self { - canonical_head, - observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), - validator_pubkey_cache: RwLock::new(validator_pubkey_cache), - slot_clock, - keypairs, - spec, - genesis_block_root: block_root, - store, + harness, + genesis_block_root, } } fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { - GossipVerificationContext { - slot_clock: &self.slot_clock, - spec: &self.spec, - observed_payload_attesters: &self.observed_payload_attesters, - canonical_head: &self.canonical_head, - validator_pubkey_cache: &self.validator_pubkey_cache, - store: &self.store, - } + self.harness.chain.payload_attestation_gossip_context() } fn ptc_members(&self, slot: Slot) -> Vec { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + let ptc = state + .get_ptc(slot, &self.harness.spec) + .expect("should get PTC"); ptc.0.to_vec() } @@ -134,16 +64,18 @@ impl TestContext { data: PayloadAttestationData, validator_index: u64, ) -> PayloadAttestationMessage { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let domain = self.spec.get_domain( + let domain = self.harness.spec.get_domain( data.slot.epoch(E::slots_per_epoch()), Domain::PTCAttester, &state.fork(), state.genesis_validators_root(), ); let message = data.signing_root(domain); - let signature = self.keypairs[validator_index as usize].sk.sign(message); + let signature = self.harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); PayloadAttestationMessage { validator_index, data, @@ -192,7 +124,7 @@ fn past_slot() { return; } let ctx = TestContext::new(); - ctx.slot_clock.set_slot(5); + ctx.harness.chain.slot_clock.set_slot(5); let gossip = ctx.gossip_ctx(); let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index acaff24e5c..ec96b3d8dd 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -3,13 +3,17 @@ use std::sync::Arc; use itertools::Itertools; use oneshot_broadcast::{Receiver, Sender, oneshot}; +use parking_lot::RwLock; +use state_processing::state_advance::partial_state_advance; use tracing::debug; use types::{ AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC, RelativeEpoch, state::CommitteeCache, }; -use crate::{BeaconChainError, metrics}; +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, canonical_head::CanonicalHead, metrics, +}; /// The size of the cache that stores shufflings for quicker verification. /// @@ -235,6 +239,164 @@ impl ShufflingCache { } } +pub fn with_cached_shuffling( + canonical_head: &CanonicalHead, + shuffling_cache_lock: &RwLock>, + store: &BeaconStore, + spec: &ChainSpec, + head_block_root: Hash256, + shuffling_epoch: Epoch, + map_fn: F, +) -> Result +where + T: BeaconChainTypes, + F: Fn(&CachedShuffling, Hash256) -> Result, + Error: From, +{ + let head_block = canonical_head + .fork_choice_read_lock() + .get_block(&head_block_root) + .ok_or(BeaconChainError::MissingBeaconBlock(head_block_root))?; + + let shuffling_id = BlockShufflingIds { + current: head_block.current_epoch_shuffling_id.clone(), + next: head_block.next_epoch_shuffling_id.clone(), + previous: None, + block_root: head_block.root, + } + .id_for_epoch(shuffling_epoch) + .ok_or_else(|| BeaconChainError::InvalidShufflingId { + shuffling_epoch, + head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), + })?; + + let mut shuffling_cache = { + let _ = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); + shuffling_cache_lock.write() + }; + + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + drop(shuffling_cache); + + let cached_shuffling = cache_item.wait()?; + map_fn(&cached_shuffling, shuffling_id.shuffling_decision_block) + } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + + // Drop the shuffling cache to avoid holding the lock for any longer than required. + drop(shuffling_cache); + + debug!( + shuffling_id = ?shuffling_epoch, + head_block_root = head_block_root.to_string(), + "Committee cache miss" + ); + + // If the block's state will be so far ahead of `shuffling_epoch` that even its previous + // epoch committee cache will be too new, then error. Callers of this function shouldn't be + // requesting such old shufflings for this `head_block_root`. + let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); + if head_block_epoch > shuffling_epoch + 1 { + return Err(BeaconChainError::InvalidStateForShuffling { + state_epoch: head_block_epoch, + shuffling_epoch, + } + .into()); + } + + let state_read_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); + + let cached_head = canonical_head.cached_head(); + let head_state_opt = if cached_head.head_block_root() == head_block_root { + Some(( + cached_head.snapshot.beacon_state.clone(), + cached_head.head_state_root(), + )) + } else { + None + }; + + // Compute the `target_slot` to advance the block's state to. + // + // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to only + // advance into the first slot of the epoch prior to `shuffling_epoch`. + // + // If the `head_block` is already ahead of that slot, then we should load the state at that + // slot, as we've determined above that the `shuffling_epoch` cache will not be too far in + // the past. + let target_slot = std::cmp::max( + shuffling_epoch + .saturating_sub(1_u64) + .start_slot(T::EthSpec::slots_per_epoch()), + head_block.slot, + ); + + // If the head state is useful for this request, use it. Otherwise, read a state from disk + // that is advanced as close as possible to `target_slot`. + let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { + (state, state_root) + } else { + // We assume that the `Pending` state has the same shufflings as a `Full` state for the + // same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root + let (state_root, state) = store + .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root) + .map_err(BeaconChainError::DBError)? + .ok_or(BeaconChainError::MissingBeaconState(head_block.state_root))?; + (state, state_root) + }; + + metrics::stop_timer(state_read_timer); + let state_skip_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); + + // If the state is still in an earlier epoch, advance it to the `target_slot` so that its + // next epoch committee cache matches the `shuffling_epoch`. + if state.current_epoch() + 1 < shuffling_epoch { + // Advance the state into the required slot, using the "partial" method since the state + // roots are not relevant for the shuffling. + partial_state_advance(&mut state, Some(state_root), target_slot, spec) + .map_err(BeaconChainError::from)?; + } + metrics::stop_timer(state_skip_timer); + + let committee_building_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); + + let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) + .map_err(BeaconChainError::IncorrectStateForAttestation)?; + + state + .build_committee_cache(relative_epoch, spec) + .map_err(BeaconChainError::from)?; + + let committee_cache = state + .committee_cache(relative_epoch) + .map_err(BeaconChainError::from)? + .clone(); + let ptc = get_ptc_for_shuffling_epoch(&state, shuffling_epoch, spec) + .map_err(BeaconChainError::from)?; + let shuffling_decision_block = shuffling_id.shuffling_decision_block; + let cached_shuffling = CachedShuffling::new(committee_cache, ptc); + + shuffling_cache_lock + .write() + .insert_committee_cache_with_ptc(shuffling_id, cached_shuffling.clone()); + + metrics::stop_timer(committee_building_timer); + + sender.send(cached_shuffling.clone()); + + map_fn(&cached_shuffling, shuffling_decision_block) + } +} + /// Return the PTC associated with the first slot in `shuffling_epoch`, when the state supports PTCs. pub fn get_ptc_for_shuffling_epoch( state: &BeaconState, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 1576092c81..e11b4a1472 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1177,7 +1177,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.current_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Current).unwrap(); // We used to check for false negatives here, but had to remove that check // because `shuffling_is_compatible` does not guarantee their absence. @@ -1215,7 +1216,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.previous_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Previous).unwrap(); if previous_epoch_shuffling_is_compatible { assert_eq!(committee_cache, state_cache.as_ref()); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 0135d7f5dd..a564dc1090 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4241,7 +4241,8 @@ impl NetworkBeaconProcessor { "payload_attn_invalid_sig", ); } - PayloadAttestationError::BeaconChainError(_) + PayloadAttestationError::MissingPTC { .. } + | PayloadAttestationError::BeaconChainError(_) | PayloadAttestationError::BeaconStateError(_) => { debug!( %peer_id,