From 5f208bb8582984bc762456eeb91e41d051fa9630 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 11 Jun 2025 11:10:06 -0700 Subject: [PATCH] Implement basic validator custody framework (no backfill) (#7578) Resolves #6767 This PR implements a basic version of validator custody. - It introduces a new `CustodyContext` object which contains info regarding number of validators attached to a node and the custody count they contribute to the cgc. - The `CustodyContext` is added in the da_checker and has methods for returning the current cgc and the number of columns to sample at head. Note that the logic for returning the cgc existed previously in the network globals. - To estimate the number of validators attached, we use the `beacon_committee_subscriptions` endpoint. This might overestimate the number of validators actually publishing attestations from the node in the case of multi BN setups. We could also potentially use the `publish_attestations` endpoint to get a more conservative estimate at a later point. - Anytime there's a change in the `custody_group_count` due to addition/removal of validators, the custody context should send an event on a broadcast channnel. The only subscriber for the channel exists in the network service which simply subscribes to more subnets. There can be additional subscribers in sync that will start a backfill once the cgc changes. TODO - [ ] **NOT REQUIRED:** Currently, the logic only handles an increase in validator count and does not handle a decrease. We should ideally unsubscribe from subnets when the cgc has decreased. - [ ] **NOT REQUIRED:** Add a service in the `CustodyContext` that emits an event once `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS ` passes after updating the current cgc. This event should be picked up by a subscriber which updates the enr and metadata. - [x] Add more tests --- beacon_node/beacon_chain/src/beacon_chain.rs | 25 +- .../beacon_chain/src/block_verification.rs | 24 +- .../src/block_verification_types.rs | 16 - beacon_node/beacon_chain/src/builder.rs | 26 +- .../src/data_availability_checker.rs | 43 +- .../overflow_lru_cache.rs | 71 +-- .../state_lru_cache.rs | 8 - beacon_node/beacon_chain/src/lib.rs | 3 + .../beacon_chain/src/persisted_custody.rs | 46 ++ beacon_node/beacon_chain/src/test_utils.rs | 35 +- .../beacon_chain/src/validator_custody.rs | 447 ++++++++++++++++++ .../beacon_chain/tests/block_verification.rs | 95 +--- .../tests/payload_invalidation.rs | 15 +- beacon_node/beacon_chain/tests/store_tests.rs | 12 +- beacon_node/http_api/src/lib.rs | 47 +- beacon_node/http_api/src/publish_blocks.rs | 11 +- .../tests/broadcast_validation_tests.rs | 22 +- .../lighthouse_network/src/discovery/mod.rs | 10 + .../lighthouse_network/src/service/mod.rs | 66 ++- .../lighthouse_network/src/types/globals.rs | 72 ++- .../lighthouse_network/src/types/topics.rs | 10 +- .../lighthouse_network/tests/common.rs | 3 +- beacon_node/network/src/metrics.rs | 4 +- .../gossip_methods.rs | 5 +- .../src/network_beacon_processor/mod.rs | 9 +- .../src/network_beacon_processor/tests.rs | 20 +- beacon_node/network/src/service.rs | 26 +- .../src/sync/block_sidecar_coupling.rs | 12 +- .../network/src/sync/network_context.rs | 13 +- .../network/src/sync/range_sync/chain.rs | 2 +- beacon_node/network/src/sync/tests/lookups.rs | 8 +- beacon_node/network/src/sync/tests/range.rs | 12 +- beacon_node/store/src/lib.rs | 3 + consensus/types/src/chain_spec.rs | 34 +- scripts/local_testnet/network_params_das.yaml | 4 +- .../tests/checkpoint-sync-config-devnet.yaml | 8 +- scripts/tests/genesis-sync-config-fulu.yaml | 9 +- testing/ef_tests/src/cases/fork_choice.rs | 2 +- 38 files changed, 928 insertions(+), 350 deletions(-) create mode 100644 beacon_node/beacon_chain/src/persisted_custody.rs create mode 100644 beacon_node/beacon_chain/src/validator_custody.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0c9eb33516..50efb367a8 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -58,12 +58,14 @@ use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; +use crate::validator_custody::CustodyContextSsz; use crate::validator_monitor::{ get_slot_delay_ms, timestamp_now, ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, @@ -670,6 +672,23 @@ impl BeaconChain { Ok(()) } + /// Persists the custody information to disk. + pub fn persist_custody_context(&self) -> Result<(), Error> { + let custody_context: CustodyContextSsz = self + .data_availability_checker + .custody_context() + .as_ref() + .into(); + debug!(?custody_context, "Persisting custody context to store"); + + persist_custody_context::( + self.store.clone(), + custody_context, + )?; + + Ok(()) + } + /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -2990,7 +3009,6 @@ impl BeaconChain { pub async fn verify_block_for_gossip( self: &Arc, block: Arc>, - custody_columns_count: usize, ) -> Result, BlockError> { let chain = self.clone(); self.task_executor @@ -3000,7 +3018,7 @@ impl BeaconChain { let slot = block.slot(); let graffiti_string = block.message().body().graffiti().as_utf8_lossy(); - match GossipVerifiedBlock::new(block, &chain, custody_columns_count) { + match GossipVerifiedBlock::new(block, &chain) { Ok(verified) => { let commitments_formatted = verified.block.commitments_formatted(); debug!( @@ -7232,7 +7250,8 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_fork_choice()?; self.persist_op_pool()?; - self.persist_eth1_cache() + self.persist_eth1_cache()?; + self.persist_custody_context() }; if let Err(e) = drop() { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 1bbf845fa5..ba501f617d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -685,7 +685,6 @@ pub struct GossipVerifiedBlock { pub block_root: Hash256, parent: Option>, consensus_context: ConsensusContext, - custody_columns_count: usize, } /// A wrapper around a `SignedBeaconBlock` that indicates that all signatures (except the deposit @@ -721,7 +720,6 @@ pub trait IntoGossipVerifiedBlock: Sized { fn into_gossip_verified_block( self, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result, BlockError>; fn inner_block(&self) -> Arc>; } @@ -730,7 +728,6 @@ impl IntoGossipVerifiedBlock for GossipVerifiedBlock fn into_gossip_verified_block( self, _chain: &BeaconChain, - _custody_columns_count: usize, ) -> Result, BlockError> { Ok(self) } @@ -743,9 +740,8 @@ impl IntoGossipVerifiedBlock for Arc, - custody_columns_count: usize, ) -> Result, BlockError> { - GossipVerifiedBlock::new(self, chain, custody_columns_count) + GossipVerifiedBlock::new(self, chain) } fn inner_block(&self) -> Arc> { @@ -821,7 +817,6 @@ impl GossipVerifiedBlock { pub fn new( block: Arc>, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // If the block is valid for gossip we don't supply it to the slasher here because // we assume it will be transformed into a fully verified block. We *do* need to supply @@ -831,14 +826,12 @@ impl GossipVerifiedBlock { // The `SignedBeaconBlock` and `SignedBeaconBlockHeader` have the same canonical root, // but it's way quicker to calculate root of the header since the hash of the tree rooted // at `BeaconBlockBody` is already computed in the header. - Self::new_without_slasher_checks(block, &header, chain, custody_columns_count).map_err( - |e| { - process_block_slash_info::<_, BlockError>( - chain, - BlockSlashInfo::from_early_error_block(header, e), - ) - }, - ) + Self::new_without_slasher_checks(block, &header, chain).map_err(|e| { + process_block_slash_info::<_, BlockError>( + chain, + BlockSlashInfo::from_early_error_block(header, e), + ) + }) } /// As for new, but doesn't pass the block to the slasher. @@ -846,7 +839,6 @@ impl GossipVerifiedBlock { block: Arc>, block_header: &SignedBeaconBlockHeader, chain: &BeaconChain, - custody_columns_count: usize, ) -> Result { // Ensure the block is the correct structure for the fork at `block.slot()`. block @@ -1054,7 +1046,6 @@ impl GossipVerifiedBlock { block_root, parent, consensus_context, - custody_columns_count, }) } @@ -1202,7 +1193,6 @@ impl SignatureVerifiedBlock { block: MaybeAvailableBlock::AvailabilityPending { block_root: from.block_root, block, - custody_columns_count: from.custody_columns_count, }, block_root: from.block_root, parent: Some(parent), diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index dab54dc823..f7002dcee1 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -31,7 +31,6 @@ use types::{ pub struct RpcBlock { block_root: Hash256, block: RpcBlockInner, - custody_columns_count: usize, } impl Debug for RpcBlock { @@ -45,10 +44,6 @@ impl RpcBlock { self.block_root } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - pub fn as_block(&self) -> &SignedBeaconBlock { match &self.block { RpcBlockInner::Block(block) => block, @@ -103,14 +98,12 @@ impl RpcBlock { pub fn new_without_blobs( block_root: Option, block: Arc>, - custody_columns_count: usize, ) -> Self { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); Self { block_root, block: RpcBlockInner::Block(block), - custody_columns_count, } } @@ -152,8 +145,6 @@ impl RpcBlock { Ok(Self { block_root, block: inner, - // Block is before PeerDAS - custody_columns_count: 0, }) } @@ -161,7 +152,6 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - custody_columns_count: usize, spec: &ChainSpec, ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); @@ -182,7 +172,6 @@ impl RpcBlock { Ok(Self { block_root, block: inner, - custody_columns_count, }) } @@ -250,12 +239,10 @@ impl ExecutedBlock { MaybeAvailableBlock::AvailabilityPending { block_root: _, block: pending_block, - custody_columns_count, } => Self::AvailabilityPending(AvailabilityPendingExecutedBlock::new( pending_block, import_data, payload_verification_outcome, - custody_columns_count, )), } } @@ -321,7 +308,6 @@ pub struct AvailabilityPendingExecutedBlock { pub block: Arc>, pub import_data: BlockImportData, pub payload_verification_outcome: PayloadVerificationOutcome, - pub custody_columns_count: usize, } impl AvailabilityPendingExecutedBlock { @@ -329,13 +315,11 @@ impl AvailabilityPendingExecutedBlock { block: Arc>, import_data: BlockImportData, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, ) -> Self { Self { block, import_data, payload_verification_outcome, - custody_columns_count, } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2346aca00b..ae07584f27 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -13,10 +13,12 @@ use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; +use crate::CustodyContext; use crate::{ BeaconChain, BeaconChainTypes, BeaconForkChoiceStore, BeaconSnapshot, Eth1Chain, Eth1ChainBackend, ServerSentEventHandler, @@ -926,6 +928,20 @@ where } }; + // Load the persisted custody context from the db and initialize + // the context for this run + let custody_context = if let Some(custody) = + load_custody_context::(store.clone()) + { + Arc::new(CustodyContext::new_from_persisted_custody_context( + custody, + self.import_all_data_columns, + )) + } else { + Arc::new(CustodyContext::new(self.import_all_data_columns)) + }; + debug!(?custody_context, "Loading persisted custody context"); + let beacon_chain = BeaconChain { spec: self.spec.clone(), config: self.chain_config, @@ -999,8 +1015,14 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, self.spec) - .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + self.kzg.clone(), + store, + custody_context, + self.spec, + ) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 0fd417389b..91ff5fb644 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -5,7 +5,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; -use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore}; +use crate::{metrics, BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext}; use kzg::Kzg; use slot_clock::SlotClock; use std::fmt; @@ -74,6 +74,7 @@ pub struct DataAvailabilityChecker { availability_cache: Arc>, slot_clock: T::SlotClock, kzg: Arc, + custody_context: Arc, spec: Arc, } @@ -111,17 +112,28 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { - let inner = DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let inner = DataAvailabilityCheckerInner::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_context.clone(), + spec.clone(), + )?; Ok(Self { availability_cache: Arc::new(inner), slot_clock, kzg, + custody_context, spec, }) } + pub fn custody_context(&self) -> Arc { + self.custody_context.clone() + } + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( @@ -297,7 +309,6 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs { @@ -311,11 +322,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } if self.data_columns_required_for_block(&block) { @@ -340,11 +347,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), })) } else { - Ok(MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - }) + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) }; } @@ -401,7 +404,6 @@ impl DataAvailabilityChecker { } for block in blocks { - let custody_columns_count = block.custody_columns_count(); let (block_root, block, blobs, data_columns) = block.deconstruct(); let maybe_available_block = if self.blobs_required_for_block(&block) { @@ -414,11 +416,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else if self.data_columns_required_for_block(&block) { if let Some(data_columns) = data_columns { @@ -432,11 +430,7 @@ impl DataAvailabilityChecker { spec: self.spec.clone(), }) } else { - MaybeAvailableBlock::AvailabilityPending { - block_root, - block, - custody_columns_count, - } + MaybeAvailableBlock::AvailabilityPending { block_root, block } } } else { MaybeAvailableBlock::Available(AvailableBlock { @@ -786,7 +780,6 @@ pub enum MaybeAvailableBlock { AvailabilityPending { block_root: Hash256, block: Arc>, - custody_columns_count: usize, }, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 3478c183f3..36c4f2cdc1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -8,6 +8,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::{Availability, AvailabilityCheckError}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; +use crate::CustodyContext; use lru::LruCache; use parking_lot::RwLock; use std::cmp::Ordering; @@ -158,6 +159,7 @@ impl PendingComponents { pub fn make_available( &mut self, spec: &Arc, + num_expected_columns: u64, recover: R, ) -> Result>, AvailabilityCheckError> where @@ -171,12 +173,11 @@ impl PendingComponents { }; let num_expected_blobs = block.num_blobs_expected(); - + let num_expected_columns = num_expected_columns as usize; let blob_data = if num_expected_blobs == 0 { Some(AvailableBlockData::NoData) } else if spec.is_peer_das_enabled_for_epoch(block.epoch()) { let num_received_columns = self.verified_data_columns.len(); - let num_expected_columns = block.custody_columns_count(); match num_received_columns.cmp(&num_expected_columns) { Ordering::Greater => { // Should never happen @@ -254,7 +255,6 @@ impl PendingComponents { block, import_data, payload_verification_outcome, - custody_columns_count: _, } = recover(block.clone())?; let available_block = AvailableBlock { @@ -308,19 +308,21 @@ impl PendingComponents { }) } - pub fn status_str(&self, block_epoch: Epoch, spec: &ChainSpec) -> String { + pub fn status_str( + &self, + block_epoch: Epoch, + num_expected_columns: Option, + spec: &ChainSpec, + ) -> String { let block_count = if self.executed_block.is_some() { 1 } else { 0 }; if spec.is_peer_das_enabled_for_epoch(block_epoch) { - let custody_columns_count = if let Some(block) = self.get_cached_block() { - &block.custody_columns_count().to_string() - } else { - "?" - }; format!( "block {} data_columns {}/{}", block_count, self.verified_data_columns.len(), - custody_columns_count, + num_expected_columns + .map(|c| c.to_string()) + .unwrap_or("?".into()) ) } else { let num_expected_blobs = if let Some(block) = self.get_cached_block() { @@ -346,6 +348,7 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, + custody_context: Arc, spec: Arc, } @@ -362,11 +365,13 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_context: Arc, spec: Arc, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), state_cache: StateLRUCache::new(beacon_store, spec.clone()), + custody_context, spec, }) } @@ -470,13 +475,15 @@ impl DataAvailabilityCheckerInner { debug!( component = "blobs", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, None, &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -519,16 +526,19 @@ impl DataAvailabilityCheckerInner { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "data_columns", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = + pending_components.make_available(&self.spec, num_expected_columns, |block| { + self.state_cache.recover_pending_executed_block(block) + })? + { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -612,17 +622,20 @@ impl DataAvailabilityCheckerInner { // Merge in the block. pending_components.merge_block(diet_executed_block); + let num_expected_columns = self.custody_context.sampling_size(Some(epoch), &self.spec); debug!( component = "block", ?block_root, - status = pending_components.status_str(epoch, &self.spec), + status = pending_components.status_str(epoch, Some(num_expected_columns), &self.spec), "Component added to data availability checker" ); // Check if we have all components and entire set is consistent. - if let Some(available_block) = pending_components.make_available(&self.spec, |block| { - self.state_cache.recover_pending_executed_block(block) - })? { + if let Some(available_block) = pending_components.make_available( + &self.spec, + self.custody_context.sampling_size(Some(epoch), &self.spec), + |block| self.state_cache.recover_pending_executed_block(block), + )? { // We keep the pending components in the availability cache during block import (#5845). write_lock.put(block_root, pending_components); drop(write_lock); @@ -700,7 +713,6 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; - const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8; fn get_store_with_spec( db_path: &TempDir, @@ -861,7 +873,6 @@ mod test { block, import_data, payload_verification_outcome, - custody_columns_count: DEFAULT_TEST_CUSTODY_COLUMN_COUNT, }; (availability_pending_block, gossip_verified_blobs) @@ -888,9 +899,15 @@ mod test { let spec = harness.spec.clone(); let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); + let custody_context = Arc::new(CustodyContext::new(false)); let cache = Arc::new( - DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new( + capacity_non_zero, + test_store, + custody_context, + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1239,8 +1256,6 @@ mod pending_components_tests { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }, - // Default custody columns count, doesn't matter here - custody_columns_count: 8, }; (block.into(), blobs, invalid_blobs) } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 5fe674f30c..f73857f468 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -24,7 +24,6 @@ pub struct DietAvailabilityPendingExecutedBlock { parent_eth1_finalization_data: Eth1FinalizationData, consensus_context: OnDiskConsensusContext, payload_verification_outcome: PayloadVerificationOutcome, - custody_columns_count: usize, } /// just implementing the same methods as `AvailabilityPendingExecutedBlock` @@ -54,10 +53,6 @@ impl DietAvailabilityPendingExecutedBlock { .unwrap_or_default() } - pub fn custody_columns_count(&self) -> usize { - self.custody_columns_count - } - /// Returns the epoch corresponding to `self.slot()`. pub fn epoch(&self) -> Epoch { self.block.slot().epoch(E::slots_per_epoch()) @@ -107,7 +102,6 @@ impl StateLRUCache { executed_block.import_data.consensus_context, ), payload_verification_outcome: executed_block.payload_verification_outcome, - custody_columns_count: executed_block.custody_columns_count, } } @@ -137,7 +131,6 @@ impl StateLRUCache { .into_consensus_context(), }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, - custody_columns_count: diet_executed_block.custody_columns_count, }) } @@ -224,7 +217,6 @@ impl From> value.import_data.consensus_context, ), payload_verification_outcome: value.payload_verification_outcome, - custody_columns_count: value.custody_columns_count, } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 5b79312d37..0eec6dc770 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,6 +48,7 @@ pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; mod persisted_beacon_chain; +pub mod persisted_custody; mod persisted_fork_choice; mod pre_finalization_cache; pub mod proposer_prep_service; @@ -59,6 +60,7 @@ pub mod summaries_dag; pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; +pub mod validator_custody; pub mod validator_monitor; pub mod validator_pubkey_cache; @@ -100,3 +102,4 @@ pub use state_processing::per_block_processing::errors::{ }; pub use store; pub use types; +pub use validator_custody::CustodyContext; diff --git a/beacon_node/beacon_chain/src/persisted_custody.rs b/beacon_node/beacon_chain/src/persisted_custody.rs new file mode 100644 index 0000000000..6ede473b36 --- /dev/null +++ b/beacon_node/beacon_chain/src/persisted_custody.rs @@ -0,0 +1,46 @@ +use crate::validator_custody::CustodyContextSsz; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use store::{DBColumn, Error as StoreError, HotColdDB, ItemStore, StoreItem}; +use types::{EthSpec, Hash256}; + +/// 32-byte key for accessing the `CustodyContext`. All zero because `CustodyContext` has its own column. +pub const CUSTODY_DB_KEY: Hash256 = Hash256::ZERO; + +pub struct PersistedCustody(CustodyContextSsz); + +pub fn load_custody_context, Cold: ItemStore>( + store: Arc>, +) -> Option { + let res: Result, _> = + store.get_item::(&CUSTODY_DB_KEY); + // Load context from the store + match res { + Ok(Some(c)) => Some(c.0), + _ => None, + } +} + +/// Attempt to persist the custody context object to `self.store`. +pub fn persist_custody_context, Cold: ItemStore>( + store: Arc>, + custody_context: CustodyContextSsz, +) -> Result<(), store::Error> { + store.put_item(&CUSTODY_DB_KEY, &PersistedCustody(custody_context)) +} + +impl StoreItem for PersistedCustody { + fn db_column() -> DBColumn { + DBColumn::CustodyContext + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + let custody_context = CustodyContextSsz::from_ssz_bytes(bytes)?; + + Ok(PersistedCustody(custody_context)) + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index db6968b662..c2c5d8d626 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -609,12 +609,6 @@ where let chain = builder.build().expect("should build"); - let sampling_column_count = if self.import_all_data_columns { - chain.spec.number_of_custody_groups as usize - } else { - chain.spec.custody_requirement as usize - }; - BeaconChainHarness { spec: chain.spec.clone(), chain: Arc::new(chain), @@ -625,7 +619,6 @@ where mock_execution_layer: self.mock_execution_layer, mock_builder: None, rng: make_rng(), - sampling_column_count, } } } @@ -682,7 +675,6 @@ pub struct BeaconChainHarness { pub mock_execution_layer: Option>, pub mock_builder: Option>>, - pub sampling_column_count: usize, pub rng: Mutex, } @@ -785,7 +777,10 @@ where } pub fn get_sampling_column_count(&self) -> usize { - self.sampling_column_count + self.chain + .data_availability_checker + .custody_context() + .sampling_size(None, &self.chain.spec) as usize } pub fn slots_per_epoch(&self) -> u64 { @@ -2360,7 +2355,7 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block, 0); + return RpcBlock::new_without_blobs(Some(block_root), block); } // Blobs are stored as data columns from Fulu (PeerDAS) @@ -2370,14 +2365,8 @@ where .into_iter() .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - custody_columns, - self.get_sampling_column_count(), - &self.spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); RpcBlock::new(Some(block_root), block, blobs).unwrap() @@ -2403,15 +2392,9 @@ where .take(sampling_column_count) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - columns, - sampling_column_count, - &self.spec, - )? + RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)? } else { - RpcBlock::new_without_blobs(Some(block_root), block, 0) + RpcBlock::new_without_blobs(Some(block_root), block) } } else { let blobs = blob_items diff --git a/beacon_node/beacon_chain/src/validator_custody.rs b/beacon_node/beacon_chain/src/validator_custody.rs new file mode 100644 index 0000000000..160333b50e --- /dev/null +++ b/beacon_node/beacon_chain/src/validator_custody.rs @@ -0,0 +1,447 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::atomic::{AtomicU64, Ordering}, +}; + +use parking_lot::RwLock; + +use ssz_derive::{Decode, Encode}; +use types::{ChainSpec, Epoch, EthSpec, Slot}; + +/// A delay before making the CGC change effective to the data availability checker. +const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; + +type ValidatorsAndBalances = Vec<(usize, u64)>; + +/// This currently just registers increases in validator count. +/// Does not handle decreasing validator counts +#[derive(Default, Debug)] +struct ValidatorRegistrations { + /// Set of all validators that is registered to this node along with its effective balance + /// + /// Key is validator index and value is effective_balance. + validators: HashMap, + /// Maintains the validator custody requirement at a given epoch. + /// + /// Note: Only stores the epoch value when there's a change in custody requirement. + /// So if epoch 10 and 11 has the same custody requirement, only 10 is stored. + /// This map is never pruned, because currently we never decrease custody requirement, so this + /// map size is contained at 128. + epoch_validator_custody_requirements: BTreeMap, +} + +impl ValidatorRegistrations { + /// Returns the validator custody requirement at the latest epoch. + fn latest_validator_custody_requirement(&self) -> Option { + self.epoch_validator_custody_requirements + .last_key_value() + .map(|(_, v)| *v) + } + + /// Lookup the active custody requirement at the given epoch. + fn custody_requirement_at_epoch(&self, epoch: Epoch) -> Option { + self.epoch_validator_custody_requirements + .range(..=epoch) + .last() + .map(|(_, custody_count)| *custody_count) + } + + /// Register a new validator index and updates the list of validators if required. + /// Returns `Some((effective_epoch, new_cgc))` if the registration results in a CGC update. + pub(crate) fn register_validators( + &mut self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option<(Epoch, u64)> { + for (validator_index, effective_balance) in validators_and_balance { + self.validators.insert(validator_index, effective_balance); + } + + // Each `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP` effectively contributes one unit of "weight". + let validator_custody_units = + self.validators.values().sum::() / spec.balance_per_additional_custody_group; + let validator_custody_requirement = + get_validators_custody_requirement(validator_custody_units, spec); + + tracing::debug!( + validator_custody_units, + validator_custody_requirement, + "Registered validators" + ); + + // If registering the new validator increased the total validator "units", then + // add a new entry for the current epoch + if Some(validator_custody_requirement) != self.latest_validator_custody_requirement() { + // Apply the change from the next epoch after adding some delay buffer to ensure + // the node has enough time to subscribe to subnets etc, and to avoid having + // inconsistent column counts within an epoch. + let effective_delay_slots = + CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS / spec.seconds_per_slot; + let effective_epoch = (slot + effective_delay_slots).epoch(E::slots_per_epoch()) + 1; + self.epoch_validator_custody_requirements + .entry(effective_epoch) + .and_modify(|old_custody| *old_custody = validator_custody_requirement) + .or_insert(validator_custody_requirement); + Some((effective_epoch, validator_custody_requirement)) + } else { + None + } + } +} + +/// Given the `validator_custody_units`, return the custody requirement based on +/// the spec parameters. +/// +/// Note: a `validator_custody_units` here represents the number of 32 eth effective_balance +/// equivalent to `BALANCE_PER_ADDITIONAL_CUSTODY_GROUP`. +/// +/// For e.g. a validator with eb 32 eth is 1 unit. +/// a validator with eb 65 eth is 65 // 32 = 2 units. +/// +/// See https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/validator.md#validator-custody +fn get_validators_custody_requirement(validator_custody_units: u64, spec: &ChainSpec) -> u64 { + std::cmp::min( + std::cmp::max(validator_custody_units, spec.validator_custody_requirement), + spec.number_of_custody_groups, + ) +} + +/// Contains all the information the node requires to calculate the +/// number of columns to be custodied when checking for DA. +#[derive(Debug)] +pub struct CustodyContext { + /// The Number of custody groups required based on the number of validators + /// that is attached to this node. + /// + /// This is the number that we use to compute the custody group count that + /// we require for data availability check, and we use to advertise to our peers in the metadata + /// and enr values. + validator_custody_count: AtomicU64, + /// Is the node run as a supernode based on current cli parameters. + pub current_is_supernode: bool, + /// The persisted value for `is_supernode` based on the previous run of this node. + /// + /// Note: We require this value because if a user restarts the node with a higher cli custody + /// count value than in the previous run, then we should continue advertising the custody + /// count based on the old value than the new one since we haven't backfilled the required + /// columns. + persisted_is_supernode: bool, + /// Maintains all the validators that this node is connected to currently + validator_registrations: RwLock, +} + +impl CustodyContext { + /// Create a new custody default custody context object when no persisted object + /// exists. + /// + /// The `is_supernode` value is based on current cli parameters. + pub fn new(is_supernode: bool) -> Self { + Self { + validator_custody_count: AtomicU64::new(0), + current_is_supernode: is_supernode, + persisted_is_supernode: is_supernode, + validator_registrations: Default::default(), + } + } + + pub fn new_from_persisted_custody_context( + ssz_context: CustodyContextSsz, + is_supernode: bool, + ) -> Self { + CustodyContext { + validator_custody_count: AtomicU64::new(ssz_context.validator_custody_at_head), + current_is_supernode: is_supernode, + persisted_is_supernode: ssz_context.persisted_is_supernode, + validator_registrations: Default::default(), + } + } + + /// Register a new validator index and updates the list of validators if required. + /// + /// Also modifies the internal structures if the validator custody has changed to + /// update the `custody_column_count`. + /// + /// Returns `Some` along with the updated custody group count if it has changed, otherwise returns `None`. + pub fn register_validators( + &self, + validators_and_balance: ValidatorsAndBalances, + slot: Slot, + spec: &ChainSpec, + ) -> Option { + let Some((effective_epoch, new_validator_custody)) = self + .validator_registrations + .write() + .register_validators::(validators_and_balance, slot, spec) + else { + return None; + }; + + let current_cgc = self.custody_group_count_at_head(spec); + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + if new_validator_custody != validator_custody_count_at_head { + tracing::debug!( + old_count = validator_custody_count_at_head, + new_count = new_validator_custody, + "Validator count at head updated" + ); + self.validator_custody_count + .store(new_validator_custody, Ordering::Relaxed); + + let updated_cgc = self.custody_group_count_at_head(spec); + // Send the message to network only if there are more columns subnets to subscribe to + if updated_cgc > current_cgc { + tracing::debug!( + old_cgc = current_cgc, + updated_cgc, + "Custody group count updated" + ); + return Some(CustodyCountChanged { + new_custody_group_count: updated_cgc, + sampling_count: self.sampling_size(Some(effective_epoch), spec), + }); + } + } + + None + } + + /// This function is used to determine the custody group count at head ONLY. + /// Do NOT use this directly for data availability check, use `self.sampling_size` instead as + /// CGC can change over epochs. + pub fn custody_group_count_at_head(&self, spec: &ChainSpec) -> u64 { + if self.current_is_supernode { + return spec.number_of_custody_groups; + } + let validator_custody_count_at_head = self.validator_custody_count.load(Ordering::Relaxed); + + // If there are no validators, return the minimum custody_requirement + if validator_custody_count_at_head > 0 { + validator_custody_count_at_head + } else { + spec.custody_requirement + } + } + + /// Returns the count of custody columns this node must sample for a block at `epoch` to import. + /// If an `epoch` is not specified, returns the *current* validator custody requirement. + pub fn sampling_size(&self, epoch_opt: Option, spec: &ChainSpec) -> u64 { + let custody_group_count = if self.current_is_supernode { + spec.number_of_custody_groups + } else if let Some(epoch) = epoch_opt { + self.validator_registrations + .read() + .custody_requirement_at_epoch(epoch) + .unwrap_or(spec.custody_requirement) + } else { + self.custody_group_count_at_head(spec) + }; + + spec.sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec") + } +} + +/// The custody count changed because of a change in the +/// number of validators being managed. +pub struct CustodyCountChanged { + pub new_custody_group_count: u64, + pub sampling_count: u64, +} + +/// The custody information that gets persisted across runs. +#[derive(Debug, Encode, Decode, Clone)] +pub struct CustodyContextSsz { + validator_custody_at_head: u64, + persisted_is_supernode: bool, +} + +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { + CustodyContextSsz { + validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), + persisted_is_supernode: context.persisted_is_supernode, + } + } +} + +#[cfg(test)] +mod tests { + use types::MainnetEthSpec; + + use super::*; + + type E = MainnetEthSpec; + + #[test] + fn no_validators_supernode_default() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.number_of_custody_groups + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn no_validators_fullnode_default() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + assert_eq!( + custody_context.custody_group_count_at_head(&spec), + spec.custody_requirement, + "head custody count should be minimum spec custody requirement" + ); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.samples_per_slot + ); + } + + #[test] + fn register_single_validator_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // One single node increases its balance over 3 epochs. + let validators_and_expected_cgc_change = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + // No CGC change at 8 custody units, as it's the minimum requirement + (vec![(0, 8 * bal_per_additional_group)], None), + (vec![(0, 10 * bal_per_additional_group)], Some(10)), + ]; + + register_validators_and_assert_cgc( + &custody_context, + validators_and_expected_cgc_change, + &spec, + ); + } + + #[test] + fn register_multiple_validators_should_update_cgc() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + let min_val_custody_requirement = spec.validator_custody_requirement; + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + ( + vec![(0, bal_per_additional_group)], + Some(min_val_custody_requirement), + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + // No CGC change at 8 custody units, as it's the minimum requirement + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + Some(10), + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + } + + #[test] + fn register_validators_should_not_update_cgc_for_supernode() { + let custody_context = CustodyContext::new(true); + let spec = E::default_spec(); + let bal_per_additional_group = spec.balance_per_additional_custody_group; + + // Add 3 validators over 3 epochs. + let validators_and_expected_cgc = vec![ + (vec![(0, bal_per_additional_group)], None), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + ], + None, + ), + ( + vec![ + (0, bal_per_additional_group), + (1, 7 * bal_per_additional_group), + (2, 2 * bal_per_additional_group), + ], + None, + ), + ]; + + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); + assert_eq!( + custody_context.sampling_size(None, &spec), + spec.number_of_custody_groups + ); + } + + #[test] + fn cgc_change_should_be_effective_to_sampling_after_delay() { + let custody_context = CustodyContext::new(false); + let spec = E::default_spec(); + let current_slot = Slot::new(10); + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let default_sampling_size = custody_context.sampling_size(None, &spec); + let validator_custody_units = 10; + + let _cgc_changed = custody_context.register_validators::( + vec![( + 0, + validator_custody_units * spec.balance_per_additional_custody_group, + )], + current_slot, + &spec, + ); + + // CGC update is not applied for `current_epoch`. + assert_eq!( + custody_context.sampling_size(Some(current_epoch), &spec), + default_sampling_size + ); + // CGC update is applied for the next epoch. + assert_eq!( + custody_context.sampling_size(Some(current_epoch + 1), &spec), + validator_custody_units + ); + } + + /// Update validator every epoch and assert cgc against expected values. + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, + validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, + spec: &ChainSpec, + ) { + for (idx, (validators_and_balance, expected_cgc_change)) in + validators_and_expected_cgc_changed.into_iter().enumerate() + { + let epoch = Epoch::new(idx as u64); + let updated_custody_count_opt = custody_context + .register_validators::( + validators_and_balance, + epoch.start_slot(E::slots_per_epoch()), + spec, + ) + .map(|c| c.new_custody_group_count); + + assert_eq!(updated_custody_count_opt, expected_cgc_change); + } + } +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 9225ffd9f4..3ff5f772aa 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -30,8 +30,6 @@ type E = MainnetEthSpec; const VALIDATOR_COUNT: usize = 24; const CHAIN_SEGMENT_LENGTH: usize = 64 * 5; const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGTH - 1]; -// Default custody group count for tests -const CGC: usize = 8; /// A cached set of keys. static KEYPAIRS: LazyLock> = @@ -144,10 +142,9 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone(), columns.len(), spec) - .unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() } - None => RpcBlock::new_without_blobs(None, block, 0), + None => RpcBlock::new_without_blobs(None, block), } } @@ -370,7 +367,6 @@ async fn chain_segment_non_linear_parent_roots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -408,7 +404,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -436,7 +431,6 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RpcBlock::new_without_blobs( None, Arc::new(SignedBeaconBlock::from_block(block, signature)), - harness.sampling_column_count, ); assert!( @@ -578,11 +572,7 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs( - None, - Arc::new(signed_block), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); let process_res = harness .chain .process_block( @@ -1002,7 +992,6 @@ async fn block_gossip_verification() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; - let cgc = harness.chain.spec.custody_requirement as usize; harness .chain @@ -1016,7 +1005,7 @@ async fn block_gossip_verification() { { let gossip_verified = harness .chain - .verify_block_for_gossip(snapshot.beacon_block.clone(), get_cgc(&blobs_opt)) + .verify_block_for_gossip(snapshot.beacon_block.clone()) .await .expect("should obtain gossip verified block"); @@ -1058,7 +1047,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_block_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::FutureSlot { present_slot, block_slot, @@ -1092,7 +1081,7 @@ async fn block_gossip_verification() { *block.slot_mut() = expected_finalized_slot; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::WouldRevertFinalizedSlot { block_slot, finalized_slot, @@ -1122,10 +1111,10 @@ async fn block_gossip_verification() { unwrap_err( harness .chain - .verify_block_for_gossip( - Arc::new(SignedBeaconBlock::from_block(block, junk_signature())), - cgc - ) + .verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block( + block, + junk_signature() + )),) .await ), BlockError::InvalidSignature(InvalidSignature::ProposerSignature) @@ -1150,7 +1139,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::ParentUnknown {parent_root: p} if p == parent_root ), @@ -1176,7 +1165,7 @@ async fn block_gossip_verification() { *block.parent_root_mut() = parent_root; assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature)), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(SignedBeaconBlock::from_block(block, signature))).await), BlockError::NotFinalizedDescendant { block_parent_root } if block_parent_root == parent_root ), @@ -1213,7 +1202,7 @@ async fn block_gossip_verification() { ); assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::IncorrectBlockProposer { block, local_shuffling, @@ -1225,7 +1214,7 @@ async fn block_gossip_verification() { // Check to ensure that we registered this is a valid block from this proposer. assert!( matches!( - unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone()), cgc).await), + unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), BlockError::DuplicateImportStatusUnknown(_), ), "should register any valid signature against the proposer, even if the block failed later verification" @@ -1233,11 +1222,7 @@ async fn block_gossip_verification() { let block = chain_segment[block_index].beacon_block.clone(); assert!( - harness - .chain - .verify_block_for_gossip(block, cgc) - .await - .is_ok(), + harness.chain.verify_block_for_gossip(block).await.is_ok(), "the valid block should be processed" ); @@ -1255,7 +1240,7 @@ async fn block_gossip_verification() { matches!( harness .chain - .verify_block_for_gossip(block.clone(), cgc) + .verify_block_for_gossip(block.clone()) .await .expect_err("should error when processing known block"), BlockError::DuplicateImportStatusUnknown(_) @@ -1331,17 +1316,8 @@ async fn verify_block_for_gossip_slashing_detection() { let state = harness.get_current_state(); let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await; let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await; - let cgc = if block1.fork_name_unchecked().fulu_enabled() { - harness.get_sampling_column_count() - } else { - 0 - }; - let verified_block = harness - .chain - .verify_block_for_gossip(block1, cgc) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap(); if let Some((kzg_proofs, blobs)) = blobs1 { harness @@ -1364,7 +1340,7 @@ async fn verify_block_for_gossip_slashing_detection() { ) .await .unwrap(); - unwrap_err(harness.chain.verify_block_for_gossip(block2, CGC).await); + unwrap_err(harness.chain.verify_block_for_gossip(block2).await); // Slasher should have been handed the two conflicting blocks and crafted a slashing. slasher.process_queued(Epoch::new(0)).unwrap(); @@ -1388,11 +1364,7 @@ async fn verify_block_for_gossip_doppelganger_detection() { .attestations() .map(|att| att.clone_as_attestation()) .collect::>(); - let verified_block = harness - .chain - .verify_block_for_gossip(block, CGC) - .await - .unwrap(); + let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap(); harness .chain .process_block( @@ -1539,7 +1511,7 @@ async fn add_base_block_to_altair_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(base_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(base_block.clone())) .await .expect_err("should error when processing base block"), BlockError::InconsistentFork(InconsistentFork { @@ -1549,7 +1521,7 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone()), 0); + let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); assert!(matches!( harness .chain @@ -1573,7 +1545,7 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], NotifyExecutionLayer::Yes, ) .await, @@ -1676,7 +1648,7 @@ async fn add_altair_block_to_base_chain() { assert!(matches!( harness .chain - .verify_block_for_gossip(Arc::new(altair_block.clone()), CGC) + .verify_block_for_gossip(Arc::new(altair_block.clone())) .await .expect_err("should error when processing altair block"), BlockError::InconsistentFork(InconsistentFork { @@ -1686,7 +1658,7 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone()), 0); + let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); assert!(matches!( harness .chain @@ -1710,7 +1682,7 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block), 0)], + vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], NotifyExecutionLayer::Yes ) .await, @@ -1771,11 +1743,7 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - harness.sampling_column_count, - ); + let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -1846,14 +1814,3 @@ async fn import_execution_pending_block( } } } - -fn get_cgc(blobs_opt: &Option>) -> usize { - if let Some(data_sidecars) = blobs_opt.as_ref() { - match data_sidecars { - DataSidecars::Blobs(_) => 0, - DataSidecars::DataColumns(d) => d.len(), - } - } else { - 0 - } -} diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 6b9ff9d6ed..05fae7aa70 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -22,7 +22,6 @@ use task_executor::ShutdownReason; use types::*; const VALIDATOR_COUNT: usize = 32; -const CGC: usize = 8; type E = MainnetEthSpec; @@ -686,8 +685,7 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -789,8 +787,7 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); let fork_block_root = rig .harness .chain @@ -1054,14 +1051,13 @@ async fn invalid_parent() { // Ensure the block built atop an invalid payload is invalid for gossip. assert!(matches!( - rig.harness.chain.clone().verify_block_for_gossip(block.clone(), CGC).await, + rig.harness.chain.clone().verify_block_for_gossip(block.clone()).await, Err(BlockError::ParentExecutionPayloadInvalid { parent_root: invalid_root }) if invalid_root == parent_root )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = - RpcBlock::new_without_blobs(None, block.clone(), rig.harness.sampling_column_count); + let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1385,8 +1381,7 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = - RpcBlock::new_without_blobs(None, fork_block.clone(), rig.harness.sampling_column_count); + let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 51c7f0c289..d0f161ed56 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2644,11 +2644,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - invalid_fork_block.clone(), - harness.sampling_column_count, - ); + let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); // Applying the invalid block should fail. let err = harness .chain @@ -2664,11 +2660,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs( - None, - valid_fork_block.clone(), - harness.sampling_column_count, - ); + let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); harness .chain .process_block( diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b220685b86..a4ec41ac06 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -48,8 +48,8 @@ use directory::DEFAULT_ROOT_DIR; use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, - ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, - ValidatorId, ValidatorStatus, ValidatorsRequestBody, + ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, StateId as CoreStateId, + ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus, ValidatorsRequestBody, }; use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use health_metrics::observe::Observe; @@ -3765,15 +3765,17 @@ pub fn serve( .and(warp::path::end()) .and(warp_utils::json::json()) .and(validator_subscription_tx_filter.clone()) + .and(network_tx_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .then( - |subscriptions: Vec, + |committee_subscriptions: Vec, validator_subscription_tx: Sender, + network_tx: UnboundedSender>, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { - let subscriptions: std::collections::BTreeSet<_> = subscriptions + let subscriptions: std::collections::BTreeSet<_> = committee_subscriptions .iter() .map(|subscription| { chain @@ -3788,6 +3790,7 @@ pub fn serve( } }) .collect(); + let message = ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions }; if let Err(e) = validator_subscription_tx.try_send(message) { @@ -3802,6 +3805,42 @@ pub fn serve( )); } + if chain.spec.is_peer_das_scheduled() { + let (finalized_beacon_state, _, _) = + StateId(CoreStateId::Finalized).state(&chain)?; + let validators_and_balances = committee_subscriptions + .iter() + .filter_map(|subscription| { + if let Ok(effective_balance) = finalized_beacon_state + .get_effective_balance(subscription.validator_index as usize) + { + Some((subscription.validator_index as usize, effective_balance)) + } else { + None + } + }) + .collect::>(); + + let current_slot = + chain.slot().map_err(warp_utils::reject::unhandled_error)?; + if let Some(cgc_change) = chain + .data_availability_checker + .custody_context() + .register_validators::( + validators_and_balances, + current_slot, + &chain.spec, + ) { + network_tx.send(NetworkMessage::CustodyCountChanged { + new_custody_group_count: cgc_change.new_custody_group_count, + sampling_count: cgc_change.sampling_count, + }).unwrap_or_else(|e| { + debug!(error = %e, "Could not send message to the network service. \ + Likely shutdown") + }); + } + } + Ok(()) }) }, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 463f585f2c..75979bbb1d 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -138,8 +138,7 @@ pub async fn publish_block>( spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; // Gossip verify the block and blobs/data columns separately. - let gossip_verified_block_result = unverified_block - .into_gossip_verified_block(&chain, network_globals.custody_columns_count() as usize); + let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), @@ -224,7 +223,7 @@ pub async fn publish_block>( publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) })?; - let sampling_columns_indices = &network_globals.sampling_columns; + let sampling_columns_indices = &network_globals.sampling_columns(); let sampling_columns = gossip_verified_columns .into_iter() .flatten() @@ -303,11 +302,7 @@ pub async fn publish_block>( ); let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - block.clone(), - network_globals.custody_columns_count() as usize, - ), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index cd590580be..843242c22f 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -39,9 +39,6 @@ type E = MainnetEthSpec; * */ -// Default custody group count for tests -const CGC: usize = 8; - /// This test checks that a block that is **invalid** from a gossip perspective gets rejected when using `broadcast_validation=gossip`. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] pub async fn gossip_invalid() { @@ -367,9 +364,9 @@ pub async fn consensus_partial_pass_only_consensus() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); /* submit `block_b` which should induce equivocation */ @@ -657,10 +654,10 @@ pub async fn equivocation_consensus_late_equivocation() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain, CGC); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1294,9 +1291,9 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { ProvenancedBlock::Builder(b, _, _) => b, }; - let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain, CGC); + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain, CGC); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1398,7 +1395,7 @@ pub async fn block_seen_on_gossip_without_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // It should not yet be added to fork choice because blobs have not been seen. @@ -1467,7 +1464,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { // Simulate the block being seen on gossip. block .clone() - .into_gossip_verified_block(&tester.harness.chain, CGC) + .into_gossip_verified_block(&tester.harness.chain) .unwrap(); // Simulate some of the blobs being seen on gossip. @@ -1786,6 +1783,5 @@ fn get_custody_columns(tester: &InteractiveTester) -> HashSet { .network_globals .as_ref() .unwrap() - .sampling_columns - .clone() + .sampling_columns() } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index ad54c6b8b1..ad4241c5b7 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -49,6 +49,7 @@ use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EnrForkId, EthSpec}; mod subnet_predicate; +use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY; pub use subnet_predicate::subnet_predicate; use types::non_zero_usize::new_non_zero_usize; @@ -476,6 +477,15 @@ impl Discovery { Ok(()) } + pub fn update_enr_cgc(&mut self, custody_group_count: u64) -> Result<(), String> { + self.discv5 + .enr_insert(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count) + .map_err(|e| format!("{:?}", e))?; + enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + *self.network_globals.local_enr.write() = self.discv5.local_enr(); + Ok(()) + } + /// Adds/Removes a subnet from the ENR attnets/syncnets Bitfield pub fn update_enr_bitfield(&mut self, subnet: Subnet, value: bool) -> Result<(), String> { let local_enr = self.discv5.local_enr(); diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 23060df9e6..5f65a6c6d0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -177,6 +177,7 @@ impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, + custody_group_count: u64, ) -> Result<(Self, Arc>), String> { let config = ctx.config.clone(); trace!("Libp2p Service starting"); @@ -201,11 +202,12 @@ impl Network { )?; // Construct the metadata - let custody_group_count = ctx.chain_spec.is_peer_das_scheduled().then(|| { - ctx.chain_spec - .custody_group_count(config.subscribe_all_data_column_subnets) - }); - let meta_data = utils::load_or_build_metadata(&config.network_dir, custody_group_count); + let custody_group_count_metadata = ctx + .chain_spec + .is_peer_das_scheduled() + .then_some(custody_group_count); + let meta_data = + utils::load_or_build_metadata(&config.network_dir, custody_group_count_metadata); let seq_number = *meta_data.seq_number(); let globals = NetworkGlobals::new( enr, @@ -885,6 +887,23 @@ impl Network { } } + /// Subscribe to all data columns determined by the cgc. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn subscribe_new_data_column_subnets(&mut self, custody_column_count: u64) { + self.network_globals + .update_data_column_subnets(custody_column_count); + + for column in self.network_globals.sampling_subnets() { + let kind = GossipKind::DataColumnSidecar(column); + self.subscribe_kind(kind); + } + } + /// Returns the scoring parameters for a topic if set. #[instrument(parent = None, level = "trace", @@ -1254,6 +1273,21 @@ impl Network { self.update_metadata_bitfields(); } + /// Updates the cgc value in the ENR. + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn update_enr_cgc(&mut self, new_custody_group_count: u64) { + if let Err(e) = self.discovery_mut().update_enr_cgc(new_custody_group_count) { + crit!(error = e, "Could not update cgc in ENR"); + } + // update the local meta data which informs our peers of the update during PINGS + self.update_metadata_cgc(new_custody_group_count); + } + /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. #[instrument(parent = None, @@ -1368,6 +1402,28 @@ impl Network { utils::save_metadata_to_disk(&self.network_dir, meta_data); } + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + fn update_metadata_cgc(&mut self, custody_group_count: u64) { + let mut meta_data_w = self.network_globals.local_metadata.write(); + + *meta_data_w.seq_number_mut() += 1; + if let Ok(cgc) = meta_data_w.custody_group_count_mut() { + *cgc = custody_group_count; + } + let seq_number = *meta_data_w.seq_number(); + let meta_data = meta_data_w.clone(); + + drop(meta_data_w); + self.eth2_rpc_mut().update_seq_number(seq_number); + // Save the updated metadata to disk + utils::save_metadata_to_disk(&self.network_dir, meta_data); + } + /// Sends a Ping request to the peer. #[instrument(parent = None, level = "trace", diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index fd99d93589..d1ed1c33b0 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -31,10 +31,8 @@ pub struct NetworkGlobals { /// The current state of the backfill sync. pub backfill_state: RwLock, /// The computed sampling subnets and columns is stored to avoid re-computing. - pub sampling_subnets: HashSet, - pub sampling_columns: HashSet, - /// Constant custody group count (CGC) set at startup - custody_group_count: u64, + pub sampling_subnets: RwLock>, + pub sampling_columns: RwLock>, /// Network-related configuration. Immutable after initialization. pub config: Arc, /// Ethereum chain configuration. Immutable after initialization. @@ -87,6 +85,13 @@ impl NetworkGlobals { sampling_columns.extend(columns); } + tracing::debug!( + cgc = custody_group_count, + ?sampling_columns, + ?sampling_subnets, + "Starting node with custody params" + ); + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), @@ -96,14 +101,40 @@ impl NetworkGlobals { gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::Paused), - sampling_subnets, - sampling_columns, - custody_group_count, + sampling_subnets: RwLock::new(sampling_subnets), + sampling_columns: RwLock::new(sampling_columns), config, spec, } } + /// Update the sampling subnets based on an updated cgc. + pub fn update_data_column_subnets(&self, custody_group_count: u64) { + // The below `expect` calls will panic on start up if the chain spec config values used + // are invalid + let sampling_size = self + .spec + .sampling_size(custody_group_count) + .expect("should compute node sampling size from valid chain spec"); + let custody_groups = + get_custody_groups(self.local_enr().node_id().raw(), sampling_size, &self.spec) + .expect("should compute node custody groups"); + + let mut sampling_subnets = self.sampling_subnets.write(); + for custody_index in &custody_groups { + let subnets = compute_subnets_from_custody_group(*custody_index, &self.spec) + .expect("should compute custody subnets for node"); + sampling_subnets.extend(subnets); + } + + let mut sampling_columns = self.sampling_columns.write(); + for custody_index in &custody_groups { + let columns = compute_columns_for_custody_group(*custody_index, &self.spec) + .expect("should compute custody columns for node"); + sampling_columns.extend(columns); + } + } + /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. pub fn local_enr(&self) -> Enr { @@ -120,19 +151,6 @@ impl NetworkGlobals { self.listen_multiaddrs.read().clone() } - /// Returns true if this node is configured as a PeerDAS supernode - pub fn is_supernode(&self) -> bool { - self.custody_group_count == self.spec.number_of_custody_groups - } - - /// Returns the count of custody columns this node must sample for block import - pub fn custody_columns_count(&self) -> u64 { - // This only panics if the chain spec contains invalid values - self.spec - .sampling_size(self.custody_group_count) - .expect("should compute node sampling size from valid chain spec") - } - /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { self.peers.read().connected_peer_ids().count() @@ -226,10 +244,18 @@ impl NetworkGlobals { enable_light_client_server: self.config.enable_light_client_server, subscribe_all_subnets: self.config.subscribe_all_subnets, subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets, - sampling_subnets: &self.sampling_subnets, + sampling_subnets: self.sampling_subnets.read().clone(), } } + pub fn sampling_columns(&self) -> HashSet { + self.sampling_columns.read().clone() + } + + pub fn sampling_subnets(&self) -> HashSet { + self.sampling_subnets.read().clone() + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals( trusted_peers: Vec, @@ -283,7 +309,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_subnets.len(), + globals.sampling_subnets.read().len(), subnet_sampling_size as usize ); } @@ -306,7 +332,7 @@ mod test { Arc::new(spec), ); assert_eq!( - globals.sampling_columns.len(), + globals.sampling_columns.read().len(), subnet_sampling_size as usize ); } diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index 56b97303d3..349bfe66a3 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -26,11 +26,11 @@ pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; #[derive(Debug)] -pub struct TopicConfig<'a> { +pub struct TopicConfig { pub enable_light_client_server: bool, pub subscribe_all_subnets: bool, pub subscribe_all_data_column_subnets: bool, - pub sampling_subnets: &'a HashSet, + pub sampling_subnets: HashSet, } /// Returns all the topics the node should subscribe at `fork_name` @@ -85,7 +85,7 @@ pub fn core_topics_to_subscribe( topics.push(GossipKind::DataColumnSidecar(i.into())); } } else { - for subnet in opts.sampling_subnets { + for subnet in &opts.sampling_subnets { topics.push(GossipKind::DataColumnSidecar(*subnet)); } } @@ -126,7 +126,7 @@ pub fn all_topics_at_fork(fork: ForkName, spec: &ChainSpec) -> Vec(fork, &opts, spec) } @@ -521,7 +521,7 @@ mod tests { enable_light_client_server: false, subscribe_all_subnets: false, subscribe_all_data_column_subnets: false, - sampling_subnets, + sampling_subnets: sampling_subnets.clone(), } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index d979ef9265..0dac126909 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -118,6 +118,7 @@ pub async fn build_libp2p_instance( let (signal, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = task_executor::TaskExecutor::new(rt, exit, shutdown_tx, service_name); + let custody_group_count = chain_spec.custody_requirement; let libp2p_context = lighthouse_network::Context { config, enr_fork_id: EnrForkId::default(), @@ -126,7 +127,7 @@ pub async fn build_libp2p_instance( libp2p_registry: None, }; Libp2pInstance( - LibP2PService::new(executor, libp2p_context) + LibP2PService::new(executor, libp2p_context, custody_group_count) .await .expect("should build libp2p instance") .0, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b129b54841..05c7dc287b 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -780,7 +780,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) let all_column_subnets = (0..network_globals.spec.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new); - let custody_column_subnets = network_globals.sampling_subnets.iter(); + let custody_column_subnets = network_globals.sampling_subnets(); // Iterate all subnet values to set to zero the empty entries in peers_per_column_subnet for subnet in all_column_subnets { @@ -794,7 +794,7 @@ pub fn update_sync_metrics(network_globals: &Arc>) // Registering this metric is a duplicate for supernodes but helpful for fullnodes. This way // operators can monitor the health of only the subnets of their interest without complex // Grafana queries. - for subnet in custody_column_subnets { + for subnet in custody_column_subnets.iter() { set_gauge_entry( &PEERS_PER_CUSTODY_COLUMN_SUBNET, &[&format!("{subnet}")], diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 8757ab4383..87f657f935 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1272,10 +1272,7 @@ impl NetworkBeaconProcessor { let verification_result = self .chain .clone() - .verify_block_for_gossip( - block.clone(), - self.network_globals.custody_columns_count() as usize, - ) + .verify_block_for_gossip(block.clone()) .await; if verification_result.is_ok() { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f9390a2c7b..df9b656051 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -843,7 +843,7 @@ impl NetworkBeaconProcessor { block_root: Hash256, publish_blobs: bool, ) { - let custody_columns = self.network_globals.sampling_columns.clone(); + let custody_columns = self.network_globals.sampling_columns(); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { if publish_blobs { @@ -930,7 +930,12 @@ impl NetworkBeaconProcessor { publish_columns: bool, ) -> Option { // Only supernodes attempt reconstruction - if !self.network_globals.is_supernode() { + if !self + .chain + .data_availability_checker + .custody_context() + .current_is_supernode + { return None; } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index cb9c976404..f6a1069a7f 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -285,7 +285,7 @@ impl TestRig { ) .unwrap() .into_iter() - .filter(|c| network_globals.sampling_columns.contains(&c.index)) + .filter(|c| network_globals.sampling_columns().contains(&c.index)) .collect::>(); (None, Some(custody_columns)) @@ -371,22 +371,12 @@ impl TestRig { } } - pub fn custody_columns_count(&self) -> usize { - self.network_beacon_processor - .network_globals - .custody_columns_count() as usize - } - pub fn enqueue_rpc_block(&self) { let block_root = self.next_block.canonical_root(); self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -398,11 +388,7 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs( - Some(block_root), - self.next_block.clone(), - self.custody_columns_count(), - ), + RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 77204b455d..c9f89ad668 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -10,6 +10,7 @@ use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconPro use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; + use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::service::Network; @@ -105,6 +106,12 @@ pub enum NetworkMessage { ConnectTrustedPeer(Enr), /// Disconnect from a trusted peer and remove it from the `trusted_peers` mapping. DisconnectTrustedPeer(Enr), + /// Custody group count changed due to a change in validators' weight. + /// Subscribe to new subnets and update ENR metadata. + CustodyCountChanged { + new_custody_group_count: u64, + sampling_count: u64, + }, } /// Messages triggered by validators that may trigger a subscription to a subnet. @@ -270,7 +277,15 @@ impl NetworkService { }; // launch libp2p service - let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context).await?; + let (mut libp2p, network_globals) = Network::new( + executor.clone(), + service_context, + beacon_chain + .data_availability_checker + .custody_context() + .custody_group_count_at_head(&beacon_chain.spec), + ) + .await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -745,6 +760,15 @@ impl NetworkService { ); } } + NetworkMessage::CustodyCountChanged { + new_custody_group_count, + sampling_count, + } => { + // subscribe to `sampling_count` subnets + self.libp2p + .subscribe_new_data_column_subnets(sampling_count); + self.libp2p.update_enr_cgc(new_custody_group_count); + } } } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 99428b0c80..0418ab4553 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -257,17 +257,11 @@ impl RangeBlockComponentsRequest { )); } - RpcBlock::new_with_custody_columns( - Some(block_root), - block, - custody_columns, - expects_custody_columns.len(), - spec, - ) - .map_err(|e| format!("{e:?}"))? + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) + .map_err(|e| format!("{e:?}"))? } else { // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block, 0) + RpcBlock::new_without_blobs(Some(block_root), block) }); } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 58641f8606..f6be39fa4a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -476,7 +476,7 @@ impl SyncNetworkContext { // Attempt to find all required custody peers before sending any request or creating an ID let columns_by_range_peers_to_request = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let column_indexes = self.network_globals().sampling_columns.clone(); + let column_indexes = self.network_globals().sampling_columns(); Some(self.select_columns_by_range_peers_to_request( &column_indexes, peers, @@ -534,7 +534,7 @@ impl SyncNetworkContext { ( data_column_requests, self.network_globals() - .sampling_columns + .sampling_columns() .clone() .iter() .copied() @@ -928,8 +928,7 @@ impl SyncNetworkContext { // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self .network_globals() - .sampling_columns - .clone() + .sampling_columns() .into_iter() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); @@ -1487,11 +1486,7 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs( - Some(block_root), - block, - self.network_globals().custody_columns_count() as usize, - ); + let block = RpcBlock::new_without_blobs(Some(block_root), block); debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index be01734417..6100d322b8 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1032,7 +1032,7 @@ impl SyncingChain { // Require peers on all sampling column subnets before sending batches let peers_on_all_custody_subnets = network .network_globals() - .sampling_subnets + .sampling_subnets() .iter() .all(|subnet_id| { let peer_count = network diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index e7e6ff5970..a2c359c87e 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1205,12 +1205,8 @@ impl TestRig { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }; - let executed_block = AvailabilityPendingExecutedBlock::new( - block, - import_data, - payload_verification_outcome, - self.network_globals.custody_columns_count() as usize, - ); + let executed_block = + AvailabilityPendingExecutedBlock::new(block, import_data, payload_verification_outcome); match self .harness .chain diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 932f485dd0..c114eca555 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -449,18 +449,10 @@ fn build_rpc_block( RpcBlock::new(None, block, Some(blobs.clone())).unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns( - None, - block, - columns.clone(), - // TODO(das): Assumes CGC = max value. Change if we want to do more complex tests - columns.len(), - spec, - ) - .unwrap() + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block, 0), + None => RpcBlock::new_without_blobs(None, block), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 5b30971fd8..eda57b7d8b 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -339,6 +339,8 @@ pub enum DBColumn { BeaconRandaoMixes, #[strum(serialize = "dht")] DhtEnrs, + #[strum(serialize = "cus")] + CustodyContext, /// DEPRECATED. For Optimistically Imported Merge Transition Blocks #[strum(serialize = "otb")] OptimisticTransitionBlock, @@ -397,6 +399,7 @@ impl DBColumn { | Self::PubkeyCache | Self::BeaconRestorePoint | Self::DhtEnrs + | Self::CustodyContext | Self::OptimisticTransitionBlock => 32, Self::BeaconBlockRoots | Self::BeaconBlockRootsChunked diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 59472e2edc..b4fd5afe87 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -203,6 +203,8 @@ pub struct ChainSpec { pub data_column_sidecar_subnet_count: u64, pub samples_per_slot: u64, pub custody_requirement: u64, + pub validator_custody_requirement: u64, + pub balance_per_additional_custody_group: u64, /* * Networking @@ -731,14 +733,6 @@ impl ChainSpec { Ok(std::cmp::max(custody_column_count, self.samples_per_slot)) } - pub fn custody_group_count(&self, is_supernode: bool) -> u64 { - if is_supernode { - self.number_of_custody_groups - } else { - self.custody_requirement - } - } - pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator { (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) } @@ -975,6 +969,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1309,6 +1305,8 @@ impl ChainSpec { data_column_sidecar_subnet_count: 128, number_of_columns: 128, samples_per_slot: 8, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, /* * Network specific @@ -1650,6 +1648,12 @@ pub struct Config { #[serde(default = "BlobSchedule::default")] #[serde(skip_serializing_if = "BlobSchedule::is_empty")] blob_schedule: BlobSchedule, + #[serde(default = "default_validator_custody_requirement")] + #[serde(with = "serde_utils::quoted_u64")] + validator_custody_requirement: u64, + #[serde(default = "default_balance_per_additional_custody_group")] + #[serde(with = "serde_utils::quoted_u64")] + balance_per_additional_custody_group: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1815,6 +1819,14 @@ const fn default_samples_per_slot() -> u64 { 8 } +const fn default_validator_custody_requirement() -> u64 { + 8 +} + +const fn default_balance_per_additional_custody_group() -> u64 { + 32000000000 +} + fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; RuntimeVariableList::::from_vec( @@ -2024,6 +2036,8 @@ impl Config { samples_per_slot: spec.samples_per_slot, custody_requirement: spec.custody_requirement, blob_schedule: spec.blob_schedule.clone(), + validator_custody_requirement: spec.validator_custody_requirement, + balance_per_additional_custody_group: spec.balance_per_additional_custody_group, } } @@ -2103,6 +2117,8 @@ impl Config { samples_per_slot, custody_requirement, ref blob_schedule, + validator_custody_requirement, + balance_per_additional_custody_group, } = self; if preset_base != E::spec_name().to_string().as_str() { @@ -2187,6 +2203,8 @@ impl Config { samples_per_slot, custody_requirement, blob_schedule: blob_schedule.clone(), + validator_custody_requirement, + balance_per_additional_custody_group, ..chain_spec.clone() }) diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml index b16be34b89..c896b11330 100644 --- a/scripts/local_testnet/network_params_das.yaml +++ b/scripts/local_testnet/network_params_das.yaml @@ -2,7 +2,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: - --subscribe-all-data-column-subnets - --subscribe-all-subnets @@ -13,7 +13,7 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 cl_extra_params: # Note: useful for testing range sync (only produce block if the node is in sync to prevent forking) - --sync-tolerance-epochs=0 diff --git a/scripts/tests/checkpoint-sync-config-devnet.yaml b/scripts/tests/checkpoint-sync-config-devnet.yaml index c536d26b3b..de3020a884 100644 --- a/scripts/tests/checkpoint-sync-config-devnet.yaml +++ b/scripts/tests/checkpoint-sync-config-devnet.yaml @@ -3,18 +3,18 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: true - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-0 + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: false checkpoint_sync_enabled: true -checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-0.ethpandaops.io" +checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-1.ethpandaops.io" global_log_level: debug network_params: - network: fusaka-devnet-0 + network: fusaka-devnet-1 diff --git a/scripts/tests/genesis-sync-config-fulu.yaml b/scripts/tests/genesis-sync-config-fulu.yaml index ccdc09c0d3..91aa4d1ffd 100644 --- a/scripts/tests/genesis-sync-config-fulu.yaml +++ b/scripts/tests/genesis-sync-config-fulu.yaml @@ -2,19 +2,26 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 count: 2 # nodes without validators, used for testing sync. - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: true validator_count: 0 - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethpandaops/geth:fusaka-devnet-1 supernode: false validator_count: 0 network_params: seconds_per_slot: 6 - fulu_fork_epoch: 0 + electra_fork_epoch: 0 + fulu_fork_epoch: 1 preset: "minimal" additional_services: - tx_fuzz diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index b507383190..af3b0bce2d 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -520,7 +520,7 @@ impl Tester { let result: Result, _> = self .block_on_dangerous(self.harness.chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone(), 0), + RpcBlock::new_without_blobs(Some(block_root), block.clone()), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()),