diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0ae9c77001..9692441aba 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3031,6 +3031,7 @@ impl BeaconChain { pub async fn verify_block_for_gossip( self: &Arc, block: Arc>, + custody_columns_count: usize, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor @@ -3040,7 +3041,7 @@ impl BeaconChain { let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain) { + match GossipVerifiedBlock::new(block, &chain, custody_columns_count) { Ok(verified) => { let commitments_formatted = verified.block.commitments_formatted(); debug!( @@ -7161,10 +7162,6 @@ impl BeaconChain { block_root: Hash256, block_data: AvailableBlockData, ) -> Result>, String> { - // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non - // custody columns: https://github.com/sigp/lighthouse/issues/6465 - let _custody_columns_count = self.data_availability_checker.get_sampling_column_count(); - match block_data { AvailableBlockData::NoData => Ok(None), AvailableBlockData::Blobs(blobs) => { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 599004d8bf..70d653524b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -683,6 +683,7 @@ pub struct GossipVerifiedBlock { pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, + custody_columns_count: usize, } /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit @@ -718,6 +719,7 @@ pub trait IntoGossipVerifiedBlock: Sized { fn into_gossip_verified_block( self, chain: &BeaconChain, + custody_columns_count: usize, ) -> Result, BlockError>; fn inner_block(&self) -> Arc>; } @@ -726,6 +728,7 @@ impl IntoGossipVerifiedBlock for GossipVerifiedBlock fn into_gossip_verified_block( self, _chain: &BeaconChain, + _custody_columns_count: usize, ) -> Result, BlockError> { Ok(self) } @@ -738,8 +741,9 @@ impl IntoGossipVerifiedBlock for Arc, + custody_columns_count: usize, ) -> Result, BlockError> { - GossipVerifiedBlock::new(self, chain) + GossipVerifiedBlock::new(self, chain, custody_columns_count) } fn inner_block(&self) -> Arc> { @@ -808,6 +812,7 @@ impl GossipVerifiedBlock { pub fn new( block: Arc>, chain: &BeaconChain, + custody_columns_count: usize, ) -> Result { // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply @@ -817,12 +822,14 @@ impl GossipVerifiedBlock { // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, // but it's way quicker to calculate root of the header since the hash of the tree rooted // at `BeaconBlockBody` is already computed in the header. - Self::new_without_slasher_checks(block, &header, chain).map_err(|e| { - process_block_slash_info::<_, BlockError>( - chain, - BlockSlashInfo::from_early_error_block(header, e), - ) - }) + Self::new_without_slasher_checks(block, &header, chain, custody_columns_count).map_err( + |e| { + process_block_slash_info::<_, BlockError>( + chain, + BlockSlashInfo::from_early_error_block(header, e), + ) + }, + ) } /// As for new, but doesn't pass the block to the slasher. @@ -830,6 +837,7 @@ impl GossipVerifiedBlock { block: Arc>, block_header: &SignedBeaconBlockHeader, chain: &BeaconChain, + custody_columns_count: usize, ) -> Result { // Ensure the block is the correct structure for the fork at `block.slot()`. block @@ -1036,6 +1044,7 @@ impl GossipVerifiedBlock { block_root, parent, consensus_context, + custody_columns_count, }) } @@ -1183,6 +1192,7 @@ impl SignatureVerifiedBlock { block: MaybeAvailableBlock::AvailabilityPending { block_root: from.block_root, block, + custody_columns_count: from.custody_columns_count, }, block_root: from.block_root, parent: Some(parent), diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 07ffae7712..d3a6e93862 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -31,6 +31,7 @@ use types::{ pub struct RpcBlock { block_root: Hash256, block: RpcBlockInner, + custody_columns_count: usize, } impl Debug for RpcBlock { @@ -44,6 +45,10 @@ impl RpcBlock { self.block_root } + pub fn custody_columns_count(&self) -> usize { + self.custody_columns_count + } + pub fn as_block(&self) -> &SignedBeaconBlock { match &self.block { RpcBlockInner::Block(block) => block, @@ -104,6 +109,8 @@ impl RpcBlock { Self { block_root, block: RpcBlockInner::Block(block), + // Block has zero columns + custody_columns_count: 0, } } @@ -145,6 +152,8 @@ impl RpcBlock { Ok(Self { block_root, block: inner, + // Block is before PeerDAS + custody_columns_count: 0, }) } @@ -152,6 +161,7 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, + custody_columns_count: usize, spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); @@ -172,6 +182,7 @@ impl RpcBlock { Ok(Self { block_root, block: inner, + custody_columns_count, }) } @@ -239,10 +250,12 @@ impl ExecutedBlock { MaybeAvailableBlock::AvailabilityPending { block_root: _, block: pending_block, + custody_columns_count, } => Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( pending_block, import_data, payload_verification_outcome, + custody_columns_count, )), } } @@ -308,6 +321,7 @@ pub struct AvailabilityPendingExecutedBlock { pub block: Arc>, pub import_data: BlockImportData, pub payload_verification_outcome: PayloadVerificationOutcome, + pub custody_columns_count: usize, } impl AvailabilityPendingExecutedBlock { @@ -315,11 +329,13 @@ impl AvailabilityPendingExecutedBlock { block: Arc>, import_data: BlockImportData, payload_verification_outcome: PayloadVerificationOutcome, + custody_columns_count: usize, ) -> Self { Self { block, import_data, payload_verification_outcome, + custody_columns_count, } } @@ -439,19 +455,13 @@ impl AsBlock for MaybeAvailableBlock { fn as_block(&self) -> &SignedBeaconBlock { match &self { MaybeAvailableBlock::Available(block) => block.as_block(), - MaybeAvailableBlock::AvailabilityPending { - block_root: _, - block, - } => block, + MaybeAvailableBlock::AvailabilityPending { block, .. } => block, } } fn block_cloned(&self) -> Arc> { match &self { MaybeAvailableBlock::Available(block) => block.block_cloned(), - MaybeAvailableBlock::AvailabilityPending { - block_root: _, - block, - } => block.clone(), + MaybeAvailableBlock::AvailabilityPending { block, .. } => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 010190bfbc..de66a8179d 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -975,14 +975,8 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new( - slot_clock, - self.kzg.clone(), - store, - self.import_all_data_columns, - self.spec, - ) - .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, + DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, self.spec) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 07d663369a..2b7ae9e4d1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -111,21 +111,9 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, - import_all_data_columns: bool, spec: Arc, ) -> Result { - let custody_group_count = spec.custody_group_count(import_all_data_columns); - // This should only panic if the chain spec contains invalid values. - let sampling_size = spec - .sampling_size(custody_group_count) - .expect("should compute node sampling size from valid chain spec"); - - let inner = DataAvailabilityCheckerInner::new( - OVERFLOW_LRU_CAPACITY, - store, - sampling_size as usize, - spec.clone(), - )?; + let inner = DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; Ok(Self { availability_cache: Arc::new(inner), slot_clock, @@ -134,14 +122,6 @@ impl DataAvailabilityChecker { }) } - pub fn get_sampling_column_count(&self) -> usize { - self.availability_cache.sampling_column_count() - } - - pub(crate) fn is_supernode(&self) -> bool { - self.get_sampling_column_count() == self.spec.number_of_columns as usize - } - /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( @@ -326,6 +306,7 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { + let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs { @@ -339,7 +320,11 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + Ok(MaybeAvailableBlock::AvailabilityPending { + block_root, + block, + custody_columns_count, + }) }; } if self.data_columns_required_for_block(&block) { @@ -364,7 +349,11 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + Ok(MaybeAvailableBlock::AvailabilityPending { + block_root, + block, + custody_columns_count, + }) }; } @@ -421,6 +410,7 @@ impl DataAvailabilityChecker { } for block in blocks { + let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { @@ -433,7 +423,11 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } + MaybeAvailableBlock::AvailabilityPending { + block_root, + block, + custody_columns_count, + } } } else if self.data_columns_required_for_block(&block) { if let Some(data_columns) = data_columns { @@ -447,7 +441,11 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } + MaybeAvailableBlock::AvailabilityPending { + block_root, + block, + custody_columns_count, + } } } else { MaybeAvailableBlock::Available(AvailableBlock { @@ -804,6 +802,7 @@ pub enum MaybeAvailableBlock { AvailabilityPending { block_root: Hash256, block: Arc>, + custody_columns_count: usize, }, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 4e75ed4945..d091d6fefb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,7 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, - Unexpected(&'static str), + Unexpected(String), SszTypes(ssz_types::Error), MissingBlobs, MissingCustodyColumns, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index d4cbf5ab76..f38a3b8b9c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -165,7 +165,6 @@ impl PendingComponents { /// reconstructed from disk. Ensure you are not holding any write locks while calling this. pub fn make_available( &mut self, - custody_column_count: usize, spec: &Arc, recover: R, ) -> Result>, AvailabilityCheckError> @@ -184,10 +183,14 @@ impl PendingComponents { let blob_data = if num_expected_blobs == 0 { Some(AvailableBlockData::NoData) } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { - match self.verified_data_columns.len().cmp(&custody_column_count) { + let num_received_columns = self.verified_data_columns.len(); + let num_expected_columns = block.custody_columns_count(); + match num_received_columns.cmp(&num_expected_columns) { Ordering::Greater => { // Should never happen - return Err(AvailabilityCheckError::Unexpected("too many columns")); + return Err(AvailabilityCheckError::Unexpected(format!( + "too many columns got {num_received_columns} expected {num_expected_columns}" + ))); } Ordering::Equal => { // Block is post-peerdas, and we got enough columns @@ -214,7 +217,9 @@ impl PendingComponents { match num_received_blobs.cmp(&num_expected_blobs) { Ordering::Greater => { // Should never happen - return Err(AvailabilityCheckError::Unexpected("too many blobs")); + return Err(AvailabilityCheckError::Unexpected(format!( + "too many blobs got {num_received_blobs} expected {num_expected_blobs}" + ))); } Ordering::Equal => { let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize; @@ -224,8 +229,12 @@ impl PendingComponents { .flatten() .map(|blob| blob.clone().to_blob()) .collect::>(); - let blobs = RuntimeVariableList::new(blobs_vec, max_blobs) - .map_err(|_| AvailabilityCheckError::Unexpected("over max_blobs"))?; + let blobs_len = blobs_vec.len(); + let blobs = RuntimeVariableList::new(blobs_vec, max_blobs).map_err(|_| { + AvailabilityCheckError::Unexpected(format!( + "over max_blobs len {blobs_len} max {max_blobs}" + )) + })?; Some(AvailableBlockData::Blobs(blobs)) } Ordering::Less => { @@ -259,6 +268,7 @@ impl PendingComponents { block, import_data, payload_verification_outcome, + custody_columns_count: _, } = recover(block.clone())?; let available_block = AvailableBlock { @@ -313,14 +323,14 @@ impl PendingComponents { }) } - pub fn status_str( - &self, - block_epoch: Epoch, - sampling_column_count: usize, - spec: &ChainSpec, - ) -> String { + pub fn status_str(&self, block_epoch: Epoch, spec: &ChainSpec) -> String { let block_count = if self.executed_block.is_some() { 1 } else { 0 }; if spec.is_peer_das_enabled_for_epoch(block_epoch) { + let custody_columns_count = if let Some(block) = self.get_cached_block() { + &block.custody_columns_count().to_string() + } else { + "?" + }; let data_column_recv_count = if self.data_column_recv.is_some() { 1 } else { @@ -330,7 +340,7 @@ impl PendingComponents { "block {} data_columns {}/{} data_columns_recv {}", block_count, self.verified_data_columns.len(), - sampling_column_count, + custody_columns_count, data_column_recv_count, ) } else { @@ -357,8 +367,6 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, - /// The number of data columns the node is sampling via subnet sampling. - sampling_column_count: usize, spec: Arc, } @@ -375,21 +383,15 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, - sampling_column_count: usize, spec: Arc, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), state_cache: StateLRUCache::new(beacon_store, spec.clone()), - sampling_column_count, spec, }) } - pub fn sampling_column_count(&self) -> usize { - self.sampling_column_count - } - /// Returns true if the block root is known, without altering the LRU ordering pub fn get_execution_valid_block( &self, @@ -460,7 +462,7 @@ impl DataAvailabilityCheckerInner { .map(|verified_blob| verified_blob.as_blob().epoch()) else { // Verified blobs list should be non-empty. - return Err(AvailabilityCheckError::Unexpected("empty blobs")); + return Err(AvailabilityCheckError::Unexpected("empty blobs".to_owned())); }; let mut fixed_blobs = @@ -488,15 +490,13 @@ impl DataAvailabilityCheckerInner { debug!( component = "blobs", ?block_root, - status = pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + status = pending_components.status_str(epoch, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = - pending_components.make_available(self.sampling_column_count, &self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? - { + if let Some(available_block) = pending_components.make_available(&self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); @@ -522,7 +522,9 @@ impl DataAvailabilityCheckerInner { .map(|verified_blob| verified_blob.as_data_column().epoch()) else { // Verified data_columns list should be non-empty. - return Err(AvailabilityCheckError::Unexpected("empty columns")); + return Err(AvailabilityCheckError::Unexpected( + "empty columns".to_owned(), + )); }; let mut write_lock = self.critical.write(); @@ -541,15 +543,13 @@ impl DataAvailabilityCheckerInner { debug!( component = "data_columns", ?block_root, - status = pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + status = pending_components.status_str(epoch, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = - pending_components.make_available(self.sampling_column_count, &self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? - { + if let Some(available_block) = pending_components.make_available(&self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); @@ -591,16 +591,13 @@ impl DataAvailabilityCheckerInner { debug!( component = "data_columns_recv", ?block_root, - status = - pending_components.status_str(block_epoch, self.sampling_column_count, &self.spec), + status = pending_components.status_str(block_epoch, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = - pending_components.make_available(self.sampling_column_count, &self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? - { + if let Some(available_block) = pending_components.make_available(&self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); @@ -632,16 +629,12 @@ impl DataAvailabilityCheckerInner { }; // If we're sampling all columns, it means we must be custodying all columns. - let custody_column_count = self.sampling_column_count(); let total_column_count = self.spec.number_of_columns as usize; let received_column_count = pending_components.verified_data_columns.len(); if pending_components.reconstruction_started { return ReconstructColumnsDecision::No("already started"); } - if custody_column_count != total_column_count { - return ReconstructColumnsDecision::No("not required for full node"); - } if received_column_count >= total_column_count { return ReconstructColumnsDecision::No("all columns received"); } @@ -692,16 +685,14 @@ impl DataAvailabilityCheckerInner { debug!( component = "block", ?block_root, - status = pending_components.status_str(epoch, self.sampling_column_count, &self.spec), + status = pending_components.status_str(epoch, &self.spec), "Component added to data availability checker" ); // Check if we have all components and entire set is consistent. - if let Some(available_block) = - pending_components.make_available(self.sampling_column_count, &self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? - { + if let Some(available_block) = pending_components.make_available(&self.spec, |block| { + self.state_cache.recover_pending_executed_block(block) + })? { // We keep the pending components in the availability cache during block import (#5845). // `data_column_recv` is returned as part of the available block and is no longer needed here. write_lock.put(block_root, pending_components); @@ -942,6 +933,7 @@ mod test { block, import_data, payload_verification_outcome, + custody_columns_count: DEFAULT_TEST_CUSTODY_COLUMN_COUNT, }; (availability_pending_block, gossip_verified_blobs) @@ -969,13 +961,8 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - DataAvailabilityCheckerInner::::new( - capacity_non_zero, - test_store, - DEFAULT_TEST_CUSTODY_COLUMN_COUNT, - spec.clone(), - ) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1325,6 +1312,8 @@ mod pending_components_tests { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }, + // Default custody columns count, doesn't matter here + custody_columns_count: 8, }; (block.into(), blobs, invalid_blobs) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 5b9b7c7023..09d0563a4a 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -29,6 +29,7 @@ pub struct DietAvailabilityPendingExecutedBlock { confirmed_state_roots: Vec, consensus_context: OnDiskConsensusContext, payload_verification_outcome: PayloadVerificationOutcome, + custody_columns_count: usize, } /// just implementing the same methods as `AvailabilityPendingExecutedBlock` @@ -58,6 +59,10 @@ impl DietAvailabilityPendingExecutedBlock { .unwrap_or_default() } + pub fn custody_columns_count(&self) -> usize { + self.custody_columns_count + } + /// Returns the epoch corresponding to `self.slot()`. pub fn epoch(&self) -> Epoch { self.block.slot().epoch(E::slots_per_epoch()) @@ -108,6 +113,7 @@ impl StateLRUCache { executed_block.import_data.consensus_context, ), payload_verification_outcome: executed_block.payload_verification_outcome, + custody_columns_count: executed_block.custody_columns_count, } } @@ -138,6 +144,7 @@ impl StateLRUCache { .into_consensus_context(), }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, + custody_columns_count: diet_executed_block.custody_columns_count, }) } @@ -225,6 +232,7 @@ impl From> value.import_data.consensus_context, ), payload_verification_outcome: value.payload_verification_outcome, + custody_columns_count: value.custody_columns_count, } } } diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index f1da1ffc2f..ceb563ffc2 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -256,13 +256,6 @@ fn spawn_compute_and_publish_data_columns_task( return; }; - // At the moment non supernodes are not required to publish any columns. - // TODO(das): we could experiment with having full nodes publish their custodied - // columns here. - if !chain_cloned.data_availability_checker.is_supernode() { - return; - } - publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns)); }, "compute_and_publish_data_columns", diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 7169c86174..ee51964910 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -93,27 +93,12 @@ impl BeaconChain { return Ok(0); } - // Blobs are stored per block, and data columns are each stored individually - let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() { - // TODO(das): `available_block includes all sampled columns, but we only need to store - // custody columns. To be clarified in spec PR. - self.data_availability_checker.get_sampling_column_count() - } else { - 1 - }; - - let blob_batch_size = blocks_to_import - .iter() - .filter(|available_block| available_block.has_blobs()) - .count() - .saturating_mul(n_blob_ops_per_block); - let mut expected_block_root = anchor_info.oldest_block_parent; let mut prev_block_slot = anchor_info.oldest_block_slot; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; - let mut blob_batch = Vec::::with_capacity(blob_batch_size); + let mut blob_batch = Vec::::new(); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len()); let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9f9ad2c6dd..beff95eb77 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -615,6 +615,12 @@ where let chain = builder.build().expect("should build"); + let sampling_column_count = if self.import_all_data_columns { + chain.spec.number_of_custody_groups as usize + } else { + chain.spec.custody_requirement as usize + }; + BeaconChainHarness { spec: chain.spec.clone(), chain: Arc::new(chain), @@ -625,6 +631,7 @@ where mock_execution_layer: self.mock_execution_layer, mock_builder: None, rng: make_rng(), + sampling_column_count, } } } @@ -681,6 +688,7 @@ pub struct BeaconChainHarness { pub mock_execution_layer: Option>, pub mock_builder: Option>>, + pub sampling_column_count: usize, pub rng: Mutex, } @@ -782,6 +790,10 @@ where (0..self.validator_keypairs.len()).collect() } + pub fn get_sampling_column_count(&self) -> usize { + self.sampling_column_count + } + pub fn slots_per_epoch(&self) -> u64 { E::slots_per_epoch() } @@ -2342,8 +2354,14 @@ where .into_iter() .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec) - .unwrap() + RpcBlock::new_with_custody_columns( + Some(block_root), + block, + custody_columns, + self.get_sampling_column_count(), + &self.spec, + ) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); RpcBlock::new(Some(block_root), block, blobs).unwrap() @@ -2358,10 +2376,7 @@ where blob_items: Option<(KzgProofs, BlobsList)>, ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { - let sampling_column_count = self - .chain - .data_availability_checker - .get_sampling_column_count(); + let sampling_column_count = self.get_sampling_column_count(); if blob_items.is_some_and(|(_, blobs)| !blobs.is_empty()) { // Note: this method ignores the actual custody columns and just take the first @@ -2372,7 +2387,13 @@ where .take(sampling_column_count) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)? + RpcBlock::new_with_custody_columns( + Some(block_root), + block, + columns, + sampling_column_count, + &self.spec, + )? } else { RpcBlock::new_without_blobs(Some(block_root), block) } @@ -3071,10 +3092,7 @@ where let is_peerdas_enabled = self.chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); if is_peerdas_enabled { let custody_columns = custody_columns_opt.unwrap_or_else(|| { - let sampling_column_count = self - .chain - .data_availability_checker - .get_sampling_column_count() as u64; + let sampling_column_count = self.get_sampling_column_count() as u64; (0..sampling_column_count).collect() }); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 5e39bf32c2..3dc46be16e 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -30,6 +30,8 @@ type E = MainnetEthSpec; const VALIDATOR_COUNT: usize = 24; const CHAIN_SEGMENT_LENGTH: usize = 64 * 5; const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGTH - 1]; +// Default custody group count for tests +const CGC: usize = 8; /// A cached set of keys. static KEYPAIRS: LazyLock> = @@ -142,7 +144,8 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), columns.len(), spec) + .unwrap() } None => RpcBlock::new_without_blobs(None, block), } @@ -991,6 +994,7 @@ async fn block_gossip_verification() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; + let cgc = harness.chain.spec.custody_requirement as usize; harness .chain @@ -1004,7 +1008,7 @@ async fn block_gossip_verification() { { let gossip_verified = harness .chain - .verify_block_for_gossip(snapshot.beacon_block.clone()) + .verify_block_for_gossip(snapshot.beacon_block.clone(), get_cgc(&blobs_opt)) .await .expect("should obtain gossip verified block"); @@ -1046,7 +1050,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_block_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), BlockError::FutureSlot { present_slot, block_slot, @@ -1080,7 +1084,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_finalized_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), BlockError::WouldRevertFinalizedSlot { block_slot, finalized_slot, @@ -1110,10 +1114,10 @@ async fn block_gossip_verification() { unwrap_err( harness .chain - .verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block( - block, - junk_signature() - ))) + .verify_block_for_gossip( + Arc::new(SignedBeaconBlock::from_block(block, junk_signature())), + cgc + ) .await ), BlockError::InvalidSignature(InvalidSignature::ProposerSignature) @@ -1138,7 +1142,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), BlockError::ParentUnknown {parent_root: p} if p == parent_root ), @@ -1164,7 +1168,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), BlockError::NotFinalizedDescendant { block_parent_root } if block_parent_root == parent_root ), @@ -1201,7 +1205,7 @@ async fn block_gossip_verification() { ); assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), BlockError::IncorrectBlockProposer { block, local_shuffling, @@ -1213,7 +1217,7 @@ async fn block_gossip_verification() { // Check to ensure that we registered this is a valid block from this proposer. assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), BlockError::DuplicateImportStatusUnknown(_), ), "should register any valid signature against the proposer, even if the block failed later verification" @@ -1221,7 +1225,11 @@ async fn block_gossip_verification() { let block = chain_segment[block_index].beacon_block.clone(); assert!( - harness.chain.verify_block_for_gossip(block).await.is_ok(), + harness + .chain + .verify_block_for_gossip(block, cgc) + .await + .is_ok(), "the valid block should be processed" ); @@ -1239,7 +1247,7 @@ async fn block_gossip_verification() { matches!( harness .chain - .verify_block_for_gossip(block.clone()) + .verify_block_for_gossip(block.clone(), cgc) .await .expect_err("should error when processing known block"), BlockError::DuplicateImportStatusUnknown(_) @@ -1315,8 +1323,17 @@ async fn verify_block_for_gossip_slashing_detection() { let state = harness.get_current_state(); let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await; let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await; + let cgc = if block1.fork_name_unchecked().fulu_enabled() { + harness.get_sampling_column_count() + } else { + 0 + }; - let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap(); + let verified_block = harness + .chain + .verify_block_for_gossip(block1, cgc) + .await + .unwrap(); if let Some((kzg_proofs, blobs)) = blobs1 { harness @@ -1339,7 +1356,7 @@ async fn verify_block_for_gossip_slashing_detection() { ) .await .unwrap(); - unwrap_err(harness.chain.verify_block_for_gossip(block2).await); + unwrap_err(harness.chain.verify_block_for_gossip(block2, CGC).await); // Slasher should have been handed the two conflicting blocks and crafted a slashing. slasher.process_queued(Epoch::new(0)).unwrap(); @@ -1363,7 +1380,11 @@ async fn verify_block_for_gossip_doppelganger_detection() { .attestations() .map(|att| att.clone_as_attestation()) .collect::>(); - let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap(); + let verified_block = harness + .chain + .verify_block_for_gossip(block, CGC) + .await + .unwrap(); harness .chain .process_block( @@ -1510,7 +1531,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(base_block.clone())) + .verify_block_for_gossip(Arc::new(base_block.clone()), CGC) .await .expect_err("should error when processing base block"), BlockError::InconsistentFork(InconsistentFork { @@ -1646,7 +1667,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(altair_block.clone())) + .verify_block_for_gossip(Arc::new(altair_block.clone()), CGC) .await .expect_err("should error when processing altair block"), BlockError::InconsistentFork(InconsistentFork { @@ -1810,3 +1831,14 @@ async fn import_execution_pending_block( } } } + +fn get_cgc(blobs_opt: &Option>) -> usize { + if let Some(data_sidecars) = blobs_opt.as_ref() { + match data_sidecars { + DataSidecars::Blobs(_) => 0, + DataSidecars::DataColumns(d) => d.len(), + } + } else { + 0 + } +} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index f81fe482ef..88180f3c94 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -21,6 +21,7 @@ use task_executor::ShutdownReason; use types::*; const VALIDATOR_COUNT: usize = 32; +const CGC: usize = 8; type E = MainnetEthSpec; @@ -1050,7 +1051,7 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for gossip. assert!(matches!( - rig.harness.chain.clone().verify_block_for_gossip(block.clone()).await, + rig.harness.chain.clone().verify_block_for_gossip(block.clone(), CGC).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 24af16680e..a5cd94536d 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -137,7 +137,8 @@ pub async fn publish_block>( spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. - let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); + let gossip_verified_block_result = unverified_block + .into_gossip_verified_block(&chain, network_globals.custody_columns_count() as usize); let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index b888439238..cd590580be 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -39,6 +39,9 @@ type E = MainnetEthSpec; * */ +// Default custody group count for tests +const CGC: usize = 8; + /// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn gossip_invalid() { @@ -364,9 +367,9 @@ pub async fn consensus_partial_pass_only_consensus() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); assert!(gossip_block_a.is_err()); /* submit `block_b` which should induce equivocation */ @@ -654,10 +657,10 @@ pub async fn equivocation_consensus_late_equivocation() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1291,9 +1294,9 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { ProvenancedBlock::Builder(b, _, _) => b, }; - let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain, CGC); assert!(gossip_block_b.is_ok()); - let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain, CGC); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1395,7 +1398,7 @@ pub async fn block_seen_on_gossip_without_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain) + .into_gossip_verified_block(&tester.harness.chain, CGC) .unwrap(); // It should not yet be added to fork choice because blobs have not been seen. @@ -1464,7 +1467,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain) + .into_gossip_verified_block(&tester.harness.chain, CGC) .unwrap(); // Simulate some of the blobs being seen on gossip. diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 4269a8973c..f41f60008e 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -33,6 +33,8 @@ pub struct NetworkGlobals { /// The computed sampling subnets and columns is stored to avoid re-computing. pub sampling_subnets: HashSet, pub sampling_columns: HashSet, + /// Constant custody group count (CGC) set at startup + custody_group_count: u64, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -48,47 +50,43 @@ impl NetworkGlobals { config: Arc, spec: Arc, ) -> Self { - let (sampling_subnets, sampling_columns) = if spec.is_peer_das_scheduled() { - let node_id = enr.node_id().raw(); + let node_id = enr.node_id().raw(); - let custody_group_count = match local_metadata.custody_group_count() { - Ok(&cgc) if cgc <= spec.number_of_custody_groups => cgc, - _ => { + let custody_group_count = match local_metadata.custody_group_count() { + Ok(&cgc) if cgc <= spec.number_of_custody_groups => cgc, + _ => { + if spec.is_peer_das_scheduled() { error!( info = "falling back to default custody requirement", "custody_group_count from metadata is either invalid or not set. This is a bug!" ); - spec.custody_requirement } - }; - - // The below `expect` calls will panic on start up if the chain spec config values used - // are invalid - let sampling_size = spec - .sampling_size(custody_group_count) - .expect("should compute node sampling size from valid chain spec"); - let custody_groups = get_custody_groups(node_id, sampling_size, &spec) - .expect("should compute node custody groups"); - - let mut sampling_subnets = HashSet::new(); - for custody_index in &custody_groups { - let subnets = compute_subnets_from_custody_group(*custody_index, &spec) - .expect("should compute custody subnets for node"); - sampling_subnets.extend(subnets); + spec.custody_requirement } - - let mut sampling_columns = HashSet::new(); - for custody_index in &custody_groups { - let columns = compute_columns_for_custody_group(*custody_index, &spec) - .expect("should compute custody columns for node"); - sampling_columns.extend(columns); - } - - (sampling_subnets, sampling_columns) - } else { - (HashSet::new(), HashSet::new()) }; + // The below `expect` calls will panic on start up if the chain spec config values used + // are invalid + let sampling_size = spec + .sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec"); + let custody_groups = get_custody_groups(node_id, sampling_size, &spec) + .expect("should compute node custody groups"); + + let mut sampling_subnets = HashSet::new(); + for custody_index in &custody_groups { + let subnets = compute_subnets_from_custody_group(*custody_index, &spec) + .expect("should compute custody subnets for node"); + sampling_subnets.extend(subnets); + } + + let mut sampling_columns = HashSet::new(); + for custody_index in &custody_groups { + let columns = compute_columns_for_custody_group(*custody_index, &spec) + .expect("should compute custody columns for node"); + sampling_columns.extend(columns); + } + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), @@ -100,6 +98,7 @@ impl NetworkGlobals { backfill_state: RwLock::new(BackFillState::Paused), sampling_subnets, sampling_columns, + custody_group_count, config, spec, } @@ -121,6 +120,19 @@ impl NetworkGlobals { self.listen_multiaddrs.read().clone() } + /// Returns true if this node is configured as a PeerDAS supernode + pub fn is_supernode(&self) -> bool { + self.custody_group_count == self.spec.number_of_custody_groups + } + + /// Returns the count of custody columns this node must sample for block import + pub fn custody_columns_count(&self) -> u64 { + // This only panics if the chain spec contains invalid values + self.spec + .sampling_size(self.custody_group_count) + .expect("should compute node sampling size from valid chain spec") + } + /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 34dca4f100..f104bbf1bc 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1257,7 +1257,10 @@ impl NetworkBeaconProcessor { let verification_result = self .chain .clone() - .verify_block_for_gossip(block.clone()) + .verify_block_for_gossip( + block.clone(), + self.network_globals.custody_columns_count() as usize, + ) .await; if verification_result.is_ok() { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index b99e71bcea..1329936932 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -927,9 +927,14 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { + let is_supernode = self.network_globals.is_supernode(); + let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { - if publish_blobs { + // At the moment non supernodes are not required to publish any columns. + // TODO(das): we could experiment with having full nodes publish their custodied + // columns here. + if publish_blobs && is_supernode { match blobs_or_data_column { BlobsOrDataColumns::Blobs(blobs) => { self_cloned.publish_blobs_gradually(blobs, block_root); @@ -1004,6 +1009,11 @@ impl NetworkBeaconProcessor { self: &Arc, block_root: Hash256, ) -> Option { + // Only supernodes attempt reconstruction + if !self.network_globals.is_supernode() { + return None; + } + let result = self.chain.reconstruct_data_columns(block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 45cb1aeace..ef9285c8dc 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -257,8 +257,14 @@ impl RangeBlockComponentsRequest { )); } - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| format!("{e:?}"))? + RpcBlock::new_with_custody_columns( + Some(block_root), + block, + custody_columns, + expects_custody_columns.len(), + spec, + ) + .map_err(|e| format!("{e:?}"))? } else { RpcBlock::new_without_blobs(Some(block_root), block) }); diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index f79dd6de96..fe72979930 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1203,8 +1203,12 @@ impl TestRig { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }; - let executed_block = - AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome); + let executed_block = AvailabilityPendingExecutedBlock::new( + block, + import_data, + payload_verification_outcome, + self.network_globals.custody_columns_count() as usize, + ); match self .harness .chain diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index ddd4626cce..ca4344c0b2 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -449,7 +449,15 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() + RpcBlock::new_with_custody_columns( + None, + block, + columns.clone(), + // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests + columns.len(), + spec, + ) + .unwrap() } None => RpcBlock::new_without_blobs(None, block), }