diff --git a/Makefile b/Makefile index 6f9bb6043c..b9677c8c7a 100644 --- a/Makefile +++ b/Makefile @@ -279,7 +279,7 @@ lint: # Lints the code using Clippy and automatically fix some simple compiler warnings. lint-fix: - EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint + EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint-full # Also run the lints on the optimized-only tests lint-full: diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index decc38e22c..a9d179cb02 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2206,7 +2206,7 @@ impl BeaconChain { pub fn verify_data_column_sidecar_for_gossip( self: &Arc, data_column_sidecar: Arc>, - subnet_id: u64, + subnet_id: DataColumnSubnetId, ) -> Result, GossipDataColumnError> { metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); @@ -3594,7 +3594,7 @@ impl BeaconChain { let availability = self .data_availability_checker - .put_gossip_verified_data_columns(block_root, data_columns)?; + .put_gossip_verified_data_columns(block_root, slot, data_columns)?; self.process_availability(slot, availability, publish_fn) .await @@ -3685,9 +3685,11 @@ impl BeaconChain { // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let availability = self - .data_availability_checker - .put_rpc_custody_columns(block_root, custody_columns)?; + let availability = self.data_availability_checker.put_rpc_custody_columns( + block_root, + slot, + custody_columns, + )?; self.process_availability(slot, availability, || Ok(())) .await @@ -7110,6 +7112,14 @@ impl BeaconChain { roots.reverse(); roots } + + /// Returns a list of column indices that should be sampled for a given epoch. + /// Used for data availability sampling in PeerDAS. + pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] { + self.data_availability_checker + .custody_context() + .sampling_columns_for_epoch(epoch, &self.spec) + } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 4f05e8a3b6..dcc24568c1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, info_span, Instrument}; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - RuntimeVariableList, SignedBeaconBlock, + RuntimeVariableList, SignedBeaconBlock, Slot, }; mod error; @@ -76,7 +76,7 @@ pub struct DataAvailabilityChecker { availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Arc, - custody_context: Arc, + custody_context: Arc>, spec: Arc, } @@ -114,7 +114,7 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, - custody_context: Arc, + custody_context: Arc>, spec: Arc, ) -> Result { let inner = DataAvailabilityCheckerInner::new( @@ -132,8 +132,8 @@ impl DataAvailabilityChecker { }) } - pub fn custody_context(&self) -> Arc { - self.custody_context.clone() + pub fn custody_context(&self) -> &Arc> { + &self.custody_context } /// Checks if the block root is currenlty in the availability cache awaiting import because @@ -233,6 +233,7 @@ impl DataAvailabilityChecker { pub fn put_rpc_custody_columns( &self, block_root: Hash256, + slot: Slot, custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { // Attributes fault to the specific peer that sent an invalid column @@ -240,8 +241,17 @@ impl DataAvailabilityChecker { KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; + // Filter out columns that aren't required for custody for this slot + // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ + // not be yet effective for data availability check, as CGC changes are only effecive from + // a new epoch. + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); let verified_custody_columns = kzg_verified_columns .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) .collect::>(); @@ -286,10 +296,16 @@ impl DataAvailabilityChecker { >( &self, block_root: Hash256, + slot: Slot, data_columns: I, ) -> Result, AvailabilityCheckError> { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); let custody_columns = data_columns .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); @@ -811,3 +827,207 @@ impl MaybeAvailableBlock { } } } + +#[cfg(test)] +mod test { + use super::*; + use crate::test_utils::{ + generate_rand_block_and_data_columns, get_kzg, EphemeralHarnessType, NumBlobs, + }; + use crate::CustodyContext; + use rand::prelude::StdRng; + use rand::seq::SliceRandom; + use rand::SeedableRng; + use slot_clock::{SlotClock, TestingSlotClock}; + use std::collections::HashSet; + use std::sync::Arc; + use std::time::Duration; + use store::HotColdDB; + use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot}; + + type E = MainnetEthSpec; + type T = EphemeralHarnessType; + + /// Test to verify any extra RPC columns received that are not part of the "effective" CGC for + /// the slot are excluded from import. + #[test] + fn should_exclude_rpc_columns_not_required_for_sampling() { + // SETUP + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + + let da_checker = new_da_checker(spec.clone()); + let custody_context = &da_checker.custody_context; + let all_column_indices_ordered = + init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec); + + // GIVEN a single 32 ETH validator is attached slot 0 + let epoch = Epoch::new(0); + let validator_0 = 0; + custody_context.register_validators( + vec![(validator_0, 32_000_000_000)], + epoch.start_slot(E::slots_per_epoch()), + &spec, + ); + assert_eq!( + custody_context.num_of_data_columns_to_sample(epoch, &spec), + spec.validator_custody_requirement as usize, + "sampling size should be the minimal custody requirement == 8" + ); + + // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch + let validator_1 = 1; + let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); + custody_context.register_validators( + vec![(validator_1, 32_000_000_000 * 9)], + cgc_change_slot, + &spec, + ); + // AND custody columns (8) and any new extra columns (2) are received via RPC responses. + // NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown. + let (_, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + let block_root = Hash256::random(); + let requested_columns = &all_column_indices_ordered[..10]; + da_checker + .put_rpc_custody_columns( + block_root, + cgc_change_slot, + data_columns + .into_iter() + .filter(|d| requested_columns.contains(&d.index)) + .collect(), + ) + .expect("should put rpc custody columns"); + + // THEN the sampling size for the end slot of the same epoch remains unchanged + let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + assert_eq!( + sampling_columns.len(), + spec.validator_custody_requirement as usize // 8 + ); + // AND any extra columns received via RPC responses are excluded from import. + let actual_cached: HashSet = da_checker + .cached_data_column_indexes(&block_root) + .expect("should have cached data columns") + .into_iter() + .collect(); + let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); + assert_eq!( + actual_cached, expected_sampling_columns, + "should cache only the effective sampling columns" + ); + assert!( + actual_cached.len() < requested_columns.len(), + "extra columns should be excluded" + ) + } + + /// Test to verify any extra gossip columns received that are not part of the "effective" CGC for + /// the slot are excluded from import. + #[test] + fn should_exclude_gossip_columns_not_required_for_sampling() { + // SETUP + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + + let da_checker = new_da_checker(spec.clone()); + let custody_context = &da_checker.custody_context; + let all_column_indices_ordered = + init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec); + + // GIVEN a single 32 ETH validator is attached slot 0 + let epoch = Epoch::new(0); + let validator_0 = 0; + custody_context.register_validators( + vec![(validator_0, 32_000_000_000)], + epoch.start_slot(E::slots_per_epoch()), + &spec, + ); + assert_eq!( + custody_context.num_of_data_columns_to_sample(epoch, &spec), + spec.validator_custody_requirement as usize, + "sampling size should be the minimal custody requirement == 8" + ); + + // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch + let validator_1 = 1; + let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); + custody_context.register_validators( + vec![(validator_1, 32_000_000_000 * 9)], + cgc_change_slot, + &spec, + ); + // AND custody columns (8) and any new extra columns (2) are received via gossip. + // NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to + // arrive via gossip. + let (_, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + let block_root = Hash256::random(); + let requested_columns = &all_column_indices_ordered[..10]; + let gossip_columns = data_columns + .into_iter() + .filter(|d| requested_columns.contains(&d.index)) + .map(GossipVerifiedDataColumn::::__new_for_testing) + .collect::>(); + da_checker + .put_gossip_verified_data_columns(block_root, cgc_change_slot, gossip_columns) + .expect("should put gossip custody columns"); + + // THEN the sampling size for the end slot of the same epoch remains unchanged + let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); + assert_eq!( + sampling_columns.len(), + spec.validator_custody_requirement as usize // 8 + ); + // AND any extra columns received via gossip responses are excluded from import. + let actual_cached: HashSet = da_checker + .cached_data_column_indexes(&block_root) + .expect("should have cached data columns") + .into_iter() + .collect(); + let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); + assert_eq!( + actual_cached, expected_sampling_columns, + "should cache only the effective sampling columns" + ); + assert!( + actual_cached.len() < requested_columns.len(), + "extra columns should be excluded" + ) + } + + fn init_custody_context_with_ordered_columns( + custody_context: &Arc>, + mut rng: &mut StdRng, + spec: &ChainSpec, + ) -> Vec { + let mut all_data_columns = (0..spec.number_of_custody_groups).collect::>(); + all_data_columns.shuffle(&mut rng); + custody_context + .init_ordered_data_columns_from_custody_groups(all_data_columns.clone(), spec) + .expect("should initialise ordered custody columns"); + all_data_columns + } + + fn new_da_checker(spec: Arc) -> DataAvailabilityChecker { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(spec.seconds_per_slot), + ); + let kzg = get_kzg(&spec); + let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap()); + let custody_context = Arc::new(CustodyContext::new(false)); + DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec) + .expect("should initialise data availability checker") + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3c1fd1e7bc..7e9e643675 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -159,7 +159,7 @@ impl PendingComponents { pub fn make_available( &mut self, spec: &Arc, - num_expected_columns: u64, + num_expected_columns: usize, recover: R, ) -> Result>, AvailabilityCheckError> where @@ -173,7 +173,6 @@ impl PendingComponents { }; let num_expected_blobs = block.num_blobs_expected(); - let num_expected_columns = num_expected_columns as usize; let blob_data = if num_expected_blobs == 0 { Some(AvailableBlockData::NoData) } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { @@ -311,7 +310,7 @@ impl PendingComponents { pub fn status_str( &self, block_epoch: Epoch, - num_expected_columns: Option, + num_expected_columns: Option, spec: &ChainSpec, ) -> String { let block_count = if self.executed_block.is_some() { 1 } else { 0 }; @@ -348,7 +347,7 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, - custody_context: Arc, + custody_context: Arc>, spec: Arc, } @@ -365,7 +364,7 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, - custody_context: Arc, + custody_context: Arc>, spec: Arc, ) -> Result { Ok(Self { @@ -482,7 +481,7 @@ impl DataAvailabilityCheckerInner { if let Some(available_block) = pending_components.make_available( &self.spec, self.custody_context - .num_of_data_columns_to_sample(Some(epoch), &self.spec), + .num_of_data_columns_to_sample(epoch, &self.spec), |block| self.state_cache.recover_pending_executed_block(block), )? { // We keep the pending components in the availability cache during block import (#5845). @@ -508,10 +507,11 @@ impl DataAvailabilityCheckerInner { .peek() .map(|verified_blob| verified_blob.as_data_column().epoch()) else { - // Verified data_columns list should be non-empty. - return Err(AvailabilityCheckError::Unexpected( - "empty columns".to_owned(), - )); + // No columns are processed. This can occur if all received columns were filtered out + // before this point, e.g. due to a CGC change that caused extra columns to be downloaded + // // before the new CGC took effect. + // Return `Ok` without marking the block as available. + return Ok(Availability::MissingComponents(block_root)); }; let mut write_lock = self.critical.write(); @@ -529,7 +529,7 @@ impl DataAvailabilityCheckerInner { let num_expected_columns = self .custody_context - .num_of_data_columns_to_sample(Some(epoch), &self.spec); + .num_of_data_columns_to_sample(epoch, &self.spec); debug!( component = "data_columns", ?block_root, @@ -627,7 +627,7 @@ impl DataAvailabilityCheckerInner { let num_expected_columns = self .custody_context - .num_of_data_columns_to_sample(Some(epoch), &self.spec); + .num_of_data_columns_to_sample(epoch, &self.spec); debug!( component = "block", ?block_root, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index e079b5ab78..821e538b84 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -198,7 +198,7 @@ impl Clone for GossipVerifiedDataCo impl GossipVerifiedDataColumn { pub fn new( column_sidecar: Arc>, - subnet_id: u64, + subnet_id: DataColumnSubnetId, chain: &BeaconChain, ) -> Result { let header = column_sidecar.signed_block_header.clone(); @@ -472,7 +472,7 @@ where pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, - subnet: u64, + subnet: DataColumnSubnetId, chain: &BeaconChain, ) -> Result, GossipDataColumnError> { let column_slot = data_column.slot(); @@ -735,15 +735,14 @@ fn verify_proposer_and_signature( fn verify_index_matches_subnet( data_column: &DataColumnSidecar, - subnet: u64, + subnet: DataColumnSubnetId, spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { - let expected_subnet: u64 = - DataColumnSubnetId::from_column_index(data_column.index, spec).into(); + let expected_subnet = DataColumnSubnetId::from_column_index(data_column.index, spec); if expected_subnet != subnet { return Err(GossipDataColumnError::InvalidSubnetId { - received: subnet, - expected: expected_subnet, + received: subnet.into(), + expected: expected_subnet.into(), }); } Ok(()) @@ -821,7 +820,7 @@ mod test { }; use crate::observed_data_sidecars::Observe; use crate::test_utils::BeaconChainHarness; - use types::{DataColumnSidecar, EthSpec, ForkName, MainnetEthSpec}; + use types::{DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkName, MainnetEthSpec}; type E = MainnetEthSpec; @@ -860,7 +859,7 @@ mod test { let result = validate_data_column_sidecar_for_gossip::<_, Observe>( column_sidecar.into(), - index, + DataColumnSubnetId::from_column_index(index, &harness.spec), &harness.chain, ); assert!(matches!( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index efc7854fa5..5dd1993092 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -31,7 +31,6 @@ use metrics::{inc_counter, TryExt}; use mockall_double::double; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; -use std::collections::HashSet; use std::sync::Arc; use tracing::{debug, warn}; use types::blob_sidecar::BlobSidecarError; @@ -73,7 +72,7 @@ pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, block: Arc>>, - custody_columns: HashSet, + custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( @@ -92,7 +91,7 @@ async fn fetch_and_process_engine_blobs_inner( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, block: Arc>>, - custody_columns: HashSet, + custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let versioned_hashes = if let Some(kzg_commitments) = block @@ -238,7 +237,7 @@ async fn fetch_and_process_blobs_v2( block_root: Hash256, block: Arc>, versioned_hashes: Vec, - custody_columns_indices: HashSet, + custody_columns_indices: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); @@ -337,11 +336,12 @@ async fn compute_custody_columns_to_import( block: Arc>>, blobs: Vec>, proofs: Vec>, - custody_columns_indices: HashSet, + custody_columns_indices: &[ColumnIndex], ) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); + let custody_columns_indices = custody_columns_indices.to_vec(); chain_adapter .executor() .spawn_blocking_handle( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index f1ffabdd8f..a59db19fca 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -21,6 +21,7 @@ type T = EphemeralHarnessType; mod get_blobs_v2 { use super::*; + use types::ColumnIndex; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_fetch_blobs_v2_no_blobs_in_block() { @@ -36,12 +37,12 @@ mod get_blobs_v2 { mock_adapter.expect_get_blobs_v2().times(0); mock_adapter.expect_process_engine_blobs().times(0); - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, Arc::new(block), - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -61,12 +62,12 @@ mod get_blobs_v2 { mock_get_blobs_v2_response(&mut mock_adapter, None); // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -89,12 +90,12 @@ mod get_blobs_v2 { mock_adapter.expect_process_engine_blobs().times(0); // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -122,12 +123,12 @@ mod get_blobs_v2 { mock_adapter.expect_process_engine_blobs().times(0); // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -161,12 +162,12 @@ mod get_blobs_v2 { mock_adapter.expect_process_engine_blobs().times(0); // **WHEN**: Trigger `fetch_blobs` on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -203,12 +204,12 @@ mod get_blobs_v2 { ); // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns.clone(), + &custody_columns, publish_fn, ) .await @@ -252,6 +253,7 @@ mod get_blobs_v1 { use super::*; use crate::block_verification_types::AsBlock; use std::collections::HashSet; + use types::ColumnIndex; const ELECTRA_FORK: ForkName = ForkName::Electra; @@ -268,12 +270,12 @@ mod get_blobs_v1 { mock_adapter.expect_get_blobs_v1().times(0); // WHEN: Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, Arc::new(block_no_blobs), - custody_columns, + &custody_columns, publish_fn, ) .await @@ -295,12 +297,12 @@ mod get_blobs_v1 { mock_get_blobs_v1_response(&mut mock_adapter, vec![None; expected_blob_count]); // WHEN: Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns, + &custody_columns, publish_fn, ) .await @@ -341,12 +343,12 @@ mod get_blobs_v1 { ); // WHEN: Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns, + &custody_columns, publish_fn, ) .await @@ -381,12 +383,12 @@ mod get_blobs_v1 { mock_fork_choice_contains_block(&mut mock_adapter, vec![block.canonical_root()]); // WHEN: Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns, + &custody_columns, publish_fn, ) .await @@ -429,12 +431,12 @@ mod get_blobs_v1 { .returning(move |_, _| Some(all_blob_indices.clone())); // **WHEN**: Trigger `fetch_blobs` on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns, + &custody_columns, publish_fn, ) .await @@ -473,12 +475,12 @@ mod get_blobs_v1 { ); // Trigger fetch blobs on the block - let custody_columns = hashset![0, 1, 2]; + let custody_columns: [ColumnIndex; 3] = [0, 1, 2]; let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, block, - custody_columns, + &custody_columns, publish_fn, ) .await diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 2c4981078d..62e20cba77 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -606,6 +606,15 @@ where let chain = builder.build().expect("should build"); + chain + .data_availability_checker + .custody_context() + .init_ordered_data_columns_from_custody_groups( + (0..spec.number_of_custody_groups).collect(), + &spec, + ) + .expect("should initialise custody context"); + BeaconChainHarness { spec: chain.spec.clone(), chain: Arc::new(chain), @@ -773,13 +782,6 @@ where (0..self.validator_keypairs.len()).collect() } - pub fn get_sampling_column_count(&self) -> usize { - self.chain - .data_availability_checker - .custody_context() - .num_of_data_columns_to_sample(None, &self.chain.spec) as usize - } - pub fn slots_per_epoch(&self) -> u64 { E::slots_per_epoch() } @@ -2385,7 +2387,8 @@ where blob_items: Option<(KzgProofs, BlobsList)>, ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { - let sampling_column_count = self.get_sampling_column_count(); + let epoch = block.slot().epoch(E::slots_per_epoch()); + let sampling_columns = self.chain.sampling_columns_for_epoch(epoch); if blob_items.is_some_and(|(_, blobs)| !blobs.is_empty()) { // Note: this method ignores the actual custody columns and just take the first @@ -2393,7 +2396,7 @@ where // currently have any knowledge of the columns being custodied. let columns = generate_data_column_sidecars_from_block(&block, &self.spec) .into_iter() - .take(sampling_column_count) + .filter(|d| sampling_columns.contains(&d.index)) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)? @@ -3123,17 +3126,22 @@ where let is_peerdas_enabled = self.chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); if is_peerdas_enabled { let custody_columns = custody_columns_opt.unwrap_or_else(|| { - let sampling_column_count = self.get_sampling_column_count() as u64; - (0..sampling_column_count).collect() + let epoch = block.slot().epoch(E::slots_per_epoch()); + self.chain + .sampling_columns_for_epoch(epoch) + .iter() + .copied() + .collect() }); let verified_columns = generate_data_column_sidecars_from_block(block, &self.spec) .into_iter() .filter(|c| custody_columns.contains(&c.index)) .map(|sidecar| { - let column_index = sidecar.index; + let subnet_id = + DataColumnSubnetId::from_column_index(sidecar.index, &self.spec); self.chain - .verify_data_column_sidecar_for_gossip(sidecar, column_index) + .verify_data_column_sidecar_for_gossip(sidecar, subnet_id) }) .collect::, _>>() .unwrap(); diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs index 4224125a2a..4e189b5bad 100644 --- a/beacon_node/beacon_chain/src/validator_custody.rs +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -1,12 +1,13 @@ +use parking_lot::RwLock; +use ssz_derive::{Decode, Encode}; +use std::marker::PhantomData; +use std::sync::OnceLock; use std::{ collections::{BTreeMap, HashMap}, sync::atomic::{AtomicU64, Ordering}, }; - -use parking_lot::RwLock; - -use ssz_derive::{Decode, Encode}; -use types::{ChainSpec, Epoch, EthSpec, Slot}; +use types::data_column_custody_group::{compute_columns_for_custody_group, CustodyIndex}; +use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; /// A delay before making the CGC change effective to the data availability checker. const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; @@ -120,7 +121,7 @@ fn get_validators_custody_requirement(validator_custody_units: u64, spec: &Chain /// Contains all the information the node requires to calculate the /// number of columns to be custodied when checking for DA. #[derive(Debug)] -pub struct CustodyContext { +pub struct CustodyContext { /// The Number of custody groups required based on the number of validators /// that is attached to this node. /// @@ -139,9 +140,13 @@ pub struct CustodyContext { persisted_is_supernode: bool, /// Maintains all the validators that this node is connected to currently validator_registrations: RwLock, + /// Stores an immutable, ordered list of all custody columns as determined by the node's NodeID + /// on startup. + all_custody_columns_ordered: OnceLock>, + _phantom_data: PhantomData, } -impl CustodyContext { +impl CustodyContext { /// Create a new custody default custody context object when no persisted object /// exists. /// @@ -152,6 +157,8 @@ impl CustodyContext { current_is_supernode: is_supernode, persisted_is_supernode: is_supernode, validator_registrations: Default::default(), + all_custody_columns_ordered: OnceLock::new(), + _phantom_data: PhantomData, } } @@ -170,16 +177,44 @@ impl CustodyContext { .into_iter() .collect(), }), + all_custody_columns_ordered: OnceLock::new(), + _phantom_data: PhantomData, } } + /// Initializes an ordered list of data columns based on provided custody groups. + /// + /// # Arguments + /// * `all_custody_groups_ordered` - Vector of custody group indices to map to columns + /// * `spec` - Chain specification containing custody parameters + /// + /// # Returns + /// Ok(()) if initialization succeeds, Err with description string if it fails + pub fn init_ordered_data_columns_from_custody_groups( + &self, + all_custody_groups_ordered: Vec, + spec: &ChainSpec, + ) -> Result<(), String> { + let mut ordered_custody_columns = vec![]; + for custody_index in all_custody_groups_ordered { + let columns = compute_columns_for_custody_group(custody_index, spec) + .map_err(|e| format!("Failed to compute columns for custody group {e:?}"))?; + ordered_custody_columns.extend(columns); + } + self.all_custody_columns_ordered + .set(ordered_custody_columns.into_boxed_slice()) + .map_err(|_| { + "Failed to initialise CustodyContext with computed custody columns".to_string() + }) + } + /// Register a new validator index and updates the list of validators if required. /// /// Also modifies the internal structures if the validator custody has changed to /// update the `custody_column_count`. /// /// Returns `Some` along with the updated custody group count if it has changed, otherwise returns `None`. - pub fn register_validators( + pub fn register_validators( &self, validators_and_balance: ValidatorsAndBalances, current_slot: Slot, @@ -215,8 +250,7 @@ impl CustodyContext { ); return Some(CustodyCountChanged { new_custody_group_count: updated_cgc, - sampling_count: self - .num_of_custody_groups_to_sample(Some(effective_epoch), spec), + sampling_count: self.num_of_custody_groups_to_sample(effective_epoch, spec), effective_epoch, }); } @@ -248,39 +282,48 @@ impl CustodyContext { /// minimum sampling size which may exceed the custody group count (CGC). /// /// See also: [`Self::num_of_custody_groups_to_sample`]. - fn custody_group_count_at_epoch(&self, epoch_opt: Option, spec: &ChainSpec) -> u64 { + fn custody_group_count_at_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { let custody_group_count = if self.current_is_supernode { spec.number_of_custody_groups - } else if let Some(epoch) = epoch_opt { + } else { self.validator_registrations .read() .custody_requirement_at_epoch(epoch) .unwrap_or(spec.custody_requirement) - } else { - self.custody_group_count_at_head(spec) }; custody_group_count } /// Returns the count of custody groups this node must _sample_ for a block at `epoch` to import. - /// If an `epoch` is not specified, returns the *current* validator custody requirement. - pub fn num_of_custody_groups_to_sample( - &self, - epoch_opt: Option, - spec: &ChainSpec, - ) -> u64 { - let custody_group_count = self.custody_group_count_at_epoch(epoch_opt, spec); + pub fn num_of_custody_groups_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> u64 { + let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); spec.sampling_size_custody_groups(custody_group_count) .expect("should compute node sampling size from valid chain spec") } /// Returns the count of columns this node must _sample_ for a block at `epoch` to import. - /// If an `epoch` is not specified, returns the *current* validator custody requirement. - pub fn num_of_data_columns_to_sample(&self, epoch_opt: Option, spec: &ChainSpec) -> u64 { - let custody_group_count = self.custody_group_count_at_epoch(epoch_opt, spec); + pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize { + let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); spec.sampling_size_columns(custody_group_count) .expect("should compute node sampling size from valid chain spec") } + + /// Returns the ordered list of column indices that should be sampled for data availability checking at the given epoch. + /// + /// # Parameters + /// * `epoch` - Epoch to determine sampling columns for + /// * `spec` - Chain specification containing sampling parameters + /// + /// # Returns + /// A slice of ordered column indices that should be sampled for this epoch based on the node's custody configuration + pub fn sampling_columns_for_epoch(&self, epoch: Epoch, spec: &ChainSpec) -> &[ColumnIndex] { + let num_of_columns_to_sample = self.num_of_data_columns_to_sample(epoch, spec); + let all_columns_ordered = self + .all_custody_columns_ordered + .get() + .expect("all_custody_columns_ordered should be initialized"); + &all_columns_ordered[..num_of_columns_to_sample] + } } /// The custody count changed because of a change in the @@ -299,8 +342,8 @@ pub struct CustodyContextSsz { pub epoch_validator_custody_requirements: Vec<(Epoch, u64)>, } -impl From<&CustodyContext> for CustodyContextSsz { - fn from(context: &CustodyContext) -> Self { +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { CustodyContextSsz { validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), persisted_is_supernode: context.persisted_is_supernode, @@ -317,6 +360,8 @@ impl From<&CustodyContext> for CustodyContextSsz { #[cfg(test)] mod tests { + use rand::seq::SliceRandom; + use rand::thread_rng; use types::MainnetEthSpec; use super::*; @@ -325,21 +370,21 @@ mod tests { #[test] fn no_validators_supernode_default() { - let custody_context = CustodyContext::new(true); + let custody_context = CustodyContext::::new(true); let spec = E::default_spec(); assert_eq!( custody_context.custody_group_count_at_head(&spec), spec.number_of_custody_groups ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(None, &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec), spec.number_of_custody_groups ); } #[test] fn no_validators_fullnode_default() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); assert_eq!( custody_context.custody_group_count_at_head(&spec), @@ -347,14 +392,14 @@ mod tests { "head custody count should be minimum spec custody requirement" ); assert_eq!( - custody_context.num_of_custody_groups_to_sample(None, &spec), + custody_context.num_of_custody_groups_to_sample(Epoch::new(0), &spec), spec.samples_per_slot ); } #[test] fn register_single_validator_should_update_cgc() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; @@ -369,7 +414,7 @@ mod tests { (vec![(0, 10 * bal_per_additional_group)], Some(10)), ]; - register_validators_and_assert_cgc( + register_validators_and_assert_cgc::( &custody_context, validators_and_expected_cgc_change, &spec, @@ -378,7 +423,7 @@ mod tests { #[test] fn register_multiple_validators_should_update_cgc() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; @@ -406,12 +451,16 @@ mod tests { ), ]; - register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + register_validators_and_assert_cgc::( + &custody_context, + validators_and_expected_cgc, + &spec, + ); } #[test] fn register_validators_should_not_update_cgc_for_supernode() { - let custody_context = CustodyContext::new(true); + let custody_context = CustodyContext::::new(true); let spec = E::default_spec(); let bal_per_additional_group = spec.balance_per_additional_custody_group; @@ -435,23 +484,29 @@ mod tests { ), ]; - register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + register_validators_and_assert_cgc::( + &custody_context, + validators_and_expected_cgc, + &spec, + ); + let current_epoch = Epoch::new(2); assert_eq!( - custody_context.num_of_custody_groups_to_sample(None, &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch, &spec), spec.number_of_custody_groups ); } #[test] fn cgc_change_should_be_effective_to_sampling_after_delay() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); let current_slot = Slot::new(10); let current_epoch = current_slot.epoch(E::slots_per_epoch()); - let default_sampling_size = custody_context.num_of_custody_groups_to_sample(None, &spec); + let default_sampling_size = + custody_context.num_of_custody_groups_to_sample(current_epoch, &spec); let validator_custody_units = 10; - let _cgc_changed = custody_context.register_validators::( + let _cgc_changed = custody_context.register_validators( vec![( 0, validator_custody_units * spec.balance_per_additional_custody_group, @@ -462,26 +517,26 @@ mod tests { // CGC update is not applied for `current_epoch`. assert_eq!( - custody_context.num_of_custody_groups_to_sample(Some(current_epoch), &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch, &spec), default_sampling_size ); // CGC update is applied for the next epoch. assert_eq!( - custody_context.num_of_custody_groups_to_sample(Some(current_epoch + 1), &spec), + custody_context.num_of_custody_groups_to_sample(current_epoch + 1, &spec), validator_custody_units ); } #[test] fn validator_dropped_after_no_registrations_within_expiry_should_not_reduce_cgc() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); let current_slot = Slot::new(10); let val_custody_units_1 = 10; let val_custody_units_2 = 5; // GIVEN val_1 and val_2 registered at `current_slot` - let _ = custody_context.register_validators::( + let _ = custody_context.register_validators( vec![ ( 1, @@ -497,7 +552,7 @@ mod tests { ); // WHEN val_1 re-registered, but val_2 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots - let cgc_changed_opt = custody_context.register_validators::( + let cgc_changed_opt = custody_context.register_validators( vec![( 1, val_custody_units_1 * spec.balance_per_additional_custody_group, @@ -516,7 +571,7 @@ mod tests { #[test] fn validator_dropped_after_no_registrations_within_expiry() { - let custody_context = CustodyContext::new(false); + let custody_context = CustodyContext::::new(false); let spec = E::default_spec(); let current_slot = Slot::new(10); let val_custody_units_1 = 10; @@ -524,7 +579,7 @@ mod tests { let val_custody_units_3 = 6; // GIVEN val_1 and val_2 registered at `current_slot` - let _ = custody_context.register_validators::( + let _ = custody_context.register_validators( vec![ ( 1, @@ -540,7 +595,7 @@ mod tests { ); // WHEN val_1 and val_3 registered, but val_3 did not re-register after `VALIDATOR_REGISTRATION_EXPIRY_SLOTS + 1` slots - let cgc_changed = custody_context.register_validators::( + let cgc_changed = custody_context.register_validators( vec![ ( 1, @@ -564,9 +619,40 @@ mod tests { ); } - /// Update validator every epoch and assert cgc against expected values. - fn register_validators_and_assert_cgc( - custody_context: &CustodyContext, + #[test] + fn should_init_ordered_data_columns_and_return_sampling_columns() { + let spec = E::default_spec(); + let custody_context = CustodyContext::::new(false); + let sampling_size = custody_context.num_of_data_columns_to_sample(Epoch::new(0), &spec); + + // initialise ordered columns + let mut all_custody_groups_ordered = (0..spec.number_of_custody_groups).collect::>(); + all_custody_groups_ordered.shuffle(&mut thread_rng()); + + custody_context + .init_ordered_data_columns_from_custody_groups( + all_custody_groups_ordered.clone(), + &spec, + ) + .expect("should initialise ordered data columns"); + + let actual_sampling_columns = + custody_context.sampling_columns_for_epoch(Epoch::new(0), &spec); + + let expected_sampling_columns = &all_custody_groups_ordered + .iter() + .flat_map(|custody_index| { + compute_columns_for_custody_group(*custody_index, &spec) + .expect("should compute columns for custody group") + }) + .collect::>()[0..sampling_size]; + + assert_eq!(actual_sampling_columns, expected_sampling_columns) + } + + /// Update the validator every epoch and assert cgc against expected values. + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, spec: &ChainSpec, ) { @@ -575,7 +661,7 @@ mod tests { { let epoch = Epoch::new(idx as u64); let updated_custody_count_opt = custody_context - .register_validators::( + .register_validators( validators_and_balance, epoch.start_slot(E::slots_per_epoch()), spec, diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9a6a789b42..92fea70a1d 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1311,7 +1311,7 @@ async fn verify_and_process_gossip_data_sidecars( ); harness.chain.verify_data_column_sidecar_for_gossip( column_sidecar.into_inner(), - *subnet_id, + subnet_id, ) }) .collect::, _>>() diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 5d0f22e252..74a98b1183 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; use types::test_utils::TestRandom; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, + BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, Slot, }; type E = MinimalEthSpec; @@ -64,8 +64,17 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { // build and process a gossip verified data column let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng)); - let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar); + let sidecar = { + // DA checker only accepts sampling columns, so we need to create one with a sampling index. + let mut random_sidecar = DataColumnSidecar::random_for_test(&mut rng); + let slot = Slot::new(10); + let epoch = slot.epoch(E::slots_per_epoch()); + random_sidecar.signed_block_header.message.slot = slot; + random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + random_sidecar + }; + let gossip_verified_data_column = + GossipVerifiedDataColumn::__new_for_testing(Arc::new(sidecar)); let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar( gossip_verified_data_column.as_data_column(), ); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 479b4b3192..7308a4775d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -42,6 +42,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use store::database::interface::BeaconNodeBackend; use timer::spawn_timer; use tracing::{debug, info, warn}; +use types::data_column_custody_group::get_custody_groups_ordered; use types::{ test_utils::generate_deterministic_keypairs, BeaconState, BlobSidecarList, ChainSpec, EthSpec, ExecutionBlockHash, Hash256, SignedBeaconBlock, @@ -477,7 +478,7 @@ where }; let (network_globals, network_senders) = NetworkService::start( - beacon_chain, + beacon_chain.clone(), config, context.executor, libp2p_registry.as_mut(), @@ -486,6 +487,8 @@ where .await .map_err(|e| format!("Failed to start network: {:?}", e))?; + init_custody_context(beacon_chain, &network_globals)?; + self.network_globals = Some(network_globals); self.network_senders = Some(network_senders); self.libp2p_registry = libp2p_registry; @@ -787,6 +790,21 @@ where } } +fn init_custody_context( + chain: Arc>, + network_globals: &NetworkGlobals, +) -> Result<(), String> { + let node_id = network_globals.local_enr().node_id().raw(); + let spec = &chain.spec; + let custody_groups_ordered = + get_custody_groups_ordered(node_id, spec.number_of_custody_groups, spec) + .map_err(|e| format!("Failed to compute custody groups: {:?}", e))?; + chain + .data_availability_checker + .custody_context() + .init_ordered_data_columns_from_custody_groups(custody_groups_ordered, spec) +} + impl ClientBuilder> where diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index c66ddacdaf..870276c080 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1433,14 +1433,12 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |value: serde_json::Value, consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let request = PublishBlockRequest::::context_deserialize( &value, @@ -1456,7 +1454,6 @@ pub fn serve( &network_tx, BroadcastValidation::default(), duplicate_block_status_code, - network_globals, ) .await }) @@ -1472,14 +1469,12 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |block_bytes: Bytes, consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( &block_bytes, @@ -1495,7 +1490,6 @@ pub fn serve( &network_tx, BroadcastValidation::default(), duplicate_block_status_code, - network_globals, ) .await }) @@ -1512,15 +1506,13 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, value: serde_json::Value, consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let request = PublishBlockRequest::::context_deserialize( &value, @@ -1537,7 +1529,6 @@ pub fn serve( &network_tx, validation_level.broadcast_validation, duplicate_block_status_code, - network_globals, ) .await }) @@ -1554,15 +1545,13 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, block_bytes: Bytes, consensus_version: ForkName, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( &block_bytes, @@ -1578,7 +1567,6 @@ pub fn serve( &network_tx, validation_level.broadcast_validation, duplicate_block_status_code, - network_globals, ) .await }) @@ -1598,13 +1586,11 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |block_contents: Arc>, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( block_contents, @@ -1612,7 +1598,6 @@ pub fn serve( &network_tx, BroadcastValidation::default(), duplicate_block_status_code, - network_globals, ) .await }) @@ -1628,13 +1613,11 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |block_bytes: Bytes, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( &block_bytes, @@ -1650,7 +1633,6 @@ pub fn serve( &network_tx, BroadcastValidation::default(), duplicate_block_status_code, - network_globals, ) .await }) @@ -1666,14 +1648,12 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, blinded_block: Arc>, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( blinded_block, @@ -1681,7 +1661,6 @@ pub fn serve( &network_tx, validation_level.broadcast_validation, duplicate_block_status_code, - network_globals, ) .await }) @@ -1697,14 +1676,12 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) - .and(network_globals.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, block_bytes: Bytes, task_spawner: TaskSpawner, chain: Arc>, - network_tx: UnboundedSender>, - network_globals: Arc>| { + network_tx: UnboundedSender>| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( &block_bytes, @@ -1720,7 +1697,6 @@ pub fn serve( &network_tx, validation_level.broadcast_validation, duplicate_block_status_code, - network_globals, ) .await }) @@ -3839,11 +3815,8 @@ pub fn serve( if let Some(cgc_change) = chain .data_availability_checker .custody_context() - .register_validators::( - validators_and_balances, - current_slot, - &chain.spec, - ) { + .register_validators(validators_and_balances, current_slot, &chain.spec) + { chain.update_data_column_custody_info(Some( cgc_change .effective_epoch diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index c1b86416b1..352c06e174 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -15,7 +15,7 @@ use eth2::types::{ }; use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use futures::TryFutureExt; -use lighthouse_network::{NetworkGlobals, PubsubMessage}; +use lighthouse_network::PubsubMessage; use network::NetworkMessage; use rand::prelude::SliceRandom; use slot_clock::SlotClock; @@ -82,7 +82,6 @@ pub async fn publish_block>( network_tx: &UnboundedSender>, validation_level: BroadcastValidation, duplicate_status_code: StatusCode, - network_globals: Arc>, ) -> Result { let seen_timestamp = timestamp_now(); let block_publishing_delay_for_testing = chain.config.block_publishing_delay; @@ -223,7 +222,8 @@ pub async fn publish_block>( publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; - let sampling_columns_indices = &network_globals.sampling_columns(); + let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns_indices = chain.sampling_columns_for_epoch(epoch); let sampling_columns = gossip_verified_columns .into_iter() .flatten() @@ -405,7 +405,7 @@ fn build_gossip_verified_data_columns( let column_index = data_column_sidecar.index; let subnet = DataColumnSubnetId::from_column_index(column_index, &chain.spec); let gossip_verified_column = - GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), chain); + GossipVerifiedDataColumn::new(data_column_sidecar, subnet, chain); match gossip_verified_column { Ok(blob) => Ok(Some(blob)), @@ -633,7 +633,6 @@ pub async fn publish_blinded_block( network_tx: &UnboundedSender>, validation_level: BroadcastValidation, duplicate_status_code: StatusCode, - network_globals: Arc>, ) -> Result { let block_root = blinded_block.canonical_root(); let full_block_opt = reconstruct_block(chain.clone(), block_root, blinded_block).await?; @@ -646,7 +645,6 @@ pub async fn publish_blinded_block( network_tx, validation_level, duplicate_status_code, - network_globals, ) .await } else { diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 95c21d8fe2..78b52f86e3 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -372,7 +372,6 @@ pub async fn consensus_partial_pass_only_consensus() { /* submit `block_b` which should induce equivocation */ let channel = tokio::sync::mpsc::unbounded_channel(); - let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -381,7 +380,6 @@ pub async fn consensus_partial_pass_only_consensus() { &channel.0, validation_level, StatusCode::ACCEPTED, - network_globals, ) .await; @@ -663,7 +661,6 @@ pub async fn equivocation_consensus_late_equivocation() { assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); - let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -672,7 +669,6 @@ pub async fn equivocation_consensus_late_equivocation() { &channel.0, validation_level, StatusCode::ACCEPTED, - network_globals, ) .await; @@ -1302,7 +1298,6 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); - let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_blinded_block( block_b, @@ -1310,7 +1305,6 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { &channel.0, validation_level, StatusCode::ACCEPTED, - network_globals, ) .await; @@ -1487,7 +1481,7 @@ pub async fn block_seen_on_gossip_with_some_blobs_or_columns() { &block, partial_blobs.iter(), partial_kzg_proofs.iter(), - Some(get_custody_columns(&tester)), + Some(get_custody_columns(&tester, block.slot())), ) .await; @@ -1558,7 +1552,7 @@ pub async fn blobs_or_columns_seen_on_gossip_without_block() { &block, blobs.iter(), kzg_proofs.iter(), - Some(get_custody_columns(&tester)), + Some(get_custody_columns(&tester, block.slot())), ) .await; @@ -1629,7 +1623,7 @@ async fn blobs_or_columns_seen_on_gossip_without_block_and_no_http_blobs_or_colu &block, blobs.iter(), kzg_proofs.iter(), - Some(get_custody_columns(&tester)), + Some(get_custody_columns(&tester, block.slot())), ) .await; @@ -1703,7 +1697,7 @@ async fn slashable_blobs_or_columns_seen_on_gossip_cause_failure() { &block_b, blobs_b.iter(), kzg_proofs_b.iter(), - Some(get_custody_columns(&tester)), + Some(get_custody_columns(&tester, block_b.slot())), ) .await; @@ -1802,11 +1796,15 @@ fn assert_server_message_error(error_response: eth2::Error, expected_message: St assert_eq!(err.message, expected_message); } -fn get_custody_columns(tester: &InteractiveTester) -> HashSet { +fn get_custody_columns(tester: &InteractiveTester, slot: Slot) -> HashSet { + let epoch = slot.epoch(E::slots_per_epoch()); tester .ctx - .network_globals + .chain .as_ref() .unwrap() - .sampling_columns() + .sampling_columns_for_epoch(epoch) + .iter() + .copied() + .collect() } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 4a6f34c76d..2c95b59f4f 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -898,9 +898,9 @@ impl Network { name = "libp2p", skip_all )] - pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + pub fn subscribe_new_data_column_subnets(&mut self, sampling_column_count: u64) { self.network_globals - .update_data_column_subnets(custody_column_count); + .update_data_column_subnets(sampling_column_count); for column in self.network_globals.sampling_subnets() { let kind = GossipKind::DataColumnSidecar(column); diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index cc4d758b4a..082097f926 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -8,9 +8,7 @@ use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; use tracing::error; -use types::data_column_custody_group::{ - compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups, -}; +use types::data_column_custody_group::{compute_subnets_from_custody_group, get_custody_groups}; use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; pub struct NetworkGlobals { @@ -32,7 +30,6 @@ pub struct NetworkGlobals { pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. pub sampling_subnets: RwLock>, - pub sampling_columns: RwLock>, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -78,16 +75,8 @@ impl NetworkGlobals { sampling_subnets.extend(subnets); } - let mut sampling_columns = HashSet::new(); - for custody_index in &custody_groups { - let columns = compute_columns_for_custody_group(*custody_index, &spec) - .expect("should compute custody columns for node"); - sampling_columns.extend(columns); - } - tracing::debug!( cgc = custody_group_count, - ?sampling_columns, ?sampling_subnets, "Starting node with custody params" ); @@ -102,20 +91,15 @@ impl NetworkGlobals { sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), sampling_subnets: RwLock::new(sampling_subnets), - sampling_columns: RwLock::new(sampling_columns), config, spec, } } /// Update the sampling subnets based on an updated cgc. - pub fn update_data_column_subnets(&self, custody_group_count: u64) { + pub fn update_data_column_subnets(&self, sampling_size: u64) { // The below `expect` calls will panic on start up if the chain spec config values used // are invalid - let sampling_size = self - .spec - .sampling_size_custody_groups(custody_group_count) - .expect("should compute node sampling size from valid chain spec"); let custody_groups = get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec) .expect("should compute node custody groups"); @@ -126,13 +110,6 @@ impl NetworkGlobals { .expect("should compute custody subnets for node"); sampling_subnets.extend(subnets); } - - let mut sampling_columns = self.sampling_columns.write(); - for custody_index in &custody_groups { - let columns = compute_columns_for_custody_group(*custody_index, &self.spec) - .expect("should compute custody columns for node"); - sampling_columns.extend(columns); - } } /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect @@ -248,10 +225,6 @@ impl NetworkGlobals { } } - pub fn sampling_columns(&self) -> HashSet { - self.sampling_columns.read().clone() - } - pub fn sampling_subnets(&self) -> HashSet { self.sampling_subnets.read().clone() } @@ -320,29 +293,6 @@ mod test { ); } - #[test] - fn test_sampling_columns() { - create_test_tracing_subscriber(); - let mut spec = E::default_spec(); - spec.fulu_fork_epoch = Some(Epoch::new(0)); - - let custody_group_count = spec.number_of_custody_groups / 2; - let expected_sampling_columns = spec.sampling_size_columns(custody_group_count).unwrap(); - let metadata = get_metadata(custody_group_count); - let config = Arc::new(NetworkConfig::default()); - - let globals = NetworkGlobals::::new_test_globals_with_metadata( - vec![], - metadata, - config, - Arc::new(spec), - ); - assert_eq!( - globals.sampling_columns.read().len(), - expected_sampling_columns as usize - ); - } - fn get_metadata(custody_group_count: u64) -> MetaData { MetaData::V3(MetaDataV3 { seq_number: 0, diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 349bfe66a3..caec40fa2f 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -188,8 +188,8 @@ impl std::fmt::Display for GossipKind { GossipKind::BlobSidecar(blob_index) => { write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index) } - GossipKind::DataColumnSidecar(column_index) => { - write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_index) + GossipKind::DataColumnSidecar(column_subnet_id) => { + write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_subnet_id) } x => f.write_str(x.as_ref()), } @@ -317,8 +317,8 @@ impl std::fmt::Display for GossipTopic { GossipKind::BlobSidecar(blob_index) => { format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) } - GossipKind::DataColumnSidecar(index) => { - format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *index) + GossipKind::DataColumnSidecar(column_subnet_id) => { + format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *column_subnet_id) } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 47d1546506..fa78163f99 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -607,7 +607,6 @@ impl NetworkBeaconProcessor { self: &Arc, message_id: MessageId, peer_id: PeerId, - _peer_client: Client, subnet_id: DataColumnSubnetId, column_sidecar: Arc>, seen_duration: Duration, @@ -623,7 +622,7 @@ impl NetworkBeaconProcessor { ); match self .chain - .verify_data_column_sidecar_for_gossip(column_sidecar.clone(), *subnet_id) + .verify_data_column_sidecar_for_gossip(column_sidecar.clone(), subnet_id) { Ok(gossip_verified_data_column) => { metrics::inc_counter( @@ -650,6 +649,7 @@ impl NetworkBeaconProcessor { duration, ); } + self.process_gossip_verified_data_column( peer_id, gossip_verified_data_column, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ea46c3d0d1..c4ffe8ad88 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -227,7 +227,6 @@ impl NetworkBeaconProcessor { self: &Arc, message_id: MessageId, peer_id: PeerId, - peer_client: Client, subnet_id: DataColumnSubnetId, column_sidecar: Arc>, seen_timestamp: Duration, @@ -238,7 +237,6 @@ impl NetworkBeaconProcessor { .process_gossip_data_column_sidecar( message_id, peer_id, - peer_client, subnet_id, column_sidecar, seen_timestamp, @@ -753,7 +751,8 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { - let custody_columns = self.network_globals.sampling_columns(); + let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let custody_columns = self.chain.sampling_columns_for_epoch(epoch); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 109c361ebe..6408fcffd9 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -37,8 +37,9 @@ use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, - DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + DataColumnSubnetId, Epoch, EthSpec, Hash256, MainnetEthSpec, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, + SubnetId, }; type E = MainnetEthSpec; @@ -271,6 +272,8 @@ impl TestRig { let (blob_sidecars, data_columns) = if let Some((kzg_proofs, blobs)) = next_block_tuple.1 { if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let kzg = get_kzg(&chain.spec); + let epoch = block.slot().epoch(E::slots_per_epoch()); + let sampling_indices = chain.sampling_columns_for_epoch(epoch); let custody_columns: DataColumnSidecarList = blobs_to_data_column_sidecars( &blobs.iter().collect_vec(), kzg_proofs.clone().into_iter().collect_vec(), @@ -280,7 +283,7 @@ impl TestRig { ) .unwrap() .into_iter() - .filter(|c| network_globals.sampling_columns().contains(&c.index)) + .filter(|c| sampling_indices.contains(&c.index)) .collect::>(); (None, Some(custody_columns)) @@ -357,7 +360,6 @@ impl TestRig { .send_gossip_data_column_sidecar( junk_message_id(), junk_peer_id(), - Client::default(), DataColumnSubnetId::from_column_index(data_column.index, &self.chain.spec), data_column.clone(), Duration::from_secs(0), @@ -807,7 +809,8 @@ async fn accept_processed_gossip_data_columns_without_import() { .unwrap() .into_iter() .map(|data_column| { - let subnet_id = data_column.index; + let subnet_id = + DataColumnSubnetId::from_column_index(data_column.index, &rig.chain.spec); validate_data_column_sidecar_for_gossip::<_, DoNotObserve>( data_column, subnet_id, @@ -820,7 +823,7 @@ async fn accept_processed_gossip_data_columns_without_import() { let block_root = rig.next_block.canonical_root(); rig.chain .data_availability_checker - .put_gossip_verified_data_columns(block_root, verified_data_columns) + .put_gossip_verified_data_columns(block_root, rig.next_block.slot(), verified_data_columns) .expect("should put data columns into availability cache"); // WHEN an already processed but unobserved data column is received via gossip diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 5d5daae4ae..305629cf52 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -379,7 +379,6 @@ impl Router { .send_gossip_data_column_sidecar( message_id, peer_id, - self.network_globals.client(&peer_id), subnet_id, column_sidecar, timestamp_now(), diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a62b8f7382..f541110fea 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -551,7 +551,13 @@ impl SyncNetworkContext { // Attempt to find all required custody peers before sending any request or creating an ID let columns_by_range_peers_to_request = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns(); + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + let column_indexes = self + .chain + .sampling_columns_for_epoch(epoch) + .iter() + .cloned() + .collect(); Some(self.select_columns_by_range_peers_to_request( &column_indexes, peers, @@ -602,18 +608,14 @@ impl SyncNetworkContext { }) .transpose()?; + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); let info = RangeBlockComponentsRequest::new( blocks_req_id, blobs_req_id, data_column_requests.map(|data_column_requests| { ( data_column_requests, - self.network_globals() - .sampling_columns() - .clone() - .iter() - .copied() - .collect(), + self.chain.sampling_columns_for_epoch(epoch).to_vec(), ) }), ); @@ -1015,11 +1017,16 @@ impl SyncNetworkContext { .cached_data_column_indexes(&block_root) .unwrap_or_default(); + let current_epoch = self.chain.epoch().map_err(|e| { + RpcRequestSendError::InternalError(format!("Unable to read slot clock {:?}", e)) + })?; + // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self - .network_globals() - .sampling_columns() - .into_iter() + .chain + .sampling_columns_for_epoch(current_epoch) + .iter() + .copied() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index e31930075a..5b48c30290 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -367,11 +367,11 @@ impl BatchInfo { pub fn processing_completed( &mut self, - procesing_result: BatchProcessingResult, + processing_result: BatchProcessingResult, ) -> Result { match self.state.poison() { BatchState::Processing(attempt) => { - self.state = match procesing_result { + self.state = match processing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), BatchProcessingResult::FaultyFailure => { // register the failed attempt diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 9b7a255ad1..2c35e69426 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -780,7 +780,7 @@ impl ChainSpec { } /// Returns the number of column sidecars to sample per slot. - pub fn sampling_size_columns(&self, custody_group_count: u64) -> Result { + pub fn sampling_size_columns(&self, custody_group_count: u64) -> Result { let sampling_size_groups = self.sampling_size_custody_groups(custody_group_count)?; let columns_per_custody_group = self @@ -792,7 +792,7 @@ impl ChainSpec { .safe_mul(sampling_size_groups) .map_err(|_| "Computing sampling size should not overflow")?; - Ok(sampling_size_columns) + Ok(sampling_size_columns as usize) } /// Returns the number of custody groups to sample per slot. diff --git a/consensus/types/src/data_column_custody_group.rs b/consensus/types/src/data_column_custody_group.rs index 9e9505da9f..3e88ef210a 100644 --- a/consensus/types/src/data_column_custody_group.rs +++ b/consensus/types/src/data_column_custody_group.rs @@ -1,7 +1,6 @@ use crate::{ChainSpec, ColumnIndex, DataColumnSubnetId}; use alloy_primitives::U256; use itertools::Itertools; -use maplit::hashset; use safe_arith::{ArithError, SafeArith}; use std::collections::HashSet; @@ -25,13 +24,32 @@ pub fn get_custody_groups( custody_group_count: u64, spec: &ChainSpec, ) -> Result, DataColumnCustodyGroupError> { + get_custody_groups_ordered(raw_node_id, custody_group_count, spec) + .map(|custody_groups| custody_groups.into_iter().collect()) +} + +/// Returns a deterministically ordered list of custody groups assigned to a node, +/// preserving the order in which they were computed during iteration. +/// +/// # Arguments +/// * `raw_node_id` - 32-byte node identifier +/// * `custody_group_count` - Number of custody groups to generate +/// * `spec` - Chain specification containing custody group parameters +/// +/// # Returns +/// Vector of custody group indices in computation order or error if parameters are invalid +pub fn get_custody_groups_ordered( + raw_node_id: [u8; 32], + custody_group_count: u64, + spec: &ChainSpec, +) -> Result, DataColumnCustodyGroupError> { if custody_group_count > spec.number_of_custody_groups { return Err(DataColumnCustodyGroupError::InvalidCustodyGroupCount( custody_group_count, )); } - let mut custody_groups: HashSet = hashset![]; + let mut custody_groups = vec![]; let mut current_id = U256::from_be_slice(&raw_node_id); while custody_groups.len() < custody_group_count as usize { let mut node_id_bytes = [0u8; 32]; @@ -44,7 +62,9 @@ pub fn get_custody_groups( let custody_group = hash_prefix_u64 .safe_rem(spec.number_of_custody_groups) .expect("spec.number_of_custody_groups must not be zero"); - custody_groups.insert(custody_group); + if !custody_groups.contains(&custody_group) { + custody_groups.push(custody_group); + } current_id = current_id.wrapping_add(U256::from(1u64)); } diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index e6b55341fe..bd2499aa28 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -28,8 +28,8 @@ use std::time::Duration; use types::{ Attestation, AttestationRef, AttesterSlashing, AttesterSlashingRef, BeaconBlock, BeaconState, BlobSidecar, BlobsList, BlockImportSource, Checkpoint, DataColumnSidecarList, - ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof, ProposerPreparationData, - SignedBeaconBlock, Slot, Uint256, + DataColumnSubnetId, ExecutionBlockHash, Hash256, IndexedAttestation, KzgProof, + ProposerPreparationData, SignedBeaconBlock, Slot, Uint256, }; // When set to true, cache any states fetched from the db. @@ -520,7 +520,8 @@ impl Tester { let gossip_verified_data_columns = columns .into_iter() .map(|column| { - GossipVerifiedDataColumn::new(column.clone(), column.index, &self.harness.chain) + let subnet_id = DataColumnSubnetId::from_column_index(column.index, &self.spec); + GossipVerifiedDataColumn::new(column.clone(), subnet_id, &self.harness.chain) .unwrap_or_else(|_| { data_column_success = false; GossipVerifiedDataColumn::__new_for_testing(column)