diff --git a/Cargo.lock b/Cargo.lock index d42bcd8fc1..129be32fcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2857,6 +2857,7 @@ dependencies = [ "kzg", "logging", "milhouse", + "proto_array", "rayon", "serde", "serde_json", diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index f35de59e1f..635ca3a2ae 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { let (committee_opt, committees_per_slot) = chain.with_committee_cache( attestation.data.target.root, attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committee_opt = committee_cache .get_beacon_committee(attestation.data.slot, attestation.committee_index) .map(|beacon_committee| beacon_committee.committee.to_vec()); @@ -1574,7 +1575,8 @@ where return Err(Error::UnknownTargetRoot(target.root)); } - chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| { + chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let committees_per_slot = committee_cache.committees_per_slot(); Ok(committee_cache diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 8c0363608a..e6a6e97f7e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,7 +77,7 @@ use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache; -use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; +use crate::shuffling_cache::{CachedPTCs, CachedShuffling, ShufflingCache, with_cached_shuffling}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -113,7 +113,7 @@ use operation_pool::{ CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella, }; use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; -use proto_array::{DoNotReOrg, ProposerHeadError}; +use proto_array::{DoNotReOrg, ProposerHeadError, ReOrgThreshold}; use rand::RngCore; use safe_arith::SafeArith; use slasher::Slasher; @@ -472,7 +472,7 @@ pub struct BeaconChain { /// HTTP server is enabled. pub event_handler: Option>, /// Caches the attester shuffling for a given epoch and shuffling key root. - pub shuffling_cache: RwLock, + pub shuffling_cache: RwLock>, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. pub beacon_proposer_cache: Arc>, /// Caches a map of `validator_index -> validator_pubkey`. @@ -1696,7 +1696,8 @@ impl BeaconChain { let (duties, dependent_root) = self.with_committee_cache( head_block_root, epoch, - |committee_cache, dependent_root| { + |cached_shuffling, dependent_root| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let duties = validator_indices .iter() .map(|validator_index| { @@ -4914,15 +4915,20 @@ impl BeaconChain { ) -> Result<(), BlockError> { for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] { let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?; + let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch()); - let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id); + if self.shuffling_cache.read().contains(&shuffling_id) { + continue; + } - if !shuffling_is_cached { - state.build_committee_cache(relative_epoch, &self.spec)?; - let committee_cache = state.committee_cache(relative_epoch)?; - self.shuffling_cache - .write() - .insert_committee_cache(shuffling_id, committee_cache); + state.build_committee_cache(relative_epoch, &self.spec)?; + let committee_cache = state.committee_cache(relative_epoch)?.clone(); + + if let Some(ptcs) = CachedPTCs::try_from_state(state, shuffling_epoch, &self.spec)? { + self.shuffling_cache.write().insert_committee_cache( + shuffling_id, + CachedShuffling::new(committee_cache, ptcs), + ); } } Ok(()) @@ -5239,15 +5245,14 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::FORK_CHOICE_OVERRIDE_FCU_TIMES); // Never override if proposer re-orgs are disabled. - let re_org_head_threshold = self - .config - .re_org_head_threshold - .ok_or(Box::new(DoNotReOrg::ReOrgsDisabled.into()))?; + if self.config.disable_proposer_reorg { + return Err(Box::new(DoNotReOrg::ReOrgsDisabled.into())); + }; - let re_org_parent_threshold = self - .config - .re_org_parent_threshold - .ok_or(Box::new(DoNotReOrg::ReOrgsDisabled.into()))?; + let re_org_head_threshold = ReOrgThreshold(self.spec.reorg_head_weight_threshold); + let re_org_parent_threshold = ReOrgThreshold(self.spec.reorg_parent_weight_threshold); + let re_org_max_epochs_since_finalization = + Epoch::new(self.spec.reorg_max_epochs_since_finalization); let head_block_root = canonical_forkchoice_params.head_root; @@ -5260,7 +5265,7 @@ impl BeaconChain { re_org_head_threshold, re_org_parent_threshold, &self.config.re_org_disallowed_offsets, - self.config.re_org_max_epochs_since_finalization, + re_org_max_epochs_since_finalization, ) .map_err(|e| e.map_inner_error(Error::ProposerHeadForkChoiceError))?; @@ -5281,7 +5286,12 @@ impl BeaconChain { .and_then(|slot_start| { let now = self.slot_clock.now_duration()?; let slot_delay = now.saturating_sub(slot_start); - Some(slot_delay <= self.config.re_org_cutoff(self.spec.get_slot_duration())) + let re_org_cutoff_duration = self + .spec + .compute_slot_component_duration(self.spec.proposer_reorg_cutoff_bps) + .ok()?; + + Some(slot_delay <= re_org_cutoff_duration) }) .unwrap_or(false) } else { @@ -7009,11 +7019,11 @@ impl BeaconChain { ) } - /// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head + /// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head /// `head_block_root`. The `map_fn` will be supplied two values: /// - /// - `&CommitteeCache`: the committee cache that serves the given parameters. - /// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`. + /// - `&CachedShuffling`: the committee cache and optional PTCs that serve the given parameters. + /// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling. /// /// It's not necessary that `head_block_root` matches our current view of the chain, it can be /// any block that is: @@ -7030,12 +7040,12 @@ impl BeaconChain { /// /// ## Notes /// - /// This function exists in this odd "map" pattern because efficiently obtaining a committee + /// This function exists in this odd "map" pattern because efficiently obtaining a shuffling /// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache` /// or it might involve reading it from a state from the DB. Due to the complexities of /// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here. /// - /// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the + /// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the /// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`. pub fn with_committee_cache( &self, @@ -7044,149 +7054,17 @@ impl BeaconChain { map_fn: F, ) -> Result where - F: Fn(&CommitteeCache, Hash256) -> Result, + F: Fn(&CachedShuffling, Hash256) -> Result, { - let head_block = self - .canonical_head - .fork_choice_read_lock() - .get_block(&head_block_root) - .ok_or(Error::MissingBeaconBlock(head_block_root))?; - - let shuffling_id = BlockShufflingIds { - current: head_block.current_epoch_shuffling_id.clone(), - next: head_block.next_epoch_shuffling_id.clone(), - previous: None, - block_root: head_block.root, - } - .id_for_epoch(shuffling_epoch) - .ok_or_else(|| Error::InvalidShufflingId { + with_cached_shuffling( + &self.canonical_head, + &self.shuffling_cache, + &self.store, + &self.spec, + head_block_root, shuffling_epoch, - head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), - })?; - - // Obtain the shuffling cache, timing how long we wait. - let mut shuffling_cache = { - let _ = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); - self.shuffling_cache.write() - }; - - if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { - // The shuffling cache is no longer required, drop the write-lock to allow concurrent - // access. - drop(shuffling_cache); - - let committee_cache = cache_item.wait()?; - map_fn(&committee_cache, shuffling_id.shuffling_decision_block) - } else { - // Create an entry in the cache that "promises" this value will eventually be computed. - // This avoids the case where multiple threads attempt to produce the same value at the - // same time. - // - // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same - // promise from being created twice. - let sender = shuffling_cache.create_promise(shuffling_id.clone())?; - - // Drop the shuffling cache to avoid holding the lock for any longer than - // required. - drop(shuffling_cache); - - debug!( - shuffling_id = ?shuffling_epoch, - head_block_root = head_block_root.to_string(), - "Committee cache miss" - ); - - // If the block's state will be so far ahead of `shuffling_epoch` that even its - // previous epoch committee cache will be too new, then error. Callers of this function - // shouldn't be requesting such old shufflings for this `head_block_root`. - let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); - if head_block_epoch > shuffling_epoch + 1 { - return Err(Error::InvalidStateForShuffling { - state_epoch: head_block_epoch, - shuffling_epoch, - }); - } - - let state_read_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); - - // If the head of the chain can serve this request, use it. - // - // This code is a little awkward because we need to ensure that the head we read and - // the head we copy is identical. Taking one lock to read the head values and another - // to copy the head is liable to race-conditions. - let head_state_opt = self.with_head(|head| { - if head.beacon_block_root == head_block_root { - Ok(Some((head.beacon_state.clone(), head.beacon_state_root()))) - } else { - Ok::<_, Error>(None) - } - })?; - - // Compute the `target_slot` to advance the block's state to. - // - // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to - // only advance into the first slot of the epoch prior to `shuffling_epoch`. - // - // If the `head_block` is already ahead of that slot, then we should load the state - // at that slot, as we've determined above that the `shuffling_epoch` cache will - // not be too far in the past. - let target_slot = std::cmp::max( - shuffling_epoch - .saturating_sub(1_u64) - .start_slot(T::EthSpec::slots_per_epoch()), - head_block.slot, - ); - - // If the head state is useful for this request, use it. Otherwise, read a state from - // disk that is advanced as close as possible to `target_slot`. - let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { - (state, state_root) - } else { - // We assume that the `Pending` state has the same shufflings as a `Full` state - // for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root - let (state_root, state) = self - .store - .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)? - .ok_or(Error::MissingBeaconState(head_block.state_root))?; - (state, state_root) - }; - - metrics::stop_timer(state_read_timer); - let state_skip_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); - - // If the state is still in an earlier epoch, advance it to the `target_slot` so - // that its next epoch committee cache matches the `shuffling_epoch`. - if state.current_epoch() + 1 < shuffling_epoch { - // Advance the state into the required slot, using the "partial" method since the - // state roots are not relevant for the shuffling. - partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?; - } - metrics::stop_timer(state_skip_timer); - - let committee_building_timer = - metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); - - let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) - .map_err(Error::IncorrectStateForAttestation)?; - - state.build_committee_cache(relative_epoch, &self.spec)?; - - let committee_cache = state.committee_cache(relative_epoch)?.clone(); - let shuffling_decision_block = shuffling_id.shuffling_decision_block; - - self.shuffling_cache - .write() - .insert_committee_cache(shuffling_id, &committee_cache); - - metrics::stop_timer(committee_building_timer); - - sender.send(committee_cache.clone()); - - map_fn(&committee_cache, shuffling_decision_block) - } + map_fn, + ) } /// Dumps the entire canonical chain, from the head to genesis to a vector for analysis. diff --git a/beacon_node/beacon_chain/src/block_production/mod.rs b/beacon_node/beacon_chain/src/block_production/mod.rs index fd5e381023..a94bc697b9 100644 --- a/beacon_node/beacon_chain/src/block_production/mod.rs +++ b/beacon_node/beacon_chain/src/block_production/mod.rs @@ -1,10 +1,10 @@ use std::{sync::Arc, time::Duration}; use fork_choice::PayloadStatus; -use proto_array::ProposerHeadError; +use proto_array::{ProposerHeadError, ReOrgThreshold}; use slot_clock::SlotClock; use tracing::{debug, error, info, instrument, warn}; -use types::{BeaconState, Hash256, SignedExecutionPayloadEnvelope, Slot}; +use types::{BeaconState, Epoch, Hash256, SignedExecutionPayloadEnvelope, Slot}; use crate::{ BeaconChain, BeaconChainTypes, BlockProductionError, StateSkipConfig, @@ -174,8 +174,10 @@ impl BeaconChain { head_slot: Slot, canonical_head: Hash256, ) -> Option<(BeaconState, Hash256)> { - let re_org_head_threshold = self.config.re_org_head_threshold?; - let re_org_parent_threshold = self.config.re_org_parent_threshold?; + let re_org_head_threshold = ReOrgThreshold(self.spec.reorg_head_weight_threshold); + let re_org_parent_threshold = ReOrgThreshold(self.spec.reorg_parent_weight_threshold); + let re_org_max_epochs_since_finalization = + Epoch::new(self.spec.reorg_max_epochs_since_finalization); if self.spec.proposer_score_boost.is_none() { warn!( @@ -198,8 +200,12 @@ impl BeaconChain { // 1. It seems we have time to propagate and still receive the proposer boost. // 2. The current head block was seen late. // 3. The `get_proposer_head` conditions from fork choice pass. - let proposing_on_time = - slot_delay < self.config.re_org_cutoff(self.spec.get_slot_duration()); + let re_org_cutoff_duration = self + .spec + .compute_slot_component_duration(self.spec.proposer_reorg_cutoff_bps) + .ok()?; + + let proposing_on_time = slot_delay < re_org_cutoff_duration; if !proposing_on_time { debug!(reason = "not proposing on time", "Not attempting re-org"); return None; @@ -223,7 +229,7 @@ impl BeaconChain { re_org_head_threshold, re_org_parent_threshold, &self.config.re_org_disallowed_offsets, - self.config.re_org_max_epochs_since_finalization, + re_org_max_epochs_since_finalization, ) .map_err(|e| match e { ProposerHeadError::DoNotReOrg(reason) => { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index f392268106..12a4b15211 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -30,7 +30,7 @@ use kzg::Kzg; use logging::crit; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; -use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; +use proto_array::DisallowedReOrgOffsets; use rand::RngCore; use rayon::prelude::*; use slasher::Slasher; @@ -47,8 +47,8 @@ use tracing::{debug, error, info, warn}; use tree_hash::TreeHash; use types::data::CustodyIndex; use types::{ - BeaconState, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList, Epoch, EthSpec, - Hash256, SignedBeaconBlock, Slot, + BeaconState, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec, Hash256, + SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -176,21 +176,6 @@ where self } - /// Sets the proposer re-org threshold. - pub fn proposer_re_org_head_threshold(mut self, threshold: Option) -> Self { - self.chain_config.re_org_head_threshold = threshold; - self - } - - /// Sets the proposer re-org max epochs since finalization. - pub fn proposer_re_org_max_epochs_since_finalization( - mut self, - epochs_since_finalization: Epoch, - ) -> Self { - self.chain_config.re_org_max_epochs_since_finalization = epochs_since_finalization; - self - } - /// Sets the proposer re-org disallowed offsets list. pub fn proposer_re_org_disallowed_offsets( mut self, diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index b2c017a469..dde09bf105 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -1,15 +1,10 @@ use crate::custody_context::NodeCustodyType; -pub use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; +pub use proto_array::DisallowedReOrgOffsets; use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::{collections::HashSet, sync::LazyLock, time::Duration}; -use types::{Checkpoint, Epoch, Hash256}; +use types::{Checkpoint, Hash256}; -pub const DEFAULT_RE_ORG_HEAD_THRESHOLD: ReOrgThreshold = ReOrgThreshold(20); -pub const DEFAULT_RE_ORG_PARENT_THRESHOLD: ReOrgThreshold = ReOrgThreshold(160); -pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2); -/// Default to 1/12th of the slot, which is 1 second on mainnet. -pub const DEFAULT_RE_ORG_CUTOFF_DENOMINATOR: u32 = 12; pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250; /// Default fraction of a slot lookahead for payload preparation (12/3 = 4 seconds on mainnet). @@ -41,14 +36,6 @@ pub struct ChainConfig { pub archive: bool, /// The max size of a message that can be sent over the network. pub max_network_size: usize, - /// Maximum percentage of the head committee weight at which to attempt re-orging the canonical head. - pub re_org_head_threshold: Option, - /// Minimum percentage of the parent committee weight at which to attempt re-orging the canonical head. - pub re_org_parent_threshold: Option, - /// Maximum number of epochs since finalization for attempting a proposer re-org. - pub re_org_max_epochs_since_finalization: Epoch, - /// Maximum delay after the start of the slot at which to propose a reorging block. - pub re_org_cutoff_millis: Option, /// Additional epoch offsets at which re-orging block proposals are not permitted. /// /// By default this list is empty, but it can be useful for reacting to network conditions, e.g. @@ -125,6 +112,8 @@ pub struct ChainConfig { pub enable_partial_columns: bool, /// The node's custody type, determining how many data columns to custody and sample. pub node_custody_type: NodeCustodyType, + /// Disable proposer re-org + pub disable_proposer_reorg: bool, } impl Default for ChainConfig { @@ -134,10 +123,6 @@ impl Default for ChainConfig { weak_subjectivity_checkpoint: None, archive: false, max_network_size: 10 * 1_048_576, // 10M - re_org_head_threshold: Some(DEFAULT_RE_ORG_HEAD_THRESHOLD), - re_org_parent_threshold: Some(DEFAULT_RE_ORG_PARENT_THRESHOLD), - re_org_max_epochs_since_finalization: DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, - re_org_cutoff_millis: None, re_org_disallowed_offsets: DisallowedReOrgOffsets::default(), fork_choice_before_proposal_timeout_ms: DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT, // Builder fallback configs that are set in `clap` will override these. @@ -168,15 +153,7 @@ impl Default for ChainConfig { disable_get_blobs: false, enable_partial_columns: false, node_custody_type: NodeCustodyType::Fullnode, + disable_proposer_reorg: false, } } } - -impl ChainConfig { - /// The latest delay from the start of the slot at which to attempt a 1-slot re-org. - pub fn re_org_cutoff(&self, slot_duration: Duration) -> Duration { - self.re_org_cutoff_millis - .map(Duration::from_millis) - .unwrap_or_else(|| slot_duration / DEFAULT_RE_ORG_CUTOFF_DENOMINATOR) - } -} diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9802f091e0..5efe9a3c23 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -251,6 +251,13 @@ pub enum BeaconChainError { request_epoch: Epoch, cache_epoch: Epoch, }, + AttesterCachePtcOutOfBounds { + slot: Slot, + epoch: Epoch, + }, + AttesterCacheNoPtcPreGloas { + slot: Slot, + }, SkipProposerPreparation, FailedColumnCustodyInfoUpdate, } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs index c36c73b344..3e9f9e4b60 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/gossip_verified_payload_attestation.rs @@ -2,26 +2,29 @@ use super::Error; use crate::beacon_chain::BeaconStore; use crate::canonical_head::CanonicalHead; use crate::observed_attesters::ObservedPayloadAttesters; +use crate::shuffling_cache::{ShufflingCache, with_cached_shuffling}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use bls::AggregateSignature; use educe::Educe; use eth2::types::{EventKind, ForkVersionedResponse}; use parking_lot::RwLock; -use safe_arith::SafeArith; use slot_clock::SlotClock; -use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set; -use state_processing::state_advance::partial_state_advance; +use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set_from_pubkeys; use std::borrow::Cow; -use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot}; +use types::{ + ChainSpec, EthSpec, Hash256, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot, +}; pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { pub slot_clock: &'a T::SlotClock, pub spec: &'a ChainSpec, pub observed_payload_attesters: &'a RwLock>, pub canonical_head: &'a CanonicalHead, + pub shuffling_cache: &'a RwLock>, pub validator_pubkey_cache: &'a RwLock>, pub store: &'a BeaconStore, + pub genesis_validators_root: Hash256, } /// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network. @@ -76,56 +79,18 @@ impl VerifiedPayloadAttestationMessage { return Err(Error::UnknownHeadBlock { beacon_block_root }); } - // Get head state for PTC computation. If the cached head state is too stale - // (e.g. during liveness failures with many skipped slots), fall back to loading - // a more recent state from the store and advancing it if necessary. - let head = ctx.canonical_head.cached_head(); - let head_state = &head.snapshot.beacon_state; - let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let state_epoch = head_state.current_epoch(); - - // get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead]. - // If the message epoch is beyond that range, the head state is stale. - let advanced_state = if message_epoch - > state_epoch - .safe_add(ctx.spec.min_seed_lookahead) - .map_err(BeaconChainError::from)? - { - let head_block_root = head.head_block_root(); - let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch()); - - let (state_root, mut state) = ctx - .store - .get_advanced_hot_state( - head_block_root, - target_slot, - head.snapshot.beacon_state_root(), - ) - .map_err(BeaconChainError::from)? - .ok_or(BeaconChainError::MissingBeaconState( - head.snapshot.beacon_state_root(), - ))?; - - if state - .current_epoch() - .safe_add(ctx.spec.min_seed_lookahead) - .map_err(BeaconChainError::from)? - < message_epoch - { - partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec) - .map_err(BeaconChainError::from)?; - } - - Some(state) - } else { - None - }; - - let state = advanced_state.as_ref().unwrap_or(head_state); + let ptc = with_cached_shuffling( + ctx.canonical_head, + ctx.shuffling_cache, + ctx.store, + ctx.spec, + beacon_block_root, + message_epoch, + |cached_shuffling, _| cached_shuffling.ptc_for_slot(slot), + )?; // [REJECT] `validator_index` is within `get_ptc(state, data.slot)`. - let ptc = state.get_ptc(slot, ctx.spec)?; if !ptc.0.contains(&(validator_index as usize)) { return Err(Error::NotInPTC { validator_index, @@ -145,11 +110,11 @@ impl VerifiedPayloadAttestationMessage { { // [REJECT] The signature is valid with respect to the `validator_index`. let pubkey_cache = ctx.validator_pubkey_cache.read(); - let signature_set = indexed_payload_attestation_signature_set( - state, + let signature_set = indexed_payload_attestation_signature_set_from_pubkeys( |validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed), &indexed_payload_attestation.signature, &indexed_payload_attestation, + ctx.genesis_validators_root, ctx.spec, ) .map_err(|_| Error::UnknownValidatorIndex(validator_index))?; @@ -204,8 +169,10 @@ impl BeaconChain { spec: &self.spec, observed_payload_attesters: &self.observed_payload_attesters, canonical_head: &self.canonical_head, + shuffling_cache: &self.shuffling_cache, validator_pubkey_cache: &self.validator_pubkey_cache, store: &self.store, + genesis_validators_root: self.genesis_validators_root, } } diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs index 477527c0aa..89ae1bbbdd 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/mod.rs @@ -9,7 +9,7 @@ use crate::BeaconChainError; use strum::AsRefStr; -use types::{BeaconStateError, Hash256, Slot}; +use types::{Hash256, Slot}; pub mod gossip_verified_payload_attestation; @@ -86,12 +86,6 @@ pub enum Error { /// We were unable to process this message due to an internal error. It's unclear if the /// message is valid. BeaconChainError(Box), - /// An error reading beacon state. - /// - /// ## Peer scoring - /// - /// We were unable to process this message due to an internal error. - BeaconStateError(BeaconStateError), } impl From for Error { @@ -100,11 +94,5 @@ impl From for Error { } } -impl From for Error { - fn from(e: BeaconStateError) -> Self { - Error::BeaconStateError(e) - } -} - #[cfg(test)] mod tests; diff --git a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs index c45df51ac8..d4b82c41fc 100644 --- a/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_attestation_verification/tests.rs @@ -1,25 +1,15 @@ use std::sync::Arc; use std::time::Duration; -use bls::{Keypair, Signature}; -use fork_choice::ForkChoice; -use genesis::{generate_deterministic_keypairs, interop_genesis_state}; -use parking_lot::RwLock; -use proto_array::PayloadStatus; +use bls::Signature; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::AllCaches; -use state_processing::genesis::genesis_block; -use store::{HotColdDB, StoreConfig}; use types::{ - ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, - PayloadAttestationMessage, SignedBeaconBlock, SignedRoot, Slot, + Domain, Epoch, EthSpec, ForkName, Hash256, MinimalEthSpec, PayloadAttestationData, + PayloadAttestationMessage, SignedRoot, Slot, }; use crate::{ - beacon_fork_choice_store::BeaconForkChoiceStore, - beacon_snapshot::BeaconSnapshot, - canonical_head::CanonicalHead, - observed_attesters::ObservedPayloadAttesters, payload_attestation_verification::{ Error as PayloadAttestationError, gossip_verified_payload_attestation::{ @@ -27,7 +17,6 @@ use crate::{ }, }, test_utils::{BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, test_spec}, - validator_pubkey_cache::ValidatorPubkeyCache, }; type E = MinimalEthSpec; @@ -36,96 +25,48 @@ type T = EphemeralHarnessType; const NUM_VALIDATORS: usize = 64; struct TestContext { - canonical_head: CanonicalHead, - observed_payload_attesters: RwLock>, - validator_pubkey_cache: RwLock>, - slot_clock: TestingSlotClock, - keypairs: Vec, - spec: ChainSpec, + harness: BeaconChainHarness, genesis_block_root: Hash256, - store: Arc>, } impl TestContext { fn new() -> Self { - let spec = test_spec::(); - let store = Arc::new( - HotColdDB::open_ephemeral(StoreConfig::default(), Arc::new(spec.clone())) - .expect("should open ephemeral store"), - ); - - let keypairs = generate_deterministic_keypairs(NUM_VALIDATORS); - - let mut state = - interop_genesis_state::(&keypairs, 0, Hash256::repeat_byte(0x42), None, &spec) - .expect("should build genesis state"); - - *state.finalized_checkpoint_mut() = Checkpoint { - epoch: Epoch::new(1), - root: Hash256::ZERO, - }; - - let mut block = genesis_block(&state, &spec).expect("should build genesis block"); - *block.state_root_mut() = state - .update_tree_hash_cache() - .expect("should hash genesis state"); - let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); - let block_root = signed_block.canonical_root(); - - let snapshot = BeaconSnapshot::new( - Arc::new(signed_block.clone()), - None, - block_root, - state.clone(), - ); - - let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), snapshot.clone()) - .expect("should create fork choice store"); - let fork_choice = - ForkChoice::from_anchor(fc_store, block_root, &signed_block, &state, None, &spec) - .expect("should create fork choice"); - - let canonical_head = - CanonicalHead::new(fork_choice, Arc::new(snapshot), PayloadStatus::Pending); - + let spec = Arc::new(test_spec::()); let slot_clock = TestingSlotClock::new( Slot::new(0), Duration::from_secs(0), spec.get_slot_duration(), ); - // Advance past genesis so `now_with_past_tolerance` doesn't underflow. - slot_clock.set_current_time(spec.get_slot_duration()); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .testing_slot_clock(slot_clock) + .build(); - let validator_pubkey_cache = - ValidatorPubkeyCache::new(&state, store.clone()).expect("should create pubkey cache"); + // Advance past genesis so `now_with_past_tolerance` doesn't underflow. + harness + .chain + .slot_clock + .set_current_time(harness.spec.get_slot_duration()); + let genesis_block_root = harness.chain.genesis_block_root; Self { - canonical_head, - observed_payload_attesters: RwLock::new(ObservedPayloadAttesters::default()), - validator_pubkey_cache: RwLock::new(validator_pubkey_cache), - slot_clock, - keypairs, - spec, - genesis_block_root: block_root, - store, + harness, + genesis_block_root, } } fn gossip_ctx(&self) -> GossipVerificationContext<'_, T> { - GossipVerificationContext { - slot_clock: &self.slot_clock, - spec: &self.spec, - observed_payload_attesters: &self.observed_payload_attesters, - canonical_head: &self.canonical_head, - validator_pubkey_cache: &self.validator_pubkey_cache, - store: &self.store, - } + self.harness.chain.payload_attestation_gossip_context() } fn ptc_members(&self, slot: Slot) -> Vec { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let ptc = state.get_ptc(slot, &self.spec).expect("should get PTC"); + let ptc = state + .get_ptc(slot, &self.harness.spec) + .expect("should get PTC"); ptc.0.to_vec() } @@ -134,16 +75,18 @@ impl TestContext { data: PayloadAttestationData, validator_index: u64, ) -> PayloadAttestationMessage { - let head = self.canonical_head.cached_head(); + let head = self.harness.chain.canonical_head.cached_head(); let state = &head.snapshot.beacon_state; - let domain = self.spec.get_domain( + let domain = self.harness.spec.get_domain( data.slot.epoch(E::slots_per_epoch()), Domain::PTCAttester, &state.fork(), state.genesis_validators_root(), ); let message = data.signing_root(domain); - let signature = self.keypairs[validator_index as usize].sk.sign(message); + let signature = self.harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); PayloadAttestationMessage { validator_index, data, @@ -192,7 +135,7 @@ fn past_slot() { return; } let ctx = TestContext::new(); - ctx.slot_clock.set_slot(5); + ctx.harness.chain.slot_clock.set_slot(5); let gossip = ctx.gossip_ctx(); let msg = make_payload_attestation(Slot::new(0), 0, ctx.genesis_block_root); @@ -328,20 +271,95 @@ fn duplicate_after_valid() { )); } -/// Exercises the `partial_state_advance` fallback in gossip verification when -/// the head state is too stale to compute PTC membership (e.g., during a -/// network liveness failure with many missed slots). #[tokio::test] -async fn stale_head_with_partial_advance() { +async fn ptc_cache_is_primed_at_gloas_fork_boundary() { + // Only run this test once, when FORK_NAME=gloas exactly. + let mut spec = test_spec::(); + if spec.fork_name_at_epoch(Epoch::new(0)) != ForkName::Gloas { + return; + } + + let gloas_fork_epoch = Epoch::new(2); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + assert_eq!( + spec.fork_name_at_epoch(gloas_fork_epoch - 1), + ForkName::Fulu + ); + assert_eq!(spec.fork_name_at_epoch(gloas_fork_epoch), ForkName::Gloas); + + let slots_per_epoch = E::slots_per_epoch(); + let fork_boundary_slot = gloas_fork_epoch.start_slot(slots_per_epoch); + let test_slots = (fork_boundary_slot.as_u64() + ..fork_boundary_slot.as_u64() + slots_per_epoch * 2) + .map(Slot::new); + + let harness = BeaconChainHarness::builder(E::default()) + .spec(Arc::new(spec)) + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.extend_to_slot(fork_boundary_slot).await; + + for slot in test_slots { + harness.chain.slot_clock.set_slot(slot.as_u64()); + assert!( + harness + .chain + .shuffling_cache + .read() + .check_gloas_ptcs_invariant(&harness.spec), + "shuffling cache should satisfy the Gloas PTC invariant" + ); + + let head = harness.chain.canonical_head.cached_head(); + let state = &head.snapshot.beacon_state; + let ptc = state.get_ptc(slot, &harness.spec).expect("should get PTC"); + let validator_index = *ptc.0.first().expect("PTC should have a member") as u64; + let data = PayloadAttestationData { + beacon_block_root: head.head_block_root(), + slot, + payload_present: true, + blob_data_available: true, + }; + let domain = harness.spec.get_domain( + data.slot.epoch(slots_per_epoch), + Domain::PTCAttester, + &state.fork(), + state.genesis_validators_root(), + ); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(data.signing_root(domain)); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + let result = harness + .chain + .verify_payload_attestation_message_for_gossip(msg); + assert!( + result.is_ok(), + "expected PTC payload attestation to verify at slot {}, got: {:?}", + slot, + result.unwrap_err() + ); + } +} + +/// Exercises payload attestation gossip verification when the message epoch is ahead of the +/// canonical head due to many missed slots. +#[tokio::test] +async fn stale_head_payload_attestation() { if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { return; } let slots_per_epoch = E::slots_per_epoch(); - // Head at epoch 1, message at epoch 5 — 4 epochs of missed slots. - // This exceeds min_seed_lookahead (1), triggering the fallback path: - // get_advanced_hot_state loads the stored state, then partial_state_advance - // advances it through epoch boundaries to populate ptc_window. + // Head at epoch 1, message at epoch 5: 4 epochs of missed slots. let head_slot = Slot::new(slots_per_epoch); let missed_epochs = 4; let target_slot = Slot::new(slots_per_epoch * (1 + missed_epochs)); @@ -360,7 +378,7 @@ async fn stale_head_with_partial_advance() { let head_epoch = head.snapshot.beacon_state.current_epoch(); assert!( target_epoch > head_epoch + harness.spec.min_seed_lookahead, - "precondition: message epoch must exceed head + min_seed_lookahead to trigger fallback" + "precondition: message epoch must exceed head + min_seed_lookahead" ); // GIVEN a slot clock advanced to epoch 5 without producing blocks @@ -385,7 +403,9 @@ async fn stale_head_with_partial_advance() { .expect("should get PTC from reference state"); let validator_index = *ptc.0.first().expect("PTC should have at least one member") as u64; - // WHEN a properly-signed payload attestation from a PTC member is verified. + // WHEN a properly-signed payload attestation from a PTC member is verified. The signature + // domain should come from the spec fork schedule and genesis validators root, not a loaded + // state in the verifier. let domain = harness.spec.get_domain( target_epoch, Domain::PTCAttester, @@ -420,3 +440,105 @@ async fn stale_head_with_partial_advance() { result.unwrap_err() ); } + +/// Exercises payload attestation gossip verification for a non-canonical block whose PTC differs +/// from the canonical chain's PTC for the same slot. +#[tokio::test] +async fn side_chain_payload_attestation_uses_side_chain_ptc() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let slots_per_epoch = E::slots_per_epoch(); + let fork_slot = Slot::new(slots_per_epoch); + let target_slot = Slot::new(slots_per_epoch * 4); + let target_epoch = target_slot.epoch(slots_per_epoch); + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(NUM_VALIDATORS) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Build a common prefix through epoch 1. + harness.extend_to_slot(fork_slot).await; + let fork_state = harness.chain.head_snapshot().beacon_state.clone(); + + // Build two branches for several epochs. The side chain skips its first slot, giving it + // different RANDAO mixes and therefore a different PTC by the target slot. The canonical chain + // is processed second and receives sub-finality attestations, so it remains the head without + // finalizing past the side-chain fork point. + let side_slots: Vec<_> = ((fork_slot + 2).as_u64()..=target_slot.as_u64()) + .map(Slot::new) + .collect(); + let canonical_slots: Vec<_> = ((fork_slot + 1).as_u64()..=target_slot.as_u64()) + .map(Slot::new) + .collect(); + let canonical_validators = (0..NUM_VALIDATORS / 2).collect::>(); + + let results = harness + .add_blocks_on_multiple_chains(vec![ + (fork_state.clone(), side_slots, vec![]), + (fork_state, canonical_slots, canonical_validators), + ]) + .await; + + let side_head_root: Hash256 = results[0].2.into(); + let side_head_state = &results[0].3; + let canonical_head_root: Hash256 = results[1].2.into(); + let canonical_head_state = &results[1].3; + + assert_ne!(side_head_root, canonical_head_root); + assert_eq!( + harness.chain.head_snapshot().beacon_block_root, + canonical_head_root + ); + + let side_ptc = side_head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get side-chain PTC"); + let canonical_ptc = canonical_head_state + .get_ptc(target_slot, &harness.spec) + .expect("should get canonical PTC"); + assert_ne!( + side_ptc, canonical_ptc, + "precondition: side-chain PTC should differ from canonical PTC" + ); + + let validator_index = side_ptc + .0 + .iter() + .copied() + .find(|validator_index| !canonical_ptc.0.contains(validator_index)) + .expect("should find a validator in the side-chain PTC only") + as u64; + + let domain = harness.spec.get_domain( + target_epoch, + Domain::PTCAttester, + &side_head_state.fork(), + side_head_state.genesis_validators_root(), + ); + let data = PayloadAttestationData { + beacon_block_root: side_head_root, + slot: target_slot, + payload_present: true, + blob_data_available: true, + }; + let message = data.signing_root(domain); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(message); + let msg = PayloadAttestationMessage { + validator_index, + data, + signature, + }; + + let verified = harness + .chain + .verify_payload_attestation_message_for_gossip(msg) + .expect("side-chain payload attestation should verify"); + assert_eq!(verified.ptc(), &side_ptc); +} diff --git a/beacon_node/beacon_chain/src/shuffling_cache.rs b/beacon_node/beacon_chain/src/shuffling_cache.rs index 0377b553e3..daaede6ed1 100644 --- a/beacon_node/beacon_chain/src/shuffling_cache.rs +++ b/beacon_node/beacon_chain/src/shuffling_cache.rs @@ -3,23 +3,28 @@ use std::sync::Arc; use itertools::Itertools; use oneshot_broadcast::{Receiver, Sender, oneshot}; +use parking_lot::RwLock; +use state_processing::state_advance::partial_state_advance; use tracing::debug; use types::{ - AttestationShufflingId, BeaconState, Epoch, EthSpec, Hash256, RelativeEpoch, - state::CommitteeCache, + AttestationShufflingId, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, PTC, + RelativeEpoch, Slot, state::CommitteeCache, }; -use crate::{BeaconChainError, metrics}; +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, canonical_head::CanonicalHead, metrics, +}; -/// The size of the cache that stores committee caches for quicker verification. +/// The size of the cache that stores shufflings for quicker verification. /// -/// Each entry should be `8 + 800,000 = 800,008` bytes in size with 100k validators. (8-byte hash + -/// 100k indices). Therefore, this cache should be approx `16 * 800,008 = 12.8 MB`. (Note: this -/// ignores a few extra bytes in the caches that should be insignificant compared to the indices). +/// Each entry should be around `8 * 2M + 128KB ~= 16 MB` in size with 2M validators +/// and 32 512-validator PTCs. Therefore, this cache should be approx +/// `16 * 16 MB ~= 256 MB`. (Note: this ignores a few extra bytes in the +/// caches that should be insignificant compared to the indices). pub const DEFAULT_CACHE_SIZE: usize = 16; -/// The maximum number of concurrent committee cache "promises" that can be issued. In effect, this -/// limits the number of concurrent states that can be loaded into memory for the committee cache. +/// The maximum number of concurrent shuffling "promises" that can be issued. In effect, this +/// limits the number of concurrent states that can be loaded into memory for the shuffling. /// This prevents excessive memory usage at the cost of rejecting some attestations. /// /// We set this value to 2 since states can be quite large and have a significant impact on memory @@ -30,19 +35,82 @@ pub const DEFAULT_CACHE_SIZE: usize = 16; const MAX_CONCURRENT_PROMISES: usize = 2; #[derive(Clone)] -pub enum CacheItem { - /// A committee. - Committee(Arc), - /// A promise for a future committee. - Promise(Receiver>), +pub struct CachedShuffling { + pub committee_cache: Arc, + pub ptcs: CachedPTCs, } -impl CacheItem { +#[derive(Clone)] +pub enum CachedPTCs { + PreGloas, + PostGloas(Vec>, Epoch), +} + +impl CachedPTCs { + /// Returns `None` at the Gloas fork boundary (pre-Gloas state, Gloas shuffling epoch); the + /// on-demand miss path in `with_cached_shuffling` handles those. + pub fn try_from_state( + state: &BeaconState, + epoch: Epoch, + spec: &ChainSpec, + ) -> Result, BeaconChainError> { + if shuffling_requires_ptcs(epoch, spec) { + if !state.fork_name_unchecked().gloas_enabled() { + return Ok(None); + } + let ptcs = epoch + .slot_iter(E::slots_per_epoch()) + .map(|slot| state.get_ptc(slot, spec)) + .collect::, _>>()?; + Ok(Some(Self::PostGloas(ptcs, epoch))) + } else { + Ok(Some(Self::PreGloas)) + } + } +} + +impl CachedShuffling { + pub fn new(committee_cache: Arc, ptcs: CachedPTCs) -> Self { + Self { + committee_cache, + ptcs, + } + } + + pub fn ptc_for_slot(&self, slot: Slot) -> Result, BeaconChainError> { + match &self.ptcs { + CachedPTCs::PreGloas => Err(BeaconChainError::AttesterCacheNoPtcPreGloas { slot }), + &CachedPTCs::PostGloas(ref ptcs, epoch) => { + if slot.epoch(E::slots_per_epoch()) != epoch { + Err(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } else { + ptcs.get(slot.as_usize() % E::slots_per_epoch() as usize) + .cloned() + .ok_or(BeaconChainError::AttesterCachePtcOutOfBounds { slot, epoch }) + } + } + } + } +} + +fn shuffling_requires_ptcs(shuffling_epoch: Epoch, spec: &ChainSpec) -> bool { + spec.fork_name_at_epoch(shuffling_epoch).gloas_enabled() +} + +#[derive(Clone)] +pub enum CacheItem { + /// A cached shuffling. + Committee(CachedShuffling), + /// A promise for a future cached shuffling. + Promise(Receiver>), +} + +impl CacheItem { pub fn is_promise(&self) -> bool { matches!(self, CacheItem::Promise(_)) } - pub fn wait(self) -> Result, BeaconChainError> { + pub fn wait(self) -> Result, BeaconChainError> { match self { CacheItem::Committee(cache) => Ok(cache), CacheItem::Promise(receiver) => receiver @@ -52,17 +120,17 @@ impl CacheItem { } } -/// Provides a cache for `CommitteeCache`. +/// Provides a cache for `CommitteeCache` and the associated optional PTCs. /// /// It has been named `ShufflingCache` because `CommitteeCacheCache` is a bit weird and looks like /// a find/replace error. -pub struct ShufflingCache { - cache: HashMap, +pub struct ShufflingCache { + cache: HashMap>, cache_size: usize, head_shuffling_ids: BlockShufflingIds, } -impl ShufflingCache { +impl ShufflingCache { pub fn new(cache_size: usize, head_shuffling_ids: BlockShufflingIds) -> Self { Self { cache: HashMap::new(), @@ -71,22 +139,22 @@ impl ShufflingCache { } } - pub fn get(&mut self, key: &AttestationShufflingId) -> Option { + pub fn get(&mut self, key: &AttestationShufflingId) -> Option> { match self.cache.get(key) { - // The cache contained the committee cache, return it. + // The cache contained the shuffling, return it. item @ Some(CacheItem::Committee(_)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The cache contains a promise for the committee cache. Check to see if the promise has + // The cache contains a promise for the shuffling. Check to see if the promise has // already been resolved, without waiting for it. item @ Some(CacheItem::Promise(receiver)) => match receiver.try_recv() { // The promise has already been resolved. Replace the entry in the cache with a - // `Committee` entry and then return the committee. - Ok(Some(committee)) => { + // `Committee` entry and then return the cached shuffling. + Ok(Some(cached_shuffling)) => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_PROMISE_HITS); metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); - let ready = CacheItem::Committee(committee); + let ready = CacheItem::Committee(cached_shuffling); self.insert_cache_item(key.clone(), ready.clone()); Some(ready) } @@ -97,8 +165,8 @@ impl ShufflingCache { metrics::inc_counter(&metrics::SHUFFLING_CACHE_HITS); item.cloned() } - // The sender has been dropped without sending a committee. There was most likely an - // error computing the committee cache. Drop the key from the cache and return + // The sender has been dropped without sending a shuffling. There was most likely an + // error computing the shuffling. Drop the key from the cache and return // `None` so the caller can recompute the committee. // // It's worth noting that this is the only place where we removed unresolved @@ -113,7 +181,7 @@ impl ShufflingCache { None } }, - // The cache does not have this committee and it's not already promised to be computed. + // The cache does not have this shuffling and it's not already promised to be computed. None => { metrics::inc_counter(&metrics::SHUFFLING_CACHE_MISSES); None @@ -125,27 +193,41 @@ impl ShufflingCache { self.cache.contains_key(key) } - pub fn insert_committee_cache( + /// Check that all entries for Gloas epochs have PTCs. + #[cfg(test)] + pub fn check_gloas_ptcs_invariant(&self, spec: &ChainSpec) -> bool { + self.cache.iter().all(|(key, item)| { + if shuffling_requires_ptcs(key.shuffling_epoch, spec) { + match item { + CacheItem::Committee(cached_shuffling) => { + matches!(cached_shuffling.ptcs, CachedPTCs::PostGloas(..)) + } + CacheItem::Promise(_) => true, + } + } else { + true + } + }) + } + + pub fn insert_committee_cache( &mut self, key: AttestationShufflingId, - committee_cache: &C, + cached_shuffling: CachedShuffling, ) { - if self - .cache - .get(&key) - // Replace the committee if it's not present or if it's a promise. A bird in the hand is - // worth two in the promise-bush! - .is_none_or(CacheItem::is_promise) - { - self.insert_cache_item( - key, - CacheItem::Committee(committee_cache.to_arc_committee_cache()), - ); + match self.cache.get(&key) { + Some(CacheItem::Committee(_)) => { + // Calculation is deterministic, so no need to replace the existing entry. + } + // A bird in the hand is worth two in the promise-bush! + Some(CacheItem::Promise(_)) | None => { + self.insert_cache_item(key, CacheItem::Committee(cached_shuffling)); + } } } /// Prunes the cache first before inserting a new cache item. - fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { + fn insert_cache_item(&mut self, key: AttestationShufflingId, cache_item: CacheItem) { self.prune_cache(); self.cache.insert(key, cache_item); } @@ -188,7 +270,7 @@ impl ShufflingCache { pub fn create_promise( &mut self, key: AttestationShufflingId, - ) -> Result>, BeaconChainError> { + ) -> Result>, BeaconChainError> { let num_active_promises = self .cache .iter() @@ -212,20 +294,170 @@ impl ShufflingCache { } } -/// A helper trait to allow lazy-cloning of the committee cache when inserting into the cache. -pub trait ToArcCommitteeCache { - fn to_arc_committee_cache(&self) -> Arc; -} +pub fn with_cached_shuffling( + canonical_head: &CanonicalHead, + shuffling_cache_lock: &RwLock>, + store: &BeaconStore, + spec: &ChainSpec, + head_block_root: Hash256, + shuffling_epoch: Epoch, + map_fn: F, +) -> Result +where + T: BeaconChainTypes, + F: Fn(&CachedShuffling, Hash256) -> Result, + Error: From, +{ + let head_block = canonical_head + .fork_choice_read_lock() + .get_block(&head_block_root) + .ok_or(BeaconChainError::MissingBeaconBlock(head_block_root))?; -impl ToArcCommitteeCache for CommitteeCache { - fn to_arc_committee_cache(&self) -> Arc { - Arc::new(self.clone()) + let shuffling_id = BlockShufflingIds { + current: head_block.current_epoch_shuffling_id.clone(), + next: head_block.next_epoch_shuffling_id.clone(), + previous: None, + block_root: head_block.root, } -} + .id_for_epoch(shuffling_epoch) + .ok_or_else(|| BeaconChainError::InvalidShufflingId { + shuffling_epoch, + head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()), + })?; -impl ToArcCommitteeCache for Arc { - fn to_arc_committee_cache(&self) -> Arc { - self.clone() + let mut shuffling_cache = { + let _ = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES); + shuffling_cache_lock.write() + }; + + if let Some(cache_item) = shuffling_cache.get(&shuffling_id) { + drop(shuffling_cache); + + let cached_shuffling = cache_item.wait()?; + map_fn(&cached_shuffling, shuffling_id.shuffling_decision_block) + } else { + // Create an entry in the cache that "promises" this value will eventually be computed. + // This avoids the case where multiple threads attempt to produce the same value at the + // same time. + // + // Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same + // promise from being created twice. + let sender = shuffling_cache.create_promise(shuffling_id.clone())?; + + // Drop the shuffling cache to avoid holding the lock for any longer than required. + drop(shuffling_cache); + + debug!( + shuffling_id = ?shuffling_epoch, + head_block_root = head_block_root.to_string(), + "Committee cache miss" + ); + + // If the block's state will be so far ahead of `shuffling_epoch` that even its previous + // epoch committee cache will be too new, then error. Callers of this function shouldn't be + // requesting such old shufflings for this `head_block_root`. + let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch()); + if head_block_epoch > shuffling_epoch + 1 { + return Err(BeaconChainError::InvalidStateForShuffling { + state_epoch: head_block_epoch, + shuffling_epoch, + } + .into()); + } + + let state_read_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES); + + let cached_head = canonical_head.cached_head(); + let head_state_opt = if cached_head.head_block_root() == head_block_root { + Some(( + cached_head.snapshot.beacon_state.clone(), + cached_head.head_state_root(), + )) + } else { + None + }; + + // Compute the `target_slot` to advance the block's state to. + // + // Since there's a one-epoch look-ahead on the attester shuffling, it suffices to only + // advance into the first slot of the epoch prior to `shuffling_epoch`. + // + // If the `head_block` is already ahead of that slot, then we should load the state at that + // slot, as we've determined above that the `shuffling_epoch` cache will not be too far in + // the past. + let mut target_slot = std::cmp::max( + shuffling_epoch + .saturating_sub(1_u64) + .start_slot(T::EthSpec::slots_per_epoch()), + head_block.slot, + ); + if spec.gloas_fork_epoch == Some(shuffling_epoch) { + target_slot = std::cmp::max( + target_slot, + shuffling_epoch.start_slot(T::EthSpec::slots_per_epoch()), + ); + } + + // If the head state is useful for this request, use it. Otherwise, read a state from disk + // that is advanced as close as possible to `target_slot`. + let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt { + (state, state_root) + } else { + let (state_root, state) = store + .get_advanced_hot_state(head_block_root, target_slot, head_block.state_root) + .map_err(BeaconChainError::DBError)? + .ok_or(BeaconChainError::MissingBeaconState(head_block.state_root))?; + (state, state_root) + }; + + metrics::stop_timer(state_read_timer); + let state_skip_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES); + + // If the state is still in an earlier epoch, advance it to the `target_slot` so that its + // next epoch committee cache matches the `shuffling_epoch`. + let advance_to_gloas_fork = spec.gloas_fork_epoch == Some(shuffling_epoch) + && state.current_epoch() < shuffling_epoch; + if state.current_epoch() + 1 < shuffling_epoch || advance_to_gloas_fork { + // Advance the state into the required slot, using the "partial" method since the state + // roots are not relevant for the shuffling. + partial_state_advance(&mut state, Some(state_root), target_slot, spec) + .map_err(BeaconChainError::from)?; + } + metrics::stop_timer(state_skip_timer); + + let committee_building_timer = + metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES); + + let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch) + .map_err(BeaconChainError::IncorrectStateForAttestation)?; + + state + .build_committee_cache(relative_epoch, spec) + .map_err(BeaconChainError::from)?; + + let committee_cache = state + .committee_cache(relative_epoch) + .map_err(BeaconChainError::from)? + .clone(); + // The state has been advanced through the upgrade if needed, so `try_from_state` + // cannot return None here. + let ptcs = CachedPTCs::try_from_state(&state, shuffling_epoch, spec)?.ok_or( + BeaconChainError::BeaconStateError(BeaconStateError::IncorrectStateVariant), + )?; + let shuffling_decision_block = shuffling_id.shuffling_decision_block; + let cached_shuffling = CachedShuffling::new(committee_cache, ptcs); + + shuffling_cache_lock + .write() + .insert_committee_cache(shuffling_id, cached_shuffling.clone()); + + metrics::stop_timer(committee_building_timer); + + sender.send(cached_shuffling.clone()); + + map_fn(&cached_shuffling, shuffling_decision_block) } } @@ -304,7 +536,7 @@ mod test { const TEST_CACHE_SIZE: usize = 5; // Creates a new shuffling cache for testing - fn new_shuffling_cache() -> ShufflingCache { + fn new_shuffling_cache() -> ShufflingCache { create_test_tracing_subscriber(); let current_epoch = 8; @@ -318,6 +550,10 @@ mod test { ShufflingCache::new(TEST_CACHE_SIZE, head_shuffling_ids) } + fn cached_shuffling(committee_cache: Arc) -> CachedShuffling { + CachedShuffling::new(committee_cache, CachedPTCs::PreGloas) + } + /// Returns two different committee caches for testing. fn committee_caches() -> (Arc, Arc) { let harness = BeaconChainHarness::builder(MinimalEthSpec) @@ -366,12 +602,12 @@ mod test { ); // Resolve the promise. - sender.send(committee_a.clone()); + sender.send(cached_shuffling(committee_a.clone())); // Ensure the promise has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "the promise should be resolved" ); assert_eq!(cache.cache.len(), 1, "the cache should have one entry"); @@ -428,30 +664,30 @@ mod test { ); // Resolve promise A. - sender_a.send(committee_a.clone()); + sender_a.send(cached_shuffling(committee_a.clone())); // Ensure promise A has been resolved. let item = cache.get(&id_a).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_a), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should be resolved" ); // Resolve promise B. - sender_b.send(committee_b.clone()); + sender_b.send(cached_shuffling(committee_b.clone())); // Ensure promise B has been resolved. let item = cache.get(&id_b).unwrap(); assert!( - matches!(item, CacheItem::Committee(committee) if committee == committee_b), + matches!(item, CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should be resolved" ); // Check both entries again. assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee) if committee == committee_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_a), "promise A should remain resolved" ); assert!( - matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(committee) if committee == committee_b), + matches!(cache.get(&id_b).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_b), "promise B should remain resolved" ); assert_eq!(cache.cache.len(), 2, "the cache should have two entries"); @@ -485,9 +721,9 @@ mod test { let mut cache = new_shuffling_cache(); let id_a = shuffling_id(1); let committee_cache_a = Arc::new(CommitteeCache::default()); - cache.insert_committee_cache(id_a.clone(), &committee_cache_a); + cache.insert_committee_cache(id_a.clone(), cached_shuffling(committee_cache_a.clone())); assert!( - matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(committee_cache) if committee_cache == committee_cache_a), + matches!(cache.get(&id_a).unwrap(), CacheItem::Committee(cached_shuffling) if cached_shuffling.committee_cache == committee_cache_a), "should insert committee cache" ); } @@ -500,7 +736,10 @@ mod test { .collect::>(); for (shuffling_id, committee_cache) in shuffling_id_and_committee_caches.iter() { - cache.insert_committee_cache(shuffling_id.clone(), committee_cache); + cache.insert_committee_cache( + shuffling_id.clone(), + cached_shuffling(committee_cache.clone()), + ); } for i in 1..(TEST_CACHE_SIZE + 1) { @@ -533,7 +772,7 @@ mod test { shuffling_epoch: (current_epoch + 1).into(), shuffling_decision_block: Hash256::from_low_u64_be(current_epoch + i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())); } // Now, update the head shuffling ids @@ -546,11 +785,17 @@ mod test { cache.update_head_shuffling_ids(head_shuffling_ids.clone()); // Insert head state shuffling ids. Should not be overridden by other shuffling ids. - cache.insert_committee_cache(head_shuffling_ids.current.clone(), &committee_cache); - cache.insert_committee_cache(head_shuffling_ids.next.clone(), &committee_cache); + cache.insert_committee_cache( + head_shuffling_ids.current.clone(), + cached_shuffling(committee_cache.clone()), + ); + cache.insert_committee_cache( + head_shuffling_ids.next.clone(), + cached_shuffling(committee_cache.clone()), + ); cache.insert_committee_cache( head_shuffling_ids.previous.clone().unwrap(), - &committee_cache, + cached_shuffling(committee_cache.clone()), ); // Insert a few entries for older epochs. @@ -559,7 +804,7 @@ mod test { shuffling_epoch: Epoch::from(i), shuffling_decision_block: Hash256::from_low_u64_be(i as u64), }; - cache.insert_committee_cache(shuffling_id, &committee_cache); + cache.insert_committee_cache(shuffling_id, cached_shuffling(committee_cache.clone())); } assert!( @@ -580,4 +825,41 @@ mod test { "should limit cache size" ); } + + /// Pre-Gloas state across the Gloas fork: epoch G-1 returns `Some(PreGloas)`, epoch G and + /// G+1 return `None` (the boundary skip). + #[test] + fn try_from_state_skips_at_gloas_boundary() { + create_test_tracing_subscriber(); + + let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); + let gloas_fork_epoch = Epoch::new(2); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(Arc::new(spec.clone())) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .build(); + let state = harness.get_current_state(); + assert!(!state.fork_name_unchecked().gloas_enabled()); + + for (epoch, expect_pre_gloas) in [ + (gloas_fork_epoch - 1, true), + (gloas_fork_epoch, false), + (gloas_fork_epoch + 1, false), + ] { + let result = CachedPTCs::::try_from_state(&state, epoch, &spec) + .expect("must not error at the boundary"); + if expect_pre_gloas { + assert!( + matches!(result, Some(CachedPTCs::PreGloas)), + "epoch {}: expected Some(PreGloas)", + epoch + ); + } else { + assert!(result.is_none(), "epoch {}: expected None", epoch); + } + } + } } diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index cb916cb514..6408f861f8 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -15,7 +15,9 @@ //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + BeaconChain, BeaconChainError, BeaconChainTypes, + chain_config::FORK_CHOICE_LOOKAHEAD_FACTOR, + shuffling_cache::{CachedPTCs, CachedShuffling}, }; use slot_clock::SlotClock; use state_processing::per_slot_processing; @@ -394,19 +396,30 @@ fn advance_head(beacon_chain: &Arc>) -> Resu .map_err(BeaconChainError::from)?; let committee_cache = state .committee_cache(RelativeEpoch::Next) - .map_err(BeaconChainError::from)?; - beacon_chain - .shuffling_cache - .write() - .insert_committee_cache(shuffling_id.clone(), committee_cache); + .map_err(BeaconChainError::from)? + .clone(); + let shuffling_epoch = RelativeEpoch::Next.into_epoch(state.current_epoch()); - debug!( - ?head_block_root, - next_epoch_shuffling_root = ?shuffling_id.shuffling_decision_block, - state_epoch = %state.current_epoch(), - current_epoch = %current_slot.epoch(T::EthSpec::slots_per_epoch()), - "Primed proposer and attester caches" - ); + if let Some(ptcs) = CachedPTCs::try_from_state(&state, shuffling_epoch, &beacon_chain.spec)? + { + beacon_chain.shuffling_cache.write().insert_committee_cache( + shuffling_id.clone(), + CachedShuffling::new(committee_cache, ptcs), + ); + + debug!( + ?head_block_root, + next_epoch_shuffling_root = ?shuffling_id.shuffling_decision_block, + state_epoch = %state.current_epoch(), + current_epoch = %current_slot.epoch(T::EthSpec::slots_per_epoch()), + "Primed proposer and attester caches" + ); + } else { + debug!( + %shuffling_epoch, + "Skipping priming of attester cache for Gloas boundary epoch" + ); + } } let final_slot = state.slot(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7e50f4e5ac..0ac77dcfaa 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1209,7 +1209,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.current_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Current).unwrap(); // We used to check for false negatives here, but had to remove that check // because `shuffling_is_compatible` does not guarantee their absence. @@ -1247,7 +1248,8 @@ fn check_shuffling_compatible( .with_committee_cache( block_root, head_state.previous_epoch(), - |committee_cache, _| { + |cached_shuffling, _| { + let committee_cache = cached_shuffling.committee_cache.as_ref(); let state_cache = head_state.committee_cache(RelativeEpoch::Previous).unwrap(); if previous_epoch_shuffling_is_compatible { assert_eq!(committee_cache, state_cache.as_ref()); diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index e6b1ed0879..ca980b96a4 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -129,6 +129,15 @@ impl BlockId { .is_finalized_block(root, block_slot) .map_err(warp_utils::reject::unhandled_error)?; Ok((*root, execution_optimistic, finalized)) + } else if chain.early_attester_cache.get_block(*root).is_some() { + // Fall back to the early attester cache for blocks that are in fork choice + // but haven't been written to disk yet. + let execution_optimistic = chain + .canonical_head + .fork_choice_read_lock() + .is_optimistic_or_invalid_block(root) + .unwrap_or(false); + Ok((*root, execution_optimistic, false)) } else { Err(warp_utils::reject::custom_not_found(format!( "beacon block with root {}", @@ -143,9 +152,18 @@ impl BlockId { root: &Hash256, chain: &BeaconChain, ) -> Result>, warp::Rejection> { - chain + if let Some(block) = chain .get_blinded_block(root) - .map_err(warp_utils::reject::unhandled_error) + .map_err(warp_utils::reject::unhandled_error)? + { + return Ok(Some(block)); + } + // Fall back to the early attester cache for blocks that are in fork choice + // but haven't been written to disk yet. + Ok(chain + .early_attester_cache + .get_block(*root) + .map(|b| b.clone_as_blinded())) } /// Return the `SignedBeaconBlock` identified by `self`. @@ -253,20 +271,20 @@ impl BlockId { } _ => { let (root, execution_optimistic, finalized) = self.root(chain)?; - chain + let block_opt = chain .get_block(&root) .await - .map_err(warp_utils::reject::unhandled_error) - .and_then(|block_opt| { - block_opt - .map(|block| (Arc::new(block), execution_optimistic, finalized)) - .ok_or_else(|| { - warp_utils::reject::custom_not_found(format!( - "beacon block with root {}", - root - )) - }) - }) + .map_err(warp_utils::reject::unhandled_error)?; + let block = block_opt + .map(Arc::new) + .or_else(|| chain.early_attester_cache.get_block(root)) + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "beacon block with root {}", + root + )) + })?; + Ok((block, execution_optimistic, finalized)) } } } @@ -290,16 +308,20 @@ impl BlockId { } let data_column_sidecars = if let Some(indices) = query.indices { - indices - .iter() - .filter_map(|index| chain.get_data_column(&root, index, fork_name).transpose()) - .collect::, _>>() + chain + .get_data_columns_checking_all_caches(root, &indices) .map_err(warp_utils::reject::unhandled_error)? } else { chain - .get_data_columns(&root, fork_name) + .early_attester_cache + .get_data_columns(root) + .map(Ok) + .unwrap_or_else(|| { + chain + .get_data_columns(&root, fork_name) + .map(|opt| opt.unwrap_or_default()) + }) .map_err(warp_utils::reject::unhandled_error)? - .unwrap_or_default() }; let fork_name = block @@ -507,3 +529,177 @@ impl fmt::Display for BlockId { write!(f, "{}", self.0) } } + +#[cfg(test)] +mod tests { + use super::*; + use beacon_chain::{ + PayloadVerificationStatus, + block_verification_types::{AvailableBlockData, RangeSyncBlock}, + test_utils::{ + BeaconChainHarness, EphemeralHarnessType, fork_name_from_env, + generate_data_column_sidecars_from_block, + }, + }; + use std::time::Duration; + use types::MinimalEthSpec; + + type TestHarness = BeaconChainHarness>; + + fn harness() -> TestHarness { + BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build() + } + + #[tokio::test] + async fn root_uses_early_attester_cache_for_unpersisted_block() { + let Some(fork_name) = fork_name_from_env().filter(|fork_name| fork_name.fulu_enabled()) + else { + return; + }; + let harness = harness(); + let chain = &harness.chain; + + harness.execution_block_generator().set_min_blob_count(1); + harness.advance_slot(); + + let (block_contents, post_state) = harness + .make_block(harness.get_current_state(), harness.get_current_slot()) + .await; + let (block, _) = block_contents; + let block_root = block.canonical_root(); + let block_fork_name = chain.spec.fork_name_at_epoch(block.epoch()); + + assert_eq!( + block_fork_name, fork_name, + "precondition: test block must be produced at {fork_name:?}" + ); + assert!( + block.num_expected_blobs() > 0, + "precondition: {fork_name:?} test block must have blobs that can be converted to data columns" + ); + + assert!( + !chain.store.block_exists(&block_root).unwrap(), + "precondition: test block must not be persisted" + ); + assert!( + chain.get_blinded_block(&block_root).unwrap().is_none(), + "precondition: test block must not be retrievable from the store" + ); + assert!( + chain + .get_data_columns(&block_root, block_fork_name) + .unwrap() + .is_none(), + "precondition: test data columns must not be retrievable from the store" + ); + assert!( + !chain.block_is_known_to_fork_choice(&block_root), + "precondition: test block must not be imported into fork choice yet" + ); + + let sampling_columns = chain.sampling_columns_for_epoch(block.epoch()); + let data_columns = generate_data_column_sidecars_from_block(&block, &chain.spec) + .into_iter() + .filter(|column| sampling_columns.contains(column.index())) + .collect::>(); + assert!( + !data_columns.is_empty(), + "precondition: {fork_name:?} test block must produce data columns" + ); + + let available_block = RangeSyncBlock::new( + block.clone(), + AvailableBlockData::new_with_data_columns(data_columns), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap() + .into_available_block(); + + let current_slot = harness.get_current_slot(); + let cached_head = chain.canonical_head.cached_head(); + let canonical_head_proposer_index = chain + .canonical_head_proposer_index(current_slot, &cached_head) + .unwrap(); + + chain + .canonical_head + .fork_choice_write_lock() + .on_block( + current_slot, + block.message(), + block_root, + Duration::ZERO, + &post_state, + PayloadVerificationStatus::Verified, + canonical_head_proposer_index, + &chain.spec, + ) + .unwrap(); + + assert!( + chain.block_is_known_to_fork_choice(&block_root), + "precondition: test block must be imported into fork choice" + ); + assert!( + !chain.store.block_exists(&block_root).unwrap(), + "precondition: fork choice insertion must not persist the block" + ); + + let proto_block = chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .unwrap(); + + chain + .early_attester_cache + .add_head_block(block_root, &available_block, proto_block, &post_state) + .unwrap(); + + let cached_data_columns = chain + .early_attester_cache + .get_data_columns(block_root) + .expect("precondition: data columns must be cached"); + assert!( + !cached_data_columns.is_empty(), + "precondition: cached data columns must be non-empty" + ); + + assert_eq!( + BlockId(CoreBlockId::Root(block_root)).root(chain).unwrap(), + (block_root, false, false) + ); + + let (blinded_block, execution_optimistic, finalized) = + BlockId(CoreBlockId::Root(block_root)) + .blinded_block(chain) + .unwrap(); + assert_eq!(blinded_block.canonical_root(), block_root); + assert_eq!(blinded_block.slot(), block.slot()); + assert!(!execution_optimistic); + assert!(!finalized); + + let (data_columns, data_columns_fork_name, execution_optimistic, finalized) = + BlockId(CoreBlockId::Root(block_root)) + .get_data_columns(DataColumnIndicesQuery { indices: None }, chain) + .unwrap(); + assert_eq!(data_columns, cached_data_columns); + assert_eq!(data_columns_fork_name, fork_name); + assert!(!execution_optimistic); + assert!(!finalized); + + chain.early_attester_cache.clear(); + + assert!( + BlockId(CoreBlockId::Root(block_root)).root(chain).is_err(), + "root lookup should fail once the unpersisted block leaves the early attester cache" + ); + } +} diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index b47f8e946a..7b5fb02714 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -2,7 +2,7 @@ use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ ChainConfig, - chain_config::{DisallowedReOrgOffsets, ReOrgThreshold}, + chain_config::DisallowedReOrgOffsets, test_utils::{ AttestationStrategy, BlockStrategy, LightClientStrategy, SyncCommitteeStrategy, test_spec, }, @@ -23,7 +23,7 @@ use std::sync::Arc; use std::time::Duration; use types::{ Address, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, Hash256, MainnetEthSpec, - MinimalEthSpec, ProposerPreparationData, Slot, Uint256, + MinimalEthSpec, ProposerPreparationData, Slot, }; type E = MainnetEthSpec; @@ -181,8 +181,6 @@ pub struct ReOrgTest { parent_distance: u64, /// Number of slots between head block and block proposal slot. head_distance: u64, - re_org_threshold: u64, - max_epochs_since_finalization: u64, percent_parent_votes: usize, percent_empty_votes: usize, percent_head_votes: usize, @@ -201,8 +199,6 @@ impl Default for ReOrgTest { head_slot: Slot::new(E::slots_per_epoch() - 2), parent_distance: 1, head_distance: 1, - re_org_threshold: 20, - max_epochs_since_finalization: 2, percent_parent_votes: 100, percent_empty_votes: 100, percent_head_votes: 0, @@ -388,8 +384,6 @@ pub async fn proposer_boost_re_org_test( head_slot, parent_distance, head_distance, - re_org_threshold, - max_epochs_since_finalization, percent_parent_votes, percent_empty_votes, percent_head_votes, @@ -403,8 +397,7 @@ pub async fn proposer_boost_re_org_test( // TODO(EIP-7732): extend test for Gloas — `get_validator_blocks_v3` is missing the // `Eth-Execution-Payload-Blinded` header for Gloas block production responses. - let mut spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); - spec.terminal_total_difficulty = Uint256::from(1); + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); // Ensure there are enough validators to have `attesters_per_slot`. let attesters_per_slot = 10; @@ -427,14 +420,9 @@ pub async fn proposer_boost_re_org_test( validator_count, None, Some(Box::new(move |builder| { - builder - .proposer_re_org_head_threshold(Some(ReOrgThreshold(re_org_threshold))) - .proposer_re_org_max_epochs_since_finalization(Epoch::new( - max_epochs_since_finalization, - )) - .proposer_re_org_disallowed_offsets( - DisallowedReOrgOffsets::new::(disallowed_offsets).unwrap(), - ) + builder.proposer_re_org_disallowed_offsets( + DisallowedReOrgOffsets::new::(disallowed_offsets).unwrap(), + ) })), Default::default(), false, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 3e8845f017..14cda1b483 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4258,8 +4258,7 @@ impl NetworkBeaconProcessor { "payload_attn_invalid_sig", ); } - PayloadAttestationError::BeaconChainError(_) - | PayloadAttestationError::BeaconStateError(_) => { + PayloadAttestationError::BeaconChainError(_) => { debug!( %peer_id, %message_slot, diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 620962b40b..2b96800e37 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -305,7 +305,12 @@ impl ActiveCustodyRequest { // must have its columns in custody. In that case, set `true = enforce max_requests` // and downscore if data_columns_by_root does not return the expected custody // columns. For the rest of peers, don't downscore if columns are missing. - lookup_peers.contains(&peer_id), + // + // Post-Gloas, blocks and payload envelopes are decoupled. A peer may + // have the block but not yet imported the envelope and data columns. + // Don't enforce max_responses in this case. + lookup_peers.contains(&peer_id) + && !cx.fork_context.current_fork_name().gloas_enabled(), ) .map_err(Error::SendFailed)?; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 51cda0fac3..647b5858cb 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -674,11 +674,22 @@ pub fn cli_app() -> Command { Arg::new("enable-partial-columns") .long("enable-partial-columns") .help("Enable partial messages for data columns. This can reduce the amount of \ - data sent over the network.") + data sent over the network. Enabled by default on Hoodi and Sepolia; use \ + --disable-partial-columns to opt out.") .action(ArgAction::SetTrue) .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("disable-partial-columns") + .long("disable-partial-columns") + .help("Disable partial messages for data columns. Use this on Hoodi or Sepolia \ + to opt out of the default-enabled behavior.") + .action(ArgAction::SetTrue) + .conflicts_with("enable-partial-columns") + .help_heading(FLAG_HEADER) + .display_order(0) + ) /* * Monitoring metrics */ @@ -1320,8 +1331,7 @@ pub fn cli_app() -> Command { .long("proposer-reorg-threshold") .action(ArgAction::Set) .value_name("PERCENT") - .help("Percentage of head vote weight below which to attempt a proposer reorg. \ - Default: 20%") + .help("DEPRECATED. This flag has no effect.") .conflicts_with("disable-proposer-reorgs") .display_order(0) ) @@ -1329,8 +1339,7 @@ pub fn cli_app() -> Command { Arg::new("proposer-reorg-parent-threshold") .long("proposer-reorg-parent-threshold") .value_name("PERCENT") - .help("Percentage of parent vote weight above which to attempt a proposer reorg. \ - Default: 160%") + .help("DEPRECATED. This flag has no effect.") .conflicts_with("disable-proposer-reorgs") .action(ArgAction::Set) .display_order(0) @@ -1340,8 +1349,7 @@ pub fn cli_app() -> Command { .long("proposer-reorg-epochs-since-finalization") .action(ArgAction::Set) .value_name("EPOCHS") - .help("Maximum number of epochs since finalization at which proposer reorgs are \ - allowed. Default: 2") + .help("DEPRECATED. This flag has no effect.") .conflicts_with("disable-proposer-reorgs") .display_order(0) ) @@ -1350,10 +1358,7 @@ pub fn cli_app() -> Command { .long("proposer-reorg-cutoff") .value_name("MILLISECONDS") .action(ArgAction::Set) - .help("Maximum delay after the start of the slot at which to propose a reorging \ - block. Lower values can prevent failed reorgs by ensuring the block has \ - ample time to propagate and be processed by the network. The default is \ - 1/12th of a slot (1 second on mainnet)") + .help("DEPRECATED. This flag has no effect.") .conflicts_with("disable-proposer-reorgs") .display_order(0) ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f10f9e3b45..045b432dc9 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1,8 +1,6 @@ use account_utils::{STDIN_INPUTS_FLAG, read_input_from_user}; use beacon_chain::chain_config::{ - DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR, DEFAULT_RE_ORG_HEAD_THRESHOLD, - DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_RE_ORG_PARENT_THRESHOLD, - DisallowedReOrgOffsets, INVALID_HOLESKY_BLOCK_ROOT, ReOrgThreshold, + DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR, DisallowedReOrgOffsets, INVALID_HOLESKY_BLOCK_ROOT, }; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::graffiti_calculator::GraffitiOrigin; @@ -110,7 +108,16 @@ pub fn get_config( set_network_config(&mut client_config.network, cli_args, &data_dir_ref)?; - if parse_flag(cli_args, "enable-partial-columns") { + let default_partial_columns_enabled = spec + .config_name + .as_ref() + .is_some_and(|name| matches!(name.as_str(), "hoodi" | "sepolia")); + let user_disable_partial_columns = parse_flag(cli_args, "disable-partial-columns"); + let user_enable_partial_columns = parse_flag(cli_args, "enable-partial-columns"); + let enable_partial_columns = !user_disable_partial_columns + && (user_enable_partial_columns || default_partial_columns_enabled); + + if enable_partial_columns { // Partial messages assume that each subnet maps to exactly one column. // Check this here to avoid weird issues on networks where this is not the case. if spec.data_column_sidecar_subnet_count == E::number_of_columns() as u64 { @@ -744,41 +751,39 @@ pub fn get_config( .individual_tracking_threshold = count; } - if cli_args.get_flag("disable-proposer-reorgs") { - client_config.chain.re_org_head_threshold = None; - client_config.chain.re_org_parent_threshold = None; - } else { - client_config.chain.re_org_head_threshold = Some( - clap_utils::parse_optional(cli_args, "proposer-reorg-threshold")? - .map(ReOrgThreshold) - .unwrap_or(DEFAULT_RE_ORG_HEAD_THRESHOLD), - ); - client_config.chain.re_org_max_epochs_since_finalization = - clap_utils::parse_optional(cli_args, "proposer-reorg-epochs-since-finalization")? - .unwrap_or(DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION); - client_config.chain.re_org_cutoff_millis = - clap_utils::parse_optional(cli_args, "proposer-reorg-cutoff")?; + client_config.chain.disable_proposer_reorg = cli_args.get_flag("disable-proposer-reorgs"); - client_config.chain.re_org_parent_threshold = Some( - clap_utils::parse_optional(cli_args, "proposer-reorg-parent-threshold")? - .map(ReOrgThreshold) - .unwrap_or(DEFAULT_RE_ORG_PARENT_THRESHOLD), - ); + if clap_utils::parse_optional::(cli_args, "proposer-reorg-threshold")?.is_some() { + warn!("The proposer-reorg-threshold flag is deprecated"); + } - if let Some(disallowed_offsets_str) = - clap_utils::parse_optional::(cli_args, "proposer-reorg-disallowed-offsets")? - { - let disallowed_offsets = disallowed_offsets_str - .split(',') - .map(|s| { - s.parse() - .map_err(|e| format!("invalid disallowed-offsets: {e:?}")) - }) - .collect::, _>>()?; - client_config.chain.re_org_disallowed_offsets = - DisallowedReOrgOffsets::new::(disallowed_offsets) - .map_err(|e| format!("invalid disallowed-offsets: {e:?}"))?; - } + if clap_utils::parse_optional::(cli_args, "proposer-reorg-epochs-since-finalization")? + .is_some() + { + warn!("The proposer-reorg-epochs-since-finalization flag is deprecated"); + } + + if clap_utils::parse_optional::(cli_args, "proposer-reorg-cutoff")?.is_some() { + warn!("The proposer-reorg-cutoff flag is deprecated"); + } + + if clap_utils::parse_optional::(cli_args, "proposer-reorg-parent-threshold")?.is_some() { + warn!("The proposer-reorg-parent-threshold flag is deprecated"); + } + + if let Some(disallowed_offsets_str) = + clap_utils::parse_optional::(cli_args, "proposer-reorg-disallowed-offsets")? + { + let disallowed_offsets = disallowed_offsets_str + .split(',') + .map(|s| { + s.parse() + .map_err(|e| format!("invalid disallowed-offsets: {e:?}")) + }) + .collect::, _>>()?; + client_config.chain.re_org_disallowed_offsets = + DisallowedReOrgOffsets::new::(disallowed_offsets) + .map_err(|e| format!("invalid disallowed-offsets: {e:?}"))?; } client_config.chain.prepare_payload_lookahead = diff --git a/book/src/advanced_re-orgs.md b/book/src/advanced_re-orgs.md index 3a31778786..71751f354f 100644 --- a/book/src/advanced_re-orgs.md +++ b/book/src/advanced_re-orgs.md @@ -14,14 +14,6 @@ attestations and transactions that can be included. There are three flags which control the re-orging behaviour: * `--disable-proposer-reorgs`: turn re-orging off (it's on by default). -* `--proposer-reorg-threshold N`: attempt to orphan blocks with less than N% of the committee vote. If this parameter isn't set then N defaults to 20% when the feature is enabled. -* `--proposer-reorg-epochs-since-finalization N`: only attempt to re-org late blocks when the number of epochs since finalization is less than or equal to N. The default is 2 epochs, - meaning re-orgs will only be attempted when the chain is finalizing optimally. -* `--proposer-reorg-cutoff T`: only attempt to re-org late blocks when the proposal is being made - before T milliseconds into the slot. Delays between the validator client and the beacon node can - cause some blocks to be requested later than the start of the slot, which makes them more likely - to fail. The default cutoff is 1000ms on mainnet, which gives blocks 3000ms to be signed and - propagated before the attestation deadline at 4000ms. * `--proposer-reorg-disallowed-offsets N1,N2,N3...`: Prohibit Lighthouse from attempting to reorg at specific offsets in each epoch. A disallowed offset `N` prevents reorging blocks from being proposed at any `slot` such that `slot % SLOTS_PER_EPOCH == N`. The value to this flag is a diff --git a/book/src/help_bn.md b/book/src/help_bn.md index b580bcae52..30163f1f0c 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -306,10 +306,7 @@ Options: values are useful for ensuring the EL is given ample notice. Default: 1/3 of a slot. --proposer-reorg-cutoff - Maximum delay after the start of the slot at which to propose a - reorging block. Lower values can prevent failed reorgs by ensuring the - block has ample time to propagate and be processed by the network. The - default is 1/12th of a slot (1 second on mainnet) + DEPRECATED. This flag has no effect. --proposer-reorg-disallowed-offsets Comma-separated list of integer offsets which can be used to avoid proposing reorging blocks at certain slots. An offset of N means that @@ -318,14 +315,11 @@ Options: avoided. Any offsets supplied with this flag will impose additional restrictions. --proposer-reorg-epochs-since-finalization - Maximum number of epochs since finalization at which proposer reorgs - are allowed. Default: 2 + DEPRECATED. This flag has no effect. --proposer-reorg-parent-threshold - Percentage of parent vote weight above which to attempt a proposer - reorg. Default: 160% + DEPRECATED. This flag has no effect. --proposer-reorg-threshold - Percentage of head vote weight below which to attempt a proposer - reorg. Default: 20% + DEPRECATED. This flag has no effect. --prune-blobs Prune blobs from Lighthouse's database when they are older than the data data availability boundary relative to the current epoch. @@ -482,6 +476,9 @@ Flags: --disable-packet-filter Disables the discovery packet filter. Useful for testing in smaller networks + --disable-partial-columns + Disable partial messages for data columns. Use this on Hoodi or + Sepolia to opt out of the default-enabled behavior. --disable-proposer-reorgs Do not attempt to reorg late blocks from other validators when proposing. @@ -499,7 +496,8 @@ Flags: --listen-address and the UDP port will be --discovery-port. --enable-partial-columns Enable partial messages for data columns. This can reduce the amount - of data sent over the network. + of data sent over the network. Enabled by default on Hoodi and + Sepolia; use --disable-partial-columns to opt out. --enable-private-discovery Lighthouse by default does not discover private IP addresses. Set this flag to enable connection attempts to local addresses. diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index a33fccaa82..8c2289e8c3 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -701,11 +701,9 @@ impl ProtoArray { justified_balances: &JustifiedBalances, spec: &ChainSpec, ) -> bool { - let reorg_threshold = calculate_committee_fraction::( - justified_balances, - spec.reorg_head_weight_threshold.unwrap_or(20), - ) - .unwrap_or(0); + let reorg_threshold = + calculate_committee_fraction::(justified_balances, spec.reorg_head_weight_threshold) + .unwrap_or(0); let head_weight = head_node .attestation_score(PayloadStatus::Pending) diff --git a/consensus/state_processing/src/per_block_processing/signature_sets.rs b/consensus/state_processing/src/per_block_processing/signature_sets.rs index ef7109dd94..f56cb17554 100644 --- a/consensus/state_processing/src/per_block_processing/signature_sets.rs +++ b/consensus/state_processing/src/per_block_processing/signature_sets.rs @@ -363,6 +363,26 @@ pub fn indexed_payload_attestation_signature_set<'a, 'b, E, F>( indexed_payload_attestation: &'b IndexedPayloadAttestation, spec: &'a ChainSpec, ) -> Result> +where + E: EthSpec, + F: Fn(usize) -> Option>, +{ + indexed_payload_attestation_signature_set_from_pubkeys( + get_pubkey, + signature, + indexed_payload_attestation, + state.genesis_validators_root(), + spec, + ) +} + +pub fn indexed_payload_attestation_signature_set_from_pubkeys<'a, 'b, E, F>( + get_pubkey: F, + signature: &'a AggregateSignature, + indexed_payload_attestation: &'b IndexedPayloadAttestation, + genesis_validators_root: Hash256, + spec: &'a ChainSpec, +) -> Result> where E: EthSpec, F: Fn(usize) -> Option>, @@ -379,12 +399,7 @@ where .slot .epoch(E::slots_per_epoch()); let fork = spec.fork_at_epoch(epoch); - let domain = spec.get_domain( - epoch, - Domain::PTCAttester, - &fork, - state.genesis_validators_root(), - ); + let domain = spec.get_domain(epoch, Domain::PTCAttester, &fork, genesis_validators_root); let message = indexed_payload_attestation.data.signing_root(domain); diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 9ee827c7b9..8d991163d2 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -44,6 +44,7 @@ merkle_proof = { workspace = true } metastruct = "0.1.0" milhouse = { workspace = true } parking_lot = { workspace = true } +paste = { workspace = true } rand = { workspace = true } rand_xorshift = { workspace = true } rayon = { workspace = true } @@ -67,7 +68,6 @@ yaml_serde = { workspace = true } [dev-dependencies] beacon_chain = { workspace = true } criterion = { workspace = true } -paste = { workspace = true } state_processing = { workspace = true } tokio = { workspace = true } types = { path = ".", features = ["arbitrary"] } diff --git a/consensus/types/src/block/signed_beacon_block.rs b/consensus/types/src/block/signed_beacon_block.rs index 11ac17dece..1a87a519d0 100644 --- a/consensus/types/src/block/signed_beacon_block.rs +++ b/consensus/types/src/block/signed_beacon_block.rs @@ -433,285 +433,85 @@ impl From>> } // Post-Bellatrix blocks can be "unblinded" by adding the full payload. -// NOTE: It might be nice to come up with a `superstruct` pattern to abstract over this before -// the first fork after Bellatrix. -impl SignedBeaconBlockBellatrix> { - pub fn into_full_block( - self, - execution_payload: ExecutionPayloadBellatrix, - ) -> SignedBeaconBlockBellatrix> { - let SignedBeaconBlockBellatrix { - message: - BeaconBlockBellatrix { - slot, - proposer_index, - parent_root, - state_root, - body: - BeaconBlockBodyBellatrix { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: BlindedPayloadBellatrix { .. }, +macro_rules! impl_into_full_block { + ($fork:ident, [ $($extra_field:ident),* $(,)? ]) => { + paste::paste! { + impl []> { + pub fn into_full_block( + self, + execution_payload: [], + ) -> []> { + let [] { + message: + [] { + slot, + proposer_index, + parent_root, + state_root, + body: + [] { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: [] { .. }, + $($extra_field,)* + }, + }, + signature, + } = self; + [] { + message: [] { + slot, + proposer_index, + parent_root, + state_root, + body: [] { + randao_reveal, + eth1_data, + graffiti, + proposer_slashings, + attester_slashings, + attestations, + deposits, + voluntary_exits, + sync_aggregate, + execution_payload: [] { execution_payload }, + $($extra_field,)* + }, }, - }, - signature, - } = self; - SignedBeaconBlockBellatrix { - message: BeaconBlockBellatrix { - slot, - proposer_index, - parent_root, - state_root, - body: BeaconBlockBodyBellatrix { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: FullPayloadBellatrix { execution_payload }, - }, - }, - signature, + signature, + } + } + } } - } + }; } -impl SignedBeaconBlockCapella> { - pub fn into_full_block( - self, - execution_payload: ExecutionPayloadCapella, - ) -> SignedBeaconBlockCapella> { - let SignedBeaconBlockCapella { - message: - BeaconBlockCapella { - slot, - proposer_index, - parent_root, - state_root, - body: - BeaconBlockBodyCapella { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: BlindedPayloadCapella { .. }, - bls_to_execution_changes, - }, - }, - signature, - } = self; - SignedBeaconBlockCapella { - message: BeaconBlockCapella { - slot, - proposer_index, - parent_root, - state_root, - body: BeaconBlockBodyCapella { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: FullPayloadCapella { execution_payload }, - bls_to_execution_changes, - }, - }, - signature, - } - } -} - -impl SignedBeaconBlockDeneb> { - pub fn into_full_block( - self, - execution_payload: ExecutionPayloadDeneb, - ) -> SignedBeaconBlockDeneb> { - let SignedBeaconBlockDeneb { - message: - BeaconBlockDeneb { - slot, - proposer_index, - parent_root, - state_root, - body: - BeaconBlockBodyDeneb { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: BlindedPayloadDeneb { .. }, - bls_to_execution_changes, - blob_kzg_commitments, - }, - }, - signature, - } = self; - SignedBeaconBlockDeneb { - message: BeaconBlockDeneb { - slot, - proposer_index, - parent_root, - state_root, - body: BeaconBlockBodyDeneb { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: FullPayloadDeneb { execution_payload }, - bls_to_execution_changes, - blob_kzg_commitments, - }, - }, - signature, - } - } -} - -impl SignedBeaconBlockElectra> { - pub fn into_full_block( - self, - execution_payload: ExecutionPayloadElectra, - ) -> SignedBeaconBlockElectra> { - let SignedBeaconBlockElectra { - message: - BeaconBlockElectra { - slot, - proposer_index, - parent_root, - state_root, - body: - BeaconBlockBodyElectra { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: BlindedPayloadElectra { .. }, - bls_to_execution_changes, - blob_kzg_commitments, - execution_requests, - }, - }, - signature, - } = self; - SignedBeaconBlockElectra { - message: BeaconBlockElectra { - slot, - proposer_index, - parent_root, - state_root, - body: BeaconBlockBodyElectra { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: FullPayloadElectra { execution_payload }, - bls_to_execution_changes, - blob_kzg_commitments, - execution_requests, - }, - }, - signature, - } - } -} - -impl SignedBeaconBlockFulu> { - pub fn into_full_block( - self, - execution_payload: ExecutionPayloadFulu, - ) -> SignedBeaconBlockFulu> { - let SignedBeaconBlockFulu { - message: - BeaconBlockFulu { - slot, - proposer_index, - parent_root, - state_root, - body: - BeaconBlockBodyFulu { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: BlindedPayloadFulu { .. }, - bls_to_execution_changes, - blob_kzg_commitments, - execution_requests, - }, - }, - signature, - } = self; - SignedBeaconBlockFulu { - message: BeaconBlockFulu { - slot, - proposer_index, - parent_root, - state_root, - body: BeaconBlockBodyFulu { - randao_reveal, - eth1_data, - graffiti, - proposer_slashings, - attester_slashings, - attestations, - deposits, - voluntary_exits, - sync_aggregate, - execution_payload: FullPayloadFulu { execution_payload }, - bls_to_execution_changes, - blob_kzg_commitments, - execution_requests, - }, - }, - signature, - } - } -} +impl_into_full_block!(Bellatrix, []); +impl_into_full_block!(Capella, [bls_to_execution_changes]); +impl_into_full_block!(Deneb, [bls_to_execution_changes, blob_kzg_commitments]); +impl_into_full_block!( + Electra, + [ + bls_to_execution_changes, + blob_kzg_commitments, + execution_requests + ] +); +impl_into_full_block!( + Fulu, + [ + bls_to_execution_changes, + blob_kzg_commitments, + execution_requests + ] +); // We can convert gloas blocks without payloads into blocks "with" payloads. // TODO(EIP-7732) Look into whether we can remove this in the future since no blinded blocks post-gloas diff --git a/consensus/types/src/core/chain_spec.rs b/consensus/types/src/core/chain_spec.rs index c42bb4b5b9..25dcb4ba06 100644 --- a/consensus/types/src/core/chain_spec.rs +++ b/consensus/types/src/core/chain_spec.rs @@ -152,9 +152,9 @@ pub struct ChainSpec { * Fork choice */ pub proposer_score_boost: Option, - pub reorg_head_weight_threshold: Option, - pub reorg_parent_weight_threshold: Option, - pub reorg_max_epochs_since_finalization: Option, + pub reorg_head_weight_threshold: u64, + pub reorg_parent_weight_threshold: u64, + pub reorg_max_epochs_since_finalization: u64, /* * Eth1 @@ -925,7 +925,7 @@ impl ChainSpec { } /// Calculate the duration into a slot for a given slot component - fn compute_slot_component_duration( + pub fn compute_slot_component_duration( &self, component_basis_points: u64, ) -> Result { @@ -1163,9 +1163,9 @@ impl ChainSpec { * Fork choice */ proposer_score_boost: Some(40), - reorg_head_weight_threshold: Some(20), - reorg_parent_weight_threshold: Some(160), - reorg_max_epochs_since_finalization: Some(2), + reorg_head_weight_threshold: 20, + reorg_parent_weight_threshold: 160, + reorg_max_epochs_since_finalization: 2, /* * Eth1 @@ -1588,9 +1588,9 @@ impl ChainSpec { * Fork choice */ proposer_score_boost: Some(40), - reorg_head_weight_threshold: Some(20), - reorg_parent_weight_threshold: Some(160), - reorg_max_epochs_since_finalization: Some(2), + reorg_head_weight_threshold: 20, + reorg_parent_weight_threshold: 160, + reorg_max_epochs_since_finalization: 2, /* * Eth1 @@ -2028,12 +2028,15 @@ pub struct Config { #[serde(skip_serializing_if = "Option::is_none")] proposer_score_boost: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - reorg_head_weight_threshold: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - reorg_parent_weight_threshold: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - reorg_max_epochs_since_finalization: Option>, + #[serde(default = "default_reorg_head_weight_threshold")] + #[serde(with = "serde_utils::quoted_u64")] + reorg_head_weight_threshold: u64, + #[serde(default = "default_reorg_parent_weight_threshold")] + #[serde(with = "serde_utils::quoted_u64")] + reorg_parent_weight_threshold: u64, + #[serde(default = "default_reorg_max_epochs_since_finalization")] + #[serde(with = "serde_utils::quoted_u64")] + reorg_max_epochs_since_finalization: u64, #[serde(with = "serde_utils::quoted_u64")] deposit_chain_id: u64, @@ -2433,6 +2436,18 @@ const fn default_max_per_epoch_activation_churn_limit_gloas() -> u64 { 256_000_000_000 } +const fn default_reorg_head_weight_threshold() -> u64 { + 20 +} + +const fn default_reorg_parent_weight_threshold() -> u64 { + 160 +} + +const fn default_reorg_max_epochs_since_finalization() -> u64 { + 2 +} + fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; RuntimeVariableList::::new( @@ -2626,15 +2641,9 @@ impl Config { max_per_epoch_activation_churn_limit: spec.max_per_epoch_activation_churn_limit, proposer_score_boost: spec.proposer_score_boost.map(|value| MaybeQuoted { value }), - reorg_head_weight_threshold: spec - .reorg_head_weight_threshold - .map(|value| MaybeQuoted { value }), - reorg_parent_weight_threshold: spec - .reorg_parent_weight_threshold - .map(|value| MaybeQuoted { value }), - reorg_max_epochs_since_finalization: spec - .reorg_max_epochs_since_finalization - .map(|value| MaybeQuoted { value }), + reorg_head_weight_threshold: spec.reorg_head_weight_threshold, + reorg_parent_weight_threshold: spec.reorg_parent_weight_threshold, + reorg_max_epochs_since_finalization: spec.reorg_max_epochs_since_finalization, deposit_chain_id: spec.deposit_chain_id, deposit_network_id: spec.deposit_network_id, @@ -2846,10 +2855,9 @@ impl Config { max_per_epoch_activation_churn_limit, churn_limit_quotient, proposer_score_boost: proposer_score_boost.map(|q| q.value), - reorg_head_weight_threshold: reorg_head_weight_threshold.map(|q| q.value), - reorg_parent_weight_threshold: reorg_parent_weight_threshold.map(|q| q.value), - reorg_max_epochs_since_finalization: reorg_max_epochs_since_finalization - .map(|q| q.value), + reorg_head_weight_threshold, + reorg_parent_weight_threshold, + reorg_max_epochs_since_finalization, deposit_chain_id, deposit_network_id, deposit_contract_address, diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 3595cf04e7..09fd6d4afe 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -37,6 +37,8 @@ beacon-node-redb = ["store/redb"] console-subscriber = ["console-subscriber/default"] # Force the use of the system memory allocator rather than jemalloc. sysmalloc = ["malloc_utils/sysmalloc"] +# Enable jemalloc heap profiling support. +jemalloc-profiling = ["malloc_utils/jemalloc-profiling"] [dependencies] account_manager = { "path" = "../account_manager" } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 0c5d9a5933..38d4275a02 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1,8 +1,6 @@ use crate::exec::{CommandLineTestExec, CompletedTest}; use beacon_node::beacon_chain::chain_config::{ - DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DEFAULT_RE_ORG_HEAD_THRESHOLD, - DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_SYNC_TOLERANCE_EPOCHS, - DisallowedReOrgOffsets, + DEFAULT_SYNC_TOLERANCE_EPOCHS, DisallowedReOrgOffsets, }; use beacon_node::beacon_chain::custody_context::NodeCustodyType; use beacon_node::{ @@ -2344,19 +2342,12 @@ fn ensure_panic_on_failed_launch() { fn enable_proposer_re_orgs_default() { CommandLineTest::new() .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.re_org_head_threshold, - Some(DEFAULT_RE_ORG_HEAD_THRESHOLD) - ); - assert_eq!( - config.chain.re_org_max_epochs_since_finalization, - DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, - ); - assert_eq!( - config.chain.re_org_cutoff(Duration::from_secs(12)), - Duration::from_secs(12) / DEFAULT_RE_ORG_CUTOFF_DENOMINATOR - ); + .with_config_and_spec::(|config, spec| { + assert!(!config.chain.disable_proposer_reorg); + assert_eq!(spec.reorg_head_weight_threshold, 20); + assert_eq!(spec.reorg_parent_weight_threshold, 160); + assert_eq!(spec.reorg_max_epochs_since_finalization, 2); + assert_eq!(spec.proposer_reorg_cutoff_bps, 1667); }); } @@ -2365,52 +2356,8 @@ fn disable_proposer_re_orgs() { CommandLineTest::new() .flag("disable-proposer-reorgs", None) .run_with_zero_port() - .with_config(|config| { - assert_eq!(config.chain.re_org_head_threshold, None); - assert_eq!(config.chain.re_org_parent_threshold, None) - }); -} - -#[test] -fn proposer_re_org_parent_threshold() { - CommandLineTest::new() - .flag("proposer-reorg-parent-threshold", Some("90")) - .run_with_zero_port() - .with_config(|config| assert_eq!(config.chain.re_org_parent_threshold.unwrap().0, 90)); -} - -#[test] -fn proposer_re_org_head_threshold() { - CommandLineTest::new() - .flag("proposer-reorg-threshold", Some("90")) - .run_with_zero_port() - .with_config(|config| assert_eq!(config.chain.re_org_head_threshold.unwrap().0, 90)); -} - -#[test] -fn proposer_re_org_max_epochs_since_finalization() { - CommandLineTest::new() - .flag("proposer-reorg-epochs-since-finalization", Some("8")) - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.re_org_max_epochs_since_finalization.as_u64(), - 8 - ) - }); -} - -#[test] -fn proposer_re_org_cutoff() { - CommandLineTest::new() - .flag("proposer-reorg-cutoff", Some("500")) - .run_with_zero_port() - .with_config(|config| { - assert_eq!( - config.chain.re_org_cutoff(Duration::from_secs(12)), - Duration::from_millis(500) - ) - }); + // When --disable-proposer-reorg is used, the field in ChainConfig should become true + .with_config(|config| assert!(config.chain.disable_proposer_reorg)); } #[test] @@ -2874,7 +2821,7 @@ fn partial_columns() { assert!(config.network.enable_partial_columns); assert!(config.chain.enable_partial_columns); }); - // And disabled by default: + // And disabled by default on mainnet: CommandLineTest::new() .run_with_zero_port() .with_config(|config| { @@ -2882,3 +2829,60 @@ fn partial_columns() { assert!(!config.chain.enable_partial_columns); }) } + +#[test] +fn partial_columns_default_hoodi() { + CommandLineTest::new() + .flag("network", Some("hoodi")) + .run_with_zero_port() + .with_config(|config| { + assert!(config.network.enable_partial_columns); + assert!(config.chain.enable_partial_columns); + }); +} + +#[test] +fn partial_columns_default_sepolia() { + CommandLineTest::new() + .flag("network", Some("sepolia")) + .run_with_zero_port() + .with_config(|config| { + assert!(config.network.enable_partial_columns); + assert!(config.chain.enable_partial_columns); + }); +} + +#[test] +fn partial_columns_disable_overrides_hoodi_default() { + CommandLineTest::new() + .flag("network", Some("hoodi")) + .flag("disable-partial-columns", None) + .run_with_zero_port() + .with_config(|config| { + assert!(!config.network.enable_partial_columns); + assert!(!config.chain.enable_partial_columns); + }); +} + +#[test] +fn partial_columns_disable_on_mainnet_no_op() { + CommandLineTest::new() + .flag("disable-partial-columns", None) + .run_with_zero_port() + .with_config(|config| { + assert!(!config.network.enable_partial_columns); + assert!(!config.chain.enable_partial_columns); + }); +} + +#[test] +fn partial_columns_enable_disable_conflict() { + let mut cmd = base_cmd(); + cmd.arg("--enable-partial-columns") + .arg("--disable-partial-columns"); + let output = cmd.output().expect("should run command"); + assert!( + !output.status.success(), + "expected clap to reject --enable-partial-columns and --disable-partial-columns together", + ); +} diff --git a/lighthouse/tests/exec.rs b/lighthouse/tests/exec.rs index a25558bc2f..696cf2f40a 100644 --- a/lighthouse/tests/exec.rs +++ b/lighthouse/tests/exec.rs @@ -144,7 +144,6 @@ impl CompletedTest { func(&self.config, &self.dir); } - #[allow(dead_code)] pub fn with_config_and_spec(self, func: F) { let spec = ChainSpec::from_config::(&self.chain_config).unwrap(); func(&self.config, spec); diff --git a/testing/ef_tests/Cargo.toml b/testing/ef_tests/Cargo.toml index 9d09c3dfe6..ac51e827ad 100644 --- a/testing/ef_tests/Cargo.toml +++ b/testing/ef_tests/Cargo.toml @@ -28,6 +28,7 @@ hex = { workspace = true } kzg = { workspace = true } logging = { workspace = true } milhouse = { workspace = true } +proto_array = { workspace = true } rayon = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 69fce09505..2954ee7eb4 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -4,10 +4,7 @@ use ::fork_choice::{AttestationFromBlock, PayloadVerificationStatus, ProposerHea use beacon_chain::beacon_proposer_cache::compute_proposer_duties_from_head; use beacon_chain::blob_verification::GossipBlobError; use beacon_chain::block_verification_types::LookupBlock; -use beacon_chain::chain_config::{ - DEFAULT_RE_ORG_HEAD_THRESHOLD, DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, - DEFAULT_RE_ORG_PARENT_THRESHOLD, DisallowedReOrgOffsets, -}; +use beacon_chain::chain_config::DisallowedReOrgOffsets; use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::slot_clock::SlotClock; use beacon_chain::{ @@ -23,6 +20,7 @@ use bls::AggregateSignature; use execution_layer::{ PayloadStatusV1, PayloadStatusV1Status, json_structures::JsonPayloadStatusV1Status, }; +use proto_array::ReOrgThreshold; use serde::Deserialize; use ssz_derive::Decode; use ssz_types::VariableList; @@ -36,9 +34,9 @@ use std::time::Duration; use types::{ Attestation, AttestationRef, AttesterSlashing, AttesterSlashingRef, BeaconBlock, BeaconState, BlobSidecar, BlobsList, BlockImportSource, Checkpoint, DataColumnSidecar, - DataColumnSidecarList, DataColumnSubnetId, ExecutionBlockHash, Hash256, IndexedAttestation, - IndexedPayloadAttestation, KzgProof, PayloadAttestationMessage, ProposerPreparationData, - SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, Uint256, + DataColumnSidecarList, DataColumnSubnetId, Epoch, ExecutionBlockHash, Hash256, + IndexedAttestation, IndexedPayloadAttestation, KzgProof, PayloadAttestationMessage, + ProposerPreparationData, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, Uint256, }; // When set to true, cache any states fetched from the db. @@ -1027,10 +1025,10 @@ impl Tester { let proposer_head_result = fc.get_proposer_head( slot, canonical_head, - DEFAULT_RE_ORG_HEAD_THRESHOLD, - DEFAULT_RE_ORG_PARENT_THRESHOLD, + ReOrgThreshold(self.spec.reorg_head_weight_threshold), + ReOrgThreshold(self.spec.reorg_parent_weight_threshold), &DisallowedReOrgOffsets::default(), - DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, + Epoch::new(self.spec.reorg_max_epochs_since_finalization), ); let proposer_head = match proposer_head_result { Ok(head) => head.parent_node.root(),