From f6fa2380c73d691d15ce56766e3b7c1a20588699 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Fri, 4 Apr 2025 16:27:58 -0300 Subject: [PATCH] Pre-compute MetaData when setting ENR --- beacon_node/http_api/src/test_utils.rs | 19 ++-- .../lighthouse_network/src/discovery/enr.rs | 19 ++-- .../lighthouse_network/src/discovery/mod.rs | 101 ++++++++++-------- .../src/discovery/subnet_predicate.rs | 3 + .../lighthouse_network/src/service/mod.rs | 4 +- .../lighthouse_network/src/types/globals.rs | 99 ++++++++++------- 6 files changed, 143 insertions(+), 102 deletions(-) diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 7208d19e8a..f65adcf516 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -141,14 +141,17 @@ pub async fn create_api_server_with_config( let enr = Enr::builder().build(&enr_key).unwrap(); let network_config = Arc::new(NetworkConfig::default()); let cgc_updates = CGCUpdates::new(chain.spec.custody_requirement); - let network_globals = Arc::new(NetworkGlobals::new( - enr.clone(), - cgc_updates, - vec![], - false, - network_config, - chain.spec.clone(), - )); + let network_globals = Arc::new( + NetworkGlobals::new( + enr.clone(), + cgc_updates, + vec![], + false, + network_config, + chain.spec.clone(), + ) + .unwrap(), + ); // Only a peer manager can add peers, so we create a dummy manager. let config = lighthouse_network::peer_manager::config::Config::default(); diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index 927c44458d..9938353b66 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -40,7 +40,7 @@ pub trait Eth2Enr { ) -> Result, &'static str>; /// The peerdas custody group count associated with the ENR. - fn custody_group_count(&self, spec: &ChainSpec) -> Result; + fn custody_group_count(&self, spec: &ChainSpec) -> Result, &'static str>; fn eth2(&self) -> Result; } @@ -68,11 +68,14 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } - fn custody_group_count(&self, spec: &ChainSpec) -> Result { - let cgc = self + fn custody_group_count(&self, spec: &ChainSpec) -> Result, &'static str> { + let Some(cgc) = self .get_decodable::(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) - .ok_or("ENR custody group count non-existent")? - .map_err(|_| "Could not decode the ENR custody group count")?; + .transpose() + .map_err(|_| "Could not decode the ENR custody group count")? + else { + return Ok(None); + }; if cgc < spec.custody_requirement { return Err("ENR CGC < custody_requirement"); @@ -80,7 +83,7 @@ impl Eth2Enr for Enr { if cgc > spec.number_of_custody_groups { return Err("ENR CGC > number_of_custody_groups"); } - Ok(cgc) + Ok(Some(cgc)) } fn eth2(&self) -> Result { @@ -365,7 +368,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_group_count(&spec).unwrap(), + enr.custody_group_count(&spec).unwrap().unwrap(), spec.custody_requirement, ); } @@ -380,7 +383,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_group_count(&spec).unwrap(), + enr.custody_group_count(&spec).unwrap().unwrap(), spec.number_of_custody_groups, ); } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 1a6d0e2561..976d3fd51b 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -418,11 +418,8 @@ impl Discovery { self.discv5 .enr_insert(enr_field, &port) .map_err(|e| format!("{:?}", e))?; + self.on_enr_updated(); - // replace the global version - self.network_globals.set_enr(self.discv5.local_enr()); - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); Ok(true) } @@ -454,11 +451,8 @@ impl Discovery { self.discv5 .enr_insert(enr_field, &port) .map_err(|e| format!("{:?}", e))?; + self.on_enr_updated(); - // replace the global version - self.network_globals.set_enr(self.discv5.local_enr()); - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); Ok(true) } @@ -469,10 +463,8 @@ impl Discovery { pub fn update_enr_udp_socket(&mut self, socket_addr: SocketAddr) -> Result<(), String> { const IS_TCP: bool = false; if self.discv5.update_local_enr_socket(socket_addr, IS_TCP) { - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + self.on_enr_updated(); } - self.network_globals.set_enr(self.discv5.local_enr()); Ok(()) } @@ -480,7 +472,7 @@ impl Discovery { pub fn update_enr_bitfield(&mut self, subnet: Subnet, value: bool) -> Result<(), String> { let local_enr = self.discv5.local_enr(); - match subnet { + let updated_enr = match subnet { Subnet::Attestation(id) => { let id = *id as usize; let mut current_bitfield = local_enr.attestation_bitfield::()?; @@ -507,12 +499,10 @@ impl Discovery { })?; // insert the bitfield into the ENR record - self.discv5 - .enr_insert::( - ATTESTATION_BITFIELD_ENR_KEY, - ¤t_bitfield.as_ssz_bytes().into(), - ) - .map_err(|e| format!("{:?}", e))?; + self.enr_insert_bytes( + ATTESTATION_BITFIELD_ENR_KEY, + ¤t_bitfield.as_ssz_bytes().into(), + )? } Subnet::SyncCommittee(id) => { let id = *id as usize; @@ -541,22 +531,19 @@ impl Discovery { })?; // insert the bitfield into the ENR record - self.discv5 - .enr_insert::( - SYNC_COMMITTEE_BITFIELD_ENR_KEY, - ¤t_bitfield.as_ssz_bytes().into(), - ) - .map_err(|e| format!("{:?}", e))?; + self.enr_insert_bytes( + SYNC_COMMITTEE_BITFIELD_ENR_KEY, + ¤t_bitfield.as_ssz_bytes().into(), + )? } // Data column subnets are computed from node ID. No subnet bitfield in the ENR. Subnet::DataColumn(_) => return Ok(()), + }; + + if updated_enr { + self.on_enr_updated(); } - // replace the global version - self.network_globals.set_enr(self.discv5.local_enr()); - - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); Ok(()) } @@ -587,16 +574,12 @@ impl Discovery { ) }); - // replace the global version with discovery version - self.network_globals.set_enr(self.discv5.local_enr()); - - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); + self.on_enr_updated(); } /// Updates the `cgc` field of our local ENR. pub fn update_cgc_enr(&mut self, cgc: u64) -> Result { - if let Ok(current_cgc) = self.local_enr().custody_group_count(&self.spec) { + if let Ok(Some(current_cgc)) = self.local_enr().custody_group_count(&self.spec) { if current_cgc == cgc { return Ok(false); } @@ -605,12 +588,8 @@ impl Discovery { self.discv5 .enr_insert(ETH2_ENR_KEY, &cgc) .map_err(|e| format!("{:?}", e))?; + self.on_enr_updated(); - // replace the global version with discovery version - self.network_globals.set_enr(self.discv5.local_enr()); - - // persist modified enr to disk - enr::save_enr_to_disk(Path::new(&self.enr_dir), &self.local_enr()); Ok(true) } @@ -975,6 +954,32 @@ impl Discovery { } None } + + /// Inserts Bytes into our local ENR. Returns true if we inserted a new value. + fn enr_insert_bytes(&self, key: &str, value: &Bytes) -> Result { + Ok( + match self + .discv5 + .enr_insert(key, value) + .map_err(|e| format!("{e:?}"))? + { + // If previous value is distinct we inserted + Some(prev_value) => prev_value != value.to_vec(), + // No previous value, we inserted + None => true, + }, + ) + } + + fn on_enr_updated(&mut self) { + let enr = self.discv5.local_enr(); + // persist modified enr to disk + enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr); + // replace the global version + if let Err(e) = self.network_globals.set_enr(enr) { + crit!(error = ?e, "Updated local ENR field to an invalid value"); + } + } } /* NetworkBehaviour Implementation */ @@ -1029,6 +1034,8 @@ impl NetworkBehaviour for Discovery { return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers })); } + let mut update_enr = false; + // Process the server event stream match self.event_stream { EventStream::Awaiting(ref mut fut) => { @@ -1076,11 +1083,8 @@ impl NetworkBehaviour for Discovery { { // Update the TCP port in the ENR self.discv5.update_local_enr_socket(socket_addr, true); + update_enr = true; } - let enr = self.discv5.local_enr(); - enr::save_enr_to_disk(Path::new(&self.enr_dir), &enr); - // update network globals - self.network_globals.set_enr(enr); // A new UDP socket has been detected. // NOTE: We assume libp2p itself can keep track of IP changes and we do // not inform it about IP changes found via discovery. @@ -1090,6 +1094,12 @@ impl NetworkBehaviour for Discovery { } } } + + // Need to do this call after the `self.event_stream` match to release its mut ref. + if update_enr { + self.on_enr_updated(); + } + Poll::Pending } @@ -1235,7 +1245,8 @@ mod tests { false, config.clone(), spec.clone(), - ); + ) + .unwrap(); let keypair = keypair.into(); Discovery::new(keypair, &config, Arc::new(globals), &spec) .await diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index f37bde3781..fd7bf8307a 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -35,6 +35,9 @@ where .is_ok_and(|b| b.get(*s.deref() as usize).unwrap_or(false)), Subnet::DataColumn(s) => { if let Ok(custody_group_count) = enr.custody_group_count(&spec) { + // Default to custody_requirement if user does not specify any CGC + let custody_group_count = + custody_group_count.unwrap_or(spec.custody_requirement); compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec) .is_ok_and(|subnets| subnets.contains(s)) } else { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index c3a86403f6..1e22afe7c3 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1,5 +1,6 @@ use self::gossip_cache::GossipCache; use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; +use crate::discovery::enr::Eth2Enr; use crate::discovery::{ subnet_predicate, DiscoveredPeers, Discovery, FIND_NODE_QUERY_CLOSEST_PEERS, }; @@ -18,7 +19,6 @@ use crate::types::{ GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, }; use crate::EnrExt; -use crate::Eth2Enr; use crate::{metrics, Enr, NetworkGlobals, PubsubMessage, TopicHash}; use api_types::{AppRequestId, PeerRequestId, RequestId, Response}; use futures::stream::StreamExt; @@ -213,7 +213,7 @@ impl Network { config.disable_peer_scoring, config.clone(), ctx.chain_spec.clone(), - ); + )?; let network_globals = Arc::new(globals); // Grab our local ENR FORK ID diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 8216587d91..a0a57ce8c6 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,10 +1,10 @@ //! A collection of variables that are accessible outside of the network thread itself. use super::TopicConfig; -use crate::discovery::enr::Eth2Enr; use crate::peer_manager::peerdb::PeerDB; -use crate::rpc::{MetaData, MetaDataV2, MetaDataV3}; +use crate::rpc::MetaData; use crate::types::{BackFillState, SyncState}; use crate::{Client, Enr, EnrExt, GossipTopic, Multiaddr, NetworkConfig, PeerId}; +use local_metadata::LocalMetadata; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; @@ -15,7 +15,7 @@ use types::{CGCUpdates, ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec, Slo pub struct NetworkGlobals { /// The current local ENR. - local_enr: RwLock, + local_enr: RwLock>, /// The local peer_id. pub peer_id: RwLock, /// Listening multiaddrs. @@ -47,7 +47,7 @@ impl NetworkGlobals { disable_peer_scoring: bool, config: Arc, spec: Arc, - ) -> Self { + ) -> Result { let node_id = enr.node_id().raw(); // The below `expect` calls will panic on start up if the chain spec config values used @@ -69,8 +69,8 @@ impl NetworkGlobals { all_sampling_columns.extend(columns); } - NetworkGlobals { - local_enr: RwLock::new(enr.clone()), + Ok(NetworkGlobals { + local_enr: RwLock::new(LocalMetadata::new(enr.clone(), &spec)?), peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring)), @@ -82,18 +82,19 @@ impl NetworkGlobals { cgc_updates: RwLock::new(cgc_updates), config, spec, - } + }) } /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. /// TODO: This contains duplicate metadata. Test who is consuming this method pub fn local_enr(&self) -> Enr { - self.local_enr.read().clone() + self.local_enr.read().enr().clone() } - pub fn set_enr(&self, enr: Enr) { - *self.local_enr.write() = enr; + pub fn set_enr(&self, enr: Enr) -> Result<(), String> { + *self.local_enr.write() = LocalMetadata::new(enr, &self.spec)?; + Ok(()) } /// Returns the local libp2p PeerID. @@ -104,28 +105,7 @@ impl NetworkGlobals { // TODO: Must keep consistency between the persisted `local_enr` and the return of this // function. Otherwise peers may downscore us and the network will have issues. pub fn local_metadata(&self) -> MetaData { - let enr = self.local_enr(); - let attnets = enr - .attestation_bitfield::() - .unwrap_or(Default::default()); - let syncnets = enr - .sync_committee_bitfield::() - .unwrap_or(Default::default()); - - if self.spec.is_peer_das_scheduled() { - MetaData::V3(MetaDataV3 { - seq_number: enr.seq(), - attnets, - syncnets, - custody_group_count: self.public_custody_group_count(), - }) - } else { - MetaData::V2(MetaDataV2 { - seq_number: enr.seq(), - attnets, - syncnets, - }) - } + self.local_enr.read().metadata().clone() } /// Returns the list of `Multiaddr` that the underlying libp2p instance is listening on. @@ -154,12 +134,6 @@ impl NetworkGlobals { &self.all_sampling_subnets[..self.all_sampling_subnets.len().min(cgc as usize)] } - fn public_custody_group_count(&self) -> u64 { - // TODO(das): delay announcing the public custody count for the duration of the pruning - // period - self.cgc_updates.read().at_slot(Slot::new(u64::MAX)) - } - /// Returns the custody group count (CGC) pub fn custody_group_count(&self, slot: Slot) -> u64 { self.cgc_updates.read().at_slot(slot) @@ -276,7 +250,54 @@ impl NetworkGlobals { let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair); let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap(); let cgc_updates = CGCUpdates::new(initial_cgc); - NetworkGlobals::new(enr, cgc_updates, trusted_peers, false, config, spec) + NetworkGlobals::new(enr, cgc_updates, trusted_peers, false, config, spec).unwrap() + } +} + +mod local_metadata { + use crate::discovery::enr::Eth2Enr; + use crate::rpc::{MetaData, MetaDataV2, MetaDataV3}; + use crate::Enr; + use types::{ChainSpec, EthSpec}; + + /// Ensures that the cached local ENR and its parsed MetaData are updated atomically. + pub struct LocalMetadata { + enr: Enr, + metadata: MetaData, + } + + impl LocalMetadata { + pub fn new(enr: Enr, spec: &ChainSpec) -> Result { + let attnets = enr.attestation_bitfield::()?; + let syncnets = enr.sync_committee_bitfield::()?; + + let metadata = if spec.is_peer_das_scheduled() { + MetaData::V3(MetaDataV3 { + seq_number: enr.seq(), + attnets, + syncnets, + custody_group_count: enr + .custody_group_count(spec)? + .unwrap_or(spec.custody_requirement), + }) + } else { + MetaData::V2(MetaDataV2 { + seq_number: enr.seq(), + attnets, + syncnets, + }) + }; + + Ok(Self { enr, metadata }) + } + + pub fn enr(&self) -> &Enr { + &self.enr + } + + pub fn metadata(&self) -> &MetaData { + &self.metadata + } } }