diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9692441aba..91df0ccd23 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -141,6 +141,7 @@ pub const BEACON_CHAIN_DB_KEY: Hash256 = Hash256::ZERO; pub const OP_POOL_DB_KEY: Hash256 = Hash256::ZERO; pub const ETH1_CACHE_DB_KEY: Hash256 = Hash256::ZERO; pub const FORK_CHOICE_DB_KEY: Hash256 = Hash256::ZERO; +pub const LOCAL_INDICES_DB_KEY: Hash256 = Hash256::ZERO; /// Defines how old a block can be before it's no longer a candidate for the early attester cache. const EARLY_ATTESTER_CACHE_HISTORIC_SLOTS: u64 = 4; @@ -712,6 +713,17 @@ impl BeaconChain { Ok(()) } + pub fn persist_local_indices(&self) -> Result<(), Error> { + let persisted_local_validators = self + .validator_monitor + .read() + .to_persisted_local_validators()?; + self.store + .put_item(&LOCAL_INDICES_DB_KEY, &persisted_local_validators)?; + + Ok(()) + } + /// Returns the slot _right now_ according to `self.slot_clock`. Returns `Err` if the slot is /// unavailable. /// @@ -7151,6 +7163,14 @@ impl BeaconChain { ) } + /// Register a local validator that has connected to the Beacon REST API + pub fn register_local_validator(&self, validator_index: u64) { + // TODO(das): persist `last_seen_local_validators` to disk on shutdown and once in a while + self.validator_monitor + .write() + .auto_register_local_validator(validator_index); + } + pub fn metrics(&self) -> BeaconChainMetrics { BeaconChainMetrics { reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(), @@ -7210,7 +7230,8 @@ impl Drop for BeaconChain { let drop = || -> Result<(), Error> { self.persist_head_and_fork_choice()?; self.persist_op_pool()?; - self.persist_eth1_cache() + self.persist_eth1_cache()?; + self.persist_local_indices() }; if let Err(e) = drop() { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index de66a8179d..19cf568808 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,5 +1,6 @@ use crate::beacon_chain::{ - CanonicalHead, LightClientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY, + CanonicalHead, LightClientProducerEvent, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, + LOCAL_INDICES_DB_KEY, OP_POOL_DB_KEY, }; use crate::beacon_proposer_cache::BeaconProposerCache; use crate::data_availability_checker::DataAvailabilityChecker; @@ -15,7 +16,7 @@ use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; -use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; +use crate::validator_monitor::{PersistedLocalIndices, ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; use crate::{ @@ -732,8 +733,16 @@ where let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); let beacon_proposer_cache: Arc> = <_>::default(); - let mut validator_monitor = - ValidatorMonitor::new(validator_monitor_config, beacon_proposer_cache.clone()); + let persisted_local_indices = store + .get_item::(&LOCAL_INDICES_DB_KEY) + .map_err(|e| format!("DB error whilst reading persisted local indices: {e:?}"))? + .unwrap_or_default(); + + let mut validator_monitor = ValidatorMonitor::new( + validator_monitor_config, + beacon_proposer_cache.clone(), + persisted_local_indices, + ); let current_slot = if slot_clock .is_prior_to_genesis() diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index 16f4e3f143..137e2d5e71 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -10,6 +10,8 @@ use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use smallvec::SmallVec; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; use state_processing::common::get_attestation_participation_flag_indices; use state_processing::per_epoch_processing::{ errors::EpochProcessingError, EpochProcessingSummary, @@ -20,16 +22,17 @@ use std::marker::PhantomData; use std::str::Utf8Error; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use store::AbstractExecPayload; +use store::{AbstractExecPayload, DBColumn, Error as StoreError, StoreItem}; use tracing::{debug, error, info, instrument, warn}; use types::consts::altair::{ TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, }; +use types::typenum::U524288; use types::{ Attestation, AttestationData, AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, - SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, + SignedContributionAndProof, Slot, SyncCommitteeMessage, VariableList, VoluntaryExit, }; /// Used for Prometheus labels. /// @@ -388,6 +391,9 @@ pub struct ValidatorMonitor { validators: HashMap, /// A map of validator index (state.validators) to a validator public key. indices: HashMap, + /// Additional map to indices that bypasses `auto_register` config and tracks the time a + /// validator was last seen as a UNIX timestamp + last_seen_local_validators: HashMap, /// If true, allow the automatic registration of validators. auto_register: bool, /// Once the number of monitored validators goes above this threshold, we @@ -413,6 +419,7 @@ impl ValidatorMonitor { pub fn new( config: ValidatorMonitorConfig, beacon_proposer_cache: Arc>, + persisted_local_indices: PersistedLocalIndices, ) -> Self { let ValidatorMonitorConfig { auto_register, @@ -423,6 +430,12 @@ impl ValidatorMonitor { let mut s = Self { validators: <_>::default(), indices: <_>::default(), + last_seen_local_validators: HashMap::from_iter( + persisted_local_indices + .indices + .iter() + .map(|e| (e.index, Duration::from_secs(e.last_seen_timestamp_sec))), + ), auto_register, individual_tracking_threshold, missed_blocks: <_>::default(), @@ -449,6 +462,34 @@ impl ValidatorMonitor { self.validators.len() <= self.individual_tracking_threshold } + /// Prunes all validators not seen for longer than `ttl` (time to live) + pub fn prune_registered_local_validators(&mut self, ttl: Duration) { + let now = timestamp_now(); + // Prune expired keys + self.last_seen_local_validators + .retain(|_, last_seen| now - *last_seen < ttl); + } + + /// Returns an iterator over registered local validators indices + pub fn get_registered_local_validators(&self) -> impl Iterator { + self.last_seen_local_validators.keys() + } + + /// Returns local indices to persist to the DB + pub fn to_persisted_local_validators(&self) -> Result { + Ok(PersistedLocalIndices { + indices: VariableList::new( + self.last_seen_local_validators + .iter() + .map(|(index, last_seen)| PersistedLocalIndex { + index: *index, + last_seen_timestamp_sec: last_seen.as_secs(), + }) + .collect::>(), + )?, + }) + } + /// Add some validators to `self` for additional monitoring. #[instrument(parent = None, level = "info", @@ -1198,6 +1239,9 @@ impl ValidatorMonitor { skip_all )] pub fn auto_register_local_validator(&mut self, validator_index: u64) { + self.last_seen_local_validators + .insert(validator_index, timestamp_now()); + if !self.auto_register { return; } @@ -2404,3 +2448,29 @@ fn min_opt(a: Option, b: Option) -> Option { _ => None, } } + +// Using 524288 as a really high limit that's never meant to be reached +#[derive(Encode, Decode, Default)] +pub struct PersistedLocalIndices { + indices: VariableList, +} + +#[derive(Encode, Decode)] +pub struct PersistedLocalIndex { + index: u64, + last_seen_timestamp_sec: u64, +} + +impl StoreItem for PersistedLocalIndices { + fn db_column() -> DBColumn { + DBColumn::LocalIndices + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index be11a075f4..d159b1794c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3623,10 +3623,7 @@ pub fn serve( let subscriptions: std::collections::BTreeSet<_> = subscriptions .iter() .map(|subscription| { - chain - .validator_monitor - .write() - .auto_register_local_validator(subscription.validator_index); + chain.register_local_validator(subscription.validator_index); api_types::ValidatorSubscription { attestation_committee_index: subscription.committee_index, slot: subscription.slot, @@ -3789,6 +3786,11 @@ pub fn serve( }) .unzip(); + // Update the network about registered validators + for (r, _) in &preparation_data { + chain.register_local_validator(r.validator_index); + } + // Update the prepare beacon proposer cache based on this request. execution_layer .update_proposer_preparation( @@ -3901,10 +3903,7 @@ pub fn serve( | { task_spawner.blocking_json_task(Priority::P0, move || { for subscription in subscriptions { - chain - .validator_monitor - .write() - .auto_register_local_validator(subscription.validator_index); + chain.register_local_validator(subscription.validator_index); let message = ValidatorSubscriptionMessage::SyncCommitteeSubscribe { subscriptions: vec![subscription], diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 930ab5e4af..7208d19e8a 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -14,7 +14,7 @@ use lighthouse_network::{ behaviour::{ConnectionEstablished, FromSwarm}, ConnectionId, NetworkBehaviour, }, - types::{CGCUpdates, SyncState}, + types::SyncState, ConnectedPoint, Enr, NetworkConfig, NetworkGlobals, PeerId, PeerManager, }; use network::{NetworkReceivers, NetworkSenders}; @@ -25,7 +25,7 @@ use std::sync::Arc; use std::time::Duration; use store::MemoryStore; use task_executor::test_utils::TestRuntime; -use types::{ChainSpec, EthSpec}; +use types::{CGCUpdates, ChainSpec, EthSpec}; pub const TCP_PORT: u16 = 42; pub const UDP_PORT: u16 = 42; diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 3ed538a9ba..816d6dead2 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1194,9 +1194,8 @@ impl Discovery { #[cfg(test)] mod tests { use super::*; - use crate::types::CGCUpdates; use libp2p::identity::secp256k1; - use types::{BitVector, MinimalEthSpec, SubnetId}; + use types::{BitVector, CGCUpdates, MinimalEthSpec, SubnetId}; type E = MinimalEthSpec; diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 8c642ec91f..6af14ddb7a 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -31,7 +31,7 @@ pub use peerdb::peer_info::{ }; use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; use strum::IntoEnumIterator; use types::data_column_custody_group::{ @@ -1451,7 +1451,7 @@ impl PeerManager { &self, peer_id: &PeerId, custody_group_count: u64, - ) -> Result, String> { + ) -> Result, String> { // If we don't have a node id, we cannot compute the custody duties anyway let node_id = peer_id_to_node_id(peer_id)?; let spec = &self.network_globals.spec; diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index ac5e7a4ff0..9beb6c7d67 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -15,7 +15,7 @@ use crate::rpc::{ }; use crate::types::{ all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, - CGCUpdates, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, + GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, }; use crate::EnrExt; use crate::Eth2Enr; @@ -42,7 +42,7 @@ use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, Epoch, EthSpec, ForkContext, Slot, SubnetId, }; -use types::{ChainSpec, ForkName}; +use types::{CGCUpdates, ChainSpec, ForkName}; use utils::{build_transport, strip_peer_id, Context as ServiceContext}; pub mod api_types; @@ -173,6 +173,7 @@ impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, + cgc_updates: Option, ) -> Result<(Self, Arc>), String> { let config = ctx.config.clone(); trace!("Libp2p Service starting"); @@ -197,10 +198,12 @@ impl Network { )?; // TODO: Load from disk, and check consistency with DB somewhere - let initial_cgc = ctx - .chain_spec - .custody_group_count(config.subscribe_all_data_column_subnets); - let cgc_updates = CGCUpdates::new(initial_cgc); + let cgc_updates = cgc_updates.unwrap_or_else(|| { + CGCUpdates::new( + ctx.chain_spec + .custody_group_count(config.subscribe_all_data_column_subnets), + ) + }); // Construct the metadata let globals = NetworkGlobals::new( diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 994d695369..8216587d91 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use types::data_column_custody_group::{ compute_columns_for_custody_group, compute_subnets_from_custody_group, get_custody_groups, }; -use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot}; +use types::{CGCUpdates, ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slot}; pub struct NetworkGlobals { /// The current local ENR. @@ -39,12 +39,6 @@ pub struct NetworkGlobals { pub spec: Arc, } -pub struct CGCUpdates { - initial_value: u64, - updates: Vec<(Slot, u64)>, - // TODO(das): Track backfilled CGC -} - impl NetworkGlobals { pub fn new( enr: Enr, @@ -145,9 +139,8 @@ impl NetworkGlobals { } pub fn sampling_subnets(&self, slot: Slot) -> &[DataColumnSubnetId] { - let cgc = self.custody_group_count(slot) as usize; - // Returns as many elements as possible, can't panic as it's upper bounded by len - &self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc)] + let cgc = self.custody_group_count(slot); + self.sampling_subnets_for_cgc(cgc) } pub fn sampling_columns(&self, slot: Slot) -> &[ColumnIndex] { @@ -156,6 +149,11 @@ impl NetworkGlobals { &self.all_sampling_columns[..self.all_sampling_columns.len().min(cgc)] } + pub fn sampling_subnets_for_cgc(&self, cgc: u64) -> &[DataColumnSubnetId] { + // Returns as many elements as possible, can't panic as it's upper bounded by len + &self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc as usize)] + } + fn public_custody_group_count(&self) -> u64 { // TODO(das): delay announcing the public custody count for the duration of the pruning // period @@ -163,7 +161,7 @@ impl NetworkGlobals { } /// Returns the custody group count (CGC) - fn custody_group_count(&self, slot: Slot) -> u64 { + pub fn custody_group_count(&self, slot: Slot) -> u64 { self.cgc_updates.read().at_slot(slot) } @@ -176,8 +174,14 @@ impl NetworkGlobals { } /// Adds a new CGC value update - pub fn add_cgc_update(&self, update: (Slot, u64)) { - self.cgc_updates.write().add_latest_update(update); + pub fn add_cgc_update(&self, update_start_slot: Slot, cgc: u64) -> Result<(), String> { + self.cgc_updates + .write() + .add_latest_update((update_start_slot, cgc)) + } + + pub fn dump_cgc_updates(&self) -> CGCUpdates { + self.cgc_updates.read().clone() } /// Returns the number of libp2p connected peers. @@ -276,30 +280,6 @@ impl NetworkGlobals { } } -impl CGCUpdates { - pub fn new(initial_value: u64) -> Self { - Self { - initial_value, - updates: vec![], - } - } - - fn at_slot(&self, slot: Slot) -> u64 { - // TODO: Test and fix logic - for (update_slot, cgc) in &self.updates { - if slot > *update_slot { - return *cgc; - } - } - - self.initial_value - } - - fn add_latest_update(&mut self, update: (Slot, u64)) { - self.updates.push(update); - } -} - #[cfg(test)] mod test { use super::*; diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index 56fe030046..868cdb6eb9 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -11,7 +11,7 @@ pub type EnrSyncCommitteeBitfield = BitVector<::SyncCommitteeSu pub type Enr = discv5::enr::Enr; pub use eth2::lighthouse::sync_state::{BackFillState, SyncState}; -pub use globals::{CGCUpdates, NetworkGlobals}; +pub use globals::NetworkGlobals; pub use pubsub::{PubsubMessage, SnappyTransform}; pub use subnet::{Subnet, SubnetDiscovery}; pub use topics::{ diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index d686885ff7..234edad098 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -117,7 +117,7 @@ pub async fn build_libp2p_instance( libp2p_registry: None, }; Libp2pInstance( - LibP2PService::new(executor, libp2p_context) + LibP2PService::new(executor, libp2p_context, None) .await .expect("should build libp2p instance") .0, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 68a029babd..8e65b3df25 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -32,6 +32,7 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; +use types::CGCUpdates; use types::{ ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription, @@ -48,6 +49,8 @@ const UNSUBSCRIBE_DELAY_EPOCHS: u64 = 2; /// Size of the queue for validator subnet subscriptions. The number is chosen so that we may be /// able to run tens of thousands of validators on one BN. const VALIDATOR_SUBSCRIPTION_MESSAGE_QUEUE_SIZE: usize = 65_536; +/// Seconds to expire a local validator registration +const LOCAL_VALIDATOR_REGISTRY_TTL_SEC: u64 = 60 * 60; /// Types of messages that the network service can receive. #[derive(Debug, IntoStaticStr)] @@ -191,6 +194,8 @@ pub struct NetworkService { gossipsub_parameter_update: tokio::time::Interval, /// Provides fork specific info. fork_context: Arc, + /// A timer to trigger CGC updates + cgc_update_interval: tokio::time::Interval, } impl NetworkService { @@ -265,8 +270,29 @@ impl NetworkService { libp2p_registry, }; + if let Some(_disk_custody_info) = store + .get_custody_info() + .map_err(|e| format!("Unable to read custody info from the DB: {e:?}"))? + { + // TODO(das): check that list of columns is compatible + } + + let cgc_udpates = if let Some(disk_cgc_updates) = store + .get_cgc_updates() + .map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))? + { + disk_cgc_updates + } else { + // Just return the new one + let initial_cgc = beacon_chain + .spec + .custody_group_count(config.subscribe_all_data_column_subnets); + CGCUpdates::new(initial_cgc) + }; + // launch libp2p service - let (mut libp2p, network_globals) = Network::new(executor.clone(), service_context).await?; + let (mut libp2p, network_globals) = + Network::new(executor.clone(), service_context, Some(cgc_udpates)).await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -318,6 +344,8 @@ impl NetworkService { validator_subscription_recv, } = network_receivers; + let epoch_in_seconds = fork_context.spec.seconds_per_slot * T::EthSpec::slots_per_epoch(); + // create the network service and spawn the task let network_service = NetworkService { beacon_chain, @@ -335,6 +363,7 @@ impl NetworkService { metrics_enabled: config.metrics_enabled, metrics_update, gossipsub_parameter_update, + cgc_update_interval: tokio::time::interval(Duration::from_secs(epoch_in_seconds)), fork_context, }; @@ -427,6 +456,10 @@ impl NetworkService { _ = self.gossipsub_parameter_update.tick() => self.update_gossipsub_parameters(), + // TODO(das) should align this tick to be in the middle of an epoch, and ideally + // outside a busy slot period + _ = self.cgc_update_interval.tick() => self.on_cgc_update_interval(), + // handle a message sent to the network Some(msg) = self.network_recv.recv() => self.on_network_msg(msg, &mut shutdown_sender).await, @@ -772,22 +805,107 @@ impl NetworkService { } } + fn on_cgc_update_interval(&mut self) { + let prev_cgc = self.network_globals.custody_group_count(Slot::max_value()); + let Ok(clock_epoch) = self.beacon_chain.epoch() else { + return; + }; + let cached_head = self.beacon_chain.canonical_head.cached_head(); + + self.beacon_chain + .validator_monitor + .write() + .prune_registered_local_validators(Duration::from_secs( + LOCAL_VALIDATOR_REGISTRY_TTL_SEC, + )); + + let known_validators_balance = self + .beacon_chain + .validator_monitor + .read() + .get_registered_local_validators() + // TODO(das): should ignore non active validators? + .map(|validator_index| { + cached_head + .snapshot + .beacon_state + .get_effective_balance(*validator_index as usize) + .unwrap_or(0) + }) + .sum::(); + + // TODO(das): track connected balance as a metric + + let next_cgc = self + .beacon_chain + .spec + .custody_group_by_balance(known_validators_balance); + + // TODO(das): check the backfilled CGC and potentially update the network globals state + + if next_cgc != prev_cgc { + // TODO(das): Should we consider the case where the clock is almost at the end of the epoch? + // If I/O is slow we may update the in-memory map for an epoch that's already + // progressing. + let next_epoch = clock_epoch + 1; + info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count"); + // TODO(das): Add CGC metrics for updates, current internal value and announced value + + // Add a new entry to the network globals + if let Err(e) = self.network_globals.add_cgc_update( + next_epoch.start_slot(T::EthSpec::slots_per_epoch()), + next_cgc, + ) { + error!("List of CGC updates full: {e:?}"); + return; + } + let cgc_updates = self.network_globals.dump_cgc_updates(); + + // Persist the entry to the store + if let Err(e) = self.beacon_chain.store.put_cgc_updates(cgc_updates) { + // Do not update the memory value unless it's persisted to disk + error!(error = ?e, "Unable to persist CGC updates to disk"); + } + + // Update the gossip subscriptions + let next_sampling_subnets = self + .network_globals + .sampling_subnets_for_cgc(next_cgc) + .iter() + .copied() + .collect::>(); + let prev_sampling_subnets = self + .network_globals + .sampling_subnets_for_cgc(prev_cgc) + .iter() + .copied() + .collect::>(); + if next_cgc > prev_cgc { + for subnet in next_sampling_subnets.difference(&prev_sampling_subnets) { + self.subscribe(Subnet::DataColumn(*subnet).into()); + } + } + if next_cgc < prev_cgc { + for subnet in prev_sampling_subnets.difference(&next_sampling_subnets) { + self.unsubscribe(Subnet::DataColumn(*subnet).into()); + } + } + } + + // Schedule an advertise CGC update for later + // TODO(das): use min_epochs_for_data_columns + let last_pruned_epoch = + clock_epoch - Epoch::new(self.beacon_chain.spec.min_epochs_for_blob_sidecars_requests); + let cgc_to_announce = self + .network_globals + .custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch())); + // TODO(das): Compare with current announced and update ENR + } + fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) { match msg { - SubnetServiceMessage::Subscribe(subnet) => { - for fork_digest in self.required_gossip_fork_digests() { - let topic = - GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - self.libp2p.subscribe(topic); - } - } - SubnetServiceMessage::Unsubscribe(subnet) => { - for fork_digest in self.required_gossip_fork_digests() { - let topic = - GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest); - self.libp2p.unsubscribe(topic); - } - } + SubnetServiceMessage::Subscribe(subnet) => self.subscribe(subnet.into()), + SubnetServiceMessage::Unsubscribe(subnet) => self.unsubscribe(subnet.into()), SubnetServiceMessage::EnrAdd(subnet) => { self.libp2p.update_enr_subnet(subnet, true); } @@ -801,6 +919,20 @@ impl NetworkService { } } + fn subscribe(&mut self, kind: GossipKind) { + for fork_digest in self.required_gossip_fork_digests() { + let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest); + self.libp2p.subscribe(topic); + } + } + + fn unsubscribe(&mut self, kind: GossipKind) { + for fork_digest in self.required_gossip_fork_digests() { + let topic = GossipTopic::new(kind.clone(), GossipEncoding::default(), fork_digest); + self.libp2p.unsubscribe(topic); + } + } + fn update_next_fork(&mut self) { let new_enr_fork_id = self.beacon_chain.enr_fork_id(); let new_fork_digest = new_enr_fork_id.fork_digest; @@ -901,6 +1033,16 @@ impl Drop for NetworkService { ), Ok(_) => info!("Saved DHT state"), } + // Persist CGC updates + match self + .beacon_chain + .store + .put_cgc_updates(self.network_globals.dump_cgc_updates()) + { + Ok(_) => info!("Saved CGC updates"), + Err(e) => error!(error = ?e, "Failed to persist CGC updates"), + } + info!("Network service shutdown"); } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6a30d8a428..2f737319a7 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,10 +7,11 @@ use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator}; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, - ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, - COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CGCUpdatesStore, CompactionTimestamp, CustodyInfo, DataColumnInfo, + PruningCheckpoint, SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_INFO_KEY, + ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, CGC_UPDATES_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, + CURRENT_SCHEMA_VERSION, CUSTODY_INFO_KEY, DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, + SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ @@ -374,6 +375,14 @@ impl HotColdDB, BeaconNodeBackend> { if let Some(disk_config) = db.load_config()? { let split = db.get_split_info(); let anchor = db.get_anchor_info(); + // TODO(das): We need to check that the persited columns already in the DB are + // compatible with a new PeerID if applicable. We don't want to compare the full PeerId + // of the existing DB and the runtime PeerId. Instead we want to assert that the + // **sorted** set of columns of the persisted post-peerdas blocks with data is the same + // as the runtime sorted set. With validator custody blocks may have more or less + // columns. We need to compare the largest set of the non-pruned columns. In a single + // runtime the PeerID does not change so all stored blocks are consistent with each + // other. db.config .check_compatibility(&disk_config, &split, &anchor)?; @@ -2451,6 +2460,30 @@ impl, Cold: ItemStore> HotColdDB data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) } + /// Load custody info from disk. + pub fn get_custody_info(&self) -> Result, Error> { + self.hot_db.get(&CUSTODY_INFO_KEY) + } + + /// Store the given `custody_info` to disk. + pub fn put_custody_info_in_batch(&self, custody_info: &CustodyInfo) -> Result<(), Error> { + let kv_store_op = custody_info.as_kv_store_op(CUSTODY_INFO_KEY); + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load `cgc_updates` from disk. + pub fn get_cgc_updates(&self) -> Result, Error> { + self.hot_db + .get::(&CGC_UPDATES_KEY) + .map(|r| r.map(|r| r.value)) + } + + /// Store the given `cgc_updates` to disk. + pub fn put_cgc_updates(&self, cgc_updates: CGCUpdates) -> Result<(), Error> { + let kv_store_op = CGCUpdatesStore { value: cgc_updates }.as_kv_store_op(CGC_UPDATES_KEY); + self.hot_db.do_atomically(vec![kv_store_op]) + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 2b5be03489..6585abe6ab 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -305,6 +305,8 @@ pub enum DBColumn { ForkChoice, #[strum(serialize = "pkc")] PubkeyCache, + #[strum(serialize = "lix")] + LocalIndices, /// For the legacy table mapping restore point numbers to state roots. /// /// DEPRECATED. Can be removed once schema v22 is buried by a hard fork. @@ -396,6 +398,7 @@ impl DBColumn { | Self::Eth1Cache | Self::ForkChoice | Self::PubkeyCache + | Self::LocalIndices | Self::BeaconRestorePoint | Self::DhtEnrs | Self::OptimisticTransitionBlock => 32, diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 1d70e105b9..1f73983a5b 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem}; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use types::{Checkpoint, Hash256, Slot}; +use types::{typenum::U4096, CGCUpdates, Checkpoint, Hash256, Slot, VariableList}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22); @@ -17,6 +17,8 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); +pub const CUSTODY_INFO_KEY: Hash256 = Hash256::repeat_byte(8); +pub const CGC_UPDATES_KEY: Hash256 = Hash256::repeat_byte(9); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -246,3 +248,46 @@ impl StoreItem for DataColumnInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] +pub struct CustodyInfo { + /// Given a PeerID, compute the set of custody columns with the maximum CGC value, then sort + /// them numerically. + /// 4096 is a random max limit that will never be reached + pub ordered_custody_columns: VariableList, +} + +impl StoreItem for CustodyInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] +pub struct CGCUpdatesStore { + pub value: CGCUpdates, +} + +impl StoreItem for CGCUpdatesStore { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 1650001db6..e67f1ab3c3 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -203,6 +203,8 @@ pub struct ChainSpec { pub data_column_sidecar_subnet_count: u64, pub samples_per_slot: u64, pub custody_requirement: u64, + pub validator_custody_requirement: u64, + pub balance_per_additional_custody_group: u64, /* * Networking @@ -263,6 +265,12 @@ pub struct ChainSpec { } impl ChainSpec { + /// Panics if `self` contains some illegal value + pub fn assert_valid(&self) { + // Tests that balance_per_additional_custody_group is not zero + self.custody_group_by_balance(1); + } + /// Construct a `ChainSpec` from a standard config. pub fn from_config(config: &Config) -> Option { let spec = E::default_spec(); @@ -704,6 +712,7 @@ impl ChainSpec { Ok(std::cmp::max(custody_column_count, self.samples_per_slot)) } + // TODO(das): delete in favor of custody_group_by_balance pub fn custody_group_count(&self, is_supernode: bool) -> u64 { if is_supernode { self.number_of_custody_groups @@ -712,6 +721,22 @@ impl ChainSpec { } } + pub fn custody_group_by_balance(&self, balance_gwei: u64) -> u64 { + if balance_gwei == 0 { + self.custody_requirement + } else { + std::cmp::min( + std::cmp::max( + balance_gwei + .safe_div(self.balance_per_additional_custody_group) + .expect("balance_per_additional_custody_group must be greater than 0"), + self.validator_custody_requirement, + ), + self.number_of_custody_groups, + ) + } + } + pub fn all_data_column_sidecar_subnets(&self) -> impl Iterator { (0..self.data_column_sidecar_subnet_count).map(DataColumnSubnetId::new) } @@ -915,6 +940,8 @@ impl ChainSpec { fulu_fork_version: [0x06, 0x00, 0x00, 0x00], fulu_fork_epoch: None, custody_requirement: 4, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, number_of_custody_groups: 128, data_column_sidecar_subnet_count: 128, number_of_columns: 128, @@ -1245,6 +1272,8 @@ impl ChainSpec { fulu_fork_version: [0x06, 0x00, 0x00, 0x64], fulu_fork_epoch: None, custody_requirement: 4, + validator_custody_requirement: 8, + balance_per_additional_custody_group: 32000000000, number_of_custody_groups: 128, data_column_sidecar_subnet_count: 128, number_of_columns: 128, diff --git a/consensus/types/src/custody.rs b/consensus/types/src/custody.rs new file mode 100644 index 0000000000..4fc0642615 --- /dev/null +++ b/consensus/types/src/custody.rs @@ -0,0 +1,36 @@ +use crate::*; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; + +#[derive(Eq, PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode)] +pub struct CGCUpdates { + initial_value: u64, + updates: VariableList<(Slot, u64), ssz_types::typenum::U131072>, + // TODO(das): Track backfilled CGC +} + +impl CGCUpdates { + pub fn new(initial_value: u64) -> Self { + Self { + initial_value, + updates: VariableList::empty(), + } + } + + pub fn at_slot(&self, slot: Slot) -> u64 { + // TODO: Test and fix logic + for (update_slot, cgc) in &self.updates { + if slot > *update_slot { + return *cgc; + } + } + + self.initial_value + } + + pub fn add_latest_update(&mut self, update: (Slot, u64)) -> Result<(), String> { + self.updates + .push(update) + .map_err(|e| format!("Updates list full: {e:?}")) + } +} diff --git a/consensus/types/src/data_column_custody_group.rs b/consensus/types/src/data_column_custody_group.rs index 9e9505da9f..931dfe096a 100644 --- a/consensus/types/src/data_column_custody_group.rs +++ b/consensus/types/src/data_column_custody_group.rs @@ -1,9 +1,8 @@ use crate::{ChainSpec, ColumnIndex, DataColumnSubnetId}; use alloy_primitives::U256; use itertools::Itertools; -use maplit::hashset; use safe_arith::{ArithError, SafeArith}; -use std::collections::HashSet; +use std::collections::{BTreeSet, HashSet}; pub type CustodyIndex = u64; @@ -24,14 +23,14 @@ pub fn get_custody_groups( raw_node_id: [u8; 32], custody_group_count: u64, spec: &ChainSpec, -) -> Result, DataColumnCustodyGroupError> { +) -> Result, DataColumnCustodyGroupError> { if custody_group_count > spec.number_of_custody_groups { return Err(DataColumnCustodyGroupError::InvalidCustodyGroupCount( custody_group_count, )); } - let mut custody_groups: HashSet = hashset![]; + let mut custody_groups: BTreeSet = <_>::default(); let mut current_id = U256::from_be_slice(&raw_node_id); while custody_groups.len() < custody_group_count as usize { let mut node_id_bytes = [0u8; 32]; @@ -49,7 +48,7 @@ pub fn get_custody_groups( current_id = current_id.wrapping_add(U256::from(1u64)); } - Ok(custody_groups) + Ok(custody_groups.into_iter().collect::>()) } /// Returns the columns that are associated with a given custody group. diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 73a50b4ef3..616bac2b9b 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -30,6 +30,7 @@ pub mod checkpoint; pub mod consolidation_request; pub mod consts; pub mod contribution_and_proof; +pub mod custody; pub mod deposit; pub mod deposit_data; pub mod deposit_message; @@ -148,6 +149,7 @@ pub use crate::config_and_preset::{ }; pub use crate::consolidation_request::ConsolidationRequest; pub use crate::contribution_and_proof::ContributionAndProof; +pub use crate::custody::CGCUpdates; pub use crate::data_column_sidecar::{ ColumnIndex, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, }; diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 2b7387e076..ee3f943d0f 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -641,6 +641,9 @@ fn run( .eth2_network_config(eth2_network_config)? .build()?; + // Panic early if the spec contains illegal values + environment.eth2_config.spec.assert_valid(); + // Log panics properly. { std::panic::set_hook(Box::new(move |info| {