diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index 7bd8b40d76..5cffb4e2fd 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -20,7 +20,7 @@ jobs: - name: Build Docker image run: | - docker build --build-arg FEATURES=portable -t lighthouse:local . + docker build --build-arg FEATURES=portable,spec-minimal -t lighthouse:local . docker save lighthouse:local -o lighthouse-docker.tar - name: Upload Docker image artifact @@ -213,6 +213,49 @@ jobs: scripts/local_testnet/logs retention-days: 3 + # Test syncing from genesis on a local testnet. Aims to cover forward syncing both short and long distances. + genesis-sync-test: + name: genesis-sync-test-${{ matrix.fork }}-${{ matrix.offline_secs }}s + runs-on: ubuntu-latest + needs: dockerfile-ubuntu + if: contains(github.event.pull_request.labels.*.name, 'syncing') + strategy: + matrix: + fork: [electra, fulu] + offline_secs: [120, 300] + steps: + - uses: actions/checkout@v4 + + - name: Install Kurtosis + run: | + echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list + sudo apt update + sudo apt install -y kurtosis-cli + kurtosis analytics disable + + - name: Download Docker image artifact + uses: actions/download-artifact@v4 + with: + name: lighthouse-docker + path: . + + - name: Load Docker image + run: docker load -i lighthouse-docker.tar + + - name: Run the genesis sync test script + run: | + ./genesis-sync.sh "sync-${{ matrix.fork }}-${{ matrix.offline_secs }}s" "genesis-sync-config-${{ matrix.fork }}.yaml" "${{ matrix.fork }}" "${{ matrix.offline_secs }}" + working-directory: scripts/tests + + - name: Upload logs artifact + if: always() + uses: actions/upload-artifact@v4 + with: + name: logs-genesis-sync-${{ matrix.fork }}-${{ matrix.offline_secs }}s + path: | + scripts/local_testnet/logs + retention-days: 3 + # This job succeeds ONLY IF all others succeed. It is used by the merge queue to determine whether # a PR is safe to merge. New jobs should be added here. local-testnet-success: @@ -228,5 +271,5 @@ jobs: - uses: actions/checkout@v4 - name: Check that success job is dependent on all others run: | - exclude_jobs='checkpoint-sync-test' + exclude_jobs='checkpoint-sync-test|genesis-sync-test' ./scripts/ci/check-success-job.sh ./.github/workflows/local-testnet.yml local-testnet-success "$exclude_jobs" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099..50efb367a8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -58,12 +58,14 @@ use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; +use crate::validator_custody::CustodyContextSsz; use crate::validator_monitor::{ get_slot_delay_ms, timestamp_now, ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, @@ -73,7 +75,9 @@ use crate::{ kzg_utils, metrics, AvailabilityPendingExecutedBlock, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead, }; -use eth2::types::{EventKind, SseBlobSidecar, SseBlock, SseExtendedPayloadAttributes}; +use eth2::types::{ + EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, +}; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition, PayloadAttributes, PayloadStatus, @@ -668,6 +672,23 @@ impl BeaconChain { Ok(()) } + /// Persists the custody information to disk. + pub fn persist_custody_context(&self) -> Result<(), Error> { + let custody_context: CustodyContextSsz = self + .data_availability_checker + .custody_context() + .as_ref() + .into(); + debug!(?custody_context, "Persisting custody context to store"); + + persist_custody_context::( + self.store.clone(), + custody_context, + )?; + + Ok(()) + } + /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -2988,7 +3009,6 @@ impl BeaconChain { pub async fn verify_block_for_gossip( self: &Arc, block: Arc>, - custody_columns_count: usize, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor @@ -2998,7 +3018,7 @@ impl BeaconChain { let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain, custody_columns_count) { + match GossipVerifiedBlock::new(block, &chain) { Ok(verified) => { let commitments_formatted = verified.block.commitments_formatted(); debug!( @@ -3087,6 +3107,11 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } + self.emit_sse_data_column_sidecar_events( + &block_root, + data_columns.iter().map(|column| column.as_data_column()), + ); + let r = self .check_gossip_data_columns_availability_and_import( slot, @@ -3158,10 +3183,16 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - // process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS - // consumers don't expect the blobs event to fire erratically. - if let EngineGetBlobsOutput::Blobs(blobs) = &engine_get_blobs_output { - self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + match &engine_get_blobs_output { + EngineGetBlobsOutput::Blobs(blobs) => { + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); + } + EngineGetBlobsOutput::CustodyColumns(columns) => { + self.emit_sse_data_column_sidecar_events( + &block_root, + columns.iter().map(|column| column.as_data_column()), + ); + } } let r = self @@ -3191,6 +3222,31 @@ impl BeaconChain { } } + fn emit_sse_data_column_sidecar_events<'a, I>( + self: &Arc, + block_root: &Hash256, + data_columns_iter: I, + ) where + I: Iterator>, + { + if let Some(event_handler) = self.event_handler.as_ref() { + if event_handler.has_data_column_sidecar_subscribers() { + let imported_data_columns = self + .data_availability_checker + .cached_data_column_indexes(block_root) + .unwrap_or_default(); + let new_data_columns = + data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index)); + + for data_column in new_data_columns { + event_handler.register(EventKind::DataColumnSidecar( + SseDataColumnSidecar::from_data_column_sidecar(data_column), + )); + } + } + } + } + /// Cache the columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_custody_columns( @@ -3231,6 +3287,11 @@ impl BeaconChain { } } + self.emit_sse_data_column_sidecar_events( + &block_root, + custody_columns.iter().map(|column| column.as_ref()), + ); + let r = self .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) .await; @@ -7189,7 +7250,8 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_fork_choice()?; self.persist_op_pool()?; - self.persist_eth1_cache() + self.persist_eth1_cache()?; + self.persist_custody_context() }; if let Err(e) = drop() { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 879a2907e9..39e6b4775b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -704,7 +704,6 @@ pub struct GossipVerifiedBlock { pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, - custody_columns_count: usize, } /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit @@ -740,7 +739,6 @@ pub trait IntoGossipVerifiedBlock: Sized { fn into_gossip_verified_block( self, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result, BlockError>; fn inner_block(&self) -> Arc>; } @@ -749,7 +747,6 @@ impl IntoGossipVerifiedBlock for GossipVerifiedBlock fn into_gossip_verified_block( self, _chain: &BeaconChain, - _custody_columns_count: usize, ) -> Result, BlockError> { Ok(self) } @@ -762,9 +759,8 @@ impl IntoGossipVerifiedBlock for Arc, - custody_columns_count: usize, ) -> Result, BlockError> { - GossipVerifiedBlock::new(self, chain, custody_columns_count) + GossipVerifiedBlock::new(self, chain) } fn inner_block(&self) -> Arc> { @@ -840,7 +836,6 @@ impl GossipVerifiedBlock { pub fn new( block: Arc>, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply @@ -850,14 +845,12 @@ impl GossipVerifiedBlock { // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, // but it's way quicker to calculate root of the header since the hash of the tree rooted // at `BeaconBlockBody` is already computed in the header. - Self::new_without_slasher_checks(block, &header, chain, custody_columns_count).map_err( - |e| { - process_block_slash_info::<_, BlockError>( - chain, - BlockSlashInfo::from_early_error_block(header, e), - ) - }, - ) + Self::new_without_slasher_checks(block, &header, chain).map_err(|e| { + process_block_slash_info::<_, BlockError>( + chain, + BlockSlashInfo::from_early_error_block(header, e), + ) + }) } /// As for new, but doesn't pass the block to the slasher. @@ -865,7 +858,6 @@ impl GossipVerifiedBlock { block: Arc>, block_header: &SignedBeaconBlockHeader, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // Ensure the block is the correct structure for the fork at `block.slot()`. block @@ -1073,7 +1065,6 @@ impl GossipVerifiedBlock { block_root, parent, consensus_context, - custody_columns_count, }) } @@ -1221,7 +1212,6 @@ impl SignatureVerifiedBlock { block: MaybeAvailableBlock::AvailabilityPending { block_root: from.block_root, block, - custody_columns_count: from.custody_columns_count, }, block_root: from.block_root, parent: Some(parent), diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 84011e23ff..03452fbf6e 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -32,7 +32,6 @@ use types::{ pub struct RpcBlock { block_root: Hash256, block: RpcBlockInner, - custody_columns_count: usize, } impl Debug for RpcBlock { @@ -46,10 +45,6 @@ impl RpcBlock { self.block_root } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - pub fn as_block(&self) -> &SignedBeaconBlock { match &self.block { RpcBlockInner::Block(block) => block, @@ -130,7 +125,6 @@ enum RpcBlockInner { BlockAndCustodyColumns { block: Arc>, data_columns: CustodyDataColumnList, - expected_custody_indices: Vec, }, } @@ -139,14 +133,12 @@ impl RpcBlock { pub fn new_without_blobs( block_root: Option, block: Arc>, - custody_columns_count: usize, ) -> Self { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); Self { block_root, block: RpcBlockInner::Block(block), - custody_columns_count, } } @@ -188,8 +180,6 @@ impl RpcBlock { Ok(Self { block_root, block: inner, - // Block is before PeerDAS - custody_columns_count: 0, }) } @@ -197,12 +187,10 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - expected_custody_indices: Vec, spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - let custody_columns_count = expected_custody_indices.len(); let inner = RpcBlockInner::BlockAndCustodyColumns { block, data_columns: RuntimeVariableList::new( @@ -210,12 +198,10 @@ impl RpcBlock { spec.number_of_columns as usize, ) .map_err(|e| format!("custody_columns len exceeds number_of_columns: {e:?}"))?, - expected_custody_indices, }; Ok(Self { block_root, block: inner, - custody_columns_count, }) } @@ -226,7 +212,7 @@ impl RpcBlock { Hash256, Arc>, Option>, - Option<(CustodyDataColumnList, Vec)>, + Option>, ) { let block_root = self.block_root(); match self.block { @@ -235,13 +221,7 @@ impl RpcBlock { RpcBlockInner::BlockAndCustodyColumns { block, data_columns, - expected_custody_indices, - } => ( - block_root, - block, - None, - Some((data_columns, expected_custody_indices)), - ), + } => (block_root, block, None, Some(data_columns)), } } pub fn n_blobs(&self) -> usize { @@ -290,12 +270,10 @@ impl ExecutedBlock { MaybeAvailableBlock::AvailabilityPending { block_root: _, block: pending_block, - custody_columns_count, } => Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( pending_block, import_data, payload_verification_outcome, - custody_columns_count, )), } } @@ -361,7 +339,6 @@ pub struct AvailabilityPendingExecutedBlock { pub block: Arc>, pub import_data: BlockImportData, pub payload_verification_outcome: PayloadVerificationOutcome, - pub custody_columns_count: usize, } impl AvailabilityPendingExecutedBlock { @@ -369,13 +346,11 @@ impl AvailabilityPendingExecutedBlock { block: Arc>, import_data: BlockImportData, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, ) -> Self { Self { block, import_data, payload_verification_outcome, - custody_columns_count, } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2346aca00b..ae07584f27 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -13,10 +13,12 @@ use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; +use crate::CustodyContext; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, ServerSentEventHandler, @@ -926,6 +928,20 @@ where } }; + // Load the persisted custody context from the db and initialize + // the context for this run + let custody_context = if let Some(custody) = + load_custody_context::(store.clone()) + { + Arc::new(CustodyContext::new_from_persisted_custody_context( + custody, + self.import_all_data_columns, + )) + } else { + Arc::new(CustodyContext::new(self.import_all_data_columns)) + }; + debug!(?custody_context, "Loading persisted custody context"); + let beacon_chain = BeaconChain { spec: self.spec.clone(), config: self.chain_config, @@ -999,8 +1015,14 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, self.spec) - .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + self.kzg.clone(), + store, + custody_context, + self.spec, + ) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2b6da1fb1f..d20e5e4262 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,12 +1,12 @@ use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; use crate::block_verification_types::{ - match_block_and_blobs, match_block_and_data_columns, AvailabilityPendingExecutedBlock, + match_block_and_blobs, match_block_and_data_columns, AsBlock, AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, }; use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; -use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; +use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext}; use kzg::Kzg; use slot_clock::SlotClock; use std::collections::HashSet; @@ -76,6 +76,7 @@ pub struct DataAvailabilityChecker { availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Arc, + custody_context: Arc, spec: Arc, } @@ -113,17 +114,28 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { - let inner = DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let inner = DataAvailabilityCheckerInner::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_context.clone(), + spec.clone(), + )?; Ok(Self { availability_cache: Arc::new(inner), slot_clock, kzg, + custody_context, spec, }) } + pub fn custody_context(&self) -> Arc { + self.custody_context.clone() + } + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( @@ -299,7 +311,6 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs { @@ -313,15 +324,11 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } if self.data_columns_required_for_block(&block) { - return if let Some((data_column_list, _)) = data_columns.as_ref() { + return if let Some(data_column_list) = data_columns.as_ref() { verify_kzg_for_data_column_list_with_scoring( data_column_list .iter() @@ -342,11 +349,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } @@ -405,7 +408,6 @@ impl DataAvailabilityChecker { // TODO(das): we could do the matching first before spending CPU cycles on KZG verification for block in blocks { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { @@ -418,25 +420,22 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else if self.data_columns_required_for_block(&block) { - if let Some((data_columns, expected_custody_indices)) = data_columns { + if let Some(data_columns) = data_columns { let received_indices = HashSet::::from_iter(data_columns.iter().map(|d| d.index())); - let missing_custody_columns = expected_custody_indices - .into_iter() - .filter(|index| !received_indices.contains(index)) - .collect::>(); + let expected_custody_indices = self + .custody_context + .sampling_size(Some(block.epoch()), &self.spec); - if !missing_custody_columns.is_empty() { + if expected_custody_indices != received_indices.len() as u64 { + // FIXME: da checker does not have the exact columns + // Maybe we can move this logic to network? return Err(AvailabilityCheckError::MissingCustodyColumns( - missing_custody_columns, + received_indices.into_iter().collect::>(), )); } @@ -810,7 +809,6 @@ pub enum MaybeAvailableBlock { AvailabilityPending { block_root: Hash256, block: Arc>, - custody_columns_count: usize, }, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3478c183f3..36c4f2cdc1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -8,6 +8,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; +use crate::CustodyContext; use lru::LruCache; use parking_lot::RwLock; use std::cmp::Ordering; @@ -158,6 +159,7 @@ impl PendingComponents { pub fn make_available( &mut self, spec: &Arc, + num_expected_columns: u64, recover: R, ) -> Result>, AvailabilityCheckError> where @@ -171,12 +173,11 @@ impl PendingComponents { }; let num_expected_blobs = block.num_blobs_expected(); - + let num_expected_columns = num_expected_columns as usize; let blob_data = if num_expected_blobs == 0 { Some(AvailableBlockData::NoData) } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { let num_received_columns = self.verified_data_columns.len(); - let num_expected_columns = block.custody_columns_count(); match num_received_columns.cmp(&num_expected_columns) { Ordering::Greater => { // Should never happen @@ -254,7 +255,6 @@ impl PendingComponents { block, import_data, payload_verification_outcome, - custody_columns_count: _, } = recover(block.clone())?; let available_block = AvailableBlock { @@ -308,19 +308,21 @@ impl PendingComponents { }) } - pub fn status_str(&self, block_epoch: Epoch, spec: &ChainSpec) -> String { + pub fn status_str( + &self, + block_epoch: Epoch, + num_expected_columns: Option, + spec: &ChainSpec, + ) -> String { let block_count = if self.executed_block.is_some() { 1 } else { 0 }; if spec.is_peer_das_enabled_for_epoch(block_epoch) { - let custody_columns_count = if let Some(block) = self.get_cached_block() { - &block.custody_columns_count().to_string() - } else { - "?" - }; format!( "block {} data_columns {}/{}", block_count, self.verified_data_columns.len(), - custody_columns_count, + num_expected_columns + .map(|c| c.to_string()) + .unwrap_or("?".into()) ) } else { let num_expected_blobs = if let Some(block) = self.get_cached_block() { @@ -346,6 +348,7 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, + custody_context: Arc, spec: Arc, } @@ -362,11 +365,13 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), state_cache: StateLRUCache::new(beacon_store, spec.clone()), + custody_context, spec, }) } @@ -470,13 +475,15 @@ impl DataAvailabilityCheckerInner { debug!( component = "blobs", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, None, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -519,16 +526,19 @@ impl DataAvailabilityCheckerInner { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "data_columns", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = + pending_components.make_available(&self.spec, num_expected_columns, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -612,17 +622,20 @@ impl DataAvailabilityCheckerInner { // Merge in the block. pending_components.merge_block(diet_executed_block); + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "block", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); // Check if we have all components and entire set is consistent. - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -700,7 +713,6 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; - const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8; fn get_store_with_spec( db_path: &TempDir, @@ -861,7 +873,6 @@ mod test { block, import_data, payload_verification_outcome, - custody_columns_count: DEFAULT_TEST_CUSTODY_COLUMN_COUNT, }; (availability_pending_block, gossip_verified_blobs) @@ -888,9 +899,15 @@ mod test { let spec = harness.spec.clone(); let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); + let custody_context = Arc::new(CustodyContext::new(false)); let cache = Arc::new( - DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new( + capacity_non_zero, + test_store, + custody_context, + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1239,8 +1256,6 @@ mod pending_components_tests { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }, - // Default custody columns count, doesn't matter here - custody_columns_count: 8, }; (block.into(), blobs, invalid_blobs) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index fe8c89e6c3..9e53b03276 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -24,7 +24,6 @@ pub struct DietAvailabilityPendingExecutedBlock { parent_eth1_finalization_data: Eth1FinalizationData, consensus_context: OnDiskConsensusContext, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, } /// just implementing the same methods as `AvailabilityPendingExecutedBlock` @@ -54,10 +53,6 @@ impl DietAvailabilityPendingExecutedBlock { .unwrap_or_default() } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - /// Returns the epoch corresponding to `self.slot()`. pub fn epoch(&self) -> Epoch { self.block.slot().epoch(E::slots_per_epoch()) @@ -107,7 +102,6 @@ impl StateLRUCache { executed_block.import_data.consensus_context, ), payload_verification_outcome: executed_block.payload_verification_outcome, - custody_columns_count: executed_block.custody_columns_count, } } @@ -137,7 +131,6 @@ impl StateLRUCache { .into_consensus_context(), }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, - custody_columns_count: diet_executed_block.custody_columns_count, }) } @@ -224,7 +217,6 @@ impl From> value.import_data.consensus_context, ), payload_verification_outcome: value.payload_verification_outcome, - custody_columns_count: value.custody_columns_count, } } } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 3e54ee9199..609e5bd796 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -215,8 +215,7 @@ impl GossipVerifiedDataColumn } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] - pub(crate) fn __new_for_testing(column_sidecar: Arc>) -> Self { + pub fn __new_for_testing(column_sidecar: Arc>) -> Self { Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::__new_for_testing(column_sidecar), @@ -268,7 +267,6 @@ impl KzgVerifiedDataColumn { } /// Create a `KzgVerifiedDataColumn` from `DataColumnSidecar` for testing ONLY. - #[cfg(test)] pub(crate) fn __new_for_testing(data_column: Arc>) -> Self { Self { data: data_column } } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index d09b74e645..94ebfb4655 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -11,6 +11,7 @@ pub struct ServerSentEventHandler { single_attestation_tx: Sender>, block_tx: Sender>, blob_sidecar_tx: Sender>, + data_column_sidecar_tx: Sender>, finalized_tx: Sender>, head_tx: Sender>, exit_tx: Sender>, @@ -37,6 +38,7 @@ impl ServerSentEventHandler { let (single_attestation_tx, _) = broadcast::channel(capacity); let (block_tx, _) = broadcast::channel(capacity); let (blob_sidecar_tx, _) = broadcast::channel(capacity); + let (data_column_sidecar_tx, _) = broadcast::channel(capacity); let (finalized_tx, _) = broadcast::channel(capacity); let (head_tx, _) = broadcast::channel(capacity); let (exit_tx, _) = broadcast::channel(capacity); @@ -57,6 +59,7 @@ impl ServerSentEventHandler { single_attestation_tx, block_tx, blob_sidecar_tx, + data_column_sidecar_tx, finalized_tx, head_tx, exit_tx, @@ -99,6 +102,10 @@ impl ServerSentEventHandler { .blob_sidecar_tx .send(kind) .map(|count| log_count("blob sidecar", count)), + EventKind::DataColumnSidecar(_) => self + .data_column_sidecar_tx + .send(kind) + .map(|count| log_count("data_column_sidecar", count)), EventKind::FinalizedCheckpoint(_) => self .finalized_tx .send(kind) @@ -177,6 +184,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.subscribe() } + pub fn subscribe_data_column_sidecar(&self) -> Receiver> { + self.data_column_sidecar_tx.subscribe() + } + pub fn subscribe_finalized(&self) -> Receiver> { self.finalized_tx.subscribe() } @@ -249,6 +260,10 @@ impl ServerSentEventHandler { self.blob_sidecar_tx.receiver_count() > 0 } + pub fn has_data_column_sidecar_subscribers(&self) -> bool { + self.data_column_sidecar_tx.receiver_count() > 0 + } + pub fn has_finalized_subscribers(&self) -> bool { self.finalized_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5b79312d37..0eec6dc770 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,6 +48,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; mod persisted_beacon_chain; +pub mod persisted_custody; mod persisted_fork_choice; mod pre_finalization_cache; pub mod proposer_prep_service; @@ -59,6 +60,7 @@ pub mod summaries_dag; pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; +pub mod validator_custody; pub mod validator_monitor; pub mod validator_pubkey_cache; @@ -100,3 +102,4 @@ pub use state_processing::per_block_processing::errors::{ }; pub use store; pub use types; +pub use validator_custody::CustodyContext; diff --git a/beacon_node/beacon_chain/src/persisted_custody.rs b/beacon_node/beacon_chain/src/persisted_custody.rs new file mode 100644 index 0000000000..6ede473b36 --- /dev/null +++ b/beacon_node/beacon_chain/src/persisted_custody.rs @@ -0,0 +1,46 @@ +use crate::validator_custody::CustodyContextSsz; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; +use types::{EthSpec, Hash256}; + +/// 32-byte key for accessing the `CustodyContext`. All zero because `CustodyContext` has its own column. +pub const CUSTODY_DB_KEY: Hash256 = Hash256::ZERO; + +pub struct PersistedCustody(CustodyContextSsz); + +pub fn load_custody_context, Cold: ItemStore>( + store: Arc>, +) -> Option { + let res: Result, _> = + store.get_item::(&CUSTODY_DB_KEY); + // Load context from the store + match res { + Ok(Some(c)) => Some(c.0), + _ => None, + } +} + +/// Attempt to persist the custody context object to `self.store`. +pub fn persist_custody_context, Cold: ItemStore>( + store: Arc>, + custody_context: CustodyContextSsz, +) -> Result<(), store::Error> { + store.put_item(&CUSTODY_DB_KEY, &PersistedCustody(custody_context)) +} + +impl StoreItem for PersistedCustody { + fn db_column() -> DBColumn { + DBColumn::CustodyContext + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let custody_context = CustodyContextSsz::from_ssz_bytes(bytes)?; + + Ok(PersistedCustody(custody_context)) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index c4044c2ce7..369145a615 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -72,7 +72,7 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // Pre-computed data column sidecar using a single static blob from: // `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` -const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = +pub const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); // Default target aggregators to set during testing, this ensures an aggregator at each slot. @@ -609,12 +609,6 @@ where let chain = builder.build().expect("should build"); - let sampling_column_count = if self.import_all_data_columns { - chain.spec.number_of_custody_groups as usize - } else { - chain.spec.custody_requirement as usize - }; - BeaconChainHarness { spec: chain.spec.clone(), chain: Arc::new(chain), @@ -625,7 +619,6 @@ where mock_execution_layer: self.mock_execution_layer, mock_builder: None, rng: make_rng(), - sampling_column_count, } } } @@ -682,7 +675,6 @@ pub struct BeaconChainHarness { pub mock_execution_layer: Option>, pub mock_builder: Option>>, - pub sampling_column_count: usize, pub rng: Mutex, } @@ -785,7 +777,10 @@ where } pub fn get_sampling_column_count(&self) -> usize { - self.sampling_column_count + self.chain + .data_availability_checker + .custody_context() + .sampling_size(None, &self.chain.spec) as usize } pub fn slots_per_epoch(&self) -> u64 { @@ -2360,25 +2355,18 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block, 0); + return RpcBlock::new_without_blobs(Some(block_root), block); } // Blobs are stored as data columns from Fulu (PeerDAS) if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap(); - let expected_custody_indices = columns.iter().map(|d| d.index).collect::>(); let custody_columns = columns .into_iter() .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - custody_columns, - expected_custody_indices, - &self.spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); RpcBlock::new(Some(block_root), block, blobs).unwrap() @@ -2404,18 +2392,10 @@ where .take(sampling_column_count) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - let expected_custody_indices = - columns.iter().map(|d| d.index()).collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - columns, - expected_custody_indices, - &self.spec, - ) - .map_err(BlockError::InternalError)? + RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec) + .map_err(BlockError::InternalError)? } else { - RpcBlock::new_without_blobs(Some(block_root), block, sampling_column_count) + RpcBlock::new_without_blobs(Some(block_root), block) } } else { let blobs = blob_items diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs new file mode 100644 index 0000000000..160333b50e --- /dev/null +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -0,0 +1,447 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::atomic::{AtomicU64, Ordering}, +}; + +use parking_lot::RwLock; + +use ssz_derive::{Decode, Encode}; +use types::{ChainSpec, Epoch, EthSpec, Slot}; + +/// A delay before making the CGC change effective to the data availability checker. +const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; + +type ValidatorsAndBalances = Vec<(usize, u64)>; + +/// This currently just registers increases in validator count. +/// Does not handle decreasing validator counts +#[derive(Default, Debug)] +struct ValidatorRegistrations { + /// Set of all validators that is registered to this node along with its effective balance + /// + /// Key is validator index and value is effective_balance. + validators: HashMap, + /// Maintains the validator custody requirement at a given epoch. + /// + /// Note: Only stores the epoch value when there's a change in custody requirement. + /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. + /// This map is never pruned, because currently we never decrease custody requirement, so this + /// map size is contained at 128. + epoch_validator_custody_requirements: BTreeMap, +} + +impl ValidatorRegistrations { + /// Returns the validator custody requirement at the latest epoch. + fn latest_validator_custody_requirement(&self) -> Option { + self.epoch_validator_custody_requirements + .last_key_value() + .map(|(_, v)| *v) + } + + /// Lookup the active custody requirement at the given epoch. + fn custody_requirement_at_epoch(&self, epoch: Epoch) -> Option { + self.epoch_validator_custody_requirements + .range(..=epoch) + .last() + .map(|(_, custody_count)| *custody_count) + } + + /// Register a new validator index and updates the list of validators if required. + /// Returns `Some((effective_epoch, new_cgc))` if the registration results in a CGC update. + pub(crate) fn register_validators( + &mut self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option<(Epoch, u64)> { + for (validator_index, effective_balance) in validators_and_balance { + self.validators.insert(validator_index, effective_balance); + } + + // Each `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP` effectively contributes one unit of "weight". + let validator_custody_units = + self.validators.values().sum::() / spec.balance_per_additional_custody_group; + let validator_custody_requirement = + get_validators_custody_requirement(validator_custody_units, spec); + + tracing::debug!( + validator_custody_units, + validator_custody_requirement, + "Registered validators" + ); + + // If registering the new validator increased the total validator "units", then + // add a new entry for the current epoch + if Some(validator_custody_requirement) != self.latest_validator_custody_requirement() { + // Apply the change from the next epoch after adding some delay buffer to ensure + // the node has enough time to subscribe to subnets etc, and to avoid having + // inconsistent column counts within an epoch. + let effective_delay_slots = + CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS / spec.seconds_per_slot; + let effective_epoch = (slot + effective_delay_slots).epoch(E::slots_per_epoch()) + 1; + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = validator_custody_requirement) + .or_insert(validator_custody_requirement); + Some((effective_epoch, validator_custody_requirement)) + } else { + None + } + } +} + +/// Given the `validator_custody_units`, return the custody requirement based on +/// the spec parameters. +/// +/// Note: a `validator_custody_units` here represents the number of 32 eth effective_balance +/// equivalent to `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP`. +/// +/// For e.g. a validator with eb 32 eth is 1 unit. +/// a validator with eb 65 eth is 65 // 32 = 2 units. +/// +/// See https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/validator.md#validator-custody +fn get_validators_custody_requirement(validator_custody_units: u64, spec: &ChainSpec) -> u64 { + std::cmp::min( + std::cmp::max(validator_custody_units, spec.validator_custody_requirement), + spec.number_of_custody_groups, + ) +} + +/// Contains all the information the node requires to calculate the +/// number of columns to be custodied when checking for DA. +#[derive(Debug)] +pub struct CustodyContext { + /// The Number of custody groups required based on the number of validators + /// that is attached to this node. + /// + /// This is the number that we use to compute the custody group count that + /// we require for data availability check, and we use to advertise to our peers in the metadata + /// and enr values. + validator_custody_count: AtomicU64, + /// Is the node run as a supernode based on current cli parameters. + pub current_is_supernode: bool, + /// The persisted value for `is_supernode` based on the previous run of this node. + /// + /// Note: We require this value because if a user restarts the node with a higher cli custody + /// count value than in the previous run, then we should continue advertising the custody + /// count based on the old value than the new one since we haven't backfilled the required + /// columns. + persisted_is_supernode: bool, + /// Maintains all the validators that this node is connected to currently + validator_registrations: RwLock, +} + +impl CustodyContext { + /// Create a new custody default custody context object when no persisted object + /// exists. + /// + /// The `is_supernode` value is based on current cli parameters. + pub fn new(is_supernode: bool) -> Self { + Self { + validator_custody_count: AtomicU64::new(0), + current_is_supernode: is_supernode, + persisted_is_supernode: is_supernode, + validator_registrations: Default::default(), + } + } + + pub fn new_from_persisted_custody_context( + ssz_context: CustodyContextSsz, + is_supernode: bool, + ) -> Self { + CustodyContext { + validator_custody_count: AtomicU64::new(ssz_context.validator_custody_at_head), + current_is_supernode: is_supernode, + persisted_is_supernode: ssz_context.persisted_is_supernode, + validator_registrations: Default::default(), + } + } + + /// Register a new validator index and updates the list of validators if required. + /// + /// Also modifies the internal structures if the validator custody has changed to + /// update the `custody_column_count`. + /// + /// Returns `Some` along with the updated custody group count if it has changed, otherwise returns `None`. + pub fn register_validators( + &self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option { + let Some((effective_epoch, new_validator_custody)) = self + .validator_registrations + .write() + .register_validators::(validators_and_balance, slot, spec) + else { + return None; + }; + + let current_cgc = self.custody_group_count_at_head(spec); + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + if new_validator_custody != validator_custody_count_at_head { + tracing::debug!( + old_count = validator_custody_count_at_head, + new_count = new_validator_custody, + "Validator count at head updated" + ); + self.validator_custody_count + .store(new_validator_custody, Ordering::Relaxed); + + let updated_cgc = self.custody_group_count_at_head(spec); + // Send the message to network only if there are more columns subnets to subscribe to + if updated_cgc > current_cgc { + tracing::debug!( + old_cgc = current_cgc, + updated_cgc, + "Custody group count updated" + ); + return Some(CustodyCountChanged { + new_custody_group_count: updated_cgc, + sampling_count: self.sampling_size(Some(effective_epoch), spec), + }); + } + } + + None + } + + /// This function is used to determine the custody group count at head ONLY. + /// Do NOT use this directly for data availability check, use `self.sampling_size` instead as + /// CGC can change over epochs. + pub fn custody_group_count_at_head(&self, spec: &ChainSpec) -> u64 { + if self.current_is_supernode { + return spec.number_of_custody_groups; + } + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + // If there are no validators, return the minimum custody_requirement + if validator_custody_count_at_head > 0 { + validator_custody_count_at_head + } else { + spec.custody_requirement + } + } + + /// Returns the count of custody columns this node must sample for a block at `epoch` to import. + /// If an `epoch` is not specified, returns the *current* validator custody requirement. + pub fn sampling_size(&self, epoch_opt: Option, spec: &ChainSpec) -> u64 { + let custody_group_count = if self.current_is_supernode { + spec.number_of_custody_groups + } else if let Some(epoch) = epoch_opt { + self.validator_registrations + .read() + .custody_requirement_at_epoch(epoch) + .unwrap_or(spec.custody_requirement) + } else { + self.custody_group_count_at_head(spec) + }; + + spec.sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec") + } +} + +/// The custody count changed because of a change in the +/// number of validators being managed. +pub struct CustodyCountChanged { + pub new_custody_group_count: u64, + pub sampling_count: u64, +} + +/// The custody information that gets persisted across runs. +#[derive(Debug, Encode, Decode, Clone)] +pub struct CustodyContextSsz { + validator_custody_at_head: u64, + persisted_is_supernode: bool, +} + +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { + CustodyContextSsz { + validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), + persisted_is_supernode: context.persisted_is_supernode, + } + } +} + +#[cfg(test)] +mod tests { + use types::MainnetEthSpec; + + use super::*; + + type E = MainnetEthSpec; + + #[test] + fn no_validators_supernode_default() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.number_of_custody_groups + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn no_validators_fullnode_default() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.custody_requirement, + "head custody count should be minimum spec custody requirement" + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.samples_per_slot + ); + } + + #[test] + fn register_single_validator_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // One single node increases its balance over 3 epochs. + let validators_and_expected_cgc_change = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + // No CGC change at 8 custody units, as it's the minimum requirement + (vec![(0, 8 * bal_per_additional_group)], None), + (vec![(0, 10 * bal_per_additional_group)], Some(10)), + ]; + + register_validators_and_assert_cgc( + &custody_context, + validators_and_expected_cgc_change, + &spec, + ); + } + + #[test] + fn register_multiple_validators_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + // No CGC change at 8 custody units, as it's the minimum requirement + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + Some(10), + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + } + + #[test] + fn register_validators_should_not_update_cgc_for_supernode() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + (vec![(0, bal_per_additional_group)], None), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + None, + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn cgc_change_should_be_effective_to_sampling_after_delay() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let current_slot = Slot::new(10); + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let default_sampling_size = custody_context.sampling_size(None, &spec); + let validator_custody_units = 10; + + let _cgc_changed = custody_context.register_validators::( + vec![( + 0, + validator_custody_units * spec.balance_per_additional_custody_group, + )], + current_slot, + &spec, + ); + + // CGC update is not applied for `current_epoch`. + assert_eq!( + custody_context.sampling_size(Some(current_epoch), &spec), + default_sampling_size + ); + // CGC update is applied for the next epoch. + assert_eq!( + custody_context.sampling_size(Some(current_epoch + 1), &spec), + validator_custody_units + ); + } + + /// Update validator every epoch and assert cgc against expected values. + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, + validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, + spec: &ChainSpec, + ) { + for (idx, (validators_and_balance, expected_cgc_change)) in + validators_and_expected_cgc_changed.into_iter().enumerate() + { + let epoch = Epoch::new(idx as u64); + let updated_custody_count_opt = custody_context + .register_validators::( + validators_and_balance, + epoch.start_slot(E::slots_per_epoch()), + spec, + ) + .map(|c| c.new_custody_group_count); + + assert_eq!(updated_custody_count_opt, expected_cgc_change); + } + } +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 4f3556263f..3ff5f772aa 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -30,8 +30,6 @@ type E = MainnetEthSpec; const VALIDATOR_COUNT: usize = 24; const CHAIN_SEGMENT_LENGTH: usize = 64 * 5; const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGTH - 1]; -// Default custody group count for tests -const CGC: usize = 8; /// A cached set of keys. static KEYPAIRS: LazyLock> = @@ -143,15 +141,10 @@ fn build_rpc_block( Some(DataSidecars::Blobs(blobs)) => { RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } - Some(DataSidecars::DataColumns(columns)) => RpcBlock::new_with_custody_columns( - None, - block, - columns.clone(), - columns.iter().map(|d| d.index()).collect(), - spec, - ) - .unwrap(), - None => RpcBlock::new_without_blobs(None, block, 0), + Some(DataSidecars::DataColumns(columns)) => { + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() + } + None => RpcBlock::new_without_blobs(None, block), } } @@ -374,7 +367,6 @@ async fn chain_segment_non_linear_parent_roots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -412,7 +404,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -440,7 +431,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -582,11 +572,7 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs( - None, - Arc::new(signed_block), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); let process_res = harness .chain .process_block( @@ -1006,7 +992,6 @@ async fn block_gossip_verification() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; - let cgc = harness.chain.spec.custody_requirement as usize; harness .chain @@ -1020,7 +1005,7 @@ async fn block_gossip_verification() { { let gossip_verified = harness .chain - .verify_block_for_gossip(snapshot.beacon_block.clone(), get_cgc(&blobs_opt)) + .verify_block_for_gossip(snapshot.beacon_block.clone()) .await .expect("should obtain gossip verified block"); @@ -1062,7 +1047,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_block_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::FutureSlot { present_slot, block_slot, @@ -1096,7 +1081,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_finalized_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::WouldRevertFinalizedSlot { block_slot, finalized_slot, @@ -1126,10 +1111,10 @@ async fn block_gossip_verification() { unwrap_err( harness .chain - .verify_block_for_gossip( - Arc::new(SignedBeaconBlock::from_block(block, junk_signature())), - cgc - ) + .verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block( + block, + junk_signature() + )),) .await ), BlockError::InvalidSignature(InvalidSignature::ProposerSignature) @@ -1154,7 +1139,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::ParentUnknown {parent_root: p} if p == parent_root ), @@ -1180,7 +1165,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::NotFinalizedDescendant { block_parent_root } if block_parent_root == parent_root ), @@ -1217,7 +1202,7 @@ async fn block_gossip_verification() { ); assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::IncorrectBlockProposer { block, local_shuffling, @@ -1229,7 +1214,7 @@ async fn block_gossip_verification() { // Check to ensure that we registered this is a valid block from this proposer. assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::DuplicateImportStatusUnknown(_), ), "should register any valid signature against the proposer, even if the block failed later verification" @@ -1237,11 +1222,7 @@ async fn block_gossip_verification() { let block = chain_segment[block_index].beacon_block.clone(); assert!( - harness - .chain - .verify_block_for_gossip(block, cgc) - .await - .is_ok(), + harness.chain.verify_block_for_gossip(block).await.is_ok(), "the valid block should be processed" ); @@ -1259,7 +1240,7 @@ async fn block_gossip_verification() { matches!( harness .chain - .verify_block_for_gossip(block.clone(), cgc) + .verify_block_for_gossip(block.clone()) .await .expect_err("should error when processing known block"), BlockError::DuplicateImportStatusUnknown(_) @@ -1335,17 +1316,8 @@ async fn verify_block_for_gossip_slashing_detection() { let state = harness.get_current_state(); let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await; let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await; - let cgc = if block1.fork_name_unchecked().fulu_enabled() { - harness.get_sampling_column_count() - } else { - 0 - }; - let verified_block = harness - .chain - .verify_block_for_gossip(block1, cgc) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap(); if let Some((kzg_proofs, blobs)) = blobs1 { harness @@ -1368,7 +1340,7 @@ async fn verify_block_for_gossip_slashing_detection() { ) .await .unwrap(); - unwrap_err(harness.chain.verify_block_for_gossip(block2, CGC).await); + unwrap_err(harness.chain.verify_block_for_gossip(block2).await); // Slasher should have been handed the two conflicting blocks and crafted a slashing. slasher.process_queued(Epoch::new(0)).unwrap(); @@ -1392,11 +1364,7 @@ async fn verify_block_for_gossip_doppelganger_detection() { .attestations() .map(|att| att.clone_as_attestation()) .collect::>(); - let verified_block = harness - .chain - .verify_block_for_gossip(block, CGC) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap(); harness .chain .process_block( @@ -1543,7 +1511,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(base_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(base_block.clone())) .await .expect_err("should error when processing base block"), BlockError::InconsistentFork(InconsistentFork { @@ -1553,7 +1521,7 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone()), 0); + let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); assert!(matches!( harness .chain @@ -1577,7 +1545,7 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], NotifyExecutionLayer::Yes, ) .await, @@ -1680,7 +1648,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(altair_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(altair_block.clone())) .await .expect_err("should error when processing altair block"), BlockError::InconsistentFork(InconsistentFork { @@ -1690,7 +1658,7 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone()), 0); + let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); assert!(matches!( harness .chain @@ -1714,7 +1682,7 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], NotifyExecutionLayer::Yes ) .await, @@ -1775,11 +1743,7 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -1850,14 +1814,3 @@ async fn import_execution_pending_block( } } } - -fn get_cgc(blobs_opt: &Option>) -> usize { - if let Some(data_sidecars) = blobs_opt.as_ref() { - match data_sidecars { - DataSidecars::Blobs(_) => 0, - DataSidecars::DataColumns(d) => d.len(), - } - } else { - 0 - } -} diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index c9bd55e062..5d0f22e252 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -1,11 +1,15 @@ use beacon_chain::blob_verification::GossipVerifiedBlob; -use beacon_chain::test_utils::BeaconChainHarness; -use eth2::types::{EventKind, SseBlobSidecar}; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; +use beacon_chain::test_utils::{BeaconChainHarness, TEST_DATA_COLUMN_SIDECARS_SSZ}; +use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar}; use rand::rngs::StdRng; use rand::SeedableRng; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; -use types::{BlobSidecar, EthSpec, ForkName, MinimalEthSpec}; +use types::test_utils::TestRandom; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkName, MinimalEthSpec, RuntimeVariableList, +}; type E = MinimalEthSpec; @@ -43,6 +47,42 @@ async fn blob_sidecar_event_on_process_gossip_blob() { assert_eq!(sidecar_event, EventKind::BlobSidecar(expected_sse_blobs)); } +/// Verifies that a data column event is emitted when a gossip verified data column is received via gossip or the publish block API. +#[tokio::test] +async fn data_column_sidecar_event_on_process_gossip_data_column() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // build and process a gossip verified data column + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let sidecar = Arc::new(DataColumnSidecar::random_for_test(&mut rng)); + let gossip_verified_data_column = GossipVerifiedDataColumn::__new_for_testing(sidecar); + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar( + gossip_verified_data_column.as_data_column(), + ); + + let _ = harness + .chain + .process_gossip_data_columns(vec![gossip_verified_data_column], || Ok(())) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} + /// Verifies that a blob event is emitted when blobs are received via RPC. #[tokio::test] async fn blob_sidecar_event_on_process_rpc_blobs() { @@ -95,3 +135,41 @@ async fn blob_sidecar_event_on_process_rpc_blobs() { } assert_eq!(sse_blobs, expected_sse_blobs); } + +#[tokio::test] +async fn data_column_sidecar_event_on_process_rpc_columns() { + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // subscribe to blob sidecar events + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut data_column_event_receiver = event_handler.subscribe_data_column_sidecar(); + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let mut sidecar = RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + spec.number_of_columns as usize, + ) + .unwrap()[0] + .clone(); + let parent_root = harness.chain.head().head_block_root(); + sidecar.signed_block_header.message.parent_root = parent_root; + let expected_sse_data_column = SseDataColumnSidecar::from_data_column_sidecar(&sidecar); + + let _ = harness + .chain + .process_rpc_custody_columns(vec![Arc::new(sidecar)]) + .await + .unwrap(); + + let sidecar_event = data_column_event_receiver.try_recv().unwrap(); + assert_eq!( + sidecar_event, + EventKind::DataColumnSidecar(expected_sse_data_column) + ); +} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 6b9ff9d6ed..05fae7aa70 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -22,7 +22,6 @@ use task_executor::ShutdownReason; use types::*; const VALIDATOR_COUNT: usize = 32; -const CGC: usize = 8; type E = MainnetEthSpec; @@ -686,8 +685,7 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -789,8 +787,7 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -1054,14 +1051,13 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for gossip. assert!(matches!( - rig.harness.chain.clone().verify_block_for_gossip(block.clone(), CGC).await, + rig.harness.chain.clone().verify_block_for_gossip(block.clone()).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = - RpcBlock::new_without_blobs(None, block.clone(), rig.harness.sampling_column_count); + let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1385,8 +1381,7 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 610fb8b58b..9f8c14f339 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2791,11 +2791,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - invalid_fork_block.clone(), - harness.sampling_column_count, - ); + let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); // Applying the invalid block should fail. let err = harness .chain @@ -2811,11 +2807,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - valid_fork_block.clone(), - harness.sampling_column_count, - ); + let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); harness .chain .process_block( diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2eaa33a964..a4ec41ac06 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -48,8 +48,8 @@ use directory::DEFAULT_ROOT_DIR; use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, - ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, - ValidatorId, ValidatorStatus, ValidatorsRequestBody, + ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, StateId as CoreStateId, + ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use health_metrics::observe::Observe; @@ -3765,15 +3765,17 @@ pub fn serve( .and(warp::path::end()) .and(warp_utils::json::json()) .and(validator_subscription_tx_filter.clone()) + .and(network_tx_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |subscriptions: Vec, + |committee_subscriptions: Vec, validator_subscription_tx: Sender, + network_tx: UnboundedSender>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { - let subscriptions: std::collections::BTreeSet<_> = subscriptions + let subscriptions: std::collections::BTreeSet<_> = committee_subscriptions .iter() .map(|subscription| { chain @@ -3788,6 +3790,7 @@ pub fn serve( } }) .collect(); + let message = ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions }; if let Err(e) = validator_subscription_tx.try_send(message) { @@ -3802,6 +3805,42 @@ pub fn serve( )); } + if chain.spec.is_peer_das_scheduled() { + let (finalized_beacon_state, _, _) = + StateId(CoreStateId::Finalized).state(&chain)?; + let validators_and_balances = committee_subscriptions + .iter() + .filter_map(|subscription| { + if let Ok(effective_balance) = finalized_beacon_state + .get_effective_balance(subscription.validator_index as usize) + { + Some((subscription.validator_index as usize, effective_balance)) + } else { + None + } + }) + .collect::>(); + + let current_slot = + chain.slot().map_err(warp_utils::reject::unhandled_error)?; + if let Some(cgc_change) = chain + .data_availability_checker + .custody_context() + .register_validators::( + validators_and_balances, + current_slot, + &chain.spec, + ) { + network_tx.send(NetworkMessage::CustodyCountChanged { + new_custody_group_count: cgc_change.new_custody_group_count, + sampling_count: cgc_change.sampling_count, + }).unwrap_or_else(|e| { + debug!(error = %e, "Could not send message to the network service. \ + Likely shutdown") + }); + } + } + Ok(()) }) }, @@ -4741,6 +4780,9 @@ pub fn serve( api_types::EventTopic::BlobSidecar => { event_handler.subscribe_blob_sidecar() } + api_types::EventTopic::DataColumnSidecar => { + event_handler.subscribe_data_column_sidecar() + } api_types::EventTopic::Attestation => { event_handler.subscribe_attestation() } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 463f585f2c..75979bbb1d 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -138,8 +138,7 @@ pub async fn publish_block>( spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. - let gossip_verified_block_result = unverified_block - .into_gossip_verified_block(&chain, network_globals.custody_columns_count() as usize); + let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), @@ -224,7 +223,7 @@ pub async fn publish_block>( publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; - let sampling_columns_indices = &network_globals.sampling_columns; + let sampling_columns_indices = &network_globals.sampling_columns(); let sampling_columns = gossip_verified_columns .into_iter() .flatten() @@ -303,11 +302,7 @@ pub async fn publish_block>( ); let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - network_globals.custody_columns_count() as usize, - ), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index cd590580be..843242c22f 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -39,9 +39,6 @@ type E = MainnetEthSpec; * */ -// Default custody group count for tests -const CGC: usize = 8; - /// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn gossip_invalid() { @@ -367,9 +364,9 @@ pub async fn consensus_partial_pass_only_consensus() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); /* submit `block_b` which should induce equivocation */ @@ -657,10 +654,10 @@ pub async fn equivocation_consensus_late_equivocation() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1294,9 +1291,9 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { ProvenancedBlock::Builder(b, _, _) => b, }; - let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain, CGC); + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain, CGC); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1398,7 +1395,7 @@ pub async fn block_seen_on_gossip_without_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // It should not yet be added to fork choice because blobs have not been seen. @@ -1467,7 +1464,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // Simulate some of the blobs being seen on gossip. @@ -1786,6 +1783,5 @@ fn get_custody_columns(tester: &InteractiveTester) -> HashSet { .network_globals .as_ref() .unwrap() - .sampling_columns - .clone() + .sampling_columns() } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index ad54c6b8b1..ad4241c5b7 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -49,6 +49,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EnrForkId, EthSpec}; mod subnet_predicate; +use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; pub use subnet_predicate::subnet_predicate; use types::non_zero_usize::new_non_zero_usize; @@ -476,6 +477,15 @@ impl Discovery { Ok(()) } + pub fn update_enr_cgc(&mut self, custody_group_count: u64) -> Result<(), String> { + self.discv5 + .enr_insert(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count) + .map_err(|e| format!("{:?}", e))?; + enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + *self.network_globals.local_enr.write() = self.discv5.local_enr(); + Ok(()) + } + /// Adds/Removes a subnet from the ENR attnets/syncnets Bitfield pub fn update_enr_bitfield(&mut self, subnet: Subnet, value: bool) -> Result<(), String> { let local_enr = self.discv5.local_enr(); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 23060df9e6..5f65a6c6d0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -177,6 +177,7 @@ impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, + custody_group_count: u64, ) -> Result<(Self, Arc>), String> { let config = ctx.config.clone(); trace!("Libp2p Service starting"); @@ -201,11 +202,12 @@ impl Network { )?; // Construct the metadata - let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { - ctx.chain_spec - .custody_group_count(config.subscribe_all_data_column_subnets) - }); - let meta_data = utils::load_or_build_metadata(&config.network_dir, custody_group_count); + let custody_group_count_metadata = ctx + .chain_spec + .is_peer_das_scheduled() + .then_some(custody_group_count); + let meta_data = + utils::load_or_build_metadata(&config.network_dir, custody_group_count_metadata); let seq_number = *meta_data.seq_number(); let globals = NetworkGlobals::new( enr, @@ -885,6 +887,23 @@ impl Network { } } + /// Subscribe to all data columns determined by the cgc. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + self.network_globals + .update_data_column_subnets(custody_column_count); + + for column in self.network_globals.sampling_subnets() { + let kind = GossipKind::DataColumnSidecar(column); + self.subscribe_kind(kind); + } + } + /// Returns the scoring parameters for a topic if set. #[instrument(parent = None, level = "trace", @@ -1254,6 +1273,21 @@ impl Network { self.update_metadata_bitfields(); } + /// Updates the cgc value in the ENR. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn update_enr_cgc(&mut self, new_custody_group_count: u64) { + if let Err(e) = self.discovery_mut().update_enr_cgc(new_custody_group_count) { + crit!(error = e, "Could not update cgc in ENR"); + } + // update the local meta data which informs our peers of the update during PINGS + self.update_metadata_cgc(new_custody_group_count); + } + /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. #[instrument(parent = None, @@ -1368,6 +1402,28 @@ impl Network { utils::save_metadata_to_disk(&self.network_dir, meta_data); } + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + fn update_metadata_cgc(&mut self, custody_group_count: u64) { + let mut meta_data_w = self.network_globals.local_metadata.write(); + + *meta_data_w.seq_number_mut() += 1; + if let Ok(cgc) = meta_data_w.custody_group_count_mut() { + *cgc = custody_group_count; + } + let seq_number = *meta_data_w.seq_number(); + let meta_data = meta_data_w.clone(); + + drop(meta_data_w); + self.eth2_rpc_mut().update_seq_number(seq_number); + // Save the updated metadata to disk + utils::save_metadata_to_disk(&self.network_dir, meta_data); + } + /// Sends a Ping request to the peer. #[instrument(parent = None, level = "trace", diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 7fa751a905..0c02344208 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -31,10 +31,8 @@ pub struct NetworkGlobals { /// The current state of the backfill sync. pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. - pub sampling_subnets: HashSet, - pub sampling_columns: HashSet, - /// Constant custody group count (CGC) set at startup - custody_group_count: u64, + pub sampling_subnets: RwLock>, + pub sampling_columns: RwLock>, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -87,6 +85,13 @@ impl NetworkGlobals { sampling_columns.extend(columns); } + tracing::debug!( + cgc = custody_group_count, + ?sampling_columns, + ?sampling_subnets, + "Starting node with custody params" + ); + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), @@ -96,14 +101,40 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), - sampling_subnets, - sampling_columns, - custody_group_count, + sampling_subnets: RwLock::new(sampling_subnets), + sampling_columns: RwLock::new(sampling_columns), config, spec, } } + /// Update the sampling subnets based on an updated cgc. + pub fn update_data_column_subnets(&self, custody_group_count: u64) { + // The below `expect` calls will panic on start up if the chain spec config values used + // are invalid + let sampling_size = self + .spec + .sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec"); + let custody_groups = + get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec) + .expect("should compute node custody groups"); + + let mut sampling_subnets = self.sampling_subnets.write(); + for custody_index in &custody_groups { + let subnets = compute_subnets_from_custody_group(*custody_index, &self.spec) + .expect("should compute custody subnets for node"); + sampling_subnets.extend(subnets); + } + + let mut sampling_columns = self.sampling_columns.write(); + for custody_index in &custody_groups { + let columns = compute_columns_for_custody_group(*custody_index, &self.spec) + .expect("should compute custody columns for node"); + sampling_columns.extend(columns); + } + } + /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. pub fn local_enr(&self) -> Enr { @@ -120,19 +151,6 @@ impl NetworkGlobals { self.listen_multiaddrs.read().clone() } - /// Returns true if this node is configured as a PeerDAS supernode - pub fn is_supernode(&self) -> bool { - self.custody_group_count == self.spec.number_of_custody_groups - } - - /// Returns the count of custody columns this node must sample for block import - pub fn custody_columns_count(&self) -> u64 { - // This only panics if the chain spec contains invalid values - self.spec - .sampling_size(self.custody_group_count) - .expect("should compute node sampling size from valid chain spec") - } - /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() @@ -226,10 +244,18 @@ impl NetworkGlobals { enable_light_client_server: self.config.enable_light_client_server, subscribe_all_subnets: self.config.subscribe_all_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, - sampling_subnets: &self.sampling_subnets, + sampling_subnets: self.sampling_subnets.read().clone(), } } + pub fn sampling_columns(&self) -> HashSet { + self.sampling_columns.read().clone() + } + + pub fn sampling_subnets(&self) -> HashSet { + self.sampling_subnets.read().clone() + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, @@ -302,7 +328,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_subnets.len(), + globals.sampling_subnets.read().len(), subnet_sampling_size as usize ); } @@ -325,7 +351,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_columns.len(), + globals.sampling_columns.read().len(), subnet_sampling_size as usize ); } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 56b97303d3..349bfe66a3 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -26,11 +26,11 @@ pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; #[derive(Debug)] -pub struct TopicConfig<'a> { +pub struct TopicConfig { pub enable_light_client_server: bool, pub subscribe_all_subnets: bool, pub subscribe_all_data_column_subnets: bool, - pub sampling_subnets: &'a HashSet, + pub sampling_subnets: HashSet, } /// Returns all the topics the node should subscribe at `fork_name` @@ -85,7 +85,7 @@ pub fn core_topics_to_subscribe( topics.push(GossipKind::DataColumnSidecar(i.into())); } } else { - for subnet in opts.sampling_subnets { + for subnet in &opts.sampling_subnets { topics.push(GossipKind::DataColumnSidecar(*subnet)); } } @@ -126,7 +126,7 @@ pub fn all_topics_at_fork(fork: ForkName, spec: &ChainSpec) -> Vec(fork, &opts, spec) } @@ -521,7 +521,7 @@ mod tests { enable_light_client_server: false, subscribe_all_subnets: false, subscribe_all_data_column_subnets: false, - sampling_subnets, + sampling_subnets: sampling_subnets.clone(), } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index d979ef9265..0dac126909 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -118,6 +118,7 @@ pub async fn build_libp2p_instance( let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name); + let custody_group_count = chain_spec.custody_requirement; let libp2p_context = lighthouse_network::Context { config, enr_fork_id: EnrForkId::default(), @@ -126,7 +127,7 @@ pub async fn build_libp2p_instance( libp2p_registry: None, }; Libp2pInstance( - LibP2PService::new(executor, libp2p_context) + LibP2PService::new(executor, libp2p_context, custody_group_count) .await .expect("should build libp2p instance") .0, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b129b54841..05c7dc287b 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -780,7 +780,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) let all_column_subnets = (0..network_globals.spec.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new); - let custody_column_subnets = network_globals.sampling_subnets.iter(); + let custody_column_subnets = network_globals.sampling_subnets(); // Iterate all subnet values to set to zero the empty entries in peers_per_column_subnet for subnet in all_column_subnets { @@ -794,7 +794,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) // Registering this metric is a duplicate for supernodes but helpful for fullnodes. This way // operators can monitor the health of only the subnets of their interest without complex // Grafana queries. - for subnet in custody_column_subnets { + for subnet in custody_column_subnets.iter() { set_gauge_entry( &PEERS_PER_CUSTODY_COLUMN_SUBNET, &[&format!("{subnet}")], diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index c1a0e50cf7..9cae31facc 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1272,10 +1272,7 @@ impl NetworkBeaconProcessor { let verification_result = self .chain .clone() - .verify_block_for_gossip( - block.clone(), - self.network_globals.custody_columns_count() as usize, - ) + .verify_block_for_gossip(block.clone()) .await; if verification_result.is_ok() { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 5488e019b6..fd119824cf 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -843,7 +843,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { - let custody_columns = self.network_globals.sampling_columns.clone(); + let custody_columns = self.network_globals.sampling_columns(); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { @@ -930,7 +930,12 @@ impl NetworkBeaconProcessor { publish_columns: bool, ) -> Option { // Only supernodes attempt reconstruction - if !self.network_globals.is_supernode() { + if !self + .chain + .data_availability_checker + .custody_context() + .current_is_supernode + { return None; } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index cb9c976404..f6a1069a7f 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -285,7 +285,7 @@ impl TestRig { ) .unwrap() .into_iter() - .filter(|c| network_globals.sampling_columns.contains(&c.index)) + .filter(|c| network_globals.sampling_columns().contains(&c.index)) .collect::>(); (None, Some(custody_columns)) @@ -371,22 +371,12 @@ impl TestRig { } } - pub fn custody_columns_count(&self) -> usize { - self.network_beacon_processor - .network_globals - .custody_columns_count() as usize - } - pub fn enqueue_rpc_block(&self) { let block_root = self.next_block.canonical_root(); self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -398,11 +388,7 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 77204b455d..c9f89ad668 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -10,6 +10,7 @@ use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconPro use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; + use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -105,6 +106,12 @@ pub enum NetworkMessage { ConnectTrustedPeer(Enr), /// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping. DisconnectTrustedPeer(Enr), + /// Custody group count changed due to a change in validators' weight. + /// Subscribe to new subnets and update ENR metadata. + CustodyCountChanged { + new_custody_group_count: u64, + sampling_count: u64, + }, } /// Messages triggered by validators that may trigger a subscription to a subnet. @@ -270,7 +277,15 @@ impl NetworkService { }; // launch libp2p service - let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context).await?; + let (mut libp2p, network_globals) = Network::new( + executor.clone(), + service_context, + beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&beacon_chain.spec), + ) + .await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -745,6 +760,15 @@ impl NetworkService { ); } } + NetworkMessage::CustodyCountChanged { + new_custody_group_count, + sampling_count, + } => { + // subscribe to `sampling_count` subnets + self.libp2p + .subscribe_new_data_column_subnets(sampling_count); + self.libp2p.update_enr_cgc(new_custody_group_count); + } } } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs new file mode 100644 index 0000000000..471f072e3f --- /dev/null +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -0,0 +1,581 @@ +use beacon_chain::{ + block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, +}; +use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + }, + PeerId, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use types::{ + BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + Hash256, RuntimeVariableList, SignedBeaconBlock, Slot, +}; + +use super::range_sync::BatchPeers; + +pub struct RangeBlockComponentsRequest { + /// Blocks we have received awaiting for their corresponding sidecar. + blocks_request: ByRangeRequest>>>, + /// Sidecars we have received awaiting for their corresponding block. + block_data_request: RangeBlockDataRequest, +} + +enum ByRangeRequest { + Active(I), + Complete(T, PeerId), +} + +enum RangeBlockDataRequest { + /// All pre-deneb blocks + NoData, + /// All post-Deneb blocks, regardless of if they have data or not + Blobs(ByRangeRequest>>>), + /// All post-Fulu blocks, regardless of if they have data or not + DataColumns { + requests: HashMap< + DataColumnsByRangeRequestId, + ByRangeRequest>, + >, + expected_column_to_peer: HashMap, + }, +} + +impl RangeBlockComponentsRequest { + pub fn new( + blocks_req_id: BlocksByRangeRequestId, + blobs_req_id: Option, + data_columns: Option<( + Vec, + HashMap, + )>, + ) -> Self { + let block_data_request = if let Some(blobs_req_id) = blobs_req_id { + RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) + } else if let Some((requests, expected_column_to_peer)) = data_columns { + RangeBlockDataRequest::DataColumns { + requests: requests + .into_iter() + .map(|id| (id, ByRangeRequest::Active(id))) + .collect(), + expected_column_to_peer, + } + } else { + RangeBlockDataRequest::NoData + }; + + Self { + blocks_request: ByRangeRequest::Active(blocks_req_id), + block_data_request, + } + } + + pub fn add_blocks( + &mut self, + req_id: BlocksByRangeRequestId, + blocks: Vec>>, + peer_id: PeerId, + ) -> Result<(), String> { + self.blocks_request.finish(req_id, blocks, peer_id) + } + + pub fn add_blobs( + &mut self, + req_id: BlobsByRangeRequestId, + blobs: Vec>>, + peer_id: PeerId, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => Err("received blobs but expected no data".to_owned()), + RangeBlockDataRequest::Blobs(ref mut req) => req.finish(req_id, blobs, peer_id), + RangeBlockDataRequest::DataColumns { .. } => { + Err("received blobs but expected data columns".to_owned()) + } + } + } + + pub fn add_custody_columns( + &mut self, + req_id: DataColumnsByRangeRequestId, + columns: Vec>>, + peer_id: PeerId, + ) -> Result<(), String> { + match &mut self.block_data_request { + RangeBlockDataRequest::NoData => { + Err("received data columns but expected no data".to_owned()) + } + RangeBlockDataRequest::Blobs(_) => { + Err("received data columns but expected blobs".to_owned()) + } + RangeBlockDataRequest::DataColumns { + ref mut requests, .. + } => { + let req = requests + .get_mut(&req_id) + .ok_or(format!("unknown data columns by range req_id {req_id}"))?; + req.finish(req_id, columns, peer_id) + } + } + } + + /// If all internal requests are complete returns a Vec of coupled RpcBlocks + #[allow(clippy::type_complexity)] + pub fn responses( + &self, + spec: &ChainSpec, + ) -> Option>, BatchPeers), String>> { + let Some((blocks, &block_peer)) = self.blocks_request.to_finished() else { + return None; + }; + + match &self.block_data_request { + RangeBlockDataRequest::NoData => Some( + Self::responses_with_blobs(blocks.to_vec(), vec![], spec) + .map(|blocks| (blocks, BatchPeers::new_from_block_peer(block_peer))), + ), + RangeBlockDataRequest::Blobs(request) => { + let Some((blobs, _blob_peer)) = request.to_finished() else { + return None; + }; + Some( + Self::responses_with_blobs(blocks.to_vec(), blobs.to_vec(), spec) + .map(|blocks| (blocks, BatchPeers::new_from_block_peer(block_peer))), + ) + } + RangeBlockDataRequest::DataColumns { + requests, + expected_column_to_peer, + } => { + let mut data_columns = vec![]; + let mut column_peers = HashMap::new(); + for req in requests.values() { + let Some((resp_columns, column_peer)) = req.to_finished() else { + return None; + }; + data_columns.extend(resp_columns.clone()); + for column in resp_columns { + column_peers.insert(column.index, *column_peer); + } + } + + Some( + Self::responses_with_custody_columns( + blocks.to_vec(), + data_columns, + expected_column_to_peer.clone(), + spec, + ) + .map(|blocks| (blocks, BatchPeers::new(block_peer, column_peers))), + ) + } + } + } + + fn responses_with_blobs( + blocks: Vec>>, + blobs: Vec>>, + spec: &ChainSpec, + ) -> Result>, String> { + // There can't be more more blobs than blocks. i.e. sending any blob (empty + // included) for a skipped slot is not permitted. + let mut responses = Vec::with_capacity(blocks.len()); + let mut blob_iter = blobs.into_iter().peekable(); + for block in blocks.into_iter() { + let max_blobs_per_block = spec.max_blobs_per_block(block.epoch()) as usize; + let mut blob_list = Vec::with_capacity(max_blobs_per_block); + while { + let pair_next_blob = blob_iter + .peek() + .map(|sidecar| sidecar.slot() == block.slot()) + .unwrap_or(false); + pair_next_blob + } { + blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?); + } + + let mut blobs_buffer = vec![None; max_blobs_per_block]; + for blob in blob_list { + let blob_index = blob.index as usize; + let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { + return Err("Invalid blob index".to_string()); + }; + if blob_opt.is_some() { + return Err("Repeat blob index".to_string()); + } else { + *blob_opt = Some(blob); + } + } + let blobs = RuntimeVariableList::new( + blobs_buffer.into_iter().flatten().collect::>(), + max_blobs_per_block, + ) + .map_err(|_| "Blobs returned exceeds max length".to_string())?; + responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?) + } + + // if accumulated sidecars is not empty, throw an error. + if blob_iter.next().is_some() { + return Err("Received sidecars that don't pair well".to_string()); + } + + Ok(responses) + } + + fn responses_with_custody_columns( + blocks: Vec>>, + data_columns: DataColumnSidecarList, + expected_custody_columns: HashMap, + spec: &ChainSpec, + ) -> Result>, String> { + // Group data columns by block_root and index + let mut custody_columns_by_block = HashMap::>>::new(); + let mut block_roots_by_slot = HashMap::>::new(); + + for column in data_columns { + let block_root = column.block_root(); + let index = column.index; + + block_roots_by_slot + .entry(column.slot()) + .or_default() + .insert(block_root); + + // Sanity check before casting to `CustodyDataColumn`. But this should never happen + if !expected_custody_columns.contains_key(&index) { + return Err(format!( + "Received column not in expected custody indices {index}" + )); + } + + custody_columns_by_block + .entry(block_root) + .or_default() + .push(CustodyDataColumn::from_asserted_custody(column)); + } + + // Now iterate all blocks ensuring that the block roots of each block and data column match, + // plus we have columns for our custody requirements + let rpc_blocks = blocks + .into_iter() + .map(|block| { + let block_root = get_block_root(&block); + block_roots_by_slot + .entry(block.slot()) + .or_default() + .insert(block_root); + + let custody_columns = custody_columns_by_block + .remove(&block_root) + .unwrap_or_default(); + + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) + .map_err(|e| format!("{e:?}")) + }) + .collect::, _>>()?; + + // Assert that there are no columns left for other blocks + if !custody_columns_by_block.is_empty() { + let remaining_roots = custody_columns_by_block.keys().collect::>(); + return Err(format!("Not all columns consumed: {remaining_roots:?}")); + } + + for (_slot, block_roots) in block_roots_by_slot { + if block_roots.len() > 1 { + // TODO: Some peer(s) are faulty or malicious. This batch will fail processing but + // we want to send it to the process to better attribute fault. Maybe warn log for + // now and track it in a metric? + } + } + + Ok(rpc_blocks) + } +} + +impl ByRangeRequest { + fn finish(&mut self, id: I, data: T, peer_id: PeerId) -> Result<(), String> { + match self { + Self::Active(expected_id) => { + if expected_id != &id { + return Err(format!("unexpected req_id expected {expected_id} got {id}")); + } + *self = Self::Complete(data, peer_id); + Ok(()) + } + Self::Complete(_, _) => Err("request already complete".to_owned()), + } + } + + fn to_finished(&self) -> Option<(&T, &PeerId)> { + match self { + Self::Active(_) => None, + Self::Complete(data, peer_id) => Some((data, peer_id)), + } + } +} + +#[cfg(test)] +mod tests { + use super::RangeBlockComponentsRequest; + use beacon_chain::test_utils::{ + generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, + }; + use lighthouse_network::{ + service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, + DataColumnsByRangeRequestId, Id, RangeRequestId, + }, + PeerId, + }; + use rand::SeedableRng; + use std::{collections::HashMap, sync::Arc}; + use types::{test_utils::XorShiftRng, Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock}; + + fn components_id() -> ComponentsByRangeRequestId { + ComponentsByRangeRequestId { + id: 0, + requester: RangeRequestId::RangeSync { + chain_id: 1, + batch_id: Epoch::new(0), + }, + } + } + + fn blocks_id(parent_request_id: ComponentsByRangeRequestId) -> BlocksByRangeRequestId { + BlocksByRangeRequestId { + id: 1, + parent_request_id, + } + } + + fn blobs_id(parent_request_id: ComponentsByRangeRequestId) -> BlobsByRangeRequestId { + BlobsByRangeRequestId { + id: 1, + parent_request_id, + } + } + + fn columns_id( + id: Id, + parent_request_id: ComponentsByRangeRequestId, + ) -> DataColumnsByRangeRequestId { + DataColumnsByRangeRequestId { + id, + parent_request_id, + } + } + + fn is_finished(info: &RangeBlockComponentsRequest) -> bool { + let spec = test_spec::(); + info.responses(&spec).is_some() + } + + #[test] + fn no_blobs_into_responses() { + let spec = test_spec::(); + let peer = PeerId::random(); + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng, &spec) + .0 + .into() + }) + .collect::>>>(); + + let blocks_req_id = blocks_id(components_id()); + let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None); + + // Send blocks and complete terminate response + info.add_blocks(blocks_req_id, blocks, peer).unwrap(); + + // Assert response is finished and RpcBlocks can be constructed + info.responses(&test_spec::()).unwrap().unwrap(); + } + + #[test] + fn empty_blobs_into_responses() { + let spec = test_spec::(); + let peer = PeerId::random(); + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + // Always generate some blobs. + generate_rand_block_and_blobs::( + ForkName::Deneb, + NumBlobs::Number(3), + &mut rng, + &spec, + ) + .0 + .into() + }) + .collect::>>>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let blobs_req_id = blobs_id(components_id); + let mut info = + RangeBlockComponentsRequest::::new(blocks_req_id, Some(blobs_req_id), None); + + // Send blocks and complete terminate response + info.add_blocks(blocks_req_id, blocks, peer).unwrap(); + // Expect no blobs returned + info.add_blobs(blobs_req_id, vec![], peer).unwrap(); + + // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. + // This makes sure we don't expect blobs here when they have expired. Checking this logic should + // be hendled elsewhere. + info.responses(&test_spec::()).unwrap().unwrap(); + } + + #[test] + fn rpc_block_with_custody_columns() { + let spec = test_spec::(); + let peer = PeerId::random(); + let expects_custody_columns = [1, 2, 3, 4]; + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = expects_custody_columns + .iter() + .enumerate() + .map(|(i, _)| columns_id(i as Id, components_id)) + .collect::>(); + + let column_to_peer = expects_custody_columns + .iter() + .map(|index| (*index, peer)) + .collect::>(); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), column_to_peer)), + ); + // Send blocks and complete terminate response + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + peer, + ) + .unwrap(); + // Assert response is not finished + assert!(!is_finished(&info)); + + // Send data columns + for (i, &column_index) in expects_custody_columns.iter().enumerate() { + info.add_custody_columns( + columns_req_id.get(i).copied().unwrap(), + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) + .collect(), + peer, + ) + .unwrap(); + + if i < expects_custody_columns.len() - 1 { + assert!( + !is_finished(&info), + "requested should not be finished at loop {i}" + ); + } + } + + // All completed construct response + info.responses(&spec).unwrap().unwrap(); + } + + #[test] + fn rpc_block_with_custody_columns_batched() { + let spec = test_spec::(); + let peer = PeerId::random(); + let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; + let expects_custody_columns = batched_column_requests + .iter() + .flatten() + .map(|index| (*index, peer)) + .collect::>(); + let custody_column_request_ids = + (0..batched_column_requests.len() as u32).collect::>(); + let num_of_data_column_requests = custody_column_request_ids.len(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = batched_column_requests + .iter() + .enumerate() + .map(|(i, _)| columns_id(i as Id, components_id)) + .collect::>(); + + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expects_custody_columns.clone())), + ); + + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..4) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + // Send blocks and complete terminate response + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + peer, + ) + .unwrap(); + // Assert response is not finished + assert!(!is_finished(&info)); + + for (i, column_indices) in batched_column_requests.iter().enumerate() { + // Send the set of columns in the same batch request + info.add_custody_columns( + columns_req_id.get(i).copied().unwrap(), + blocks + .iter() + .flat_map(|b| { + b.1.iter() + .filter(|d| column_indices.contains(&d.index)) + .cloned() + }) + .collect::>(), + peer, + ) + .unwrap(); + + if i < num_of_data_column_requests - 1 { + assert!( + !is_finished(&info), + "requested should not be finished at loop {i}" + ); + } + } + + // All completed construct response + info.responses(&spec).unwrap().unwrap(); + } +} diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a81591e58f..dd141286ff 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -774,8 +774,7 @@ impl SyncNetworkContext { // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self .network_globals() - .sampling_columns - .clone() + .sampling_columns() .into_iter() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); @@ -1508,11 +1507,7 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs( - Some(block_root), - block, - self.network_globals().custody_columns_count() as usize, - ); + let block = RpcBlock::new_without_blobs(Some(block_root), block); debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a419a74621..8132269f71 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1313,12 +1313,8 @@ impl TestRig { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }; - let executed_block = AvailabilityPendingExecutedBlock::new( - block, - import_data, - payload_verification_outcome, - self.network_globals.custody_columns_count() as usize, - ); + let executed_block = + AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome); match self .harness .chain diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 382965ec97..6de5902bb1 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -924,19 +924,10 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests - let expected_custody_indices = columns.iter().map(|d| d.index()).collect::>(); - RpcBlock::new_with_custody_columns( - None, - block, - columns.clone(), - expected_custody_indices, - spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block, 0), + None => RpcBlock::new_without_blobs(None, block), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 5b30971fd8..eda57b7d8b 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -339,6 +339,8 @@ pub enum DBColumn { BeaconRandaoMixes, #[strum(serialize = "dht")] DhtEnrs, + #[strum(serialize = "cus")] + CustodyContext, /// DEPRECATED. For Optimistically Imported Merge Transition Blocks #[strum(serialize = "otb")] OptimisticTransitionBlock, @@ -397,6 +399,7 @@ impl DBColumn { | Self::PubkeyCache | Self::BeaconRestorePoint | Self::DhtEnrs + | Self::CustodyContext | Self::OptimisticTransitionBlock => 32, Self::BeaconBlockRoots | Self::BeaconBlockRootsChunked diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b8c74d4dcd..f7bda17eb1 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -960,6 +960,35 @@ impl SseBlobSidecar { } } +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SseDataColumnSidecar { + pub block_root: Hash256, + #[serde(with = "serde_utils::quoted_u64")] + pub index: u64, + pub slot: Slot, + pub kzg_commitments: Vec, + pub versioned_hashes: Vec, +} + +impl SseDataColumnSidecar { + pub fn from_data_column_sidecar( + data_column_sidecar: &DataColumnSidecar, + ) -> SseDataColumnSidecar { + let kzg_commitments = data_column_sidecar.kzg_commitments.to_vec(); + let versioned_hashes = kzg_commitments + .iter() + .map(|c| c.calculate_versioned_hash()) + .collect(); + SseDataColumnSidecar { + block_root: data_column_sidecar.block_root(), + index: data_column_sidecar.index, + slot: data_column_sidecar.slot(), + kzg_commitments, + versioned_hashes, + } + } +} + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseFinalizedCheckpoint { pub block: Hash256, @@ -1110,6 +1139,7 @@ pub enum EventKind { SingleAttestation(Box), Block(SseBlock), BlobSidecar(SseBlobSidecar), + DataColumnSidecar(SseDataColumnSidecar), FinalizedCheckpoint(SseFinalizedCheckpoint), Head(SseHead), VoluntaryExit(SignedVoluntaryExit), @@ -1133,6 +1163,7 @@ impl EventKind { EventKind::Head(_) => "head", EventKind::Block(_) => "block", EventKind::BlobSidecar(_) => "blob_sidecar", + EventKind::DataColumnSidecar(_) => "data_column_sidecar", EventKind::Attestation(_) => "attestation", EventKind::SingleAttestation(_) => "single_attestation", EventKind::VoluntaryExit(_) => "voluntary_exit", @@ -1168,6 +1199,11 @@ impl EventKind { "blob_sidecar" => Ok(EventKind::BlobSidecar(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Blob Sidecar: {:?}", e)), )?)), + "data_column_sidecar" => Ok(EventKind::DataColumnSidecar( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Data Column Sidecar: {:?}", e)) + })?, + )), "chain_reorg" => Ok(EventKind::ChainReorg(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Chain Reorg: {:?}", e)), )?)), @@ -1257,6 +1293,7 @@ pub enum EventTopic { Head, Block, BlobSidecar, + DataColumnSidecar, Attestation, SingleAttestation, VoluntaryExit, @@ -1283,6 +1320,7 @@ impl FromStr for EventTopic { "head" => Ok(EventTopic::Head), "block" => Ok(EventTopic::Block), "blob_sidecar" => Ok(EventTopic::BlobSidecar), + "data_column_sidecar" => Ok(EventTopic::DataColumnSidecar), "attestation" => Ok(EventTopic::Attestation), "single_attestation" => Ok(EventTopic::SingleAttestation), "voluntary_exit" => Ok(EventTopic::VoluntaryExit), @@ -1310,6 +1348,7 @@ impl fmt::Display for EventTopic { EventTopic::Head => write!(f, "head"), EventTopic::Block => write!(f, "block"), EventTopic::BlobSidecar => write!(f, "blob_sidecar"), + EventTopic::DataColumnSidecar => write!(f, "data_column_sidecar"), EventTopic::Attestation => write!(f, "attestation"), EventTopic::SingleAttestation => write!(f, "single_attestation"), EventTopic::VoluntaryExit => write!(f, "voluntary_exit"), diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 59472e2edc..b4fd5afe87 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -203,6 +203,8 @@ pub struct ChainSpec { pub data_column_sidecar_subnet_count: u64, pub samples_per_slot: u64, pub custody_requirement: u64, + pub validator_custody_requirement: u64, + pub balance_per_additional_custody_group: u64, /* * Networking @@ -731,14 +733,6 @@ impl ChainSpec { Ok(std::cmp::max(custody_column_count, self.samples_per_slot)) } - pub fn custody_group_count(&self, is_supernode: bool) -> u64 { - if is_supernode { - self.number_of_custody_groups - } else { - self.custody_requirement - } - } - pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator { (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) } @@ -975,6 +969,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1309,6 +1305,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1650,6 +1648,12 @@ pub struct Config { #[serde(default = "BlobSchedule::default")] #[serde(skip_serializing_if = "BlobSchedule::is_empty")] blob_schedule: BlobSchedule, + #[serde(default = "default_validator_custody_requirement")] + #[serde(with = "serde_utils::quoted_u64")] + validator_custody_requirement: u64, + #[serde(default = "default_balance_per_additional_custody_group")] + #[serde(with = "serde_utils::quoted_u64")] + balance_per_additional_custody_group: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1815,6 +1819,14 @@ const fn default_samples_per_slot() -> u64 { 8 } +const fn default_validator_custody_requirement() -> u64 { + 8 +} + +const fn default_balance_per_additional_custody_group() -> u64 { + 32000000000 +} + fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; RuntimeVariableList::::from_vec( @@ -2024,6 +2036,8 @@ impl Config { samples_per_slot: spec.samples_per_slot, custody_requirement: spec.custody_requirement, blob_schedule: spec.blob_schedule.clone(), + validator_custody_requirement: spec.validator_custody_requirement, + balance_per_additional_custody_group: spec.balance_per_additional_custody_group, } } @@ -2103,6 +2117,8 @@ impl Config { samples_per_slot, custody_requirement, ref blob_schedule, + validator_custody_requirement, + balance_per_additional_custody_group, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -2187,6 +2203,8 @@ impl Config { samples_per_slot, custody_requirement, blob_schedule: blob_schedule.clone(), + validator_custody_requirement, + balance_per_additional_custody_group, ..chain_spec.clone() }) diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml index b16be34b89..c896b11330 100644 --- a/scripts/local_testnet/network_params_das.yaml +++ b/scripts/local_testnet/network_params_das.yaml @@ -2,7 +2,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: - --subscribe-all-data-column-subnets - --subscribe-all-subnets @@ -13,7 +13,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: # Note: useful for testing range sync (only produce block if the node is in sync to prevent forking) - --sync-tolerance-epochs=0 diff --git a/scripts/local_testnet/start_local_testnet.sh b/scripts/local_testnet/start_local_testnet.sh index 8e8859ca0e..442e6fd98d 100755 --- a/scripts/local_testnet/start_local_testnet.sh +++ b/scripts/local_testnet/start_local_testnet.sh @@ -81,7 +81,7 @@ fi if [ "$BUILD_IMAGE" = true ]; then echo "Building Lighthouse Docker image." ROOT_DIR="$SCRIPT_DIR/../.." - docker build --build-arg FEATURES=portable -f $ROOT_DIR/Dockerfile -t $LH_IMAGE_NAME $ROOT_DIR + docker build --build-arg FEATURES=portable,spec-minimal -f $ROOT_DIR/Dockerfile -t $LH_IMAGE_NAME $ROOT_DIR else echo "Not rebuilding Lighthouse Docker image." fi diff --git a/scripts/tests/checkpoint-sync-config-devnet.yaml b/scripts/tests/checkpoint-sync-config-devnet.yaml index c536d26b3b..de3020a884 100644 --- a/scripts/tests/checkpoint-sync-config-devnet.yaml +++ b/scripts/tests/checkpoint-sync-config-devnet.yaml @@ -3,18 +3,18 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: true - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: false checkpoint_sync_enabled: true -checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-0.ethpandaops.io" +checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-1.ethpandaops.io" global_log_level: debug network_params: - network: fusaka-devnet-0 + network: fusaka-devnet-1 diff --git a/scripts/tests/genesis-sync-config-electra.yaml b/scripts/tests/genesis-sync-config-electra.yaml new file mode 100644 index 0000000000..153f754c94 --- /dev/null +++ b/scripts/tests/genesis-sync-config-electra.yaml @@ -0,0 +1,22 @@ +# Kurtosis config file for testing sync on a local devnet. +participants: + - cl_type: lighthouse + cl_image: lighthouse:local + count: 2 + # nodes without validators, used for testing sync. + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: true # no supernode in Electra, this is for future proof + validator_count: 0 + - cl_type: lighthouse + cl_image: lighthouse:local + supernode: false + validator_count: 0 +network_params: + seconds_per_slot: 6 + electra_fork_epoch: 0 + preset: "minimal" +additional_services: + - tx_fuzz + - spamoor +global_log_level: debug diff --git a/scripts/tests/genesis-sync-config-fulu.yaml b/scripts/tests/genesis-sync-config-fulu.yaml new file mode 100644 index 0000000000..91aa4d1ffd --- /dev/null +++ b/scripts/tests/genesis-sync-config-fulu.yaml @@ -0,0 +1,29 @@ +# Kurtosis config file for testing sync on a local devnet. +participants: + - cl_type: lighthouse + cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 + count: 2 + # nodes without validators, used for testing sync. + - cl_type: lighthouse + cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 + supernode: true + validator_count: 0 + - cl_type: lighthouse + cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 + supernode: false + validator_count: 0 +network_params: + seconds_per_slot: 6 + electra_fork_epoch: 0 + fulu_fork_epoch: 1 + preset: "minimal" +additional_services: + - tx_fuzz + - spamoor +global_log_level: debug diff --git a/scripts/tests/genesis-sync.sh b/scripts/tests/genesis-sync.sh new file mode 100755 index 0000000000..39628c9e73 --- /dev/null +++ b/scripts/tests/genesis-sync.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash +# +# Genesis sync test on a local network. +# +# Start a local testnet, shut down non-validator nodes for a period, then restart them +# and monitor their sync progress from genesis to head. +SCRIPT_DIR="$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" + +ENCLAVE_NAME=${1:-genesis-sync-testnet} +CONFIG=${2:-$SCRIPT_DIR/genesis-sync-config-electra.yaml} +FORK_TYPE=${3:-electra} # electra or fulu +OFFLINE_DURATION_SECS=${4:-120} # stopped duration of non validating nodes + +# Test configuration +# ------------------------------------------------------ +# Interval for polling the /lighthouse/syncing endpoint for sync status +# Reduce the polling time so that some progress can be seen +POLL_INTERVAL_SECS=0.5 +# Timeout for this test, if the nodes fail to sync, fail the test. +TIMEOUT_MINS=5 +TIMEOUT_SECS=$((TIMEOUT_MINS * 60)) +# ------------------------------------------------------ + +echo "Starting genesis sync test with:" +echo " Fork: $FORK_TYPE" +echo " Offline duration: ${OFFLINE_DURATION_SECS}s" + +# Polls a node's sync status +poll_node() { + local node_type=$1 + local url=${node_urls[$node_type]} + + response=$(curl -s "${url}/lighthouse/syncing" 2>/dev/null) + + if [ -z "$response" ] || [ "$response" = "null" ]; then + echo "${node_type} status: No response or null response" + return + fi + + # Print syncing status + sync_state=$(echo "$response" | jq -r 'if (.data | type) == "object" then "object" else "string" end' 2>/dev/null) + + if [ "$sync_state" = "object" ]; then + status=$(echo "$response" | jq -r '.data | keys[0] // "Unknown"') + fields=$(echo "$response" | jq -r ".data.${status} | to_entries | map(\"\(.key): \(.value)\") | join(\", \")") + echo "${node_type} status: ${status}, ${fields}" + else + status=$(echo "$response" | jq -r '.data' 2>/dev/null) + echo "${node_type} status: ${status:-Unknown}" + + # The test is complete when the node is synced + if [ "$status" = "Synced" ]; then + mark_node_complete "$node_type" + fi + fi +} + +# Marks a node as complete and record time +mark_node_complete() { + local node_type=$1 + if [ "${node_completed[$node_type]}" = false ]; then + node_completed[$node_type]=true + node_complete_time[$node_type]=$(date +%s) + echo "${node_type} completed sync in $((node_complete_time[$node_type] - sync_start_time)) seconds" + fi +} + +exit_and_dump_logs() { + local exit_code=$1 + echo "Shutting down..." + $SCRIPT_DIR/../local_testnet/stop_local_testnet.sh $ENCLAVE_NAME + echo "Test completed with exit code $exit_code." + exit $exit_code +} + +# Start the nodes +$SCRIPT_DIR/../local_testnet/start_local_testnet.sh -e $ENCLAVE_NAME -b false -n $CONFIG +if [ $? -ne 0 ]; then + echo "Failed to start local testnet" + exit_and_dump_logs 1 +fi + +# Wait for 10s before stopping non-validating nodes +sleep 10 + +# These are non validating nodes +supernode="cl-3-lighthouse-geth" +fullnode="cl-4-lighthouse-geth" + +# Stop the non-validator nodes +kurtosis service stop $ENCLAVE_NAME $supernode +kurtosis service stop $ENCLAVE_NAME $fullnode + +echo "Non-validator nodes stopped. Waiting ${OFFLINE_DURATION_SECS} seconds..." + +# Display the time every 10s when the nodes are stopped +remaining_time=$OFFLINE_DURATION_SECS +while [ $remaining_time -gt 0 ]; do + sleep 10 + remaining_time=$((remaining_time - 10)) + echo "Nodes are stopped for $((OFFLINE_DURATION_SECS - remaining_time))s, ${remaining_time}s remains..." +done + +echo "Resuming non-validator nodes..." + +# Resume the non validating nodes +kurtosis service start $ENCLAVE_NAME $supernode +kurtosis service start $ENCLAVE_NAME $fullnode + +# The time at which syncing starts after the node was stopped +sync_start_time=$(date +%s) + +# Get beacon API URLs for non validating nodes for query +supernode_url=$(kurtosis port print $ENCLAVE_NAME $supernode http) +fullnode_url=$(kurtosis port print $ENCLAVE_NAME $fullnode http) + +# Initialize statuses +declare -A node_completed +declare -A node_complete_time +declare -A node_urls + +node_urls["supernode"]="$supernode_url" +node_urls["fullnode"]="$fullnode_url" +node_completed["supernode"]=false +node_completed["fullnode"]=false + +echo "Polling sync status until nodes are synced or timeout of ${TIMEOUT_MINS} mins" + +while [ "${node_completed[supernode]}" = false ] || [ "${node_completed[fullnode]}" = false ]; do + current_time=$(date +%s) + elapsed=$((current_time - sync_start_time)) + + if [ "$elapsed" -ge "$TIMEOUT_SECS" ]; then + echo "ERROR: Nodes timed out syncing after ${TIMEOUT_MINS} minutes. Exiting." + exit_and_dump_logs 1 + fi + + # Poll each node that hasn't completed yet + for node in "supernode" "fullnode"; do + if [ "${node_completed[$node]}" = false ]; then + poll_node "$node" + fi + done + + sleep $POLL_INTERVAL_SECS +done + +echo "Genesis sync test complete! Both supernode and fullnode have synced successfully." +echo "Supernode time: $((node_complete_time[supernode] - sync_start_time)) seconds" +echo "Fullnode time: $((node_complete_time[fullnode] - sync_start_time)) seconds" +exit_and_dump_logs 0 \ No newline at end of file diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index b507383190..af3b0bce2d 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -520,7 +520,7 @@ impl Tester { let result: Result, _> = self .block_on_dangerous(self.harness.chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone(), 0), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()),