diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index df8c49f8de..1b0aea5a2f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -645,26 +645,36 @@ pub fn signature_verify_chain_segment( &chain.spec, )?; - // unzip chain segment and verify kzg in bulk - let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); - let maybe_available_blocks = chain - .data_availability_checker - .verify_kzg_for_rpc_blocks(blocks)?; - // zip it back up - let mut signature_verified_blocks = roots - .into_iter() - .zip(maybe_available_blocks) - .map(|(block_root, maybe_available_block)| { - let consensus_context = ConsensusContext::new(maybe_available_block.slot()) - .set_current_block_root(block_root); - SignatureVerifiedBlock { - block: maybe_available_block, - block_root, - parent: None, - consensus_context, + let mut available_blocks = Vec::with_capacity(chain_segment.len()); + let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); + + for (block_root, block) in chain_segment { + let consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(block_root); + + match block { + RpcBlock::FullyAvailable(available_block) => { + available_blocks.push(available_block.clone()); + signature_verified_blocks.push(SignatureVerifiedBlock { + block: MaybeAvailableBlock::Available(available_block), + block_root, + parent: None, + consensus_context, + }); } - }) - .collect::>(); + RpcBlock::BlockOnly { .. } => { + // RangeSync and BackfillSync already ensure that the chain segment is fully available + // so this shouldn't be possible in practice. + return Err(BlockError::InternalError( + "Chain segment is not fully available".to_string(), + )); + } + } + } + + chain + .data_availability_checker + .batch_verify_kzg_for_available_blocks(&available_blocks)?; // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; @@ -1297,16 +1307,28 @@ impl IntoExecutionPendingBlock for RpcBlock // Perform an early check to prevent wasting time on irrelevant blocks. let block_root = check_block_relevancy(self.as_block(), block_root, chain) .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; - let maybe_available = chain - .data_availability_checker - .verify_kzg_for_rpc_block(self.clone()) - .map_err(|e| { - BlockSlashInfo::SignatureNotChecked( - self.signed_block_header(), - BlockError::AvailabilityCheck(e), - ) - })?; - SignatureVerifiedBlock::check_slashable(maybe_available, block_root, chain)? + + let maybe_available_block = match &self { + RpcBlock::FullyAvailable(available_block) => { + chain + .data_availability_checker + .verify_kzg_for_available_block(available_block) + .map_err(|e| { + BlockSlashInfo::SignatureNotChecked( + self.signed_block_header(), + BlockError::AvailabilityCheck(e), + ) + })?; + MaybeAvailableBlock::Available(available_block.clone()) + } + // No need to perform KZG verification unless we have a fully available block + RpcBlock::BlockOnly { block, block_root } => MaybeAvailableBlock::AvailabilityPending { + block_root: *block_root, + block: block.clone(), + }, + }; + + SignatureVerifiedBlock::check_slashable(maybe_available_block, block_root, chain)? .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) } diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index f7831d5c77..84e600cd40 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -1,204 +1,151 @@ -use crate::data_availability_checker::AvailabilityCheckError; -pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; -use crate::data_column_verification::{CustodyDataColumn, CustodyDataColumnList}; -use crate::{PayloadVerificationOutcome, get_block_root}; +use crate::data_availability_checker::{AvailabilityCheckError, DataAvailabilityChecker}; +pub use crate::data_availability_checker::{ + AvailableBlock, AvailableBlockData, MaybeAvailableBlock, +}; +use crate::{BeaconChainTypes, PayloadVerificationOutcome}; use educe::Educe; -use ssz_types::VariableList; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, + BeaconBlockRef, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: /// -/// 1. `BlockAndBlobs`: A fully available post deneb block with all the blobs available. This variant -/// is only constructed after making consistency checks between blocks and blobs. -/// Hence, it is fully self contained w.r.t verification. i.e. this block has all the required -/// data to get verified and imported into fork choice. +/// 1. `FullyAvailable`: A fully available block. This can either be a pre-deneb block, a +/// post-Deneb block with blobs, a post-Fulu block with the columns the node is required to custody, +/// or a post-Deneb block that doesn't require blobs/columns. Hence, it is fully self contained w.r.t +/// verification. i.e. this block has all the required data to get verified and imported into fork choice. /// -/// 2. `Block`: This can be a fully available pre-deneb block **or** a post-deneb block that may or may -/// not require blobs to be considered fully available. -/// -/// Note: We make a distinction over blocks received over gossip because -/// in a post-deneb world, the blobs corresponding to a given block that are received -/// over rpc do not contain the proposer signature for dos resistance. +/// 2. `BlockOnly`: This is a post-deneb block that requires blobs to be considered fully available. #[derive(Clone, Educe)] #[educe(Hash(bound(E: EthSpec)))] -pub struct RpcBlock { - block_root: Hash256, - block: RpcBlockInner, +pub enum RpcBlock { + FullyAvailable(AvailableBlock), + BlockOnly { + block: Arc>, + block_root: Hash256, + }, } impl Debug for RpcBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root) + write!(f, "RpcBlock({:?})", self.block_root()) } } impl RpcBlock { pub fn block_root(&self) -> Hash256 { - self.block_root + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_root(), + RpcBlock::BlockOnly { block_root, .. } => *block_root, + } } pub fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block(), + RpcBlock::BlockOnly { block, .. } => block, } } pub fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { block, .. } => block.clone(), } } - pub fn blobs(&self) -> Option<&BlobSidecarList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), - RpcBlockInner::BlockAndCustodyColumns(_, _) => None, + pub fn block_data(&self) -> Option<&AvailableBlockData> { + match self { + RpcBlock::FullyAvailable(available_block) => Some(available_block.data()), + RpcBlock::BlockOnly { .. } => None, } } - - pub fn custody_columns(&self) -> Option<&CustodyDataColumnList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, _) => None, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), - } - } -} - -/// Note: This variant is intentionally private because we want to safely construct the -/// internal variants after applying consistency checks to ensure that the block and blobs -/// are consistent with respect to each other. -#[derive(Debug, Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -enum RpcBlockInner { - /// Single block lookup response. This should potentially hit the data availability cache. - Block(Arc>), - /// This variant is used with parent lookups and by-range responses. It should have all blobs - /// ordered, all block roots matching, and the correct number of blobs for this block. - BlockAndBlobs(Arc>, BlobSidecarList), - /// This variant is used with parent lookups and by-range responses. It should have all - /// requested data columns, all block roots matching for this block. - BlockAndCustodyColumns(Arc>, CustodyDataColumnList), } impl RpcBlock { - /// Constructs a `Block` variant. - pub fn new_without_blobs( - block_root: Option, + /// Constructs an `RpcBlock` from a block and optional availability data. + /// + /// This function creates an RpcBlock which can be in one of two states: + /// - `FullyAvailable`: When `block_data` is provided, the block contains all required + /// data for verification. + /// - `BlockOnly`: When `block_data` is `None`, the block may still need additional + /// data to be considered fully available (used during block lookups or when blobs + /// will arrive separately). + /// + /// # Validation + /// + /// When `block_data` is provided, this function validates that: + /// - Block data is not provided when not required. + /// - Required blobs are present and match the expected count. + /// - Required custody columns are included based on the nodes custody requirements. + /// + /// # Errors + /// + /// Returns `AvailabilityCheckError` if: + /// - `InvalidAvailableBlockData`: Block data is provided but not required. + /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete. + /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete. + pub fn new( block: Arc>, - ) -> Self { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - Self { - block_root, - block: RpcBlockInner::Block(block), + block_data: Option>, + da_checker: &DataAvailabilityChecker, + spec: Arc, + ) -> Result + where + T: BeaconChainTypes, + { + match block_data { + Some(block_data) => Ok(RpcBlock::FullyAvailable(AvailableBlock::new( + block, block_data, da_checker, spec, + )?)), + None => Ok(RpcBlock::BlockOnly { + block_root: block.canonical_root(), + block, + }), } } - /// Constructs a new `BlockAndBlobs` variant after making consistency - /// checks between the provided blocks and blobs. This struct makes no - /// guarantees about whether blobs should be present, only that they are - /// consistent with the block. An empty list passed in for `blobs` is - /// viewed the same as `None` passed in. - pub fn new( - block_root: Option, - block: Arc>, - blobs: Option>, - ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - // Treat empty blob lists as if they are missing. - let blobs = blobs.filter(|b| !b.is_empty()); - - if let (Some(blobs), Ok(block_commitments)) = ( - blobs.as_ref(), - block.message().body().blob_kzg_commitments(), - ) { - if blobs.len() != block_commitments.len() { - return Err(AvailabilityCheckError::MissingBlobs); - } - for (blob, &block_commitment) in blobs.iter().zip(block_commitments.iter()) { - let blob_commitment = blob.kzg_commitment; - if blob_commitment != block_commitment { - return Err(AvailabilityCheckError::KzgCommitmentMismatch { - block_commitment, - blob_commitment, - }); - } - } - } - let inner = match blobs { - Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), - None => RpcBlockInner::Block(block), - }; - Ok(Self { - block_root, - block: inner, - }) - } - - pub fn new_with_custody_columns( - block_root: Option, - block: Arc>, - custody_columns: Vec>, - ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - if block.num_expected_blobs() > 0 && custody_columns.is_empty() { - // The number of required custody columns is out of scope here. - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - // Treat empty data column lists as if they are missing. - let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns(block, VariableList::new(custody_columns)?) - } else { - RpcBlockInner::Block(block) - }; - Ok(Self { - block_root, - block: inner, - }) - } - #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( Hash256, Arc>, - Option>, - Option>, + Option>, ) { - let block_root = self.block_root(); - match self.block { - RpcBlockInner::Block(block) => (block_root, block, None, None), - RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), - RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { - (block_root, block, None, Some(data_columns)) + match self { + RpcBlock::FullyAvailable(available_block) => { + let (block_root, block, block_data) = available_block.deconstruct(); + (block_root, block, Some(block_data)) } + RpcBlock::BlockOnly { block, block_root } => (block_root, block, None), } } + pub fn n_blobs(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0, - RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + } + } else { + 0 } } + pub fn n_data_columns(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + } + } else { + 0 } } } @@ -500,17 +447,21 @@ impl AsBlock for RpcBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + Self::BlockOnly { + block, + block_root: _, + } => block, + Self::FullyAvailable(available_block) => available_block.block(), } } fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { + block, + block_root: _, + } => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 05ef220b84..db37a79372 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,17 +1,17 @@ use crate::blob_verification::{ GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, verify_kzg_for_blob_list, }; -use crate::block_verification_types::{ - AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, -}; +use crate::block_verification_types::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; use crate::{ BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics, }; +use educe::Educe; use kzg::Kzg; use slot_clock::SlotClock; +use std::collections::HashSet; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -31,8 +31,8 @@ mod state_lru_cache; use crate::data_availability_checker::error::Error; use crate::data_column_verification::{ - CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list, + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -366,151 +366,51 @@ impl DataAvailabilityChecker { .remove_pre_execution_block(block_root); } - /// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may - /// include the fully available block. - /// - /// WARNING: This function assumes all required blobs are already present, it does NOT - /// check if there are any missing blobs. - pub fn verify_kzg_for_rpc_block( + /// Verifies kzg commitments for an `AvailableBlock`. + pub fn verify_kzg_for_available_block( &self, - block: RpcBlock, - ) -> Result, AvailabilityCheckError> { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - if self.blobs_required_for_block(&block) { - return if let Some(blob_list) = blobs { - verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidBlobs)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blob_list), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; + available_block: &AvailableBlock, + ) -> Result<(), AvailabilityCheckError> { + match available_block.data() { + AvailableBlockData::NoData => Ok(()), + AvailableBlockData::Blobs(blobs) => verify_kzg_for_blob_list(blobs.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidBlobs), + AvailableBlockData::DataColumns(columns) => { + verify_kzg_for_data_column_list(columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn) + } } - if self.data_columns_required_for_block(&block) { - return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list( - data_column_list - .iter() - .map(|custody_column| custody_column.as_data_column()), - &self.kzg, - ) - .map_err(AvailabilityCheckError::InvalidColumn)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_column_list - .into_iter() - .map(|d| d.clone_arc()) - .collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; - } - - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) } - /// Checks if a vector of blocks are available. Returns a vector of `MaybeAvailableBlock` - /// This is more efficient than calling `verify_kzg_for_rpc_block` in a loop as it does - /// all kzg verification at once - /// - /// WARNING: This function assumes all required blobs are already present, it does NOT - /// check if there are any missing blobs. + /// Performs batch kzg verification for a vector of `AvailableBlocks`. This is more efficient than + /// calling `verify_kzg_for_available_block` in a loop. #[instrument(skip_all)] - pub fn verify_kzg_for_rpc_blocks( + pub fn batch_verify_kzg_for_available_blocks( &self, - blocks: Vec>, - ) -> Result>, AvailabilityCheckError> { - let mut results = Vec::with_capacity(blocks.len()); - let all_blobs = blocks - .iter() - .filter(|block| self.blobs_required_for_block(block.as_block())) - // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.blobs().cloned()) - .flatten() - .collect::>(); + available_blocks: &[AvailableBlock], + ) -> Result<(), AvailabilityCheckError> { + let mut all_blobs = Vec::new(); + let mut all_data_columns = Vec::new(); + + for available_block in available_blocks { + match available_block.data().to_owned() { + AvailableBlockData::NoData => {} + AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs), + AvailableBlockData::DataColumns(columns) => all_data_columns.extend(columns), + } + } - // verify kzg for all blobs at once if !all_blobs.is_empty() { verify_kzg_for_blob_list(all_blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs)?; } - let all_data_columns = blocks - .iter() - .filter(|block| self.data_columns_required_for_block(block.as_block())) - // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.custody_columns().cloned()) - .flatten() - .map(CustodyDataColumn::into_inner) - .collect::>(); - - // verify kzg for all data columns at once if !all_data_columns.is_empty() { - // Attributes fault to the specific peer that sent an invalid column verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } - for block in blocks { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - - let maybe_available_block = if self.blobs_required_for_block(&block) { - if let Some(blobs) = blobs { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blobs), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else if self.data_columns_required_for_block(&block) { - if let Some(data_columns) = data_columns { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_columns.into_iter().map(|d| d.into_inner()).collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - }; - - results.push(maybe_available_block); - } - - Ok(results) + Ok(()) } /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. @@ -749,7 +649,8 @@ async fn availability_cache_maintenance_service( } } -#[derive(Debug)] +#[derive(Debug, Clone)] +// TODO(#8633) move this to `block_verification_types.rs` pub enum AvailableBlockData { /// Block is pre-Deneb or has zero blobs NoData, @@ -759,31 +660,161 @@ pub enum AvailableBlockData { DataColumns(DataColumnSidecarList), } +impl AvailableBlockData { + pub fn new_with_blobs(blobs: BlobSidecarList) -> Self { + if blobs.is_empty() { + Self::NoData + } else { + Self::Blobs(blobs) + } + } + + pub fn new_with_data_columns(columns: DataColumnSidecarList) -> Self { + if columns.is_empty() { + Self::NoData + } else { + Self::DataColumns(columns) + } + } + + pub fn blobs(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(blobs) => Some(blobs.clone()), + AvailableBlockData::DataColumns(_) => None, + } + } + + pub fn blobs_len(&self) -> usize { + if let Some(blobs) = self.blobs() { + blobs.len() + } else { + 0 + } + } + + pub fn data_columns(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(_) => None, + AvailableBlockData::DataColumns(data_columns) => Some(data_columns.clone()), + } + } + + pub fn data_columns_len(&self) -> usize { + if let Some(data_columns) = self.data_columns() { + data_columns.len() + } else { + 0 + } + } +} + /// A fully available block that is ready to be imported into fork choice. -#[derive(Debug)] +#[derive(Debug, Clone, Educe)] +#[educe(Hash(bound(E: EthSpec)))] pub struct AvailableBlock { block_root: Hash256, block: Arc>, + #[educe(Hash(ignore))] blob_data: AvailableBlockData, + #[educe(Hash(ignore))] /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, + #[educe(Hash(ignore))] pub spec: Arc, } impl AvailableBlock { - pub fn __new_for_testing( - block_root: Hash256, - block: Arc>, - data: AvailableBlockData, + /// Constructs an `AvailableBlock` from a block and blob data. + /// + /// This function validates that: + /// - Block data is not provided when not required (pre-Deneb or past DA boundary) + /// - Required blobs are present and match the expected count + /// - Required custody columns are complete based on the node's custody requirements + /// - KZG commitments in blobs match those in the block + /// + /// Returns `AvailabilityCheckError` if: + /// - `InvalidAvailableBlockData`: Block data is provided but not required + /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete + /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete + /// - `KzgCommitmentMismatch`: Blob KZG commitment doesn't match block commitment + pub fn new( + block: Arc>, + block_data: AvailableBlockData, + da_checker: &DataAvailabilityChecker, spec: Arc, - ) -> Self { - Self { - block_root, - block, - blob_data: data, - blobs_available_timestamp: None, - spec, + ) -> Result + where + T: BeaconChainTypes, + { + // Ensure block availability + let blobs_required = da_checker.blobs_required_for_block(&block); + let columns_required = da_checker.data_columns_required_for_block(&block); + + match &block_data { + AvailableBlockData::NoData => { + if columns_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else if blobs_required { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + AvailableBlockData::Blobs(blobs) => { + if !blobs_required { + return Err(AvailabilityCheckError::InvalidAvailableBlockData); + } + + let Ok(block_kzg_commitments) = block.message().body().blob_kzg_commitments() + else { + return Err(AvailabilityCheckError::Unexpected( + "Expected blobs but could not fetch KZG commitments from the block" + .to_owned(), + )); + }; + + if blobs.len() != block_kzg_commitments.len() { + return Err(AvailabilityCheckError::MissingBlobs); + } + + for (blob, &block_kzg_commitment) in blobs.iter().zip(block_kzg_commitments.iter()) + { + if blob.kzg_commitment != block_kzg_commitment { + return Err(AvailabilityCheckError::KzgCommitmentMismatch { + blob_commitment: blob.kzg_commitment, + block_commitment: block_kzg_commitment, + }); + } + } + } + AvailableBlockData::DataColumns(data_columns) => { + if !columns_required { + return Err(AvailabilityCheckError::InvalidAvailableBlockData); + } + + let mut column_indices = da_checker + .custody_context + .sampling_columns_for_epoch(block.epoch(), &spec) + .iter() + .collect::>(); + + for data_column in data_columns { + column_indices.remove(data_column.index()); + } + + if !column_indices.is_empty() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } } + + Ok(Self { + block_root: block.canonical_root(), + block, + blob_data: block_data, + blobs_available_timestamp: None, + spec: spec.clone(), + }) } pub fn block(&self) -> &SignedBeaconBlock { @@ -801,6 +832,10 @@ impl AvailableBlock { &self.blob_data } + pub fn block_root(&self) -> Hash256 { + self.block_root + } + pub fn has_blobs(&self) -> bool { match self.blob_data { AvailableBlockData::NoData => false, @@ -864,7 +899,9 @@ impl MaybeAvailableBlock { mod test { use super::*; use crate::CustodyContext; + use crate::block_verification_types::RpcBlock; use crate::custody_context::NodeCustodyType; + use crate::data_column_verification::CustodyDataColumn; use crate::test_utils::{ EphemeralHarnessType, NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, get_kzg, @@ -926,8 +963,16 @@ mod test { &spec, ); let block_root = Hash256::random(); - let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); - let requested_columns = &custody_columns[..10]; + // Get 10 columns using the "latest" CGC (head) that block lookup would use. + // The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS, + // which is typically epoch 2+ for MinimalEthSpec. + let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective + let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec); + assert_eq!( + requested_columns.len(), + 10, + "future epoch should have 10 sampling columns" + ); da_checker .put_rpc_custody_columns( block_root, @@ -1005,8 +1050,16 @@ mod test { &spec, ); let block_root = Hash256::random(); - let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); - let requested_columns = &custody_columns[..10]; + // Get 10 columns using the "latest" CGC that gossip subscriptions would use. + // The CGC change becomes effective after CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS, + // which is typically epoch 2+ for MinimalEthSpec. + let future_epoch = Epoch::new(10); // Far enough in the future to have the CGC change effective + let requested_columns = custody_context.sampling_columns_for_epoch(future_epoch, &spec); + assert_eq!( + requested_columns.len(), + 10, + "future epoch should have 10 sampling columns" + ); let gossip_columns = data_columns .into_iter() .filter(|d| requested_columns.contains(d.index())) @@ -1059,9 +1112,6 @@ mod test { let custody_columns = if index == 0 { // 128 valid data columns in the first block data_columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect::>() } else { // invalid data columns in the second block data_columns @@ -1079,17 +1129,30 @@ mod test { .clone(), }); CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar)) + .as_data_column() + .clone() }) .collect::>() }; - RpcBlock::new_with_custody_columns(None, Arc::new(block), custody_columns) + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + let da_checker = Arc::new(new_da_checker(spec.clone())); + RpcBlock::new(Arc::new(block), Some(block_data), &da_checker, spec.clone()) .expect("should create RPC block with custody columns") }) .collect::>(); + let available_blocks = blocks_with_columns + .iter() + .filter_map(|block| match block { + RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), + RpcBlock::BlockOnly { .. } => None, + }) + .collect::>(); + // WHEN verifying all blocks together (totalling 256 data columns) - let verification_result = da_checker.verify_kzg_for_rpc_blocks(blocks_with_columns); + let verification_result = + da_checker.batch_verify_kzg_for_available_blocks(&available_blocks); // THEN batch block verification should fail due to 128 invalid columns in the second block verification_result.expect_err("should have failed to verify blocks"); @@ -1132,7 +1195,7 @@ mod test { // Add 64 columns to the da checker (enough to be able to reconstruct) // Order by all_column_indices_ordered, then take first 64 - let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); + let custody_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); let custody_columns = custody_columns .iter() .filter_map(|&col_idx| data_columns.iter().find(|d| *d.index() == col_idx).cloned()) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index c9efb7a414..af3cb72c03 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -22,6 +22,7 @@ pub enum Error { BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, + InvalidAvailableBlockData, } #[derive(PartialEq, Eq)] @@ -44,7 +45,8 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) - | Error::SlotClockError => ErrorCategory::Internal, + | Error::SlotClockError + | Error::InvalidAvailableBlockData => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 91b0f12cbb..45ae9d7b84 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -157,12 +157,10 @@ impl BeaconChain { } match &block_data { - AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(..) => { - new_oldest_blob_slot = Some(block.slot()); - } + AvailableBlockData::NoData => (), + AvailableBlockData::Blobs(_) => new_oldest_blob_slot = Some(block.slot()), AvailableBlockData::DataColumns(_) => { - new_oldest_data_column_slot = Some(block.slot()); + new_oldest_data_column_slot = Some(block.slot()) } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f92030a671..e77739e2d5 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -78,7 +78,7 @@ pub use block_verification::{ BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlock, InvalidSignature, PayloadVerificationOutcome, PayloadVerificationStatus, build_blob_data_column_sidecars, - get_block_root, + get_block_root, signature_verify_chain_segment, }; pub use block_verification_types::AvailabilityPendingExecutedBlock; pub use block_verification_types::ExecutedBlock; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 709758ae2d..a170d6a3d4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,12 +1,12 @@ use crate::blob_verification::GossipVerifiedBlob; -use crate::block_verification_types::{AsBlock, RpcBlock}; +use crate::block_verification_types::{AsBlock, AvailableBlockData, RpcBlock}; use crate::custody_context::NodeCustodyType; -use crate::data_column_verification::CustodyDataColumn; +use crate::data_availability_checker::DataAvailabilityChecker; use crate::graffiti_calculator::GraffitiSettings; use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas}; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::{BeaconBlockResponseWrapper, get_block_root}; +use crate::{BeaconBlockResponseWrapper, CustodyContext, get_block_root}; use crate::{ BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, StateSkipConfig, @@ -212,6 +212,34 @@ pub fn test_spec() -> ChainSpec { spec.target_aggregators_per_committee = DEFAULT_TARGET_AGGREGATORS; spec } +pub fn test_da_checker( + spec: Arc, + node_custody_type: NodeCustodyType, +) -> DataAvailabilityChecker> { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(spec.seconds_per_slot), + ); + let kzg = get_kzg(&spec); + let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap()); + let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + let custody_context = Arc::new(CustodyContext::new( + node_custody_type, + ordered_custody_column_indices, + &spec, + )); + let complete_blob_backfill = false; + DataAvailabilityChecker::new( + complete_blob_backfill, + slot_clock, + kzg, + store, + custody_context, + spec, + ) + .expect("should initialise data availability checker") +} pub struct Builder { eth_spec_instance: T::EthSpec, @@ -2380,8 +2408,16 @@ where ) -> Result { self.set_current_slot(slot); let (block, blob_items) = block_contents; + // Determine if block is available: it's available if it doesn't require blobs, + // or if it requires blobs and we have them + let has_blob_commitments = block + .message() + .body() + .blob_kzg_commitments() + .is_ok_and(|c| !c.is_empty()); + let is_available = !has_blob_commitments || blob_items.is_some(); - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, is_available)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2405,7 +2441,15 @@ where let (block, blob_items) = block_contents; let block_root = block.canonical_root(); - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + // Determine if block is available: it's available if it doesn't require blobs, + // or if it requires blobs and we have them + let has_blob_commitments = block + .message() + .body() + .blob_kzg_commitments() + .is_ok_and(|c| !c.is_empty()); + let is_available = !has_blob_commitments || blob_items.is_some(); + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, is_available)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2436,7 +2480,13 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block); + return RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap(); } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2447,23 +2497,39 @@ where .get_data_columns(&block_root, fork_name) .unwrap() .unwrap(); - let custody_columns = columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns).unwrap() + let custody_columns = columns.into_iter().collect::>(); + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RpcBlock::new( + block, + Some(block_data), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); - RpcBlock::new(Some(block_root), block, blobs).unwrap() + let block_data = if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + + RpcBlock::new( + block, + Some(block_data), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .unwrap() } } /// Builds an `RpcBlock` from a `SignedBeaconBlock` and `BlobsList`. pub fn build_rpc_block_from_blobs( &self, - block_root: Hash256, block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, + is_available: bool, ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); @@ -2476,11 +2542,37 @@ where let columns = generate_data_column_sidecars_from_block(&block, &self.spec) .into_iter() .filter(|d| sampling_columns.contains(d.index())) - .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, columns)? + if is_available { + let block_data = AvailableBlockData::new_with_data_columns(columns); + RpcBlock::new( + block, + Some(block_data), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? + } else { + RpcBlock::new( + block, + None, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? + } + } else if is_available { + RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? } else { - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new( + block, + None, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? } } else { let blobs = blob_items @@ -2489,7 +2581,27 @@ where }) .transpose() .unwrap(); - RpcBlock::new(Some(block_root), block, blobs)? + if is_available { + let block_data = if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + + RpcBlock::new( + block, + Some(block_data), + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? + } else { + RpcBlock::new( + block, + None, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + )? + } }) } diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index a57c20211a..a1922f32a4 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,6 +1,8 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_simulator::produce_unaggregated_attestation; +use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS; use beacon_chain::{StateSkipConfig, WhenSlotSkipped, metrics}; @@ -114,6 +116,8 @@ async fn produces_attestations() { .keypairs(KEYPAIRS[..].to_vec()) .fresh_ephemeral_store() .mock_execution_layer() + // SemiSupernode ensures enough columns are stored for sampling + custody validation for RpcBlock + .node_custody_type(NodeCustodyType::SemiSupernode) .build(); let chain = &harness.chain; @@ -221,14 +225,16 @@ async fn produces_attestations() { let rpc_block = harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); - let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available( - available_block, - ) = chain - .data_availability_checker - .verify_kzg_for_rpc_block(rpc_block) - .unwrap() - else { - panic!("block should be available") + + let available_block = match rpc_block { + RpcBlock::FullyAvailable(available_block) => { + chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .unwrap(); + available_block + } + RpcBlock::BlockOnly { .. } => panic!("block should be available"), }; let early_attestation = { @@ -288,14 +294,17 @@ async fn early_attester_cache_old_request() { let rpc_block = harness .build_rpc_block_from_store_blobs(Some(head.beacon_block_root), head.beacon_block.clone()); - let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) = - harness - .chain - .data_availability_checker - .verify_kzg_for_rpc_block(rpc_block) - .unwrap() - else { - panic!("block should be available") + + let available_block = match rpc_block { + RpcBlock::FullyAvailable(available_block) => { + harness + .chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .unwrap(); + available_block + } + RpcBlock::BlockOnly { .. } => panic!("block should be available"), }; harness diff --git a/beacon_node/beacon_chain/tests/blob_verification.rs b/beacon_node/beacon_chain/tests/blob_verification.rs index 019736ca01..e39c53729f 100644 --- a/beacon_node/beacon_chain/tests/blob_verification.rs +++ b/beacon_node/beacon_chain/tests/blob_verification.rs @@ -77,7 +77,7 @@ async fn rpc_blobs_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain @@ -85,11 +85,12 @@ async fn rpc_blobs_with_invalid_header_signature() { block_root, rpc_block, NotifyExecutionLayer::Yes, - BlockImportSource::RangeSync, + BlockImportSource::Lookup, || Ok(()), ) .await .unwrap(); + assert_eq!( availability, AvailabilityProcessingStatus::MissingComponents(slot, block_root) @@ -114,6 +115,8 @@ async fn rpc_blobs_with_invalid_header_signature() { .process_rpc_blobs(slot, block_root, blob_sidecars) .await .unwrap_err(); + + println!("{:?}", err); assert!(matches!( err, BlockError::InvalidSignature(InvalidSignature::ProposerSignature) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 771edf8f70..440c0be3e4 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,9 +1,11 @@ #![cfg(not(debug_assertions))] use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, + WhenSlotSkipped, custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, test_spec, @@ -11,7 +13,7 @@ use beacon_chain::{ }; use beacon_chain::{ BeaconSnapshot, BlockError, ChainConfig, ChainSegmentResult, IntoExecutionPendingBlock, - InvalidSignature, NotifyExecutionLayer, + InvalidSignature, NotifyExecutionLayer, signature_verify_chain_segment, }; use bls::{AggregateSignature, Keypair, Signature}; use fixed_bytes::FixedBytesExtended; @@ -39,6 +41,7 @@ const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGT static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT)); +// TODO(#8633): Delete this unnecessary enum and refactor this file to use `AvailableBlockData` instead. enum DataSidecars { Blobs(BlobSidecarList), DataColumns(Vec>), @@ -130,32 +133,65 @@ fn get_harness( harness } -fn chain_segment_blocks( +fn chain_segment_blocks( chain_segment: &[BeaconSnapshot], chain_segment_sidecars: &[Option>], -) -> Vec> { + chain: Arc>, +) -> Vec> +where + T: BeaconChainTypes, +{ chain_segment .iter() .zip(chain_segment_sidecars.iter()) .map(|(snapshot, data_sidecars)| { let block = snapshot.beacon_block.clone(); - build_rpc_block(block, data_sidecars) + build_rpc_block(block, data_sidecars, chain.clone()) }) .collect() } -fn build_rpc_block( +fn build_rpc_block( block: Arc>, data_sidecars: &Option>, -) -> RpcBlock { + chain: Arc>, +) -> RpcBlock +where + T: BeaconChainTypes, +{ match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); + RpcBlock::new( + block, + Some(block_data), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new_with_data_columns( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); + RpcBlock::new( + block, + Some(block_data), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap() } - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap(), } } @@ -266,9 +302,10 @@ fn update_data_column_signed_header( async fn chain_segment_full_segment() { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); harness .chain @@ -302,9 +339,11 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); for chunk_size in &[1, 2, 31, 32, 33] { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); @@ -346,9 +385,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a block removed. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); blocks.remove(2); assert!( @@ -366,16 +406,21 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a modified parent root. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); - blocks[3] = RpcBlock::new_without_blobs( - None, + + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -403,15 +448,19 @@ async fn chain_segment_non_linear_slots() { * Test where a child is lower than the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -429,15 +478,19 @@ async fn chain_segment_non_linear_slots() { * Test where a child is equal to the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -463,7 +516,9 @@ async fn assert_invalid_signature( let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); // Ensure the block will be rejected if imported in a chain segment. @@ -488,7 +543,9 @@ async fn assert_invalid_signature( .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been // imported prior to this test. @@ -505,6 +562,7 @@ async fn assert_invalid_signature( build_rpc_block( snapshots[block_index].beacon_block.clone(), &chain_segment_blobs[block_index], + harness.chain.clone(), ), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, @@ -562,7 +620,9 @@ async fn invalid_signature_gossip_block() { .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); harness .chain @@ -571,7 +631,13 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); + let rpc_block = RpcBlock::new( + Arc::new(signed_block), + None, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); let process_res = harness .chain .process_block( @@ -613,7 +679,9 @@ async fn invalid_signature_block_proposal() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. let process_res = harness @@ -930,7 +998,9 @@ async fn invalid_signature_deposit() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); assert!( !matches!( @@ -1572,7 +1642,13 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); + let base_rpc_block = RpcBlock::new( + Arc::new(base_block.clone()), + None, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); assert!(matches!( harness .chain @@ -1596,7 +1672,15 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], + vec![ + RpcBlock::new( + Arc::new(base_block), + None, + &harness.chain.data_availability_checker, + harness.spec.clone() + ) + .unwrap() + ], NotifyExecutionLayer::Yes, ) .await, @@ -1709,7 +1793,13 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); + let altair_rpc_block = RpcBlock::new( + Arc::new(altair_block.clone()), + None, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); assert!(matches!( harness .chain @@ -1733,7 +1823,15 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], + vec![ + RpcBlock::new( + Arc::new(altair_block), + None, + &harness.chain.data_availability_checker, + harness.spec.clone() + ) + .unwrap() + ], NotifyExecutionLayer::Yes ) .await, @@ -1796,7 +1894,13 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); + let rpc_block = RpcBlock::new( + block.clone(), + Some(AvailableBlockData::NoData), + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -1870,3 +1974,277 @@ async fn import_execution_pending_block( } } } + +// Test that `signature_verify_chain_segment` errors with a chain segment of mixed `FullyAvailable` +// and `BlockOnly` RpcBlocks. This situation should never happen in production. +#[tokio::test] +async fn signature_verify_mixed_rpc_block_variants() { + let (snapshots, data_sidecars) = get_chain_segment().await; + let snapshots: Vec<_> = snapshots.into_iter().take(10).collect(); + let data_sidecars: Vec<_> = data_sidecars.into_iter().take(10).collect(); + + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + + let mut chain_segment = Vec::new(); + + for (i, (snapshot, blobs)) in snapshots.iter().zip(data_sidecars.iter()).enumerate() { + let block = snapshot.beacon_block.clone(); + let block_root = snapshot.beacon_block_root; + + // Alternate between FullyAvailable and BlockOnly + let rpc_block = if i % 2 == 0 { + // FullyAvailable - with blobs/columns if needed + build_rpc_block(block, blobs, harness.chain.clone()) + } else { + // BlockOnly - no data + RpcBlock::new( + block, + None, + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ) + .unwrap() + }; + + chain_segment.push((block_root, rpc_block)); + } + + // This should error because `signature_verify_chain_segment` expects a list + // of `RpcBlock::FullyAvailable`. + assert!(signature_verify_chain_segment(chain_segment.clone(), &harness.chain).is_err()); +} + +// Test that RpcBlock::new() rejects blocks when blob count doesn't match expected. +#[tokio::test] +async fn rpc_block_construction_fails_with_wrong_blob_count() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).deneb_enabled() + || spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() + { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness + .extend_chain( + E::slots_per_epoch() as usize * 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Get a block with blobs + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let blobs = harness.chain.get_blobs(&root).unwrap().blobs().unwrap(); + + // Create AvailableBlockData with wrong number of blobs (remove one) + let mut wrong_blobs_vec: Vec<_> = blobs.iter().cloned().collect(); + wrong_blobs_vec.pop(); + + let max_blobs = harness.spec.max_blobs_per_block(block.epoch()) as usize; + let wrong_blobs = ssz_types::RuntimeVariableList::new(wrong_blobs_vec, max_blobs) + .expect("should create BlobSidecarList"); + let block_data = AvailableBlockData::new_with_blobs(wrong_blobs); + + // Try to create RpcBlock with wrong blob count + let result = RpcBlock::new( + Arc::new(block), + Some(block_data), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ); + + // Should fail with MissingBlobs + assert!( + matches!(result, Err(AvailabilityCheckError::MissingBlobs)), + "RpcBlock construction should fail with wrong blob count, got: {:?}", + result + ); + return; + } + } + + panic!("No block with blobs found"); +} + +// Test that RpcBlock::new() rejects blocks when custody columns are incomplete. +#[tokio::test] +async fn rpc_block_rejects_missing_custody_columns() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + // Extend chain to create some blocks with data columns + harness + .extend_chain( + 5, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Get a block with data columns + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let fork_name = harness.chain.spec.fork_name_at_slot::(block.slot()); + let columns = harness + .chain + .get_data_columns(&root, fork_name) + .unwrap() + .unwrap(); + + if columns.len() > 1 { + // Create AvailableBlockData with incomplete columns (remove one) + let mut incomplete_columns: Vec<_> = columns.to_vec(); + incomplete_columns.pop(); + + let block_data = AvailableBlockData::new_with_data_columns(incomplete_columns); + + // Try to create RpcBlock with incomplete custody columns + let result = RpcBlock::new( + Arc::new(block), + Some(block_data), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ); + + // Should fail with MissingCustodyColumns + assert!( + matches!(result, Err(AvailabilityCheckError::MissingCustodyColumns)), + "RpcBlock construction should fail with missing custody columns, got: {:?}", + result + ); + return; + } + } + } + + panic!("No block with data columns found"); +} + +// Test that RpcBlock::new() allows construction past the data availability boundary. +// When a block is past the DA boundary, we should be able to construct an RpcBlock +// with NoData even if the block has blob commitments, since columns are not expected. +#[tokio::test] +async fn rpc_block_allows_construction_past_da_boundary() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + // Extend chain to create some blocks with blob commitments + harness + .extend_chain( + 5, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Find a block with blob commitments + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let block_epoch = block.epoch(); + + // Advance the slot clock far into the future, past the DA boundary + // For a block to be past the DA boundary: + // current_epoch - min_epochs_for_data_column_sidecars_requests > block_epoch + let min_epochs_for_data = harness.spec.min_epochs_for_data_column_sidecars_requests; + let future_epoch = block_epoch + min_epochs_for_data + 10; + let future_slot = future_epoch.start_slot(E::slots_per_epoch()); + harness.chain.slot_clock.set_slot(future_slot.as_u64()); + + // Now verify the block is past the DA boundary + let da_boundary = harness + .chain + .data_availability_checker + .data_availability_boundary() + .expect("DA boundary should be set"); + assert!( + block_epoch < da_boundary, + "Block should be past the DA boundary. Block epoch: {}, DA boundary: {}", + block_epoch, + da_boundary + ); + + // Try to create RpcBlock with NoData for a block past DA boundary + // This should succeed since columns are not expected for blocks past DA boundary + let result = RpcBlock::new( + Arc::new(block), + Some(AvailableBlockData::NoData), + &harness.chain.data_availability_checker, + harness.chain.spec.clone(), + ); + + assert!( + result.is_ok(), + "RpcBlock construction should succeed for blocks past DA boundary, got: {:?}", + result + ); + return; + } + } + + panic!("No block with blob commitments found"); +} diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index be9b3b2fa1..ffbc460465 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -81,7 +81,7 @@ async fn rpc_columns_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 5bd43835e3..1204412d65 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -685,7 +685,13 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + &rig.harness.chain.data_availability_checker, + rig.harness.chain.spec.clone(), + ) + .unwrap(); let fork_block_root = rig .harness .chain @@ -787,7 +793,13 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + &rig.harness.chain.data_availability_checker, + rig.harness.chain.spec.clone(), + ) + .unwrap(); let fork_block_root = rig .harness .chain @@ -1059,7 +1071,13 @@ async fn invalid_parent() { )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); + let rpc_block = RpcBlock::new( + block.clone(), + None, + &rig.harness.chain.data_availability_checker, + rig.harness.chain.spec.clone(), + ) + .unwrap(); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1384,7 +1402,13 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + &rig.harness.chain.data_availability_checker, + rig.harness.chain.spec.clone(), + ) + .unwrap(); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 598b79acc2..14e9deb62a 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -21,7 +21,6 @@ use beacon_chain::{ compute_proposer_duties_from_head, ensure_state_can_determine_proposers_for_epoch, }, custody_context::NodeCustodyType, - data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, migrate::MigratorConfig, }; @@ -3176,16 +3175,19 @@ async fn weak_subjectivity_sync_test( .expect("should get block") .expect("should get block"); - if let MaybeAvailableBlock::Available(block) = harness - .chain - .data_availability_checker - .verify_kzg_for_rpc_block( + let rpc_block = + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)); + + match rpc_block { + RpcBlock::FullyAvailable(available_block) => { harness - .build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), - ) - .expect("should verify kzg") - { - available_blocks.push(block); + .chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .expect("should verify kzg"); + available_blocks.push(available_block); + } + RpcBlock::BlockOnly { .. } => panic!("Should be an available block"), } } @@ -3194,15 +3196,16 @@ async fn weak_subjectivity_sync_test( let mut batch_with_invalid_first_block = available_blocks.iter().map(clone_block).collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); + let (_, block, data) = clone_block(&available_blocks[0]).deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing( - block_root, + AvailableBlock::new( Arc::new(corrupt_block), data, + &beacon_chain.data_availability_checker, Arc::new(spec), ) + .expect("available block") }; // Importing the invalid batch should error. @@ -3746,7 +3749,13 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); + let invalid_fork_rpc_block = RpcBlock::new( + invalid_fork_block.clone(), + None, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); // Applying the invalid block should fail. let err = harness .chain @@ -3762,7 +3771,13 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); + let valid_fork_rpc_block = RpcBlock::new( + valid_fork_block.clone(), + None, + &harness.chain.data_availability_checker, + harness.spec.clone(), + ) + .unwrap(); harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 17d9c5f697..1884429a6a 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -3,6 +3,7 @@ use beacon_chain::{ BeaconChain, ChainConfig, NotifyExecutionLayer, StateSkipConfig, WhenSlotSkipped, attestation_verification::Error as AttnError, + custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, OP_POOL_DB_KEY, @@ -54,6 +55,28 @@ fn get_harness_with_config( harness } +/// Creates a harness with SemiSupernode custody type to ensure enough columns are stored +/// for sampling validation in Fulu. +fn get_harness_semi_supernode( + validator_count: usize, +) -> BeaconChainHarness> { + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .default_spec() + .chain_config(ChainConfig { + reconstruct_historic_states: true, + ..Default::default() + }) + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .node_custody_type(NodeCustodyType::SemiSupernode) + .build(); + + harness.advance_slot(); + + harness +} + #[test] fn massive_skips() { let harness = get_harness(8); @@ -679,8 +702,9 @@ async fn unaggregated_attestations_added_to_fork_choice_all_updated() { async fn run_skip_slot_test(skip_slots: u64) { let num_validators = 8; - let harness_a = get_harness(num_validators); - let harness_b = get_harness(num_validators); + // SemiSupernode ensures enough columns are stored for sampling + custody RpcBlock validation + let harness_a = get_harness_semi_supernode(num_validators); + let harness_b = get_harness_semi_supernode(num_validators); for _ in 0..skip_slots { harness_a.advance_slot(); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 346c768ac5..1887dee640 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -314,9 +314,19 @@ pub async fn publish_block>( slot = %block.slot(), "Block previously seen" ); + let Ok(rpc_block) = RpcBlock::new( + block.clone(), + None, + &chain.data_availability_checker, + chain.spec.clone(), + ) else { + return Err(warp_utils::reject::custom_bad_request( + "Unable to construct rpc block".to_string(), + )); + }; let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), + rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8f21fa8b9e..bf1485a339 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,7 +8,6 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; -use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, @@ -720,18 +719,27 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); - let available_blocks = match self + let mut available_blocks = vec![]; + + for downloaded_block in downloaded_blocks { + match downloaded_block { + RpcBlock::FullyAvailable(available_block) => available_blocks.push(available_block), + RpcBlock::BlockOnly { .. } => return ( + 0, + Err(ChainSegmentFailed { + peer_action: None, + message: "Invalid downloaded_blocks segment. All downloaded blocks must be fully available".to_string() + }) + ), + } + } + + match self .chain .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .batch_verify_kzg_for_available_blocks(&available_blocks) { - Ok(blocks) => blocks - .into_iter() - .filter_map(|maybe_available| match maybe_available { - MaybeAvailableBlock::Available(block) => Some(block), - MaybeAvailableBlock::AvailabilityPending { .. } => None, - }) - .collect::>(), + Ok(()) => {} Err(e) => match e { AvailabilityCheckError::StoreError(_) => { return ( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 0b6989be74..49b1c0c262 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -401,7 +401,13 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new( + self.next_block.clone(), + None, + &self._harness.chain.data_availability_checker, + self._harness.spec.clone(), + ) + .unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -413,7 +419,13 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new( + self.next_block.clone(), + None, + &self._harness.chain.data_availability_checker, + self._harness.spec.clone(), + ) + .unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 27e334fa10..a287771854 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,5 +1,9 @@ use beacon_chain::{ - block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, + BeaconChainTypes, + block_verification_types::{AvailableBlockData, RpcBlock}, + data_availability_checker::DataAvailabilityChecker, + data_column_verification::CustodyDataColumn, + get_block_root, }; use lighthouse_network::{ PeerId, @@ -192,19 +196,26 @@ impl RangeBlockComponentsRequest { /// Returns `None` if not all expected requests have completed. /// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid. /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. - pub fn responses( + pub fn responses( &mut self, - spec: &ChainSpec, - ) -> Option>, CouplingError>> { + da_checker: Arc>, + spec: Arc, + ) -> Option>, CouplingError>> + where + T: BeaconChainTypes, + { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { - RangeBlockDataRequest::NoData => { - Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) - } + RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( + blocks.to_vec(), + vec![], + da_checker, + spec, + )), RangeBlockDataRequest::Blobs(request) => { let Some(blobs) = request.to_finished() else { return None; @@ -212,6 +223,7 @@ impl RangeBlockComponentsRequest { Some(Self::responses_with_blobs( blocks.to_vec(), blobs.to_vec(), + da_checker, spec, )) } @@ -248,6 +260,8 @@ impl RangeBlockComponentsRequest { column_to_peer_id, expected_custody_columns, *attempt, + da_checker, + spec, ); if let Err(CouplingError::DataColumnPeerFailure { @@ -269,11 +283,15 @@ impl RangeBlockComponentsRequest { } } - fn responses_with_blobs( + fn responses_with_blobs( blocks: Vec>>, blobs: Vec>>, - spec: &ChainSpec, - ) -> Result>, CouplingError> { + da_checker: Arc>, + spec: Arc, + ) -> Result>, CouplingError> + where + T: BeaconChainTypes, + { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -315,8 +333,9 @@ impl RangeBlockComponentsRequest { .map_err(|_| { CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string()) })?; + let block_data = AvailableBlockData::new_with_blobs(blobs); responses.push( - RpcBlock::new(None, block, Some(blobs)) + RpcBlock::new(block, Some(block_data), &da_checker, spec.clone()) .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } @@ -333,13 +352,18 @@ impl RangeBlockComponentsRequest { Ok(responses) } - fn responses_with_custody_columns( + fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], attempt: usize, - ) -> Result>, CouplingError> { + da_checker: Arc>, + spec: Arc, + ) -> Result>, CouplingError> + where + T: BeaconChainTypes, + { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -415,11 +439,14 @@ impl RangeBlockComponentsRequest { ); } - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns) + let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); + + RpcBlock::new(block, Some(block_data), &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new(block, Some(AvailableBlockData::NoData), &da_checker, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } @@ -459,10 +486,13 @@ impl ByRangeRequest { #[cfg(test)] mod tests { - use super::RangeBlockComponentsRequest; use crate::sync::network_context::MAX_COLUMN_RETRIES; + + use super::RangeBlockComponentsRequest; + use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::test_utils::{ - NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, + NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, + test_da_checker, test_spec, }; use lighthouse_network::{ PeerId, @@ -472,7 +502,7 @@ mod tests { }, }; use rand::SeedableRng; - use std::sync::Arc; + use std::{collections::HashMap, sync::Arc}; use tracing::Span; use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; @@ -512,8 +542,9 @@ mod tests { } fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { - let spec = test_spec::(); - info.responses(&spec).is_some() + let spec = Arc::new(test_spec::()); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + info.responses(da_checker, spec).is_some() } #[test] @@ -534,8 +565,11 @@ mod tests { // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); + let spec = Arc::new(test_spec::()); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + // Assert response is finished and RpcBlocks can be constructed - info.responses(&test_spec::()).unwrap().unwrap(); + info.responses(da_checker, spec).unwrap().unwrap(); } #[test] @@ -565,16 +599,26 @@ mod tests { // Expect no blobs returned info.add_blobs(blobs_req_id, vec![]).unwrap(); - // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. - // This makes sure we don't expect blobs here when they have expired. Checking this logic should - // be hendled elsewhere. - info.responses(&test_spec::()).unwrap().unwrap(); + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + // Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned. + let result = info.responses(da_checker, spec).unwrap(); + assert!(result.is_err()) } #[test] fn rpc_block_with_custody_columns() { - let spec = test_spec::(); - let expects_custody_columns = vec![1, 2, 3, 4]; + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expects_custody_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -638,18 +682,26 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(da_checker, spec).unwrap().unwrap(); } #[test] fn rpc_block_with_custody_columns_batched() { - let spec = test_spec::(); - let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; - let expects_custody_columns = batched_column_requests - .iter() - .flatten() - .cloned() - .collect::>(); + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_sampling_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); + // Split sampling columns into two batches + let mid = expected_sampling_columns.len() / 2; + let batched_column_requests = [ + expected_sampling_columns[..mid].to_vec(), + expected_sampling_columns[mid..].to_vec(), + ]; let custody_column_request_ids = (0..batched_column_requests.len() as u32).collect::>(); let num_of_data_column_requests = custody_column_request_ids.len(); @@ -673,7 +725,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expects_custody_columns.clone())), + Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); @@ -723,14 +775,18 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(da_checker, spec).unwrap().unwrap(); } #[test] fn missing_custody_columns_from_faulty_peers() { - // GIVEN: A request expecting custody columns from multiple peers - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2, 3, 4]; + // GIVEN: A request expecting sampling columns from multiple peers + let spec = Arc::new(test_spec::()); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_sampling_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) .map(|_| { @@ -745,7 +801,7 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_custody_columns + let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { @@ -761,7 +817,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_custody_columns.clone())), + Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); @@ -772,8 +828,8 @@ mod tests { ) .unwrap(); - // AND: Only some custody columns are received (columns 1 and 2) - for (i, &column_index) in expected_custody_columns.iter().take(2).enumerate() { + // AND: Only the first 2 sampling columns are received successfully + for (i, &column_index) in expected_sampling_columns.iter().take(2).enumerate() { let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns( *req, @@ -786,13 +842,13 @@ mod tests { } // AND: Remaining column requests are completed with empty data (simulating faulty peers) - for i in 2..4 { + for i in 2..expected_sampling_columns.len() { let (req, _columns) = columns_req_id.get(i).unwrap(); info.add_custody_columns(*req, vec![]).unwrap(); } // WHEN: Attempting to construct RPC blocks - let result = info.responses(&spec).unwrap(); + let result = info.responses(da_checker, spec).unwrap(); // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); @@ -803,9 +859,13 @@ mod tests { }) = result { assert!(error.contains("Peers did not return column")); - assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing - assert_eq!(faulty_peers[0].0, 3); // column index 3 - assert_eq!(faulty_peers[1].0, 4); // column index 4 + // All columns after the first 2 should be reported as faulty + let expected_faulty_count = expected_sampling_columns.len() - 2; + assert_eq!(faulty_peers.len(), expected_faulty_count); + // Verify the faulty column indices match + for (i, (column_index, _peer)) in faulty_peers.iter().enumerate() { + assert_eq!(*column_index, expected_sampling_columns[i + 2]); + } assert!(!exceeded_retries); // First attempt, should be false } else { panic!("Expected PeerFailure error"); @@ -814,9 +874,16 @@ mod tests { #[test] fn retry_logic_after_peer_failures() { - // GIVEN: A request expecting custody columns where some peers initially fail - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2]; + // GIVEN: A request expecting sampling columns where some peers initially fail + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_sampling_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) .map(|_| { @@ -831,7 +898,7 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_custody_columns + let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { @@ -847,7 +914,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_custody_columns.clone())), + Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); @@ -858,46 +925,61 @@ mod tests { ) .unwrap(); - // AND: Only partial custody columns are received (column 1 but not 2) - let (req1, _) = columns_req_id.first().unwrap(); + // AND: Only partial sampling columns are received (first column but not others) + let (req0, _) = columns_req_id.first().unwrap(); info.add_custody_columns( - *req1, + *req0, blocks .iter() - .flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned()) + .flat_map(|b| { + b.1.iter() + .filter(|d| *d.index() == expected_sampling_columns[0]) + .cloned() + }) .collect(), ) .unwrap(); - // AND: The missing column request is completed with empty data (peer failure) - let (req2, _) = columns_req_id.get(1).unwrap(); - info.add_custody_columns(*req2, vec![]).unwrap(); + // AND: The remaining column requests are completed with empty data (peer failure) + for i in 1..expected_sampling_columns.len() { + let (req, _) = columns_req_id.get(i).unwrap(); + info.add_custody_columns(*req, vec![]).unwrap(); + } - // WHEN: First attempt to get responses fails - let result = info.responses(&spec).unwrap(); + let result: Result< + Vec>, + crate::sync::block_sidecar_coupling::CouplingError, + > = info.responses(da_checker.clone(), spec.clone()).unwrap(); assert!(result.is_err()); - // AND: We retry with a new peer for the failed column + // AND: We retry with a new peer for the failed columns let new_columns_req_id = columns_id( 10 as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ); - let failed_column_requests = vec![(new_columns_req_id, vec![2])]; - info.reinsert_failed_column_requests(failed_column_requests) - .unwrap(); + for column in &expected_sampling_columns[1..] { + let failed_column_requests = vec![(new_columns_req_id, vec![*column])]; + info.reinsert_failed_column_requests(failed_column_requests) + .unwrap(); + } // AND: The new peer provides the missing column data + let failed_column_indices: Vec<_> = expected_sampling_columns[1..].to_vec(); info.add_custody_columns( new_columns_req_id, blocks .iter() - .flat_map(|b| b.1.iter().filter(|d| *d.index() == 2).cloned()) + .flat_map(|b| { + b.1.iter() + .filter(|d| failed_column_indices.contains(d.index())) + .cloned() + }) .collect(), ) .unwrap(); // WHEN: Attempting to get responses again - let result = info.responses(&spec).unwrap(); + let result = info.responses(da_checker, spec).unwrap(); // THEN: Should succeed with complete RPC blocks assert!(result.is_ok()); @@ -908,8 +990,15 @@ mod tests { #[test] fn max_retries_exceeded_behavior() { // GIVEN: A request where peers consistently fail to provide required columns - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2]; + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); + let expected_sampling_columns = da_checker + .custody_context() + .sampling_columns_for_epoch(Epoch::new(0), &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..1) .map(|_| { @@ -924,7 +1013,7 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_custody_columns + let columns_req_id = expected_sampling_columns .iter() .enumerate() .map(|(i, column)| { @@ -940,7 +1029,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_custody_columns.clone())), + Some((columns_req_id.clone(), expected_sampling_columns.clone())), Span::none(), ); @@ -951,24 +1040,30 @@ mod tests { ) .unwrap(); - // AND: Only partial custody columns are provided (column 1 but not 2) - let (req1, _) = columns_req_id.first().unwrap(); + // AND: Only the first sampling column is provided successfully + let (req0, _) = columns_req_id.first().unwrap(); info.add_custody_columns( - *req1, + *req0, blocks .iter() - .flat_map(|b| b.1.iter().filter(|d| *d.index() == 1).cloned()) + .flat_map(|b| { + b.1.iter() + .filter(|d| *d.index() == expected_sampling_columns[0]) + .cloned() + }) .collect(), ) .unwrap(); - // AND: Column 2 request completes with empty data (persistent peer failure) - let (req2, _) = columns_req_id.get(1).unwrap(); - info.add_custody_columns(*req2, vec![]).unwrap(); + // AND: All other column requests complete with empty data (persistent peer failure) + for i in 1..expected_sampling_columns.len() { + let (req, _) = columns_req_id.get(i).unwrap(); + info.add_custody_columns(*req, vec![]).unwrap(); + } // WHEN: Multiple retry attempts are made (up to max retries) for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(&spec).unwrap(); + let result = info.responses(da_checker.clone(), spec.clone()).unwrap(); assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { @@ -981,7 +1076,7 @@ mod tests { } // AND: One final attempt after exceeding max retries - let result = info.responses(&spec).unwrap(); + let result = info.responses(da_checker, spec).unwrap(); // THEN: Should fail with exceeded_retries = true assert!(result.is_err()); @@ -991,8 +1086,16 @@ mod tests { exceeded_retries, }) = result { - assert_eq!(faulty_peers.len(), 1); // column 2 missing - assert_eq!(faulty_peers[0].0, 2); // column index 2 + // All columns except the first one should be faulty + let expected_faulty_count = expected_sampling_columns.len() - 1; + assert_eq!(faulty_peers.len(), expected_faulty_count); + + let mut faulty_peers = faulty_peers.into_iter().collect::>(); + // Only the columns that failed (indices 1..N) should be in faulty_peers + for column in &expected_sampling_columns[1..] { + faulty_peers.remove(column); + } + assert!(faulty_peers.is_empty()); assert!(exceeded_retries); // Should be true after max retries } else { panic!("Expected PeerFailure error with exceeded_retries=true"); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 069d51764f..7f4da9c0da 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -776,7 +776,10 @@ impl SyncNetworkContext { } let range_req = entry.get_mut(); - if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + if let Some(blocks_result) = range_req.responses( + self.chain.data_availability_checker.clone(), + self.chain.spec.clone(), + ) { if let Err(CouplingError::DataColumnPeerFailure { error, faulty_peers: _, @@ -1605,7 +1608,13 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs(Some(block_root), block); + let block = RpcBlock::new( + block, + None, + &self.chain.data_availability_checker, + self.chain.spec.clone(), + ) + .map_err(|_| SendErrorProcessor::SendError)?; debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 715928906e..b6e96737d6 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -19,6 +19,7 @@ use beacon_chain::{ PayloadVerificationOutcome, PayloadVerificationStatus, blob_verification::GossipVerifiedBlob, block_verification_types::{AsBlock, BlockImportData}, + custody_context::NodeCustodyType, data_availability_checker::Availability, test_utils::{ BeaconChainHarness, EphemeralHarnessType, NumBlobs, generate_rand_block_and_blobs, @@ -54,6 +55,10 @@ type DCByRootId = (SyncRequestId, Vec); impl TestRig { pub fn test_setup() -> Self { + Self::test_setup_with_custody_type(NodeCustodyType::Fullnode) + } + + pub fn test_setup_with_custody_type(node_custody_type: NodeCustodyType) -> Self { // Use `fork_from_env` logic to set correct fork epochs let spec = test_spec::(); @@ -68,6 +73,7 @@ impl TestRig { Duration::from_secs(0), Duration::from_secs(12), )) + .node_custody_type(node_custody_type) .build(); let chain = harness.chain.clone(); @@ -101,8 +107,6 @@ impl TestRig { .network_globals .set_sync_state(SyncState::Synced); - let spec = chain.spec.clone(); - // deterministic seed let rng_08 = ::from_seed([0u8; 32]); let rng = ChaCha20Rng::from_seed([0u8; 32]); @@ -128,7 +132,6 @@ impl TestRig { ), harness, fork_name, - spec, } } @@ -1929,7 +1932,6 @@ mod deneb_only { block_verification_types::{AsBlock, RpcBlock}, data_availability_checker::AvailabilityCheckError, }; - use ssz_types::RuntimeVariableList; use std::collections::VecDeque; struct DenebTester { @@ -2283,15 +2285,13 @@ mod deneb_only { fn parent_block_unknown_parent(mut self) -> Self { self.rig.log("parent_block_unknown_parent"); let block = self.unknown_parent_block.take().unwrap(); - let max_len = self.rig.spec.max_blobs_per_block(block.epoch()) as usize; // Now this block is the one we expect requests from self.block = block.clone(); let block = RpcBlock::new( - Some(block.canonical_root()), block, - self.unknown_parent_blobs - .take() - .map(|vec| RuntimeVariableList::new(vec, max_len).unwrap()), + None, + &self.rig.harness.chain.data_availability_checker, + self.rig.harness.chain.spec.clone(), ) .unwrap(); self.rig.parent_block_processed( diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 23c14ff63e..dcc7e3e49d 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use types::{ChainSpec, ForkName, MinimalEthSpec as E}; +use types::{ForkName, MinimalEthSpec as E}; mod lookups; mod range; @@ -68,7 +68,6 @@ struct TestRig { rng_08: rand_chacha_03::ChaCha20Rng, rng: ChaCha20Rng, fork_name: ForkName, - spec: Arc, } // Environment variable to read if `fork_from_env` feature is enabled. diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 9cda9fec95..6f129bc8f0 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -5,6 +5,9 @@ use crate::sync::SyncMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; +use beacon_chain::BeaconChain; +use beacon_chain::block_verification_types::AvailableBlockData; +use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock}; @@ -427,7 +430,7 @@ impl TestRig { .chain .process_block( block_root, - build_rpc_block(block.into(), &data_sidecars), + build_rpc_block(block.into(), &data_sidecars, self.harness.chain.clone()), NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), @@ -443,16 +446,42 @@ impl TestRig { fn build_rpc_block( block: Arc>, data_sidecars: &Option>, + chain: Arc>, ) -> RpcBlock { match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); + RpcBlock::new( + block, + Some(block_data), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new_with_data_columns( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); + RpcBlock::new( + block, + Some(block_data), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + &chain.data_availability_checker, + chain.spec.clone(), + ) + .unwrap(), } } @@ -485,10 +514,11 @@ fn head_chain_removed_while_finalized_syncing() { async fn state_update_while_purging() { // NOTE: this is a regression test. // Added in PR https://github.com/sigp/lighthouse/pull/2827 - let mut rig = TestRig::test_setup(); + let mut rig = TestRig::test_setup_with_custody_type(NodeCustodyType::SemiSupernode); // Create blocks on a separate harness - let mut rig_2 = TestRig::test_setup(); + // SemiSupernode ensures enough columns are stored for sampling + custody RPC block validation + let mut rig_2 = TestRig::test_setup_with_custody_type(NodeCustodyType::SemiSupernode); // Need to create blocks that can be inserted into the fork-choice and fit the "known // conditions" below. let head_peer_block = rig_2.create_canonical_block().await; diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index d94aa79c5d..a3c2fab468 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -532,6 +532,7 @@ impl Tester { valid: bool, ) -> Result<(), Error> { let block_root = block.canonical_root(); + let mut data_column_success = true; if let Some(columns) = columns.clone() { @@ -560,13 +561,21 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new( + block.clone(), + None, + &self.harness.chain.data_availability_checker, + self.harness.chain.spec.clone(), + ) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = data_column_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid { @@ -650,13 +659,21 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new( + block.clone(), + None, + &self.harness.chain.data_availability_checker, + self.harness.chain.spec.clone(), + ) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = blob_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid {