From a4c4cccf04a2e647ace2a2f9ecdcea91915075ef Mon Sep 17 00:00:00 2001 From: ethDreamer <37123614+ethDreamer@users.noreply.github.com> Date: Wed, 24 Jun 2026 21:53:43 -0500 Subject: [PATCH] Refactor Custody Context Availability Checks (#9515) Co-Authored-By: Mark Mackey --- beacon_node/beacon_chain/src/beacon_chain.rs | 121 +---- .../beacon_chain/src/block_verification.rs | 3 +- .../src/block_verification_types.rs | 13 +- beacon_node/beacon_chain/src/builder.rs | 23 +- .../beacon_chain/src/canonical_head.rs | 3 +- .../beacon_chain/src/custody_context.rs | 500 +++++++++++------- .../src/data_availability_checker.rs | 160 ++---- .../overflow_lru_cache.rs | 43 +- .../src/historical_data_columns.rs | 3 +- beacon_node/beacon_chain/src/invariants.rs | 6 +- .../src/payload_envelope_verification/mod.rs | 88 ++- .../src/pending_payload_cache/mod.rs | 64 +-- .../pending_components.rs | 54 +- beacon_node/beacon_chain/src/test_utils.rs | 98 ++-- .../beacon_chain/tests/block_verification.rs | 147 ++--- beacon_node/beacon_chain/tests/events.rs | 10 +- beacon_node/beacon_chain/tests/store_tests.rs | 29 +- beacon_node/client/src/notifier.rs | 8 +- .../src/beacon/execution_payload_envelopes.rs | 2 +- beacon_node/http_api/src/block_id.rs | 7 +- beacon_node/http_api/src/custody.rs | 12 +- beacon_node/http_api/src/lib.rs | 5 +- beacon_node/http_api/src/publish_blocks.rs | 2 +- beacon_node/http_api/src/validator/mod.rs | 5 +- .../tests/broadcast_validation_tests.rs | 1 + .../http_api/tests/interactive_tests.rs | 9 +- .../gossip_methods.rs | 8 +- .../src/network_beacon_processor/mod.rs | 4 +- .../network_beacon_processor/rpc_methods.rs | 42 +- .../src/network_beacon_processor/tests.rs | 8 +- beacon_node/network/src/service.rs | 5 +- .../network/src/sync/backfill_sync/mod.rs | 5 +- .../sync/block_lookups/single_block_lookup.rs | 11 +- .../src/sync/block_sidecar_coupling.rs | 134 ++--- .../src/sync/custody_backfill_sync/mod.rs | 53 +- .../network/src/sync/network_context.rs | 23 +- .../network/src/sync/range_sync/chain.rs | 3 + beacon_node/network/src/sync/tests/lookups.rs | 53 +- .../execution/signed_execution_payload_bid.rs | 4 + 39 files changed, 939 insertions(+), 830 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d175c54be7..708a07021d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -16,7 +16,7 @@ use crate::block_verification_types::{ }; pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; -use crate::custody_context::CustodyContextSsz; +use crate::custody_context::{CustodyContext, CustodyContextSsz}; use crate::data_availability_checker::{ Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, DataColumnReconstructionResult as DataColumnReconstructionResultV1, @@ -500,6 +500,8 @@ pub struct BeaconChain { pub validator_monitor: RwLock>, /// The slot at which blocks are downloaded back to. pub genesis_backfill_slot: Slot, + /// Contains all the information the node requires to calculate which columns to custody + pub custody_context: Arc>, /// Provides KZG verification and temporary storage for pre-Gloas blocks and blobs. pub data_availability_checker: Arc>, /// Provides KZG verification and temporary storage for post-Gloas payload envelopes. @@ -682,11 +684,7 @@ impl BeaconChain { return Ok(()); } - let custody_context: CustodyContextSsz = self - .data_availability_checker - .custody_context() - .as_ref() - .into(); + let custody_context: CustodyContextSsz = self.custody_context.as_ref().into(); // Pattern match to avoid accidentally missing fields and to ignore deprecated fields. let CustodyContextSsz { @@ -3318,8 +3316,9 @@ impl BeaconChain { ); // Check if we have custody of this column - let sampling_columns = - self.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())); let verified_partial = if sampling_columns.contains(&partial.index) { KzgVerifiedCustodyPartialDataColumn::from_asserted_custody(verified_partial) } else { @@ -3981,7 +3980,7 @@ impl BeaconChain { )?; let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs) + .put_rpc_blobs(block_root, blobs, &self.slot_clock) .map_err(BlockError::from)?; self.process_availability(slot, availability, || Ok(())) @@ -7170,25 +7169,6 @@ impl BeaconChain { }) } - /// The data availability boundary for custodying columns. It will just be the - /// regular data availability boundary unless we are near the Fulu fork epoch. - pub fn column_data_availability_boundary(&self) -> Option { - match self.data_availability_boundary() { - Some(da_boundary_epoch) => { - if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch { - if da_boundary_epoch < fulu_fork_epoch { - Some(fulu_fork_epoch) - } else { - Some(da_boundary_epoch) - } - } else { - None // Fulu hasn't been enabled - } - } - None => None, // Deneb hasn't been enabled - } - } - /// Safely update data column custody info by ensuring that: /// - cgc values at the updated epoch and the earliest custodied column epoch are equal /// - we are only decrementing the earliest custodied data column epoch by one epoch @@ -7206,14 +7186,12 @@ impl BeaconChain { } let cgc_at_effective_epoch = self - .data_availability_checker - .custody_context() - .custody_group_count_at_epoch(effective_epoch, &self.spec); + .custody_context + .custody_group_count_at_epoch(effective_epoch); let cgc_at_earliest_data_colum_epoch = self - .data_availability_checker - .custody_context() - .custody_group_count_at_epoch(earliest_data_column_epoch, &self.spec); + .custody_context + .custody_group_count_at_epoch(earliest_data_column_epoch); let can_update_data_column_custody_info = cgc_at_effective_epoch == cgc_at_earliest_data_colum_epoch @@ -7240,16 +7218,16 @@ impl BeaconChain { /// Compare columns custodied for `epoch` versus columns custodied for the head of the chain /// and return any column indices that are missing. pub fn get_missing_columns_for_epoch(&self, epoch: Epoch) -> HashSet { - let custody_context = self.data_availability_checker.custody_context(); - - let columns_required = custody_context - .custody_columns_for_epoch(None, &self.spec) + let columns_required = self + .custody_context + .custody_columns_for_epoch(None) .iter() .cloned() .collect::>(); - let current_columns_at_epoch = custody_context - .custody_columns_for_epoch(Some(epoch), &self.spec) + let current_columns_at_epoch = self + .custody_context + .custody_columns_for_epoch(Some(epoch)) .iter() .cloned() .collect::>(); @@ -7260,24 +7238,6 @@ impl BeaconChain { .collect::>() } - /// The da boundary for custodying columns. It will just be the DA boundary unless we are near the Fulu fork epoch. - pub fn get_column_da_boundary(&self) -> Option { - match self.data_availability_boundary() { - Some(da_boundary_epoch) => { - if let Some(fulu_fork_epoch) = self.spec.fulu_fork_epoch { - if da_boundary_epoch < fulu_fork_epoch { - Some(fulu_fork_epoch) - } else { - Some(da_boundary_epoch) - } - } else { - None - } - } - None => None, // If no DA boundary set, dont try to custody backfill - } - } - /// This method serves to get a sense of the current chain health. It is used in block proposal /// to determine whether we should outsource payload production duties. /// @@ -7480,30 +7440,6 @@ impl BeaconChain { gossip_attested || block_attested || aggregated || produced_block } - /// The epoch at which we require a data availability check in block processing. - /// `None` if the `Deneb` fork is disabled. - pub fn data_availability_boundary(&self) -> Option { - self.data_availability_checker.data_availability_boundary() - } - - /// Returns true if epoch is within the data availability boundary - pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { - self.data_availability_checker - .da_check_required_for_epoch(epoch) - } - - /// Returns true if we should fetch blobs for this block - pub fn should_fetch_blobs(&self, block_epoch: Epoch) -> bool { - self.da_check_required_for_epoch(block_epoch) - && !self.spec.is_peer_das_enabled_for_epoch(block_epoch) - } - - /// Returns true if we should fetch custody columns for this block - pub fn should_fetch_custody_columns(&self, block_epoch: Epoch) -> bool { - self.da_check_required_for_epoch(block_epoch) - && self.spec.is_peer_das_enabled_for_epoch(block_epoch) - } - /// Gets the `LightClientBootstrap` object for a requested block root. /// /// Returns `None` when the state or block is not found in the database. @@ -7542,7 +7478,7 @@ impl BeaconChain { Some(StoreOp::PutBlobs(block_root, blobs)) } AvailableBlockData::DataColumns(mut data_columns) => { - let columns_to_custody = self.custody_columns_for_epoch(Some( + let columns_to_custody = self.custody_context.custody_columns_for_epoch(Some( block_slot.epoch(T::EthSpec::slots_per_epoch()), )); // Supernodes need to persist all sampled custody columns @@ -7588,25 +7524,6 @@ impl BeaconChain { roots.reverse(); roots } - - /// Returns a list of column indices that should be sampled for a given epoch. - /// Used for data availability sampling in PeerDAS. - pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] { - self.data_availability_checker - .custody_context() - .sampling_columns_for_epoch(epoch, &self.spec) - } - - /// Returns a list of column indices that the node is expected to custody for a given epoch. - /// i.e. the node must have validated and persisted the column samples and should be able to - /// serve them to peers. - /// - /// If epoch is `None`, this function computes the custody columns at head. - pub fn custody_columns_for_epoch(&self, epoch_opt: Option) -> &[ColumnIndex] { - self.data_availability_checker - .custody_context() - .custody_columns_for_epoch(epoch_opt, &self.spec) - } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 6b1ac3b033..0de9a5cdb1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1246,8 +1246,7 @@ impl SignatureVerifiedBlock { AvailableBlock::new( block, AvailableBlockData::NoData, - &chain.data_availability_checker, - chain.spec.clone(), + &chain.custody_context, ) .map_err(BlockError::AvailabilityCheck)?, ) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 18e95f58f3..51feb12a69 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -1,18 +1,18 @@ -use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +use crate::data_availability_checker::AvailabilityCheckError; pub use crate::data_availability_checker::{ AvailableBlock, AvailableBlockData, MaybeAvailableBlock, }; use crate::payload_envelope_verification::AvailableEnvelope; use crate::payload_envelope_verification::gossip_verified_envelope::verify_envelope_consistency; -use crate::{BeaconChainTypes, PayloadVerificationOutcome}; +use crate::{BeaconChainTypes, CustodyContext, PayloadVerificationOutcome}; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, - SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, Epoch, EthSpec, Hash256, SignedBeaconBlock, + SignedBeaconBlockHeader, Slot, }; /// A wrapper around a `SignedBeaconBlock`. This varaint is constructed @@ -118,8 +118,7 @@ impl RangeSyncBlock { pub fn new( block: Arc>, block_data: AvailableBlockData, - da_checker: &DataAvailabilityChecker, - spec: Arc, + custody_context: &CustodyContext, ) -> Result where T: BeaconChainTypes, @@ -127,7 +126,7 @@ impl RangeSyncBlock { if block.fork_name_unchecked().gloas_enabled() { return Err(AvailabilityCheckError::InvalidVariant); } - let available_block = AvailableBlock::new(block, block_data, da_checker, spec)?; + let available_block = AvailableBlock::new(block, block_data, custody_context)?; Ok(Self::Base(available_block)) } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 91a9dcc7c8..3b137f6faa 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -951,14 +951,18 @@ where self.node_custody_type, head_epoch, ordered_custody_column_indices, - &self.spec, + slot_clock.clone(), + complete_blob_backfill, + self.spec.clone(), ) } else { ( CustodyContext::new( self.node_custody_type, ordered_custody_column_indices, - &self.spec, + slot_clock.clone(), + complete_blob_backfill, + self.spec.clone(), ), None, ) @@ -1036,10 +1040,9 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, + custody_context: custody_context.clone(), data_availability_checker: Arc::new( DataAvailabilityChecker::new( - complete_blob_backfill, - slot_clock.clone(), self.kzg.clone(), custody_context.clone(), self.spec.clone(), @@ -1049,12 +1052,8 @@ where .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), pending_payload_cache: Arc::new( - PendingPayloadCache::new( - self.kzg.clone(), - custody_context.clone(), - self.spec.clone(), - ) - .map_err(|e| format!("Error initializing PendingPayloadCache: {:?}", e))?, + PendingPayloadCache::new(self.kzg.clone(), custody_context, self.spec.clone()) + .map_err(|e| format!("Error initializing PendingPayloadCache: {:?}", e))?, ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), @@ -1125,7 +1124,9 @@ where } // Prune blobs older than the blob data availability boundary in the background. - if let Some(data_availability_boundary) = beacon_chain.data_availability_boundary() { + if let Some(data_availability_boundary) = + beacon_chain.custody_context.data_availability_boundary() + { beacon_chain .store_migrator .process_prune_blobs(data_availability_boundary); diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 1eab7ccf7a..5d7a7f1527 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -1065,7 +1065,8 @@ impl BeaconChain { )?; // Prune blobs in the background. - if let Some(data_availability_boundary) = self.data_availability_boundary() { + if let Some(data_availability_boundary) = self.custody_context.data_availability_boundary() + { self.store_migrator .process_prune_blobs(data_availability_boundary); } diff --git a/beacon_node/beacon_chain/src/custody_context.rs b/beacon_node/beacon_chain/src/custody_context.rs index 72f62db1b4..4a1dfdbe71 100644 --- a/beacon_node/beacon_chain/src/custody_context.rs +++ b/beacon_node/beacon_chain/src/custody_context.rs @@ -1,13 +1,18 @@ +use crate::BeaconChainTypes; +use educe::Educe; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; +use slot_clock::SlotClock; use ssz_derive::{Decode, Encode}; -use std::marker::PhantomData; use std::{ collections::{BTreeMap, HashMap}, + sync::Arc, sync::atomic::{AtomicU64, Ordering}, }; use tracing::{debug, warn}; -use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; +use types::{ + ChainSpec, ColumnIndex, Epoch, EthSpec, SignedBeaconBlock, SignedExecutionPayloadBid, Slot, +}; /// A delay before making the CGC change effective to the data availability checker. pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; @@ -236,8 +241,9 @@ impl NodeCustodyType { /// Contains all the information the node requires to calculate the /// number of columns to be custodied when checking for DA. -#[derive(Debug)] -pub struct CustodyContext { +#[derive(Educe)] +#[educe(Debug(bound(T: BeaconChainTypes)))] +pub struct CustodyContext { /// The Number of custody groups required based on the number of validators /// that is attached to this node. /// @@ -250,10 +256,15 @@ pub struct CustodyContext { /// Stores an immutable, ordered list of all data column indices as determined by the node's NodeID /// on startup. This used to determine the node's custody columns. ordered_custody_column_indices: Vec, - _phantom_data: PhantomData, + #[educe(Debug(ignore))] + slot_clock: T::SlotClock, + /// backfill blobs and data columns beyond the data availability window. + complete_blob_backfill: bool, + #[educe(Debug(ignore))] + spec: Arc, } -impl CustodyContext { +impl CustodyContext { /// Create a new custody default custody context object when no persisted object /// exists. /// @@ -261,9 +272,11 @@ impl CustodyContext { pub fn new( node_custody_type: NodeCustodyType, ordered_custody_column_indices: Vec, - spec: &ChainSpec, + slot_clock: T::SlotClock, + complete_blob_backfill: bool, + spec: Arc, ) -> Self { - let cgc_override = node_custody_type.get_custody_count_override(spec); + let cgc_override = node_custody_type.get_custody_count_override(&spec); // If there's no override, we initialise `validator_custody_count` to 0. This has been the // existing behaviour and we maintain this for now to avoid a semantic schema change until // a later release. @@ -271,7 +284,9 @@ impl CustodyContext { validator_custody_count: AtomicU64::new(cgc_override.unwrap_or(0)), validator_registrations: RwLock::new(ValidatorRegistrations::new(cgc_override)), ordered_custody_column_indices, - _phantom_data: PhantomData, + slot_clock, + complete_blob_backfill, + spec, } } @@ -294,7 +309,9 @@ impl CustodyContext { node_custody_type: NodeCustodyType, head_epoch: Epoch, ordered_custody_column_indices: Vec, - spec: &ChainSpec, + slot_clock: T::SlotClock, + complete_blob_backfill: bool, + spec: Arc, ) -> (Self, Option) { let CustodyContextSsz { mut validator_custody_at_head, @@ -304,7 +321,7 @@ impl CustodyContext { let mut custody_count_changed = None; - if let Some(cgc_from_cli) = node_custody_type.get_custody_count_override(spec) { + if let Some(cgc_from_cli) = node_custody_type.get_custody_count_override(&spec) { debug!( ?node_custody_type, persisted_custody_count = validator_custody_at_head, @@ -360,7 +377,9 @@ impl CustodyContext { .collect(), }), ordered_custody_column_indices, - _phantom_data: PhantomData, + slot_clock, + complete_blob_backfill, + spec, }; (custody_context, custody_count_changed) @@ -376,13 +395,15 @@ impl CustodyContext { &self, validators_and_balance: ValidatorsAndBalances, current_slot: Slot, - spec: &ChainSpec, ) -> Option { let Some((effective_epoch, new_validator_custody)) = self .validator_registrations .write() - .register_validators::(validators_and_balance, current_slot, spec) - else { + .register_validators::( + validators_and_balance, + current_slot, + &self.spec, + ) else { return None; }; @@ -397,7 +418,7 @@ impl CustodyContext { self.validator_custody_count .store(new_validator_custody, Ordering::Relaxed); - let updated_cgc = self.custody_group_count_at_head(spec); + let updated_cgc = self.custody_group_count_at_head(); // Send the message to network only if there are more columns subnets to subscribe to if updated_cgc > current_cgc { debug!( @@ -407,7 +428,7 @@ impl CustodyContext { return Some(CustodyCountChanged { new_custody_group_count: updated_cgc, old_custody_group_count: current_cgc, - sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec), + sampling_count: self.num_of_custody_groups_to_sample(effective_epoch), effective_epoch, }); } @@ -419,14 +440,14 @@ impl CustodyContext { /// This function is used to determine the custody group count at head ONLY. /// Do NOT use this directly for data availability check, use `self.sampling_size` instead as /// CGC can change over epochs. - pub fn custody_group_count_at_head(&self, spec: &ChainSpec) -> u64 { + pub fn custody_group_count_at_head(&self) -> u64 { let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); // If there are no validators, return the minimum custody_requirement if validator_custody_count_at_head > 0 { validator_custody_count_at_head } else { - spec.custody_requirement + self.spec.custody_requirement } } @@ -436,33 +457,35 @@ impl CustodyContext { /// minimum sampling size which may exceed the custody group count (CGC). /// /// See also: [`Self::num_of_custody_groups_to_sample`]. - pub fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { + pub fn custody_group_count_at_epoch(&self, epoch: Epoch) -> u64 { self.validator_registrations .read() .custody_requirement_at_epoch(epoch) - .unwrap_or(spec.custody_requirement) + .unwrap_or(self.spec.custody_requirement) } /// Returns the count of custody groups this node must _sample_ for a block at `epoch` to import. - pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { - let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); - spec.sampling_size_custody_groups(custody_group_count) + pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch) -> u64 { + let custody_group_count = self.custody_group_count_at_epoch(epoch); + self.spec + .sampling_size_custody_groups(custody_group_count) .expect("should compute node sampling size from valid chain spec") } /// Returns the count of columns this node must _sample_ for a block at `epoch` to import. - pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize { - let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); - spec.sampling_size_columns::(custody_group_count) + pub fn num_of_data_columns_to_sample(&self, epoch: Epoch) -> usize { + let custody_group_count = self.custody_group_count_at_epoch(epoch); + self.spec + .sampling_size_columns::(custody_group_count) .expect("should compute node sampling size from valid chain spec") } /// Returns whether the node should attempt reconstruction at a given epoch. - pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool { - let min_columns_for_reconstruction = E::number_of_columns() / 2; + pub fn should_attempt_reconstruction(&self, epoch: Epoch) -> bool { + let min_columns_for_reconstruction = T::EthSpec::number_of_columns() / 2; // performing reconstruction is not necessary if sampling column count is exactly 50%, // because the node doesn't need the remaining columns. - self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction + self.num_of_data_columns_to_sample(epoch) > min_columns_for_reconstruction } /// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch. @@ -473,8 +496,8 @@ impl CustodyContext { /// /// # Returns /// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration - pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] { - let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec); + pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] { + let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch); &self.ordered_custody_column_indices[..num_of_columns_to_sample] } @@ -491,19 +514,15 @@ impl CustodyContext { /// /// # Returns /// A slice of ordered custody column indices for this epoch based on the node's custody configuration - pub fn custody_columns_for_epoch( - &self, - epoch_opt: Option, - spec: &ChainSpec, - ) -> &[ColumnIndex] { + pub fn custody_columns_for_epoch(&self, epoch_opt: Option) -> &[ColumnIndex] { let custody_group_count = if let Some(epoch) = epoch_opt { - self.custody_group_count_at_epoch(epoch, spec) as usize + self.custody_group_count_at_epoch(epoch) as usize } else { - self.custody_group_count_at_head(spec) as usize + self.custody_group_count_at_head() as usize }; // This is an unnecessary conversion for spec compliance, basically just multiplying by 1. - let columns_per_custody_group = spec.data_columns_per_group::() as usize; + let columns_per_custody_group = self.spec.data_columns_per_group::() as usize; let custody_column_count = columns_per_custody_group * custody_group_count; &self.ordered_custody_column_indices[..custody_column_count] @@ -528,6 +547,61 @@ impl CustodyContext { .write() .reset_validator_custody_requirements(effective_epoch); } + + /// The epoch at which we require a data availability check in block processing. + /// `None` if the `Deneb` fork is disabled. + pub fn data_availability_boundary(&self) -> Option { + let fork_epoch = self.spec.deneb_fork_epoch?; + + if self.complete_blob_backfill { + Some(fork_epoch) + } else { + let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); + self.spec + .min_epoch_data_availability_boundary(current_epoch) + } + } + + /// Returns true if the given epoch lies within the da boundary and false otherwise. + pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { + self.data_availability_boundary() + .is_some_and(|da_epoch| block_epoch >= da_epoch) + } + + /// If the epoch is from prior to the data availability boundary, no blobs are required. + pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { + self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch) + } + + /// If the epoch is from prior to the data availability boundary, no data columns are required. + pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { + self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) + } + + /// See `Self::blobs_required_for_epoch` + pub fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) + } + + /// See `Self::data_columns_required_for_epoch` + pub fn data_columns_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch()) + } + + pub fn data_columns_required_for_bid( + &self, + bid: &SignedExecutionPayloadBid, + ) -> bool { + bid.num_blobs_expected() > 0 && self.data_columns_required_for_epoch(bid.epoch()) + } + + /// The data availability boundary for custodying columns. It will just be the + /// regular data availability boundary unless we are near the Fulu fork epoch. + pub fn column_data_availability_boundary(&self) -> Option { + let da_boundary = self.data_availability_boundary()?; + let fulu_epoch = self.spec.fulu_fork_epoch?; + Some(da_boundary.max(fulu_epoch)) + } } /// Indicates that the custody group count (CGC) has increased. @@ -553,8 +627,8 @@ pub struct CustodyContextSsz { pub epoch_validator_custody_requirements: Vec<(Epoch, u64)>, } -impl From<&CustodyContext> for CustodyContextSsz { - fn from(context: &CustodyContext) -> Self { +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { CustodyContextSsz { validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), // This field is deprecated and has no effect @@ -573,16 +647,27 @@ impl From<&CustodyContext> for CustodyContextSsz { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::generate_data_column_indices_rand_order; + use crate::test_utils::{EphemeralHarnessType, generate_data_column_indices_rand_order}; + use slot_clock::{SlotClock, TestingSlotClock}; + use std::time::Duration; use types::MainnetEthSpec; type E = MainnetEthSpec; + type T = EphemeralHarnessType; + + fn testing_slot_clock(spec: &ChainSpec) -> TestingSlotClock { + TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + spec.get_slot_duration(), + ) + } fn setup_custody_context( - spec: &ChainSpec, + spec: Arc, head_epoch: Epoch, epoch_and_cgc_tuples: Vec<(Epoch, u64)>, - ) -> CustodyContext { + ) -> CustodyContext { let cgc_at_head = epoch_and_cgc_tuples.last().unwrap().1; let ssz_context = CustodyContextSsz { validator_custody_at_head: cgc_at_head, @@ -590,11 +675,14 @@ mod tests { epoch_validator_custody_requirements: epoch_and_cgc_tuples, }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let complete_blob_backfill = false; + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, head_epoch, generate_data_column_indices_rand_order::(), + testing_slot_clock(&spec), + complete_blob_backfill, spec, ); @@ -602,7 +690,7 @@ mod tests { } fn complete_backfill_for_epochs( - custody_context: &CustodyContext, + custody_context: &CustodyContext, start_epoch: Epoch, end_epoch: Epoch, expected_cgc: u64, @@ -623,26 +711,29 @@ mod tests { target_node_custody_type: NodeCustodyType, expected_new_cgc: u64, head_epoch: Epoch, - spec: &ChainSpec, + spec: Arc, ) { let ssz_context = CustodyContextSsz { validator_custody_at_head: persisted_cgc, persisted_is_supernode: false, epoch_validator_custody_requirements: vec![(Epoch::new(0), persisted_cgc)], }; + let complete_blob_backfill = false; let (custody_context, custody_count_changed) = - CustodyContext::::new_from_persisted_custody_context( + CustodyContext::::new_from_persisted_custody_context( ssz_context, target_node_custody_type, head_epoch, generate_data_column_indices_rand_order::(), - spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); // Verify CGC increased assert_eq!( - custody_context.custody_group_count_at_head(spec), + custody_context.custody_group_count_at_head(), expected_new_cgc, "cgc should increase from {} to {}", persisted_cgc, @@ -675,13 +766,13 @@ mod tests { // Verify custody_group_count_at_epoch returns correct values assert_eq!( - custody_context.custody_group_count_at_epoch(head_epoch, spec), + custody_context.custody_group_count_at_epoch(head_epoch), persisted_cgc, "current epoch should still use old cgc ({})", persisted_cgc ); assert_eq!( - custody_context.custody_group_count_at_epoch(head_epoch + 1, spec), + custody_context.custody_group_count_at_epoch(head_epoch + 1), expected_new_cgc, "next epoch should use new cgc ({})", expected_new_cgc @@ -694,26 +785,29 @@ mod tests { persisted_cgc: u64, target_node_custody_type: NodeCustodyType, head_epoch: Epoch, - spec: &ChainSpec, + spec: Arc, ) { let ssz_context = CustodyContextSsz { validator_custody_at_head: persisted_cgc, persisted_is_supernode: false, epoch_validator_custody_requirements: vec![(Epoch::new(0), persisted_cgc)], }; + let complete_blob_backfill = false; let (custody_context, custody_count_changed) = - CustodyContext::::new_from_persisted_custody_context( + CustodyContext::::new_from_persisted_custody_context( ssz_context, target_node_custody_type, head_epoch, generate_data_column_indices_rand_order::(), - spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); // Verify CGC stays at persisted value (no reduction) assert_eq!( - custody_context.custody_group_count_at_head(spec), + custody_context.custody_group_count_at_head(), persisted_cgc, "cgc should remain at {} (reduction not supported)", persisted_cgc @@ -728,66 +822,78 @@ mod tests { #[test] fn no_validators_supernode_default() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Supernode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), spec.number_of_custody_groups ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(0)), spec.number_of_custody_groups ); } #[test] fn no_validators_semi_supernode_default() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::SemiSupernode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), spec.number_of_custody_groups / 2 ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(0)), spec.number_of_custody_groups / 2 ); } #[test] fn no_validators_fullnode_default() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), spec.custody_requirement, "head custody count should be minimum spec custody requirement" ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(0)), spec.samples_per_slot ); } #[test] fn register_single_validator_should_update_cgc() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; @@ -802,20 +908,22 @@ mod tests { (vec![(0, 10 * bal_per_additional_group)], Some(10)), ]; - register_validators_and_assert_cgc::( + register_validators_and_assert_cgc::( &custody_context, validators_and_expected_cgc_change, - &spec, ); } #[test] fn register_multiple_validators_should_update_cgc() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; @@ -843,20 +951,19 @@ mod tests { ), ]; - register_validators_and_assert_cgc::( - &custody_context, - validators_and_expected_cgc, - &spec, - ); + register_validators_and_assert_cgc::(&custody_context, validators_and_expected_cgc); } #[test] fn register_validators_should_not_update_cgc_for_supernode() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Supernode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let bal_per_additional_group = spec.balance_per_additional_custody_group; @@ -880,30 +987,28 @@ mod tests { ), ]; - register_validators_and_assert_cgc::( - &custody_context, - validators_and_expected_cgc, - &spec, - ); + register_validators_and_assert_cgc::(&custody_context, validators_and_expected_cgc); let current_epoch = Epoch::new(2); assert_eq!( - custody_context.num_of_custody_groups_to_sample(current_epoch, &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch), spec.number_of_custody_groups ); } #[test] fn cgc_change_should_be_effective_to_sampling_after_delay() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let current_slot = Slot::new(10); let current_epoch = current_slot.epoch(E::slots_per_epoch()); - let default_sampling_size = - custody_context.num_of_custody_groups_to_sample(current_epoch, &spec); + let default_sampling_size = custody_context.num_of_custody_groups_to_sample(current_epoch); let validator_custody_units = 10; let _cgc_changed = custody_context.register_validators( @@ -912,28 +1017,30 @@ mod tests { validator_custody_units * spec.balance_per_additional_custody_group, )], current_slot, - &spec, ); // CGC update is not applied for `current_epoch`. assert_eq!( - custody_context.num_of_custody_groups_to_sample(current_epoch, &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch), default_sampling_size ); // CGC update is applied for the next epoch. assert_eq!( - custody_context.num_of_custody_groups_to_sample(current_epoch + 1, &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch + 1), validator_custody_units ); } #[test] fn validator_dropped_after_no_registrations_within_expiry_should_not_reduce_cgc() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let current_slot = Slot::new(10); let val_custody_units_1 = 10; @@ -952,7 +1059,6 @@ mod tests { ), ], current_slot, - &spec, ); // WHEN val_1 re-registered, but val_2 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots @@ -962,24 +1068,26 @@ mod tests { val_custody_units_1 * spec.balance_per_additional_custody_group, )], current_slot + VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1, - &spec, ); // THEN the reduction from dropping val_2 balance should NOT result in a CGC reduction assert!(cgc_changed_opt.is_none(), "CGC should remain unchanged"); assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), val_custody_units_1 + val_custody_units_2 ) } #[test] fn validator_dropped_after_no_registrations_within_expiry() { - let spec = E::default_spec(); - let custody_context = CustodyContext::::new( + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let current_slot = Slot::new(10); let val_custody_units_1 = 10; @@ -999,7 +1107,6 @@ mod tests { ), ], current_slot, - &spec, ); // WHEN val_1 and val_3 registered, but val_3 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots @@ -1015,7 +1122,6 @@ mod tests { ), ], current_slot + VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1, - &spec, ); // THEN CGC should increase, BUT val_2 balance should NOT be included in CGC @@ -1028,10 +1134,9 @@ mod tests { } /// Update the validator every epoch and assert cgc against expected values. - fn register_validators_and_assert_cgc( - custody_context: &CustodyContext, + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, - spec: &ChainSpec, ) { for (idx, (validators_and_balance, expected_cgc_change)) in validators_and_expected_cgc_changed.into_iter().enumerate() @@ -1040,8 +1145,7 @@ mod tests { let updated_custody_count_opt = custody_context .register_validators( validators_and_balance, - epoch.start_slot(E::slots_per_epoch()), - spec, + epoch.start_slot(T::EthSpec::slots_per_epoch()), ) .map(|c| c.new_custody_group_count); @@ -1051,44 +1155,53 @@ mod tests { #[test] fn custody_columns_for_epoch_no_validators_fullnode() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, ordered_custody_column_indices, - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context.custody_columns_for_epoch(None).len(), spec.custody_requirement as usize ); } #[test] fn custody_columns_for_epoch_no_validators_supernode() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( + let custody_context = CustodyContext::::new( NodeCustodyType::Supernode, ordered_custody_column_indices, - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context.custody_columns_for_epoch(None).len(), spec.number_of_custody_groups as usize ); } #[test] fn custody_columns_for_epoch_with_validators_should_match_cgc() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, ordered_custody_column_indices, - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let val_custody_units = 10; @@ -1098,30 +1211,32 @@ mod tests { val_custody_units * spec.balance_per_additional_custody_group, )], Slot::new(10), - &spec, ); assert_eq!( - custody_context.custody_columns_for_epoch(None, &spec).len(), + custody_context.custody_columns_for_epoch(None).len(), val_custody_units as usize ); } #[test] fn custody_columns_for_epoch_specific_epoch_uses_epoch_cgc() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( + let custody_context = CustodyContext::::new( NodeCustodyType::Fullnode, ordered_custody_column_indices, - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); let test_epoch = Epoch::new(5); - let expected_cgc = custody_context.custody_group_count_at_epoch(test_epoch, &spec); + let expected_cgc = custody_context.custody_group_count_at_epoch(test_epoch); assert_eq!( custody_context - .custody_columns_for_epoch(Some(test_epoch), &spec) + .custody_columns_for_epoch(Some(test_epoch)) .len(), expected_cgc as usize ); @@ -1129,23 +1244,26 @@ mod tests { #[test] fn restore_from_persisted_fullnode_no_validators() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let ssz_context = CustodyContextSsz { validator_custody_at_head: 0, // no validators persisted_is_supernode: false, epoch_validator_custody_requirements: vec![], }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, Epoch::new(0), generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), spec.custody_requirement, "restored custody group count should match fullnode default" ); @@ -1155,7 +1273,7 @@ mod tests { /// CGC should increase and trigger backfill via CustodyCountChanged. #[test] fn restore_fullnode_then_switch_to_supernode_increases_cgc() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let head_epoch = Epoch::new(10); let supernode_cgc = spec.number_of_custody_groups; @@ -1164,7 +1282,7 @@ mod tests { NodeCustodyType::Supernode, supernode_cgc, head_epoch, - &spec, + spec, ); } @@ -1172,17 +1290,20 @@ mod tests { /// Semi-supernode can exceed 64 when validator effective balance increases CGC. #[test] fn restore_semi_supernode_with_validators_can_exceed_64() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); + let complete_blob_backfill = false; let semi_supernode_cgc = spec.number_of_custody_groups / 2; // 64 - let custody_context = CustodyContext::::new( + let custody_context = CustodyContext::::new( NodeCustodyType::SemiSupernode, generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); // Verify initial CGC is 64 (semi-supernode) assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), semi_supernode_cgc, "initial cgc should be 64" ); @@ -1196,7 +1317,6 @@ mod tests { validator_custody_units * spec.balance_per_additional_custody_group, )], current_slot, - &spec, ); // Verify CGC increased from 64 to 70 @@ -1216,7 +1336,7 @@ mod tests { // Verify the custody context reflects the new CGC assert_eq!( - custody_context.custody_group_count_at_head(&spec), + custody_context.custody_group_count_at_head(), validator_custody_units, "custody_group_count_at_head should be 70" ); @@ -1226,14 +1346,14 @@ mod tests { /// CGC reduction is not supported - persisted value is retained. #[test] fn restore_supernode_then_switch_to_fullnode_uses_persisted() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let supernode_cgc = spec.number_of_custody_groups; assert_custody_type_switch_unchanged_cgc( supernode_cgc, NodeCustodyType::Fullnode, Epoch::new(0), - &spec, + spec, ); } @@ -1241,7 +1361,7 @@ mod tests { /// CGC reduction is not supported - persisted value is retained. #[test] fn restore_supernode_then_switch_to_semi_supernode_keeps_supernode_cgc() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let supernode_cgc = spec.number_of_custody_groups; let head_epoch = Epoch::new(10); @@ -1249,7 +1369,7 @@ mod tests { supernode_cgc, NodeCustodyType::SemiSupernode, head_epoch, - &spec, + spec, ); } @@ -1257,7 +1377,7 @@ mod tests { /// CGC should increase and trigger backfill via CustodyCountChanged. #[test] fn restore_fullnode_with_validators_then_switch_to_semi_supernode() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let persisted_cgc = 32u64; let semi_supernode_cgc = spec.number_of_custody_groups / 2; let head_epoch = Epoch::new(10); @@ -1267,7 +1387,7 @@ mod tests { NodeCustodyType::SemiSupernode, semi_supernode_cgc, head_epoch, - &spec, + spec, ); } @@ -1275,7 +1395,7 @@ mod tests { /// CGC should increase and trigger backfill via CustodyCountChanged. #[test] fn restore_semi_supernode_then_switch_to_supernode() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let semi_supernode_cgc = spec.number_of_custody_groups / 2; let supernode_cgc = spec.number_of_custody_groups; let head_epoch = Epoch::new(10); @@ -1285,7 +1405,7 @@ mod tests { NodeCustodyType::Supernode, supernode_cgc, head_epoch, - &spec, + spec, ); } @@ -1293,7 +1413,7 @@ mod tests { /// CGC should increase and trigger backfill via CustodyCountChanged. #[test] fn restore_with_cli_flag_increases_cgc_from_nonzero() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let persisted_cgc = 32u64; let supernode_cgc = spec.number_of_custody_groups; let head_epoch = Epoch::new(10); @@ -1303,13 +1423,13 @@ mod tests { NodeCustodyType::Supernode, supernode_cgc, head_epoch, - &spec, + spec, ); } #[test] fn restore_with_validator_custody_history_across_epochs() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let initial_cgc = 8u64; let increased_cgc = 16u64; let final_cgc = 32u64; @@ -1324,45 +1444,45 @@ mod tests { ], }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let complete_blob_backfill = false; + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, Epoch::new(20), generate_data_column_indices_rand_order::(), - &spec, + testing_slot_clock(&spec), + complete_blob_backfill, + spec.clone(), ); // Verify head uses latest value - assert_eq!( - custody_context.custody_group_count_at_head(&spec), - final_cgc - ); + assert_eq!(custody_context.custody_group_count_at_head(), final_cgc); // Verify historical epoch lookups work correctly assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(5), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(5)), initial_cgc, "epoch 5 should use initial cgc" ); assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(15), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(15)), increased_cgc, "epoch 15 should use increased cgc" ); assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(25), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(25)), final_cgc, "epoch 25 should use final cgc" ); // Verify sampling size calculation uses correct historical values assert_eq!( - custody_context.num_of_custody_groups_to_sample(Epoch::new(5), &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(5)), spec.samples_per_slot, "sampling at epoch 5 should use spec minimum since cgc is at minimum" ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(Epoch::new(25), &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(25)), final_cgc, "sampling at epoch 25 should match final cgc" ); @@ -1370,16 +1490,16 @@ mod tests { #[test] fn backfill_single_cgc_increase_updates_past_epochs() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let final_cgc = 32u64; let default_cgc = spec.custody_requirement; // Setup: Node restart after validators were registered, causing CGC increase to 32 at epoch 20 let head_epoch = Epoch::new(20); let epoch_and_cgc_tuples = vec![(head_epoch, final_cgc)]; - let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples); + let custody_context = setup_custody_context(spec.clone(), head_epoch, epoch_and_cgc_tuples); assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(15), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(15)), default_cgc, ); @@ -1388,26 +1508,26 @@ mod tests { // After backfilling to epoch 15, it should use latest CGC (32) assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(15), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(15)), final_cgc, ); assert_eq!( custody_context - .custody_columns_for_epoch(Some(Epoch::new(15)), &spec) + .custody_columns_for_epoch(Some(Epoch::new(15))) .len(), final_cgc as usize, ); // Prior epoch should still return the original CGC assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(14), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(14)), default_cgc, ); } #[test] fn backfill_with_multiple_cgc_increases_prunes_map_correctly() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let initial_cgc = 8u64; let mid_cgc = 16u64; let final_cgc = 32u64; @@ -1419,7 +1539,7 @@ mod tests { (Epoch::new(10), mid_cgc), (head_epoch, final_cgc), ]; - let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples); + let custody_context = setup_custody_context(spec.clone(), head_epoch, epoch_and_cgc_tuples); // Backfill to epoch 15 (between the two CGC increases) complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(15), final_cgc); @@ -1427,7 +1547,7 @@ mod tests { // Verify epochs 15 - 20 return latest CGC (32) for epoch in 15..=20 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), final_cgc, ); } @@ -1435,7 +1555,7 @@ mod tests { // Verify epochs 10-14 still return mid_cgc (16) for epoch in 10..14 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), mid_cgc, ); } @@ -1443,7 +1563,7 @@ mod tests { #[test] fn attempt_backfill_with_invalid_cgc() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let initial_cgc = 8u64; let mid_cgc = 16u64; let final_cgc = 32u64; @@ -1455,7 +1575,7 @@ mod tests { (Epoch::new(10), mid_cgc), (head_epoch, final_cgc), ]; - let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples); + let custody_context = setup_custody_context(spec.clone(), head_epoch, epoch_and_cgc_tuples); // Backfill to epoch 15 (between the two CGC increases) complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(15), final_cgc); @@ -1463,7 +1583,7 @@ mod tests { // Verify epochs 15 - 20 return latest CGC (32) for epoch in 15..=20 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), final_cgc, ); } @@ -1479,7 +1599,7 @@ mod tests { // Verify epochs 15 - 20 still return latest CGC (32) for epoch in 15..=20 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), final_cgc, ); } @@ -1487,7 +1607,7 @@ mod tests { // Verify epochs 10-14 still return mid_cgc (16) for epoch in 10..14 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), mid_cgc, ); } @@ -1495,7 +1615,7 @@ mod tests { #[test] fn reset_validator_custody_requirements() { - let spec = E::default_spec(); + let spec = Arc::new(E::default_spec()); let minimum_cgc = 4u64; let initial_cgc = 8u64; let mid_cgc = 16u64; @@ -1508,7 +1628,7 @@ mod tests { (Epoch::new(10), mid_cgc), (head_epoch, final_cgc), ]; - let custody_context = setup_custody_context(&spec, head_epoch, epoch_and_cgc_tuples); + let custody_context = setup_custody_context(spec.clone(), head_epoch, epoch_and_cgc_tuples); // Backfill from epoch 20 to 9 complete_backfill_for_epochs(&custody_context, Epoch::new(20), Epoch::new(9), final_cgc); @@ -1519,14 +1639,14 @@ mod tests { // Verify epochs 0 - 19 return the minimum cgc requirement because of the validator custody requirement reset for epoch in 0..=19 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), minimum_cgc, ); } // Verify epoch 20 returns a CGC of 32 assert_eq!( - custody_context.custody_group_count_at_epoch(head_epoch, &spec), + custody_context.custody_group_count_at_epoch(head_epoch), final_cgc ); @@ -1536,7 +1656,7 @@ mod tests { // Verify epochs 0 - 20 return the final cgc requirements for epoch in 0..=20 { assert_eq!( - custody_context.custody_group_count_at_epoch(Epoch::new(epoch), &spec), + custody_context.custody_group_count_at_epoch(Epoch::new(epoch)), final_cgc, ); } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index e559dc7689..d129072ff9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -18,7 +18,7 @@ use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError, + DataColumnSidecarList, EthSpec, Hash256, PartialDataColumnSidecarError, PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, }; @@ -75,12 +75,9 @@ const OVERFLOW_LRU_CAPACITY: usize = 32; /// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch /// data during moments of unstable network conditions. pub struct DataAvailabilityChecker { - complete_blob_backfill: bool, availability_cache: Arc>, partial_assembler: Option>>, - slot_clock: T::SlotClock, kzg: Arc, - custody_context: Arc>, spec: Arc, } @@ -115,10 +112,8 @@ impl Debug for Availability { impl DataAvailabilityChecker { pub fn new( - complete_blob_backfill: bool, - slot_clock: T::SlotClock, kzg: Arc, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, enable_partial_columns: bool, disable_get_blobs: bool, @@ -137,18 +132,15 @@ impl DataAvailabilityChecker { None }; Ok(Self { - complete_blob_backfill, partial_assembler, availability_cache: Arc::new(inner), - slot_clock, kzg, - custody_context, spec, }) } - pub fn custody_context(&self) -> &Arc> { - &self.custody_context + fn custody_context(&self) -> &Arc> { + self.availability_cache.custody_context() } pub fn partial_assembler(&self) -> Option<&Arc>> { @@ -310,9 +302,9 @@ impl DataAvailabilityChecker { &self, block_root: Hash256, blobs: FixedBlobSidecarList, + slot_clock: &T::SlotClock, ) -> Result, AvailabilityCheckError> { - let seen_timestamp = self - .slot_clock + let seen_timestamp = slot_clock .now_duration() .ok_or(AvailabilityCheckError::SlotClockError)?; @@ -350,9 +342,7 @@ impl DataAvailabilityChecker { // not be yet effective for data availability check, as CGC changes are only effecive from // a new epoch. let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + let sampling_columns = self.custody_context().sampling_columns_for_epoch(epoch); let verified_custody_columns = kzg_verified_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -390,9 +380,7 @@ impl DataAvailabilityChecker { data_columns: I, ) -> Result, AvailabilityCheckError> { let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + let sampling_columns = self.custody_context().sampling_columns_for_epoch(epoch); let custody_columns = data_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -507,59 +495,6 @@ impl DataAvailabilityChecker { Ok(()) } - /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. - /// If the epoch is from prior to the data availability boundary, no blobs are required. - pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch) - } - - /// Determines the data column requirements for an epoch. - /// - If the epoch is pre-peerdas, no data columns are required. - /// - If the epoch is from prior to the data availability boundary, no data columns are required. - pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) - } - - /// See `Self::blobs_required_for_epoch` - fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) - } - - /// See `Self::data_columns_required_for_epoch` - fn data_columns_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch()) - } - - /// The epoch at which we require a data availability check in block processing. - /// `None` if the `Deneb` fork is disabled. - pub fn data_availability_boundary(&self) -> Option { - let fork_epoch = self.spec.deneb_fork_epoch?; - - if self.complete_blob_backfill { - Some(fork_epoch) - } else { - let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); - self.spec - .min_epoch_data_availability_boundary(current_epoch) - } - } - - /// Returns true if the given epoch lies within the da boundary and false otherwise. - pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { - self.data_availability_boundary() - .is_some_and(|da_epoch| block_epoch >= da_epoch) - } - - /// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch. - pub fn is_deneb(&self) -> bool { - self.slot_clock.now().is_some_and(|slot| { - self.spec.deneb_fork_epoch.is_some_and(|deneb_epoch| { - let now_epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - now_epoch >= deneb_epoch - }) - }) - } - /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { @@ -629,7 +564,7 @@ impl DataAvailabilityChecker { let columns_to_sample = self .custody_context() - .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())); // We only need to import and publish columns that we need to sample // and columns that we haven't already received @@ -886,15 +821,14 @@ impl AvailableBlock { pub fn new( block: Arc>, block_data: AvailableBlockData, - da_checker: &DataAvailabilityChecker, - spec: Arc, + custody_context: &CustodyContext, ) -> Result where T: BeaconChainTypes, { // Ensure block availability - let blobs_required = da_checker.blobs_required_for_block(&block); - let columns_required = da_checker.data_columns_required_for_block(&block); + let blobs_required = custody_context.blobs_required_for_block(&block); + let columns_required = custody_context.data_columns_required_for_block(&block); match &block_data { AvailableBlockData::NoData => { @@ -935,9 +869,8 @@ impl AvailableBlock { return Err(AvailabilityCheckError::InvalidAvailableBlockData); } - let mut column_indices = da_checker - .custody_context - .sampling_columns_for_epoch(block.epoch(), &spec) + let mut column_indices = custody_context + .sampling_columns_for_epoch(block.epoch()) .iter() .collect::>(); @@ -1081,7 +1014,8 @@ mod test { use std::time::Duration; use types::data::DataColumn; use types::{ - ChainSpec, ColumnIndex, DataColumnSidecarFulu, EthSpec, ForkName, MainnetEthSpec, Slot, + ChainSpec, ColumnIndex, DataColumnSidecarFulu, Epoch, EthSpec, ForkName, MainnetEthSpec, + Slot, }; type E = MainnetEthSpec; @@ -1096,7 +1030,7 @@ mod test { let mut u = types::test_utils::test_unstructured(); let da_checker = new_da_checker(spec.clone()); - let custody_context = &da_checker.custody_context; + let custody_context = da_checker.custody_context(); // GIVEN a single 32 ETH validator is attached slot 0 let epoch = Epoch::new(0); @@ -1104,10 +1038,9 @@ mod test { custody_context.register_validators( vec![(validator_0, 32_000_000_000)], epoch.start_slot(E::slots_per_epoch()), - &spec, ); assert_eq!( - custody_context.num_of_data_columns_to_sample(epoch, &spec), + custody_context.num_of_data_columns_to_sample(epoch), spec.validator_custody_requirement as usize, "sampling size should be the minimal custody requirement == 8" ); @@ -1115,11 +1048,8 @@ mod test { // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch let validator_1 = 1; let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); - custody_context.register_validators( - vec![(validator_1, 32_000_000_000 * 9)], - cgc_change_slot, - &spec, - ); + custody_context + .register_validators(vec![(validator_1, 32_000_000_000 * 9)], cgc_change_slot); // AND custody columns (8) and any new extra columns (2) are received via RPC responses. // NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown. let (_, data_columns) = generate_rand_block_and_data_columns::( @@ -1134,7 +1064,7 @@ mod test { // The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS, // which is typically epoch 2+ for MinimalEthSpec. let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective - let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec); + let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch); assert_eq!( requested_columns.len(), 10, @@ -1152,7 +1082,7 @@ mod test { .expect("should put rpc custody columns"); // THEN the sampling size for the end slot of the same epoch remains unchanged - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context.sampling_columns_for_epoch(epoch); assert_eq!( sampling_columns.len(), spec.validator_custody_requirement as usize // 8 @@ -1183,7 +1113,7 @@ mod test { let mut u = types::test_utils::test_unstructured(); let da_checker = new_da_checker(spec.clone()); - let custody_context = &da_checker.custody_context; + let custody_context = da_checker.custody_context(); // GIVEN a single 32 ETH validator is attached slot 0 let epoch = Epoch::new(0); @@ -1191,10 +1121,9 @@ mod test { custody_context.register_validators( vec![(validator_0, 32_000_000_000)], epoch.start_slot(E::slots_per_epoch()), - &spec, ); assert_eq!( - custody_context.num_of_data_columns_to_sample(epoch, &spec), + custody_context.num_of_data_columns_to_sample(epoch), spec.validator_custody_requirement as usize, "sampling size should be the minimal custody requirement == 8" ); @@ -1202,11 +1131,8 @@ mod test { // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch let validator_1 = 1; let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); - custody_context.register_validators( - vec![(validator_1, 32_000_000_000 * 9)], - cgc_change_slot, - &spec, - ); + custody_context + .register_validators(vec![(validator_1, 32_000_000_000 * 9)], cgc_change_slot); // AND custody columns (8) and any new extra columns (2) are received via gossip. // NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to // arrive via gossip. @@ -1222,7 +1148,7 @@ mod test { // The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS, // which is typically epoch 2+ for MinimalEthSpec. let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective - let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec); + let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch); assert_eq!( requested_columns.len(), 10, @@ -1238,7 +1164,7 @@ mod test { .expect("should put gossip custody columns"); // THEN the sampling size for the end slot of the same epoch remains unchanged - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context.sampling_columns_for_epoch(epoch); assert_eq!( sampling_columns.len(), spec.validator_custody_requirement as usize // 8 @@ -1305,8 +1231,7 @@ mod test { }; let block_data = AvailableBlockData::new_with_data_columns(custody_columns); - let da_checker = Arc::new(new_da_checker(spec.clone())); - RangeSyncBlock::new(Arc::new(block), block_data, &da_checker, spec.clone()) + RangeSyncBlock::new(Arc::new(block), block_data, da_checker.custody_context()) .expect("should create RPC block with custody columns") }) .collect::>(); @@ -1331,16 +1256,15 @@ mod test { let mut u = types::test_utils::test_unstructured(); let da_checker = new_da_checker(spec.clone()); - let custody_context = &da_checker.custody_context; + let custody_context = da_checker.custody_context(); // Set custody requirement to 65 columns (enough to trigger reconstruction) let epoch = Epoch::new(1); custody_context.register_validators( vec![(0, 2_048_000_000_000), (1, 32_000_000_000)], // 64 + 1 Slot::new(0), - &spec, ); - let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec); + let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch); assert_eq!( sampling_requirement, 65, "sampling requirement should be 65" @@ -1362,7 +1286,7 @@ mod test { // Add 64 columns to the da checker (enough to be able to reconstruct) // Order by all_column_indices_ordered, then take first 64 - let custody_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let custody_columns = custody_context.sampling_columns_for_epoch(epoch); let custody_columns = custody_columns .iter() .filter_map(|&col_idx| data_columns.iter().find(|d| *d.index() == col_idx).cloned()) @@ -1400,7 +1324,7 @@ mod test { ); // Only the columns required for custody (65) should be imported into the cache - let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + let sampling_columns = custody_context.sampling_columns_for_epoch(epoch); let actual_cached: HashSet = da_checker .cached_data_column_indexes(&block_root) .expect("should have cached data columns") @@ -1421,21 +1345,15 @@ mod test { ); let kzg = get_kzg(&spec); let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + let complete_blob_backfill = false; let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, ordered_custody_column_indices, - &spec, - )); - let complete_blob_backfill = false; - DataAvailabilityChecker::new( - complete_blob_backfill, slot_clock, - kzg, - custody_context, - spec, - true, - false, - ) - .expect("should initialise data availability checker") + complete_blob_backfill, + spec.clone(), + )); + DataAvailabilityChecker::new(kzg, custody_context, spec, true, false) + .expect("should initialise data availability checker") } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 47740cdf5e..dc59d614df 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -349,7 +349,7 @@ impl PendingComponents { pub struct DataAvailabilityCheckerInner { /// Contains all the data we keep in memory, protected by an RwLock critical: RwLock>>, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, } @@ -365,7 +365,7 @@ pub(crate) enum ReconstructColumnsDecision { impl DataAvailabilityCheckerInner { pub fn new( capacity: usize, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, ) -> Result { Ok(Self { @@ -434,6 +434,10 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } + pub fn custody_context(&self) -> &Arc> { + &self.custody_context + } + /// Puts the KZG verified blobs into the availability cache as pending components. pub fn put_kzg_verified_blobs>>( &self, @@ -500,9 +504,7 @@ impl DataAvailabilityCheckerInner { pending_components.merge_data_columns(kzg_verified_data_columns) })?; - let num_expected_columns = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + let num_expected_columns = self.custody_context.num_of_data_columns_to_sample(epoch); pending_components.span.in_scope(|| { debug!( @@ -606,9 +608,7 @@ impl DataAvailabilityCheckerInner { }; let total_column_count = T::EthSpec::number_of_columns(); - let sampling_column_count = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + let sampling_column_count = self.custody_context.num_of_data_columns_to_sample(epoch); let received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { @@ -709,9 +709,7 @@ impl DataAvailabilityCheckerInner { fn get_num_expected_columns(&self, epoch: Epoch) -> Option { if self.spec.is_peer_das_enabled_for_epoch(epoch) { - let num_of_column_samples = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + let num_of_column_samples = self.custody_context.num_of_data_columns_to_sample(epoch); Some(num_of_column_samples) } else { None @@ -760,6 +758,7 @@ mod test { }; use fork_choice::PayloadVerificationStatus; use logging::create_test_tracing_subscriber; + use slot_clock::TestingSlotClock; use state_processing::ConsensusContext; use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; @@ -922,19 +921,25 @@ mod test { HotStore = BeaconNodeBackend, ColdStore = BeaconNodeBackend, EthSpec = E, + SlotClock = TestingSlotClock, >, { create_test_tracing_subscriber(); let chain_db_path = tempdir().expect("should get temp dir"); let harness = get_fulu_chain(&chain_db_path).await; let spec = harness.spec.clone(); + let complete_blob_backfill = false; + let slot_clock = harness.chain.slot_clock.clone(); + let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, generate_data_column_indices_rand_order::(), - &spec, + slot_clock, + complete_blob_backfill, + spec.clone(), )); let cache = Arc::new( - DataAvailabilityCheckerInner::::new(capacity, custody_context, spec.clone()) + DataAvailabilityCheckerInner::::new(capacity, custody_context, spec) .expect("should create cache"), ); (harness, cache, chain_db_path) @@ -952,9 +957,7 @@ mod test { let epoch = pending_block.block.epoch(); let num_blobs_expected = pending_block.num_blobs_expected(); - let columns_expected = cache - .custody_context - .num_of_data_columns_to_sample(epoch, &harness.spec); + let columns_expected = cache.custody_context.num_of_data_columns_to_sample(epoch); // All columns are returned from availability_pending_block (E::number_of_columns()) // but we only need custody columns @@ -994,9 +997,7 @@ mod test { } // Get sampling column indices for this epoch - let sampling_column_indices = cache - .custody_context - .sampling_columns_for_epoch(epoch, &harness.spec); + let sampling_column_indices = cache.custody_context.sampling_columns_for_epoch(epoch); // Filter to only sampling columns let sampling_columns: Vec<_> = columns @@ -1032,9 +1033,7 @@ mod test { let root = pending_block.import_data.block_root; // Get sampling column indices for this epoch - let sampling_column_indices = cache - .custody_context - .sampling_columns_for_epoch(epoch, &harness.spec); + let sampling_column_indices = cache.custody_context.sampling_columns_for_epoch(epoch); // Filter to only sampling columns let sampling_columns: Vec<_> = columns diff --git a/beacon_node/beacon_chain/src/historical_data_columns.rs b/beacon_node/beacon_chain/src/historical_data_columns.rs index d6977d9985..1089c801ff 100644 --- a/beacon_node/beacon_chain/src/historical_data_columns.rs +++ b/beacon_node/beacon_chain/src/historical_data_columns.rs @@ -131,8 +131,7 @@ impl BeaconChain { ); } - self.data_availability_checker - .custody_context() + self.custody_context .update_and_backfill_custody_count_at_epoch(epoch, expected_cgc); self.safely_backfill_data_column_custody_info(epoch) diff --git a/beacon_node/beacon_chain/src/invariants.rs b/beacon_node/beacon_chain/src/invariants.rs index b365f37a0a..369b968e78 100644 --- a/beacon_node/beacon_chain/src/invariants.rs +++ b/beacon_node/beacon_chain/src/invariants.rs @@ -29,14 +29,12 @@ impl BeaconChain { .collect() }; - let custody_context = self.data_availability_checker.custody_context(); + let custody_context = self.custody_context.clone(); let ctx = InvariantContext { fork_choice_blocks, state_cache_roots: self.store.state_cache.lock().state_roots(), - custody_columns: custody_context - .custody_columns_for_epoch(None, &self.spec) - .to_vec(), + custody_columns: custody_context.custody_columns_for_epoch(None).to_vec(), pubkey_cache_pubkeys: { let cache = self.validator_pubkey_cache.read(); (0..cache.len()) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index a0d34949c6..1dc8418420 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -17,20 +17,21 @@ //! ExecutedEnvelope //! //! ``` - +use crate::data_availability_checker::AvailabilityCheckError; +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, CustodyContext, + ExecutionPayloadError, PayloadVerificationError, PayloadVerificationOutcome, +}; use state_processing::envelope_processing::EnvelopeProcessingError; +use std::collections::HashSet; use std::sync::Arc; use store::Error as DBError; use strum::AsRefStr; -use tracing::instrument; +use tracing::{instrument, warn}; use types::{ BeaconState, BeaconStateError, DataColumnSidecarList, EthSpec, ExecutionBlockHash, - ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot, -}; - -use crate::{ - BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, ExecutionPayloadError, - PayloadVerificationError, PayloadVerificationOutcome, + ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, + Slot, }; pub mod execution_pending_envelope; @@ -47,11 +48,76 @@ pub struct AvailableEnvelope { } impl AvailableEnvelope { - pub fn new( + /// Constructs an `AvailableEnvelope` from an envelope and custody column data. + /// + /// This function validates that: + /// - All required custody columns are present + /// + /// If more columns are provided than necessary, a warning is logged and the extra + /// columns are filtered out of the list. + /// + /// Returns `AvailabilityCheckError` if: + /// - `MissingCustodyColumns`: Required custody columns are missing or incomplete + pub fn new( envelope: Arc>, columns: DataColumnSidecarList, - ) -> Self { - Self { envelope, columns } + bid: &SignedExecutionPayloadBid, + custody_context: &CustodyContext, + ) -> Result + where + T: BeaconChainTypes, + { + if custody_context.data_columns_required_for_bid(bid) { + let columns_expected = custody_context.num_of_data_columns_to_sample(bid.epoch()); + + // Get required custody column indices + let required_indices = custody_context + .sampling_columns_for_epoch(bid.epoch()) + .iter() + .copied() + .collect::>(); + + // Filter to only the columns we need (deduplicates if there are duplicates) + let mut filtered_columns = Vec::new(); + let mut seen_indices = HashSet::new(); + let num_provided_columns = columns.len(); + for column in columns { + if required_indices.contains(column.index()) && seen_indices.insert(*column.index()) + { + filtered_columns.push(column); + } + } + + // Check if we have all required columns + if filtered_columns.len() != columns_expected { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + + if num_provided_columns != filtered_columns.len() { + warn!( + message = "More columns provided than expected", + envelope = %envelope.message.payload.block_hash, + num_provided_columns = %num_provided_columns, + columns_expected = %columns_expected, + ); + } + + Ok(Self { + envelope, + columns: filtered_columns, + }) + } else if columns.is_empty() { + Ok(Self { envelope, columns }) + } else { + warn!( + message = "Custody columns provided for envelope that does not require them", + envelope = %envelope.message.payload.block_hash, + ); + Ok(Self { + envelope, + columns: vec![], + }) + } } pub fn envelope(&self) -> &Arc> { diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index c5c97418c7..33271d6d0e 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -92,14 +92,14 @@ pub struct PendingPayloadCache { /// Contains all the data we keep in memory, protected by an RwLock availability_cache: RwLock>>, kzg: Arc, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, } impl PendingPayloadCache { pub fn new( kzg: Arc, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, ) -> Result { Ok(Self { @@ -110,7 +110,7 @@ impl PendingPayloadCache { }) } - pub fn custody_context(&self) -> &Arc> { + pub fn custody_context(&self) -> &Arc> { &self.custody_context } @@ -174,7 +174,6 @@ impl PendingPayloadCache { &self, executed_envelope: AvailabilityPendingExecutedEnvelope, ) -> Result, AvailabilityCheckError> { - let epoch = executed_envelope.envelope.epoch(); let beacon_block_root = executed_envelope.envelope.beacon_block_root(); let bid = self .get_bid(&beacon_block_root) @@ -185,19 +184,15 @@ impl PendingPayloadCache { pending_components.insert_executed_payload_envelope(executed_envelope); })?; - let num_expected_columns = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); - pending_components.span.in_scope(|| { debug!( component = "executed envelope", - status = pending_components.status_str(num_expected_columns), + status = pending_components.status_str(&self.custody_context), "Component added to data availability checker" ); }); - self.check_availability(beacon_block_root, pending_components, num_expected_columns) + self.check_availability(beacon_block_root, pending_components) } /// Inserts a bid into the pending payload cache. @@ -228,9 +223,7 @@ impl PendingPayloadCache { .map_err(AvailabilityCheckError::InvalidColumn)?; let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + let sampling_columns = self.custody_context.sampling_columns_for_epoch(epoch); let verified_custody_columns = kzg_verified_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -252,9 +245,7 @@ impl PendingPayloadCache { .get_bid(&block_root) .ok_or(AvailabilityCheckError::MissingBid(block_root))?; let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); + let sampling_columns = self.custody_context.sampling_columns_for_epoch(epoch); let custody_columns = data_columns .into_iter() .filter(|col| sampling_columns.contains(&col.index())) @@ -280,21 +271,15 @@ impl PendingPayloadCache { pending_components.merge_data_columns(kzg_verified_data_columns) })?; - let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); - - let num_expected_columns = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); - pending_components.span.in_scope(|| { debug!( component = "data_columns", - status = pending_components.status_str(num_expected_columns), + status = pending_components.status_str(&self.custody_context), "Component added to data availability checker" ); }); - self.check_availability(block_root, pending_components, num_expected_columns) + self.check_availability(block_root, pending_components) } #[instrument(skip_all, level = "debug")] @@ -340,7 +325,7 @@ impl PendingPayloadCache { let slot = bid.message.slot; let columns_to_sample = self .custody_context() - .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())); let data_columns_to_import_and_publish = all_data_columns .into_iter() @@ -388,9 +373,10 @@ impl PendingPayloadCache { &self, block_root: Hash256, pending_components: MappedRwLockReadGuard<'_, PendingComponents>, - num_expected_columns: usize, ) -> Result, AvailabilityCheckError> { - if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? { + if let Some(available_envelope) = + pending_components.make_available(&self.custody_context)? + { // Explicitly drop read lock before acquiring write lock drop(pending_components); if let Some(components) = self.availability_cache.write().get_mut(&block_root) { @@ -458,9 +444,7 @@ impl PendingPayloadCache { let epoch = pending_components.bid.epoch(); let total_column_count = T::EthSpec::number_of_columns(); - let sampling_column_count = self - .custody_context - .num_of_data_columns_to_sample(epoch, &self.spec); + let sampling_column_count = self.custody_context.num_of_data_columns_to_sample(epoch); if pending_components.reconstruction_started { return ReconstructColumnsDecision::No("already started"); @@ -516,10 +500,12 @@ mod data_availability_checker_tests { }; use fork_choice::PayloadVerificationStatus; use logging::create_test_tracing_subscriber; + use slot_clock::{SlotClock, TestingSlotClock}; + use std::time::Duration; use types::test_utils::test_unstructured; use types::{ ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, - MinimalEthSpec, SignedExecutionPayloadEnvelope, + MinimalEthSpec, SignedExecutionPayloadEnvelope, Slot, }; type E = MinimalEthSpec; @@ -541,10 +527,18 @@ mod data_availability_checker_tests { create_test_tracing_subscriber(); let spec = Arc::new(ForkName::Gloas.make_genesis_spec(E::default_spec())); let kzg = get_kzg(&spec); - let custody_context = Arc::new(CustodyContext::::new( + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + spec.get_slot_duration(), + ); + let complete_blob_backfill = false; + let custody_context = Arc::new(CustodyContext::::new( node_custody, generate_data_column_indices_rand_order::(), - &spec, + slot_clock, + complete_blob_backfill, + spec.clone(), )); let cache = Arc::new( PendingPayloadCache::::new(kzg, custody_context, spec.clone()) @@ -567,9 +561,7 @@ mod data_availability_checker_tests { cache.insert_bid(block_root, bid.clone()); let epoch = bid.message.slot.epoch(E::slots_per_epoch()); - let sampling = cache - .custody_context() - .sampling_columns_for_epoch(epoch, &cache.spec); + let sampling = cache.custody_context().sampling_columns_for_epoch(epoch); let custody = columns .into_iter() .filter(|c| sampling.contains(c.index())) diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs index e7b9009577..dac0180be8 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -1,3 +1,5 @@ +use crate::beacon_chain::BeaconChainTypes; +use crate::custody_context::CustodyContext; use crate::data_availability_checker::AvailabilityCheckError; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; @@ -27,8 +29,15 @@ pub struct PendingComponents { } impl PendingComponents { - pub fn num_blobs_expected(&self) -> usize { - self.bid.message.blob_kzg_commitments.len() + pub fn num_columns_required(&self, custody_context: &CustodyContext) -> usize + where + T: BeaconChainTypes, + { + if custody_context.data_columns_required_for_bid(&self.bid) { + custody_context.num_of_data_columns_to_sample(self.bid.epoch()) + } else { + 0 + } } /// Returns columns that have all cells present. @@ -59,7 +68,7 @@ impl PendingComponents { &mut self, kzg_verified_data_columns: &[KzgVerifiedCustodyDataColumn], ) { - let num_blobs_expected = self.num_blobs_expected(); + let num_blobs_expected = self.bid.num_blobs_expected(); for data_column in kzg_verified_data_columns { let data_column = data_column.as_data_column(); // The Vec-backed `PendingColumn` keys cells by index, so we have to allocate up to @@ -95,10 +104,13 @@ impl PendingComponents { } /// Returns `Some` if the envelope and all required data columns have been received. - pub fn make_available( + pub fn make_available( &self, - num_expected_columns: usize, - ) -> Result>, AvailabilityCheckError> { + custody_context: &CustodyContext, + ) -> Result>, AvailabilityCheckError> + where + T: BeaconChainTypes, + { // Check if the payload has been received and executed let Some(envelope) = &self.envelope else { return Ok(None); @@ -110,17 +122,24 @@ impl PendingComponents { payload_verification_outcome, } = envelope; - let columns = if self.num_blobs_expected() == 0 { - self.span.in_scope(|| { - debug!("Bid has no blobs, data is available"); - }); + let num_columns_required = self.num_columns_required(custody_context); + let columns = if num_columns_required == 0 { + if self.bid.num_blobs_expected() == 0 { + self.span.in_scope(|| { + debug!("Bid has no blobs, data is available"); + }); + } else { + self.span.in_scope(|| { + debug!("No data columns required for this epoch"); + }); + } vec![] } else { let columns = self.get_cached_data_columns(); - match columns.len().cmp(&num_expected_columns) { + match columns.len().cmp(&num_columns_required) { Ordering::Greater => { return Err(AvailabilityCheckError::Unexpected(format!( - "too many columns: got {} expected {num_expected_columns}", + "too many columns: got {} expected {num_columns_required}", columns.len() ))); } @@ -137,7 +156,8 @@ impl PendingComponents { } }; - let available_envelope = AvailableEnvelope::new(envelope.clone(), columns); + let available_envelope = + AvailableEnvelope::new(envelope.clone(), columns, &self.bid, custody_context)?; Ok(Some(AvailableExecutedEnvelope { envelope: available_envelope, @@ -160,12 +180,16 @@ impl PendingComponents { } } - pub fn status_str(&self, num_expected_columns: usize) -> String { + pub fn status_str(&self, custody_context: &CustodyContext) -> String + where + T: BeaconChainTypes, + { + let num_columns_required = self.num_columns_required(custody_context); format!( "envelope {}, data_columns {}/{}", self.envelope.is_some(), self.num_completed_columns(), - num_expected_columns + num_columns_required ) } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index deaae6cba5..ce51a82cec 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -226,29 +226,30 @@ pub fn test_da_checker( spec: Arc, node_custody_type: NodeCustodyType, ) -> DataAvailabilityChecker> { + let kzg = get_kzg(&spec); + let custody_context = test_custody_context(node_custody_type, spec.clone()); + DataAvailabilityChecker::new(kzg, custody_context, spec, true, false) + .expect("should initialise data availability checker") +} + +pub fn test_custody_context( + node_custody_type: NodeCustodyType, + spec: Arc, +) -> Arc>> { + let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + let complete_blob_backfill = false; let slot_clock = TestingSlotClock::new( Slot::new(0), Duration::from_secs(0), spec.get_slot_duration(), ); - let kzg = get_kzg(&spec); - let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = Arc::new(CustodyContext::new( + Arc::new(CustodyContext::new( node_custody_type, ordered_custody_column_indices, - &spec, - )); - let complete_blob_backfill = false; - DataAvailabilityChecker::new( - complete_blob_backfill, slot_clock, - kzg, - custody_context, + complete_blob_backfill, spec, - true, - false, - ) - .expect("should initialise data availability checker") + )) } pub struct Builder { @@ -3163,26 +3164,28 @@ where block: Arc>, ) -> RangeSyncBlock { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - let is_gloas = block.fork_name_unchecked().gloas_enabled(); // For Gloas, kzg commitments live in the bid (`signed_execution_payload_bid`), so the // body's `blob_kzg_commitments()` accessor returns Err. `num_expected_blobs` already // handles both shapes. let has_blobs = block.num_expected_blobs() > 0; if !has_blobs { - return if is_gloas { + return if let Ok(bid) = block.message().body().signed_execution_payload_bid() { let envelope = self .chain .get_payload_envelope(&block_root) .unwrap() .map(Arc::new) - .map(|envelope| AvailableEnvelope::new(envelope, vec![])); + .map(|envelope| { + AvailableEnvelope::new(envelope, vec![], bid, &self.chain.custody_context) + }) + .transpose() + .unwrap(); RangeSyncBlock::new_gloas(block, envelope).unwrap() } else { RangeSyncBlock::new( block, AvailableBlockData::NoData, - &self.chain.data_availability_checker, - self.chain.spec.clone(), + &self.chain.custody_context, ) .unwrap() }; @@ -3197,23 +3200,26 @@ where .unwrap() .unwrap(); let custody_columns = columns.into_iter().collect::>(); - if is_gloas { + if let Ok(bid) = block.message().body().signed_execution_payload_bid() { let envelope = self .chain .get_payload_envelope(&block_root) .unwrap() .map(Arc::new) - .map(|envelope| AvailableEnvelope::new(envelope, custody_columns)); + .map(|envelope| { + AvailableEnvelope::new( + envelope, + custody_columns, + bid, + &self.chain.custody_context, + ) + }) + .transpose() + .unwrap(); RangeSyncBlock::new_gloas(block, envelope).unwrap() } else { let block_data = AvailableBlockData::new_with_data_columns(custody_columns); - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap() + RangeSyncBlock::new(block, block_data, &self.chain.custody_context).unwrap() } } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); @@ -3223,13 +3229,7 @@ where AvailableBlockData::NoData }; - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - ) - .unwrap() + RangeSyncBlock::new(block, block_data, &self.chain.custody_context).unwrap() } } @@ -3239,7 +3239,7 @@ where block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, ) -> Result, BlockError> { - if block.fork_name_unchecked().gloas_enabled() { + if let Ok(bid) = block.message().body().signed_execution_payload_bid() { let columns = blob_items .map(|_| generate_data_column_sidecars_from_block(&block, &self.spec)) .unwrap_or_default(); @@ -3248,13 +3248,17 @@ where .get_payload_envelope(&block.canonical_root()) .map_err(|e| BlockError::BeaconChainError(Box::new(e)))? .map(Arc::new) - .map(|envelope| AvailableEnvelope::new(envelope, columns)); + .map(|envelope| { + AvailableEnvelope::new(envelope, columns, bid, &self.chain.custody_context) + }) + .transpose() + .unwrap(); return RangeSyncBlock::new_gloas(block, envelope).map_err(BlockError::InternalError); } Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); - let sampling_columns = self.chain.sampling_columns_for_epoch(epoch); + let sampling_columns = self.chain.custody_context.sampling_columns_for_epoch(epoch); if blob_items.is_some_and(|(kzg_proofs, _)| !kzg_proofs.is_empty()) { // Note: this method ignores the actual custody columns and just take the first @@ -3265,18 +3269,12 @@ where .filter(|d| sampling_columns.contains(d.index())) .collect::>(); let block_data = AvailableBlockData::new_with_data_columns(columns); - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - )? + RangeSyncBlock::new(block, block_data, &self.chain.custody_context)? } else { RangeSyncBlock::new( block, AvailableBlockData::NoData, - &self.chain.data_availability_checker, - self.chain.spec.clone(), + &self.chain.custody_context, )? } } else { @@ -3292,12 +3290,7 @@ where AvailableBlockData::NoData }; - RangeSyncBlock::new( - block, - block_data, - &self.chain.data_availability_checker, - self.chain.spec.clone(), - )? + RangeSyncBlock::new(block, block_data, &self.chain.custody_context)? }) } @@ -4007,6 +4000,7 @@ where let custody_columns = custody_columns_opt.unwrap_or_else(|| { let epoch = block.slot().epoch(E::slots_per_epoch()); self.chain + .custody_context .sampling_columns_for_epoch(epoch) .iter() .copied() diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 94d4b3b9da..3269400c42 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] // TODO(gloas) we probably need similar test for payload envelope verification use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, LookupBlock, RangeSyncBlock}; +use beacon_chain::custody_context::CustodyContext; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::payload_envelope_verification::AvailableEnvelope; @@ -10,7 +11,7 @@ use beacon_chain::{ custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, - MakeAttestationOptions, test_spec, + MakeAttestationOptions, generate_data_column_sidecars_from_block, test_spec, }, }; use beacon_chain::{ @@ -178,7 +179,7 @@ fn build_range_sync_block( where T: BeaconChainTypes, { - if block.fork_name_unchecked().gloas_enabled() { + if let Ok(bid) = block.message().body().signed_execution_payload_bid() { let columns = match data_sidecars { Some(DataSidecars::DataColumns(columns)) => columns .iter() @@ -186,20 +187,17 @@ where .collect::>(), Some(DataSidecars::Blobs(_)) | None => vec![], }; - let envelope = execution_envelope.map(|envelope| AvailableEnvelope::new(envelope, columns)); + let envelope = execution_envelope + .map(|envelope| AvailableEnvelope::new(envelope, columns, bid, &chain.custody_context)) + .transpose() + .unwrap(); return RangeSyncBlock::new_gloas(block, envelope).unwrap(); } match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); - RangeSyncBlock::new( - block, - block_data, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap() + RangeSyncBlock::new(block, block_data, &chain.custody_context).unwrap() } Some(DataSidecars::DataColumns(columns)) => { let block_data = AvailableBlockData::new_with_data_columns( @@ -208,21 +206,11 @@ where .map(|c| c.as_data_column().clone()) .collect::>(), ); - RangeSyncBlock::new( - block, - block_data, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap() + RangeSyncBlock::new(block, block_data, &chain.custody_context).unwrap() + } + None => { + RangeSyncBlock::new(block, AvailableBlockData::NoData, &chain.custody_context).unwrap() } - None => RangeSyncBlock::new( - block, - AvailableBlockData::NoData, - &chain.data_availability_checker, - chain.spec.clone(), - ) - .unwrap(), } } @@ -522,8 +510,7 @@ async fn chain_segment_non_linear_parent_roots() { RangeSyncBlock::new( mutated_block, blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), + &harness.chain.custody_context, ) .unwrap() }; @@ -567,8 +554,7 @@ async fn chain_segment_non_linear_slots() { RangeSyncBlock::new( mutated_block, blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.spec.clone(), + &harness.chain.custody_context, ) .unwrap() }; @@ -602,8 +588,7 @@ async fn chain_segment_non_linear_slots() { RangeSyncBlock::new( mutated_block, blocks[3].block_data().clone(), - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), + &harness.chain.custody_context, ) .unwrap() }; @@ -1809,8 +1794,7 @@ async fn add_base_block_to_altair_chain() { let base_range_sync_block = RangeSyncBlock::new( Arc::new(base_block.clone()), AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone(), + &harness.chain.custody_context, ) .unwrap(); assert!(matches!( @@ -1840,8 +1824,7 @@ async fn add_base_block_to_altair_chain() { RangeSyncBlock::new( Arc::new(base_block), AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone() + &harness.chain.custody_context, ) .unwrap() ], @@ -1985,8 +1968,7 @@ async fn add_altair_block_to_base_chain() { RangeSyncBlock::new( Arc::new(altair_block), AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone() + &harness.chain.custody_context, ) .unwrap() ], @@ -2205,8 +2187,7 @@ async fn import_duplicate_block_unrealized_justification() { RangeSyncBlock::new( block.clone(), AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.spec.clone(), + &harness.chain.custody_context, ) .unwrap() }; @@ -2284,8 +2265,12 @@ async fn import_execution_pending_block( } } -async fn make_gloas_range_sync_block_inputs() --> Option<(Arc>, SignedExecutionPayloadEnvelope)> { +async fn make_gloas_range_sync_block_inputs() -> Option<( + Arc>, + SignedExecutionPayloadEnvelope, + Arc>>, + DataColumnSidecarList, +)> { let spec = test_spec::(); if !spec.fork_name_at_slot::(Slot::new(1)).gloas_enabled() { return None; @@ -2304,16 +2289,37 @@ async fn make_gloas_range_sync_block_inputs() let state = harness.get_current_state(); let slot = harness.get_current_slot(); let ((block, _), envelope, _) = harness.make_block_with_envelope(state, slot).await; - Some((block, envelope.expect("gloas block should have envelope"))) + let custody_context = harness.chain.custody_context.clone(); + let columns = generate_data_column_sidecars_from_block(&block, &harness.chain.spec) + .into_iter() + .filter(|column| { + custody_context + .sampling_columns_for_epoch(block.epoch()) + .contains(column.index()) + }) + .collect(); + Some(( + block, + envelope.expect("gloas block should have envelope"), + custody_context, + columns, + )) } #[tokio::test] async fn range_sync_block_new_gloas_accepts_matching_envelope() { - let Some((block, envelope)) = make_gloas_range_sync_block_inputs().await else { + let Some((block, envelope, custody_context, columns)) = + make_gloas_range_sync_block_inputs().await + else { return; }; - - let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .unwrap(); + let available_envelope = + AvailableEnvelope::new(Arc::new(envelope), columns, bid, &custody_context).unwrap(); let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); assert!( @@ -2325,7 +2331,7 @@ async fn range_sync_block_new_gloas_accepts_matching_envelope() { #[tokio::test] async fn range_sync_block_new_gloas_allows_missing_envelope() { - let Some((block, _)) = make_gloas_range_sync_block_inputs().await else { + let Some((block, _, _, _)) = make_gloas_range_sync_block_inputs().await else { return; }; @@ -2340,12 +2346,20 @@ async fn range_sync_block_new_gloas_allows_missing_envelope() { #[tokio::test] async fn range_sync_block_new_gloas_rejects_slot_mismatch() { - let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + let Some((block, mut envelope, custody_context, columns)) = + make_gloas_range_sync_block_inputs().await + else { return; }; envelope.message.payload.slot_number += 1; - let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .unwrap(); + let available_envelope = + AvailableEnvelope::new(Arc::new(envelope), columns, bid, &custody_context).unwrap(); let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); assert!( @@ -2357,12 +2371,20 @@ async fn range_sync_block_new_gloas_rejects_slot_mismatch() { #[tokio::test] async fn range_sync_block_new_gloas_rejects_builder_index_mismatch() { - let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + let Some((block, mut envelope, custody_context, columns)) = + make_gloas_range_sync_block_inputs().await + else { return; }; envelope.message.builder_index += 1; - let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .unwrap(); + let available_envelope = + AvailableEnvelope::new(Arc::new(envelope), columns, bid, &custody_context).unwrap(); let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); assert!( @@ -2374,12 +2396,20 @@ async fn range_sync_block_new_gloas_rejects_builder_index_mismatch() { #[tokio::test] async fn range_sync_block_new_gloas_rejects_block_hash_mismatch() { - let Some((block, mut envelope)) = make_gloas_range_sync_block_inputs().await else { + let Some((block, mut envelope, custody_context, columns)) = + make_gloas_range_sync_block_inputs().await + else { return; }; envelope.message.payload.block_hash = ExecutionBlockHash::repeat_byte(0x22); - let available_envelope = AvailableEnvelope::new(Arc::new(envelope), vec![]); + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .unwrap(); + let available_envelope = + AvailableEnvelope::new(Arc::new(envelope), columns, bid, &custody_context).unwrap(); let result = RangeSyncBlock::new_gloas(block, Some(available_envelope)); assert!( @@ -2442,12 +2472,8 @@ async fn range_sync_block_construction_fails_with_wrong_blob_count() { let block_data = AvailableBlockData::new_with_blobs(wrong_blobs); // Try to create RpcBlock with wrong blob count - let result = RangeSyncBlock::new( - Arc::new(block), - block_data, - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), - ); + let result = + RangeSyncBlock::new(Arc::new(block), block_data, &harness.chain.custody_context); // Should fail with MissingBlobs assert!( @@ -2523,8 +2549,7 @@ async fn range_sync_block_rejects_missing_custody_columns() { let result = RangeSyncBlock::new( Arc::new(block), block_data, - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), + &harness.chain.custody_context, ); // Should fail with MissingCustodyColumns @@ -2599,6 +2624,7 @@ async fn rpc_block_allows_construction_past_da_boundary() { // Now verify the block is past the DA boundary let da_boundary = harness .chain + .custody_context .data_availability_boundary() .expect("DA boundary should be set"); assert!( @@ -2613,8 +2639,7 @@ async fn rpc_block_allows_construction_past_da_boundary() { let result = RangeSyncBlock::new( Arc::new(block), AvailableBlockData::NoData, - &harness.chain.data_availability_checker, - harness.chain.spec.clone(), + &harness.chain.custody_context, ); assert!( diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 9f0b3675f3..537e8f40ee 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -43,7 +43,10 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { let mut random_sidecar = DataColumnSidecarGloas::arbitrary(&mut u).unwrap(); let epoch = slot.epoch(E::slots_per_epoch()); random_sidecar.slot = slot; - random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + random_sidecar.index = harness + .chain + .custody_context + .sampling_columns_for_epoch(epoch)[0]; // For gloas, the bid must be known, e.g. in the pending payload cache let mut bid = SignedExecutionPayloadBid::::empty(); @@ -58,7 +61,10 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { let mut random_sidecar = DataColumnSidecarFulu::arbitrary(&mut u).unwrap(); let epoch = slot.epoch(E::slots_per_epoch()); random_sidecar.signed_block_header.message.slot = slot; - random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + random_sidecar.index = harness + .chain + .custody_context + .sampling_columns_for_epoch(epoch)[0]; DataColumnSidecar::Fulu(random_sidecar) } }; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 4d392ef524..f15beb8197 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3322,13 +3322,8 @@ async fn weak_subjectivity_sync_test( let (_, block, data) = clone_block(&available_blocks[0]).deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::new( - Arc::new(corrupt_block), - data, - &beacon_chain.data_availability_checker, - Arc::new(spec), - ) - .expect("available block") + AvailableBlock::new(Arc::new(corrupt_block), data, &beacon_chain.custody_context) + .expect("available block") }; // Importing the invalid batch should error. @@ -4878,7 +4873,10 @@ async fn test_column_da_boundary() { // The column da boundary should be the fulu fork epoch assert_eq!( - harness.chain.column_data_availability_boundary(), + harness + .chain + .custody_context + .column_data_availability_boundary(), Some(fulu_fork_epoch) ); } @@ -5293,6 +5291,7 @@ async fn test_custody_column_filtering_regular_node() { // Get custody columns for this epoch - regular nodes only store a subset let expected_custody_columns: HashSet<_> = harness .chain + .custody_context .custody_columns_for_epoch(Some(current_slot.epoch(E::slots_per_epoch()))) .iter() .copied() @@ -5374,8 +5373,6 @@ async fn test_missing_columns_after_cgc_change() { return; } - let custody_context = harness.chain.data_availability_checker.custody_context(); - harness.advance_slot(); harness .extend_chain( @@ -5397,7 +5394,10 @@ async fn test_missing_columns_after_cgc_change() { let epoch_after_increase = Epoch::new(num_epochs_before_increase + 2); let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); - custody_context.register_validators(vec![(1, 32_000_000_000 * 9)], cgc_change_slot, &spec); + harness + .chain + .custody_context + .register_validators(vec![(1, 32_000_000_000 * 9)], cgc_change_slot); harness.advance_slot(); harness @@ -5444,8 +5444,6 @@ async fn test_safely_backfill_data_column_custody_info() { return; } - let custody_context = harness.chain.data_availability_checker.custody_context(); - harness.advance_slot(); harness .extend_chain( @@ -5461,7 +5459,10 @@ async fn test_safely_backfill_data_column_custody_info() { let cgc_change_slot = epoch_before_increase.end_slot(E::slots_per_epoch()); - custody_context.register_validators(vec![(1, 32_000_000_000 * 16)], cgc_change_slot, &spec); + harness + .chain + .custody_context + .register_validators(vec![(1, 32_000_000_000 * 16)], cgc_change_slot); let epoch_after_increase = (cgc_change_slot + effective_delay_slots).epoch(E::slots_per_epoch()); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index bdb4228765..7b7c7beb9b 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -171,7 +171,9 @@ pub fn spawn_notifier( Ok(data_column_custody_info) => { if let Some(earliest_data_column_slot) = data_column_custody_info .and_then(|info| info.earliest_data_column_slot) - && let Some(da_boundary) = beacon_chain.get_column_da_boundary() + && let Some(da_boundary) = beacon_chain + .custody_context + .column_data_availability_boundary() { sync_distance = earliest_data_column_slot.saturating_sub( da_boundary.start_slot(T::EthSpec::slots_per_epoch()), @@ -295,7 +297,9 @@ pub fn spawn_notifier( let speed = speedo.slots_per_second(); let display_speed = speed.is_some_and(|speed| speed != 0.0); let est_time_in_secs = if let (Some(da_boundary_epoch), Some(original_slot)) = ( - beacon_chain.get_column_da_boundary(), + beacon_chain + .custody_context + .column_data_availability_boundary(), original_earliest_data_column_slot, ) { let target = original_slot.saturating_sub( diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs index d058f66001..456d371c71 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelopes.rs @@ -200,7 +200,7 @@ pub async fn publish_execution_payload_envelope( } let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_column_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_column_indices = chain.custody_context.sampling_columns_for_epoch(epoch); let sampling_columns = gossip_verified_columns .into_iter() .filter(|col| sampling_column_indices.contains(&col.index())) diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 50d5c8d165..fe34863b77 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -602,7 +602,9 @@ mod tests { "precondition: test block must not be imported into fork choice yet" ); - let sampling_columns = chain.sampling_columns_for_epoch(block.epoch()); + let sampling_columns = chain + .custody_context + .sampling_columns_for_epoch(block.epoch()); let data_columns = generate_data_column_sidecars_from_block(&block, &chain.spec) .into_iter() .filter(|column| sampling_columns.contains(column.index())) @@ -615,8 +617,7 @@ mod tests { let available_block = AvailableBlock::new( block.clone(), AvailableBlockData::new_with_data_columns(data_columns), - &chain.data_availability_checker, - chain.spec.clone(), + &chain.custody_context, ) .unwrap(); diff --git a/beacon_node/http_api/src/custody.rs b/beacon_node/http_api/src/custody.rs index a43b55ceca..21004beb72 100644 --- a/beacon_node/http_api/src/custody.rs +++ b/beacon_node/http_api/src/custody.rs @@ -17,6 +17,7 @@ pub fn info( .map_err(|e| custom_server_error(format!("error reading DataColumnCustodyInfo: {e:?}")))?; let column_data_availability_boundary = chain + .custody_context .column_data_availability_boundary() .ok_or_else(|| custom_server_error("unreachable: Fulu should be enabled".to_string()))?; @@ -38,12 +39,13 @@ pub fn info( // Compute the custody columns and the CGC *at the earliest custodied slot*. The node might // have some columns prior to this, but this value is the most up-to-date view of the data the // node is custodying. - let custody_context = chain.data_availability_checker.custody_context(); - let custody_columns = custody_context - .custody_columns_for_epoch(Some(earliest_custodied_data_column_epoch), &chain.spec) + let custody_columns = chain + .custody_context + .custody_columns_for_epoch(Some(earliest_custodied_data_column_epoch)) .to_vec(); - let custody_group_count = custody_context - .custody_group_count_at_epoch(earliest_custodied_data_column_epoch, &chain.spec); + let custody_group_count = chain + .custody_context + .custody_group_count_at_epoch(earliest_custodied_data_column_epoch); Ok(CustodyInfo { earliest_custodied_data_column_slot, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 7c0959acb9..8e31e88ff8 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3179,10 +3179,11 @@ pub fn serve( .head_slot() .epoch(T::EthSpec::slots_per_epoch()) + 1; - let custody_context = chain.data_availability_checker.custody_context(); // Reset validator custody requirements to `effective_epoch` with the latest // cgc requiremnets. - custody_context.reset_validator_custody_requirements(effective_epoch); + chain + .custody_context + .reset_validator_custody_requirements(effective_epoch); // Update `DataColumnCustodyInfo` to reflect the custody change. chain.update_data_column_custody_info(Some( effective_epoch.start_slot(T::EthSpec::slots_per_epoch()), diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e3e9839b2d..41bb515a2b 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -217,7 +217,7 @@ pub async fn publish_block>( warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_columns_indices = chain.custody_context.sampling_columns_for_epoch(epoch); let sampling_columns = gossip_verified_columns .into_iter() .filter(|data_column| sampling_columns_indices.contains(&data_column.index())) diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index ee699b3adc..7f3d1cd721 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -855,9 +855,8 @@ pub fn post_validator_prepare_beacon_proposer( let current_slot = chain.slot().map_err(warp_utils::reject::unhandled_error)?; if let Some(cgc_change) = chain - .data_availability_checker - .custody_context() - .register_validators(validators_and_balances, current_slot, &chain.spec) + .custody_context + .register_validators(validators_and_balances, current_slot) { chain.update_data_column_custody_info(Some( cgc_change diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 98629a1c5e..6d80344e94 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -2036,6 +2036,7 @@ fn get_custody_columns(tester: &InteractiveTester, slot: Slot) -> HashSet NetworkBeaconProcessor { async fn check_reconstruction_trigger(self: &Arc, slot: Slot, block_root: &Hash256) { if self .chain - .data_availability_checker - .custody_context() - .should_attempt_reconstruction( - slot.epoch(T::EthSpec::slots_per_epoch()), - &self.chain.spec, - ) + .custody_context + .should_attempt_reconstruction(slot.epoch(T::EthSpec::slots_per_epoch())) { // Instead of triggering reconstruction immediately, schedule it to be run. If // another column arrives, it either completes availability or pushes diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ea5fa3e90b..8066d8c689 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -910,7 +910,7 @@ impl NetworkBeaconProcessor { return; } let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch()); - let custody_columns = self.chain.sampling_columns_for_epoch(epoch); + let custody_columns = self.chain.custody_context.sampling_columns_for_epoch(epoch); let self_cloned = self.clone(); let publish_fn = move |columns: Vec>| { if publish_blobs { @@ -985,7 +985,7 @@ impl NetworkBeaconProcessor { return; }; let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch()); - let custody_columns = self.chain.sampling_columns_for_epoch(epoch); + let custody_columns = self.chain.custody_context.sampling_columns_for_epoch(epoch); let columns = assembler.get_columns_and_mark_as_local_fetched(block_root, &header); let mut present_indices: HashSet = HashSet::with_capacity(columns.len()); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 37a6f3779a..fc451ebc05 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -791,7 +791,7 @@ impl NetworkBeaconProcessor { ) -> Result<(), (RpcErrorResponse, &'static str)> { let mut send_data_column_count = 0; // Only attempt lookups for columns the node has advertised and is responsible for maintaining custody of. - let available_columns = self.chain.custody_columns_for_epoch(None); + let available_columns = self.chain.custody_context.custody_columns_for_epoch(None); for data_column_ids_by_root in request.data_column_ids.as_slice() { let indices_to_retrieve = data_column_ids_by_root @@ -1595,13 +1595,14 @@ impl NetworkBeaconProcessor { req.count }; - let data_availability_boundary_slot = match self.chain.data_availability_boundary() { - Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), - None => { - debug!("Deneb fork is disabled"); - return Err((RpcErrorResponse::InvalidRequest, "Deneb fork is disabled")); - } - }; + let data_availability_boundary_slot = + match self.chain.custody_context.data_availability_boundary() { + Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), + None => { + debug!("Deneb fork is disabled"); + return Err((RpcErrorResponse::InvalidRequest, "Deneb fork is disabled")); + } + }; let oldest_blob_slot = self .chain @@ -1745,14 +1746,17 @@ impl NetworkBeaconProcessor { let request_start_slot = Slot::from(req.start_slot); - let column_data_availability_boundary_slot = - match self.chain.column_data_availability_boundary() { - Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), - None => { - debug!("Fulu fork is disabled"); - return Err((RpcErrorResponse::InvalidRequest, "Fulu fork is disabled")); - } - }; + let column_data_availability_boundary_slot = match self + .chain + .custody_context + .column_data_availability_boundary() + { + Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), + None => { + debug!("Fulu fork is disabled"); + return Err((RpcErrorResponse::InvalidRequest, "Fulu fork is disabled")); + } + }; let earliest_custodied_data_column_slot = match self.chain.earliest_custodied_data_column_epoch() { @@ -1798,6 +1802,7 @@ impl NetworkBeaconProcessor { let request_start_epoch = request_start_slot.epoch(T::EthSpec::slots_per_epoch()); let available_columns = self .chain + .custody_context .custody_columns_for_epoch(Some(request_start_epoch)); let indices_to_retrieve = req @@ -1916,9 +1921,8 @@ impl NetworkBeaconProcessor { let non_custody_indices = { let custody_columns = self .chain - .data_availability_checker - .custody_context() - .custody_columns_for_epoch(epoch_opt, &self.chain.spec); + .custody_context + .custody_columns_for_epoch(epoch_opt); requested_indices .iter() .filter(|subnet_id| !custody_columns.contains(subnet_id)) diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 89434c878e..95f7d7649f 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -340,7 +340,7 @@ impl TestRig { if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let kzg = get_kzg(&chain.spec); let epoch = block.slot().epoch(E::slots_per_epoch()); - let sampling_indices = chain.sampling_columns_for_epoch(epoch); + let sampling_indices = chain.custody_context.sampling_columns_for_epoch(epoch); let custody_columns: DataColumnSidecarList = blobs_to_data_column_sidecars( &blobs.iter().collect_vec(), kzg_proofs.clone().into_iter().collect_vec(), @@ -1836,6 +1836,7 @@ async fn test_data_columns_by_range_request_only_returns_requested_columns() { let all_custody_columns = rig .chain + .custody_context .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); let available_columns: Vec = all_custody_columns.to_vec(); @@ -1897,7 +1898,10 @@ async fn test_data_columns_by_range_no_duplicates_with_skip_slots() { let skip_slots: HashSet = [5, 6].into_iter().collect(); let mut rig = TestRig::new_with_skip_slots(128, &skip_slots).await; - let all_custody_columns = rig.chain.custody_columns_for_epoch(Some(Epoch::new(0))); + let all_custody_columns = rig + .chain + .custody_context + .custody_columns_for_epoch(Some(Epoch::new(0))); let requested_column = vec![all_custody_columns[0]]; // Request a range that spans the skip slots (slots 0 through 9). diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ba4aada352..cef3d5859e 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -294,10 +294,7 @@ impl NetworkService { let (mut libp2p, network_globals) = Network::new( executor.clone(), service_context, - beacon_chain - .data_availability_checker - .custody_context() - .custody_group_count_at_head(&beacon_chain.spec), + beacon_chain.custody_context.custody_group_count_at_head(), local_keypair, ) .await?; diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index c8bb17243e..edf358976a 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -352,11 +352,14 @@ impl BackFillSync { debug!(?batch_id, msg, "Blob peer failure"); } CouplingError::EnvelopePeerFailure(msg) => { - debug!(?batch_id, msg, "Envelope peer failure"); + debug!(?batch_id, ?msg, "Envelope peer failure"); } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } + CouplingError::AvailabilityCheckError(err) => { + error!(?batch_id, ?err, "Availability check error"); + } } } // A batch could be retried without the peer failing the request (disconnecting/ diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index dbf3604cf0..09ffa50f9b 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -394,12 +394,11 @@ impl SingleBlockLookup { match &mut self.data_request { DataRequest::WaitingForBlock => { if let Some(block) = self.block_request.state.peek_downloaded_data() { - let block_epoch = block - .slot() - .epoch(::EthSpec::slots_per_epoch()); - self.data_request = if block.num_expected_blobs() == 0 { - DataRequest::NoData - } else if cx.chain.should_fetch_custody_columns(block_epoch) { + self.data_request = if cx + .chain + .custody_context + .data_columns_required_for_block(block) + { DataRequest::Request { slot: block.slot(), peers: self.get_data_peers(block.payload_bid_block_hash().ok()), diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 93c0699ded..001fabb704 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,8 +1,9 @@ +use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::payload_envelope_verification::AvailableEnvelope; use beacon_chain::{ BeaconChainTypes, block_verification_types::{AvailableBlockData, RangeSyncBlock}, - data_availability_checker::DataAvailabilityChecker, + custody_context::CustodyContext, data_column_verification::CustodyDataColumn, get_block_root, }; @@ -81,6 +82,13 @@ pub enum CouplingError { }, BlobPeerFailure(String), EnvelopePeerFailure(String), + AvailabilityCheckError(AvailabilityCheckError), +} + +impl From for CouplingError { + fn from(e: AvailabilityCheckError) -> Self { + CouplingError::AvailabilityCheckError(e) + } } impl RangeBlockComponentsRequest { @@ -221,7 +229,7 @@ impl RangeBlockComponentsRequest { /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. pub fn responses( &mut self, - da_checker: Arc>, + custody_context: &CustodyContext, spec: Arc, ) -> Option>, CouplingError>> where @@ -241,7 +249,7 @@ impl RangeBlockComponentsRequest { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( blocks.to_vec(), vec![], - da_checker, + custody_context, spec, )), RangeBlockDataRequest::Blobs(request) => { @@ -251,7 +259,7 @@ impl RangeBlockComponentsRequest { Some(Self::responses_with_blobs( blocks.to_vec(), blobs.to_vec(), - da_checker, + custody_context, spec, )) } @@ -294,8 +302,7 @@ impl RangeBlockComponentsRequest { column_to_peer_id, expected_custody_columns, *attempt, - da_checker, - spec, + custody_context, payload_envelopes, ); @@ -321,7 +328,7 @@ impl RangeBlockComponentsRequest { fn responses_with_blobs( blocks: Vec>>, blobs: Vec>>, - da_checker: Arc>, + custody_context: &CustodyContext, spec: Arc, ) -> Result>, CouplingError> where @@ -370,7 +377,7 @@ impl RangeBlockComponentsRequest { })?; let block_data = AvailableBlockData::new_with_blobs(blobs); responses.push( - RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) + RangeSyncBlock::new(block, block_data, custody_context) .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } @@ -394,8 +401,7 @@ impl RangeBlockComponentsRequest { column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], attempt: usize, - da_checker: Arc>, - spec: Arc, + custody_context: &CustodyContext, payload_envelopes: Option>>>, ) -> Result>, CouplingError> where @@ -499,17 +505,24 @@ impl RangeBlockComponentsRequest { envelopes_by_block_root.as_mut() { let envelope = envelopes_by_block_root.remove(&block_root); - let available_envelope = - envelope.map(|env| AvailableEnvelope::new(env, custody_columns)); + let bid = block + .message() + .body() + .signed_execution_payload_bid() + // this really should never fail + .map_err(|_| AvailabilityCheckError::MissingBid(block_root))?; + let available_envelope = envelope + .map(|env| AvailableEnvelope::new(env, custody_columns, bid, custody_context)) + .transpose()?; RangeSyncBlock::new_gloas(block, available_envelope) .map_err(CouplingError::EnvelopePeerFailure)? } else if custody_columns.is_empty() { - RangeSyncBlock::new(block, AvailableBlockData::NoData, &da_checker, spec.clone()) + RangeSyncBlock::new(block, AvailableBlockData::NoData, custody_context) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { let block_data = AvailableBlockData::new_with_data_columns(custody_columns); - RangeSyncBlock::new(block, block_data, &da_checker, spec.clone()) + RangeSyncBlock::new(block, block_data, custody_context) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }; range_sync_blocks.push(range_sync_block); @@ -562,11 +575,10 @@ mod tests { use super::RangeBlockComponentsRequest; use beacon_chain::block_verification_types::RangeSyncBlock; - use beacon_chain::custody_context::NodeCustodyType; - use beacon_chain::data_availability_checker::DataAvailabilityChecker; + use beacon_chain::custody_context::{CustodyContext, NodeCustodyType}; use beacon_chain::test_utils::{ EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs, - generate_rand_block_and_data_columns, test_da_checker, test_spec, + generate_rand_block_and_data_columns, test_custody_context, test_spec, }; use bls::Signature; use lighthouse_network::{ @@ -737,8 +749,8 @@ mod tests { fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { let spec = Arc::new(test_spec::()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - info.responses(da_checker, spec).is_some() + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + info.responses(&custody_context, spec).is_some() } fn gloas_spec() -> ChainSpec { @@ -826,7 +838,7 @@ mod tests { #[allow(clippy::type_complexity)] struct GloasSetup { info: RangeBlockComponentsRequest, - da_checker: Arc>>, + custody_context: Arc>>, spec: Arc, blocks: Vec<( Arc>, @@ -841,10 +853,9 @@ mod tests { /// ready for the per-test payload-envelope step. fn setup_gloas_coupling(count: usize) -> GloasSetup { let spec = Arc::new(gloas_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_custody_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expected_custody_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); let blocks = make_gloas_blocks_and_columns(count, &spec); @@ -886,7 +897,7 @@ mod tests { GloasSetup { info, - da_checker, + custody_context, spec, blocks, payloads_req_id, @@ -900,7 +911,7 @@ mod tests { // Gloas each block still couples to its (empty-column) payload envelope, so the envelope // request is driven too. let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); let blocks = make_blocks_and_columns(4, NumBlobs::None, &spec); let components_id = components_id(); @@ -921,7 +932,7 @@ mod tests { .unwrap(); add_envelopes_if_gloas(&mut info, payloads_req_id, &blocks); - let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let responses = info.responses(&custody_context, spec).unwrap().unwrap(); assert_custody_columns_coupled(&responses, blocks.len(), 0); } @@ -960,19 +971,18 @@ mod tests { // FORK_NAME. spec.fulu_fork_epoch = None; let spec = Arc::new(spec); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); // Blobs are no longer required for availability, so the response succeeds without them. - let result = info.responses(da_checker, spec).unwrap(); + let result = info.responses(&custody_context, spec).unwrap(); assert!(result.is_ok()) } #[test] fn rpc_block_with_custody_columns() { let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expects_custody_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expects_custody_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); let blocks = make_blocks_and_columns(4, NumBlobs::Number(1), &spec); @@ -1038,17 +1048,16 @@ mod tests { add_envelopes_if_gloas(&mut info, payloads_req_id, &blocks); // All completed construct response - let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let responses = info.responses(&custody_context, spec).unwrap().unwrap(); assert_custody_columns_coupled(&responses, blocks.len(), expects_custody_columns.len()); } #[test] fn rpc_block_with_custody_columns_batched() { let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expected_sampling_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); // Split sampling columns into two batches let mid = expected_sampling_columns.len() / 2; @@ -1126,7 +1135,7 @@ mod tests { add_envelopes_if_gloas(&mut info, payloads_req_id, &blocks); // All completed construct response - let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let responses = info.responses(&custody_context, spec).unwrap().unwrap(); assert_custody_columns_coupled(&responses, blocks.len(), expected_sampling_columns.len()); } @@ -1134,20 +1143,20 @@ mod tests { fn gloas_payload_envelopes_must_complete_before_responses() { let GloasSetup { mut info, - da_checker, + custody_context, spec, .. } = setup_gloas_coupling(2); // No payload envelopes added yet, so the request must not be complete. - assert!(info.responses(da_checker, spec).is_none()); + assert!(info.responses(&custody_context, spec).is_none()); } #[test] fn gloas_payload_envelopes_are_coupled_by_block_root() { let GloasSetup { mut info, - da_checker, + custody_context, spec, blocks, payloads_req_id, @@ -1165,7 +1174,7 @@ mod tests { ) .unwrap(); - let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let responses = info.responses(&custody_context, spec).unwrap().unwrap(); assert_eq!(responses.len(), blocks.len()); for response in responses { match response { @@ -1188,7 +1197,7 @@ mod tests { fn gloas_payload_envelopes_allow_missing_envelopes() { let GloasSetup { mut info, - da_checker, + custody_context, spec, blocks, payloads_req_id, @@ -1199,7 +1208,7 @@ mod tests { info.add_payload_envelopes(payloads_req_id, vec![blocks[0].2.clone()]) .unwrap(); - let responses = info.responses(da_checker, spec).unwrap().unwrap(); + let responses = info.responses(&custody_context, spec).unwrap().unwrap(); let count_with = |with_envelope: bool| { responses .iter() @@ -1216,7 +1225,7 @@ mod tests { fn gloas_payload_envelope_mismatch_fails_coupling() { let GloasSetup { mut info, - da_checker, + custody_context, spec, blocks, payloads_req_id, @@ -1228,7 +1237,7 @@ mod tests { info.add_payload_envelopes(payloads_req_id, vec![Arc::new(bad_envelope)]) .unwrap(); - let result = info.responses(da_checker, spec).unwrap(); + let result = info.responses(&custody_context, spec).unwrap(); assert!( matches!( result, @@ -1243,10 +1252,9 @@ mod tests { fn missing_custody_columns_from_faulty_peers() { // GIVEN: A request expecting sampling columns from multiple peers let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expected_sampling_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); let blocks = make_blocks_and_columns(2, NumBlobs::Number(1), &spec); @@ -1309,7 +1317,7 @@ mod tests { } // WHEN: Attempting to construct RPC blocks - let result = info.responses(da_checker, spec).unwrap(); + let result = info.responses(&custody_context, spec).unwrap(); // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); @@ -1337,10 +1345,9 @@ mod tests { fn retry_logic_after_peer_failures() { // GIVEN: A request expecting sampling columns where some peers initially fail let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expected_sampling_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); let blocks = make_blocks_and_columns(2, NumBlobs::Number(1), &spec); @@ -1402,7 +1409,7 @@ mod tests { let result: Result< Vec>, crate::sync::block_sidecar_coupling::CouplingError, - > = info.responses(da_checker.clone(), spec.clone()).unwrap(); + > = info.responses(&custody_context, spec.clone()).unwrap(); assert!(result.is_err()); // AND: We retry with a new peer for the failed columns @@ -1433,7 +1440,7 @@ mod tests { .unwrap(); // WHEN: Attempting to get responses again - let result = info.responses(da_checker, spec).unwrap(); + let result = info.responses(&custody_context, spec).unwrap(); // THEN: Should succeed with complete RangeSync blocks assert!(result.is_ok()); @@ -1445,10 +1452,9 @@ mod tests { fn max_retries_exceeded_behavior() { // GIVEN: A request where peers consistently fail to provide required columns let spec = Arc::new(custody_test_spec()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker - .custody_context() - .sampling_columns_for_epoch(Epoch::new(0), &spec) + let custody_context = test_custody_context(NodeCustodyType::Fullnode, spec.clone()); + let expected_sampling_columns = custody_context + .sampling_columns_for_epoch(Epoch::new(0)) .to_vec(); let blocks = make_blocks_and_columns(1, NumBlobs::Number(1), &spec); @@ -1509,7 +1515,7 @@ mod tests { // WHEN: Multiple retry attempts are made (up to max retries) for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(da_checker.clone(), spec.clone()).unwrap(); + let result = info.responses(&custody_context, spec.clone()).unwrap(); assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { @@ -1522,7 +1528,7 @@ mod tests { } // AND: One final attempt after exceeding max retries - let result = info.responses(da_checker, spec).unwrap(); + let result = info.responses(&custody_context, spec).unwrap(); // THEN: Should fail with exceeded_retries = true assert!(result.is_err()); diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index c85610613c..2874bbebf1 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -162,7 +162,11 @@ impl CustodyBackFillSync { /// - The earliest data column epoch's custodied columns != previous epoch's custodied columns /// - The earliest data column epoch is a finalied epoch pub fn should_start_custody_backfill_sync(&mut self) -> bool { - let Some(da_boundary_epoch) = self.beacon_chain.get_column_da_boundary() else { + let Some(da_boundary_epoch) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return false; }; @@ -220,9 +224,8 @@ impl CustodyBackFillSync { fn restart_if_required(&mut self) -> bool { let cgc_at_head = self .beacon_chain - .data_availability_checker - .custody_context() - .custody_group_count_at_head(&self.beacon_chain.spec); + .custody_context + .custody_group_count_at_head(); if cgc_at_head != self.cgc { self.restart_sync(); @@ -290,7 +293,11 @@ impl CustodyBackFillSync { } } - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + let Some(column_da_boundary) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return Ok(SyncStart::NotSyncing); }; @@ -309,9 +316,8 @@ impl CustodyBackFillSync { fn set_cgc(&mut self) { self.cgc = self .beacon_chain - .data_availability_checker - .custody_context() - .custody_group_count_at_head(&self.beacon_chain.spec); + .custody_context + .custody_group_count_at_head(); } fn set_start_epoch(&mut self) { @@ -379,9 +385,10 @@ impl CustodyBackFillSync { /// Creates the next required batch from the chain. If there are no more batches required, /// `None` is returned. fn include_next_batch(&mut self) -> Option { - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { - return None; - }; + let column_da_boundary = self + .beacon_chain + .custody_context + .column_data_availability_boundary()?; // Skip all batches (Epochs) that don't have missing columns. for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) { @@ -715,7 +722,11 @@ impl CustodyBackFillSync { self.advance_custody_backfill_sync(batch_id); - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + let Some(column_da_boundary) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return Err(CustodyBackfillError::InvalidSyncState( "Can't calculate column data availability boundary".to_string(), )); @@ -889,7 +900,11 @@ impl CustodyBackFillSync { /// /// The `validating_epoch` must align with batch boundaries. fn advance_custody_backfill_sync(&mut self, validating_epoch: Epoch) { - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + let Some(column_da_boundary) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return; }; // make sure this epoch produces an advancement, unless its at the column DA boundary @@ -986,7 +1001,11 @@ impl CustodyBackFillSync { return false; }; - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + let Some(column_da_boundary) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return false; }; @@ -997,7 +1016,11 @@ impl CustodyBackFillSync { /// Checks if custody backfill would complete by syncing to `start_epoch`. fn would_complete(&self, start_epoch: Epoch) -> bool { - let Some(column_da_boundary) = self.beacon_chain.get_column_da_boundary() else { + let Some(column_da_boundary) = self + .beacon_chain + .custody_context + .column_data_availability_boundary() + else { return false; }; start_epoch <= column_da_boundary diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 8b4e3c5694..100be9f4d7 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -589,6 +589,7 @@ impl SyncNetworkContext { let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let column_indexes = self .chain + .custody_context .sampling_columns_for_epoch(epoch) .iter() .cloned() @@ -694,7 +695,10 @@ impl SyncNetworkContext { data_column_requests.map(|data_column_requests| { ( data_column_requests, - self.chain.sampling_columns_for_epoch(epoch).to_vec(), + self.chain + .custody_context + .sampling_columns_for_epoch(epoch) + .to_vec(), ) }), payloads_req_id, @@ -815,10 +819,9 @@ impl SyncNetworkContext { } let range_req = entry.get_mut(); - if let Some(blocks_result) = range_req.responses( - self.chain.data_availability_checker.clone(), - self.chain.spec.clone(), - ) { + if let Some(blocks_result) = + range_req.responses(&self.chain.custody_context, self.chain.spec.clone()) + { if let Err(CouplingError::DataColumnPeerFailure { error, faulty_peers: _, @@ -1108,6 +1111,7 @@ impl SyncNetworkContext { // Include only the blob indexes not yet imported (received through gossip) let mut custody_indexes_to_fetch = self .chain + .custody_context .sampling_columns_for_epoch(block_slot.epoch(T::EthSpec::slots_per_epoch())) .iter() .copied() @@ -1424,15 +1428,11 @@ impl SyncNetworkContext { ByRangeRequestType::BlocksAndEnvelopesAndColumns } else if self .chain - .data_availability_checker + .custody_context .data_columns_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndColumns - } else if self - .chain - .data_availability_checker - .blobs_required_for_epoch(epoch) - { + } else if self.chain.custody_context.blobs_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks @@ -1734,6 +1734,7 @@ impl SyncNetworkContext { let columns_by_range_peers_to_request = { let column_indexes = self .chain + .custody_context .sampling_columns_for_epoch(batch_id.epoch) .iter() .cloned() diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 6292388339..e82bfa8a1a 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -958,6 +958,9 @@ impl SyncingChain { CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } + CouplingError::AvailabilityCheckError(err) => { + error!(?batch_id, ?err, "Availability check error"); + } } } // A batch could be retried without the peer failing the request (disconnecting/ diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 835b7546b3..f9afb910d9 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1282,30 +1282,34 @@ impl TestRig { ) { let block_root = block.canonical_root(); let block_slot = block.slot(); - let range_sync_block = if block.fork_name_unchecked().gloas_enabled() { - // Gloas carries data columns in the payload envelope, not in `block_data`. - let envelope = self - .network_blocks_by_root - .get(&block_root) - .and_then(envelope_of) - .map(|envelope| AvailableEnvelope::new(envelope, columns.unwrap_or_default())); - RangeSyncBlock::new_gloas(block, envelope).unwrap() - } else { - let block_data = if let Some(columns) = columns { - AvailableBlockData::new_with_data_columns(columns) - } else if let Some(blobs) = blobs { - AvailableBlockData::new_with_blobs(blobs) + let range_sync_block = + if let Ok(bid) = block.message().body().signed_execution_payload_bid() { + // Gloas carries data columns in the payload envelope, not in `block_data`. + let envelope = self + .network_blocks_by_root + .get(&block_root) + .and_then(envelope_of) + .map(|envelope| { + AvailableEnvelope::new( + envelope, + columns.unwrap_or_default(), + bid, + &self.harness.chain.custody_context, + ) + }) + .transpose() + .unwrap(); + RangeSyncBlock::new_gloas(block, envelope).unwrap() } else { - AvailableBlockData::NoData + let block_data = if let Some(columns) = columns { + AvailableBlockData::new_with_data_columns(columns) + } else if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + RangeSyncBlock::new(block, block_data, &self.harness.chain.custody_context).unwrap() }; - RangeSyncBlock::new( - block, - block_data, - &self.harness.chain.data_availability_checker, - self.harness.chain.spec.clone(), - ) - .unwrap() - }; self.network_blocks_by_slot .insert(block_slot, range_sync_block.clone()); self.network_blocks_by_root @@ -1622,9 +1626,8 @@ impl TestRig { fn custody_columns(&self) -> &[ColumnIndex] { self.harness .chain - .data_availability_checker - .custody_context() - .custody_columns_for_epoch(None, &self.harness.spec) + .custody_context + .custody_columns_for_epoch(None) } // Test setup diff --git a/consensus/types/src/execution/signed_execution_payload_bid.rs b/consensus/types/src/execution/signed_execution_payload_bid.rs index 2ad6dcea1a..7dd6a88a1a 100644 --- a/consensus/types/src/execution/signed_execution_payload_bid.rs +++ b/consensus/types/src/execution/signed_execution_payload_bid.rs @@ -37,6 +37,10 @@ impl SignedExecutionPayloadBid { signature: Signature::empty(), } } + + pub fn num_blobs_expected(&self) -> usize { + self.message.blob_kzg_commitments.len() + } } #[cfg(test)]