From 661bfebdf0708b81d11409d26a13bd29e3898db3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 4 Apr 2025 14:16:28 -0300 Subject: [PATCH] More progress --- beacon_node/beacon_chain/src/beacon_chain.rs | 1 - .../beacon_chain/src/validator_monitor.rs | 24 +++----- beacon_node/http_api/src/lib.rs | 61 ++++++++----------- .../lighthouse_network/src/discovery/enr.rs | 18 +++--- .../lighthouse_network/src/discovery/mod.rs | 20 ++++++ .../src/discovery/subnet_predicate.rs | 2 +- .../lighthouse_network/src/service/mod.rs | 19 ++++-- beacon_node/network/src/metrics.rs | 34 +++++++++++ .../src/network_beacon_processor/tests.rs | 19 +----- beacon_node/network/src/service.rs | 51 ++++++++++------ 10 files changed, 148 insertions(+), 101 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 91df0ccd23..bcb64691a6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7165,7 +7165,6 @@ impl BeaconChain { /// Register a local validator that has connected to the Beacon REST API pub fn register_local_validator(&self, validator_index: u64) { - // TODO(das): persist `last_seen_local_validators` to disk on shutdown and once in a while self.validator_monitor .write() .auto_register_local_validator(validator_index); diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index 137e2d5e71..a3d736c735 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -430,11 +430,14 @@ impl ValidatorMonitor { let mut s = Self { validators: <_>::default(), indices: <_>::default(), + // Don't persist the last seen time of indices, and reset them to just seen on start-up. + // Otherwise if the node crashes for more than `ttl` it will forget that it had indices. + // We will only prune an index if it is not seen during runtime for longer than `ttl`. last_seen_local_validators: HashMap::from_iter( persisted_local_indices .indices - .iter() - .map(|e| (e.index, Duration::from_secs(e.last_seen_timestamp_sec))), + .into_iter() + .map(|index| (index, timestamp_now())), ), auto_register, individual_tracking_threshold, @@ -465,7 +468,6 @@ impl ValidatorMonitor { /// Prunes all validators not seen for longer than `ttl` (time to live) pub fn prune_registered_local_validators(&mut self, ttl: Duration) { let now = timestamp_now(); - // Prune expired keys self.last_seen_local_validators .retain(|_, last_seen| now - *last_seen < ttl); } @@ -479,12 +481,8 @@ impl ValidatorMonitor { pub fn to_persisted_local_validators(&self) -> Result { Ok(PersistedLocalIndices { indices: VariableList::new( - self.last_seen_local_validators - .iter() - .map(|(index, last_seen)| PersistedLocalIndex { - index: *index, - last_seen_timestamp_sec: last_seen.as_secs(), - }) + self.get_registered_local_validators() + .copied() .collect::>(), )?, }) @@ -2452,13 +2450,7 @@ fn min_opt(a: Option, b: Option) -> Option { // Using 524288 as a really high limit that's never meant to be reached #[derive(Encode, Decode, Default)] pub struct PersistedLocalIndices { - indices: VariableList, -} - -#[derive(Encode, Decode)] -pub struct PersistedLocalIndex { - index: u64, - last_seen_timestamp_sec: u64, + indices: VariableList, } impl StoreItem for PersistedLocalIndices { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index d159b1794c..954d933872 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -87,11 +87,11 @@ use tokio_stream::{ use tracing::{debug, error, info, warn}; use types::{ fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId, - AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, - Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + AttesterSlashing, BeaconStateError, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, + EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, + RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -2907,11 +2907,9 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and(chain_filter.clone()) .then( |task_spawner: TaskSpawner, - network_globals: Arc>, - chain: Arc>| { + network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let enr = network_globals.local_enr(); let p2p_addresses = enr.multiaddr_p2p_tcp(); @@ -2921,10 +2919,7 @@ pub fn serve( enr, p2p_addresses, discovery_addresses, - metadata: from_meta_data::( - &network_globals.local_metadata(), - &chain.spec, - ), + metadata: from_meta_data::(&network_globals.local_metadata()), })) }) }, @@ -4815,32 +4810,26 @@ pub fn serve( Ok(http_server) } -fn from_meta_data(meta_data: &MetaData, spec: &ChainSpec) -> api_types::MetaData { +fn from_meta_data(meta_data: &MetaData) -> api_types::MetaData { let format_hex = |bytes: &[u8]| format!("0x{}", hex::encode(bytes)); - let seq_number = *meta_data.seq_number(); - let attnets = format_hex(&meta_data.attnets().clone().into_bytes()); - let syncnets = format_hex( - &meta_data - .syncnets() - .cloned() - .unwrap_or_default() - .into_bytes(), - ); - - if spec.is_peer_das_scheduled() { - api_types::MetaData::V3(api_types::MetaDataV3 { - seq_number, - attnets, - syncnets, - custody_group_count: meta_data.custody_group_count().cloned().unwrap_or_default(), - }) - } else { - api_types::MetaData::V2(api_types::MetaDataV2 { - seq_number, - attnets, - syncnets, - }) + match meta_data { + MetaData::V1(meta_data) => api_types::MetaData::V2(api_types::MetaDataV2 { + seq_number: meta_data.seq_number, + attnets: format_hex(meta_data.attnets.as_slice()), + syncnets: format_hex(<_>::default()), + }), + MetaData::V2(meta_data) => api_types::MetaData::V2(api_types::MetaDataV2 { + seq_number: meta_data.seq_number, + attnets: format_hex(meta_data.attnets.as_slice()), + syncnets: format_hex(meta_data.syncnets.as_slice()), + }), + MetaData::V3(meta_data) => api_types::MetaData::V3(api_types::MetaDataV3 { + seq_number: meta_data.seq_number, + attnets: format_hex(meta_data.attnets.as_slice()), + syncnets: format_hex(meta_data.syncnets.as_slice()), + custody_group_count: meta_data.custody_group_count, + }), } } diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index e70c8047e0..927c44458d 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -40,7 +40,7 @@ pub trait Eth2Enr { ) -> Result, &'static str>; /// The peerdas custody group count associated with the ENR. - fn custody_group_count(&self, spec: &ChainSpec) -> Result; + fn custody_group_count(&self, spec: &ChainSpec) -> Result; fn eth2(&self) -> Result; } @@ -68,17 +68,19 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } - fn custody_group_count(&self, spec: &ChainSpec) -> Result { + fn custody_group_count(&self, spec: &ChainSpec) -> Result { let cgc = self .get_decodable::(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) .ok_or("ENR custody group count non-existent")? .map_err(|_| "Could not decode the ENR custody group count")?; - if (spec.custody_requirement..=spec.number_of_custody_groups).contains(&cgc) { - Ok(cgc) - } else { - Err("Invalid custody group count in ENR") + if cgc < spec.custody_requirement { + return Err("ENR CGC < custody_requirement"); } + if cgc > spec.number_of_custody_groups { + return Err("ENR CGC > number_of_custody_groups"); + } + Ok(cgc) } fn eth2(&self) -> Result { @@ -363,7 +365,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_group_count::(&spec).unwrap(), + enr.custody_group_count(&spec).unwrap(), spec.custody_requirement, ); } @@ -378,7 +380,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_group_count::(&spec).unwrap(), + enr.custody_group_count(&spec).unwrap(), spec.number_of_custody_groups, ); } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 816d6dead2..1a6d0e2561 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -594,6 +594,26 @@ impl Discovery { enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); } + /// Updates the `cgc` field of our local ENR. + pub fn update_cgc_enr(&mut self, cgc: u64) -> Result { + if let Ok(current_cgc) = self.local_enr().custody_group_count(&self.spec) { + if current_cgc == cgc { + return Ok(false); + } + } + + self.discv5 + .enr_insert(ETH2_ENR_KEY, &cgc) + .map_err(|e| format!("{:?}", e))?; + + // replace the global version with discovery version + self.network_globals.set_enr(self.discv5.local_enr()); + + // persist modified enr to disk + enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + Ok(true) + } + // Bans a peer and it's associated seen IP addresses. pub fn ban_peer(&mut self, peer_id: &PeerId, ip_addresses: Vec) { // first try and convert the peer_id to a node_id. diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index 735ef5b0f2..f37bde3781 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -34,7 +34,7 @@ where .as_ref() .is_ok_and(|b| b.get(*s.deref() as usize).unwrap_or(false)), Subnet::DataColumn(s) => { - if let Ok(custody_group_count) = enr.custody_group_count::(&spec) { + if let Ok(custody_group_count) = enr.custody_group_count(&spec) { compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec) .is_ok_and(|subnets| subnets.contains(s)) } else { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 9beb6c7d67..c3a86403f6 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -173,7 +173,7 @@ impl Network { pub async fn new( executor: task_executor::TaskExecutor, mut ctx: ServiceContext<'_>, - cgc_updates: Option, + initial_cgc_updates: Option, ) -> Result<(Self, Arc>), String> { let config = ctx.config.clone(); trace!("Libp2p Service starting"); @@ -197,8 +197,8 @@ impl Network { &ctx.chain_spec, )?; - // TODO: Load from disk, and check consistency with DB somewhere - let cgc_updates = cgc_updates.unwrap_or_else(|| { + // Load initial CGC updates from persisted source (DB) or default to minimum CGC + let cgc_updates = initial_cgc_updates.unwrap_or_else(|| { CGCUpdates::new( ctx.chain_spec .custody_group_count(config.subscribe_all_data_column_subnets), @@ -1259,12 +1259,23 @@ impl Network { crit!(error = e, "Could not update ENR bitfield"); } - // TODO: Can we deprecate this for a single source of truth? + // TODO(das): Can we deprecate this for a single source of truth? let metadata = self.network_globals.local_metadata(); self.eth2_rpc_mut() .update_seq_number(*metadata.seq_number()); } + /// Updates the CGC value in our local ENR + #[instrument(parent = None, + level = "trace", + fields(service = "libp2p"), + name = "libp2p", + skip_all + )] + pub fn update_enr_cgc(&mut self, cgc: u64) -> Result { + self.discovery_mut().update_cgc_enr(cgc) + } + /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we /// would like to retain the peers for. #[instrument(parent = None, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index ee104930db..1e9763e97d 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -628,6 +628,40 @@ pub static SAMPLING_REQUEST_RESULT: LazyLock> = LazyLock:: ) }); +/* + * PeerDAS custody metrics + */ +pub static LOCAL_INDICES_COUNT: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_local_indices_count", + "Current count of local indices that recently connected to the beacon API", + ) +}); +pub static LOCAL_INDICES_ETH_BALANCE: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_local_indices_eth_balance", + "Current sum of local indices balance that recently connected to the beacon API", + ) +}); +pub static CGC_INTERNAL: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_cgc_internal", + "Current internal Custody Group Count CGC", + ) +}); +pub static CGC_ANNOUNCED: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "beacon_custody_cgc_announced", + "Current announced Custody Group Count CGC", + ) +}); +pub static CGC_UPDATES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_custody_cgc_updates", + "Total count of Custody Group Count CGC updates", + ) +}); + pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) { inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]); } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 69ba5c1dbd..510bc6916c 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -195,28 +195,13 @@ impl TestRig { let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); - // Default metadata - let meta_data = if spec.is_peer_das_scheduled() { - MetaData::V3(MetaDataV3 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - custody_group_count: spec.custody_requirement, - }) - } else { - MetaData::V2(MetaDataV2 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }) - }; - let enr_key = CombinedKey::generate_secp256k1(); let enr = enr::Enr::builder().build(&enr_key).unwrap(); + let cgc_updates = CGCUpdates::new(spec.custody_requirement); let network_config = Arc::new(NetworkConfig::default()); let network_globals = Arc::new(NetworkGlobals::new( enr, - meta_data, + cgc_updates, vec![], false, network_config, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 8e65b3df25..8873b8819f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -32,10 +32,9 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use tracing::{debug, error, info, info_span, trace, warn, Instrument}; -use types::CGCUpdates; use types::{ - ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, - Unsigned, ValidatorSubscription, + ChainSpec, Epoch, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; @@ -277,22 +276,15 @@ impl NetworkService { // TODO(das): check that list of columns is compatible } - let cgc_udpates = if let Some(disk_cgc_updates) = store + // If there are no stored disk_cgc_updates `Network::new` will default to empty ones with + // the minimum CGC. + let initial_cgc_updates = store .get_cgc_updates() - .map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))? - { - disk_cgc_updates - } else { - // Just return the new one - let initial_cgc = beacon_chain - .spec - .custody_group_count(config.subscribe_all_data_column_subnets); - CGCUpdates::new(initial_cgc) - }; + .map_err(|e| format!("Unable to read cgc updates from the DB: {e:?}"))?; // launch libp2p service let (mut libp2p, network_globals) = - Network::new(executor.clone(), service_context, Some(cgc_udpates)).await?; + Network::new(executor.clone(), service_context, initial_cgc_updates).await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -819,6 +811,7 @@ impl NetworkService { LOCAL_VALIDATOR_REGISTRY_TTL_SEC, )); + let mut local_indices_count = 0; let known_validators_balance = self .beacon_chain .validator_monitor @@ -826,6 +819,7 @@ impl NetworkService { .get_registered_local_validators() // TODO(das): should ignore non active validators? .map(|validator_index| { + local_indices_count += 1; cached_head .snapshot .beacon_state @@ -834,7 +828,9 @@ impl NetworkService { }) .sum::(); - // TODO(das): track connected balance as a metric + // TODO(das): Should persist the local indices here to dump to DB once in a while in case of + // improper shutdown? Not sure if we do the same for other persisted data. It sounds + // sensible but at the same time it will waste I/O. let next_cgc = self .beacon_chain @@ -849,7 +845,7 @@ impl NetworkService { // progressing. let next_epoch = clock_epoch + 1; info!(prev_cgc, next_cgc, %next_epoch, "Updating internal custody count"); - // TODO(das): Add CGC metrics for updates, current internal value and announced value + metrics::inc_counter(&metrics::CGC_UPDATES); // Add a new entry to the network globals if let Err(e) = self.network_globals.add_cgc_update( @@ -899,7 +895,26 @@ impl NetworkService { let cgc_to_announce = self .network_globals .custody_group_count(last_pruned_epoch.start_slot(T::EthSpec::slots_per_epoch())); - // TODO(das): Compare with current announced and update ENR + + // update_enr_cgc updates the NetworkGlobals ENR + match self.libp2p.update_enr_cgc(cgc_to_announce) { + Ok(updated) => { + if updated { + info!(cgc = cgc_to_announce, "Updated ENR custody group count"); + } + } + Err(e) => { + crit!(error = ?e, "Error updating local ENR custody group count"); + } + } + + metrics::set_gauge(&metrics::CGC_INTERNAL, next_cgc as i64); + metrics::set_gauge(&metrics::CGC_ANNOUNCED, cgc_to_announce as i64); + metrics::set_gauge(&metrics::LOCAL_INDICES_COUNT, local_indices_count as i64); + metrics::set_gauge( + &metrics::LOCAL_INDICES_ETH_BALANCE, + known_validators_balance as i64 / 1_000_000_000, + ); } fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {