diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index c3a44d941a..01cc161105 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -712,8 +712,9 @@ impl PeerManager { } /// Received a metadata response from a peer. - pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) -> bool { let mut invalid_meta_data = false; + let mut updated_cgc = false; if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { @@ -729,12 +730,16 @@ impl PeerManager { debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata"); } + let known_custody_group_count = peer_info + .meta_data() + .and_then(|meta_data| meta_data.custody_group_count().copied().ok()); + let custody_group_count_opt = meta_data.custody_group_count().copied().ok(); peer_info.set_meta_data(meta_data); if self.network_globals.spec.is_peer_das_scheduled() { - // Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to - // prioritize PeerDAS peers. + // Gracefully ignore metadata/v2 peers. + // We only send metadata v3 requests when PeerDAS is scheduled if let Some(custody_group_count) = custody_group_count_opt { match self.compute_peer_custody_groups(peer_id, custody_group_count) { Ok(custody_groups) => { @@ -755,6 +760,8 @@ impl PeerManager { }) .collect(); peer_info.set_custody_subnets(custody_subnets); + + updated_cgc = Some(custody_group_count) != known_custody_group_count; } Err(err) => { debug!( @@ -777,6 +784,8 @@ impl PeerManager { if invalid_meta_data { self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager) } + + updated_cgc } /// Updates the gossipsub scores for all known peers in gossipsub. @@ -1487,6 +1496,15 @@ impl PeerManager { pub fn remove_trusted_peer(&mut self, enr: Enr) { self.trusted_peers.remove(&enr); } + + #[cfg(test)] + fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option { + self.network_globals + .peers + .read() + .peer_info(peer_id) + .map(|peer_info| peer_info.custody_subnets_iter().count()) + } } enum ConnectingType { @@ -1507,8 +1525,9 @@ enum ConnectingType { #[cfg(test)] mod tests { use super::*; + use crate::rpc::MetaDataV3; use crate::NetworkConfig; - use types::MainnetEthSpec as E; + use types::{ChainSpec, ForkName, MainnetEthSpec as E}; async fn build_peer_manager(target_peer_count: usize) -> PeerManager { build_peer_manager_with_trusted_peers(vec![], target_peer_count).await @@ -1517,6 +1536,15 @@ mod tests { async fn build_peer_manager_with_trusted_peers( trusted_peers: Vec, target_peer_count: usize, + ) -> PeerManager { + let spec = Arc::new(E::default_spec()); + build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await + } + + async fn build_peer_manager_with_opts( + trusted_peers: Vec, + target_peer_count: usize, + spec: Arc, ) -> PeerManager { let config = config::Config { target_peer_count, @@ -1527,7 +1555,6 @@ mod tests { target_peers: target_peer_count, ..Default::default() }); - let spec = Arc::new(E::default_spec()); let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec); PeerManager::new(config, Arc::new(globals)).unwrap() } @@ -1878,6 +1905,44 @@ mod tests { assert!(peers_should_have_removed.is_empty()); } + #[tokio::test] + /// Test a metadata response should update custody subnets + async fn test_peer_manager_update_custody_subnets() { + // PeerDAS is enabled from Fulu. + let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); + let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await; + let pubkey = Keypair::generate_secp256k1().public(); + let peer_id = PeerId::from_public_key(&pubkey); + peer_manager.inject_connect_ingoing( + &peer_id, + Multiaddr::empty().with_p2p(peer_id).unwrap(), + None, + ); + + // A newly connected peer should have no custody subnets before metadata is received. + let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); + assert_eq!(custody_subnet_count, Some(0)); + + // Metadata should update the custody subnets. + let peer_cgc = 4; + let meta_data = MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + custody_group_count: peer_cgc, + }); + let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone()); + assert!(cgc_updated); + let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); + assert_eq!(custody_subnet_count, Some(peer_cgc as usize)); + + // Make another update and assert that CGC is not updated. + let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data); + assert!(!cgc_updated); + let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); + assert_eq!(custody_subnet_count, Some(peer_cgc as usize)); + } + #[tokio::test] /// Test the pruning logic to remove grouped subnet peers async fn test_peer_manager_prune_grouped_subnet_peers() { diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 86da517e21..23060df9e6 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -103,6 +103,8 @@ pub enum NetworkEvent { StatusPeer(PeerId), NewListenAddr(Multiaddr), ZeroListeners, + /// A peer has an updated custody group count from MetaData. + PeerUpdatedCustodyGroupCount(PeerId), } pub type Gossipsub = gossipsub::Behaviour; @@ -1655,7 +1657,7 @@ impl Network { return None; } - // The METADATA and PING RPC responses are handled within the behaviour and not propagated + // The PING RPC responses are handled within the behaviour and not propagated match event.message { Err(handler_err) => { match handler_err { @@ -1858,9 +1860,11 @@ impl Network { None } RpcSuccessResponse::MetaData(meta_data) => { - self.peer_manager_mut() + let updated_cgc = self + .peer_manager_mut() .meta_data_response(&peer_id, meta_data.as_ref().clone()); - None + // Send event after calling into peer_manager so the PeerDB is updated. + updated_cgc.then(|| NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id)) } /* Network propagated protocols */ RpcSuccessResponse::Status(msg) => { diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 05c00b76af..2a7bc597c2 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -73,6 +73,8 @@ pub enum RouterMessage { PubsubMessage(MessageId, PeerId, PubsubMessage, bool), /// The peer manager has requested we re-status a peer. StatusPeer(PeerId), + /// The peer has an updated custody group count from METADATA. + PeerUpdatedCustodyGroupCount(PeerId), } impl Router { @@ -155,6 +157,10 @@ impl Router { RouterMessage::PeerDisconnected(peer_id) => { self.send_to_sync(SyncMessage::Disconnect(peer_id)); } + // A peer has updated CGC + RouterMessage::PeerUpdatedCustodyGroupCount(peer_id) => { + self.send_to_sync(SyncMessage::UpdatedPeerCgc(peer_id)); + } RouterMessage::RPCRequestReceived { peer_id, inbound_request_id, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7afd62ab2e..77204b455d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -485,6 +485,9 @@ impl NetworkService { NetworkEvent::PeerDisconnected(peer_id) => { self.send_to_router(RouterMessage::PeerDisconnected(peer_id)); } + NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id) => { + self.send_to_router(RouterMessage::PeerUpdatedCustodyGroupCount(peer_id)); + } NetworkEvent::RequestReceived { peer_id, inbound_request_id, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 9119b1652c..473881f182 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -106,6 +106,9 @@ pub enum SyncMessage { head_slot: Option, }, + /// Peer manager has received a MetaData of a peer with a new or updated CGC value. + UpdatedPeerCgc(PeerId), + /// A block has been received from the RPC. RpcBlock { sync_request_id: SyncRequestId, @@ -476,6 +479,16 @@ impl SyncManager { } } + fn updated_peer_cgc(&mut self, _peer_id: PeerId) { + // Try to make progress on custody requests that are waiting for peers + for (id, result) in self.network.continue_custody_by_root_requests() { + self.on_custody_by_root_result(id, result); + } + + // Attempt to resume range sync too + self.range_sync.resume(&mut self.network); + } + /// Handles RPC errors related to requests that were emitted from the sync manager. fn inject_error(&mut self, peer_id: PeerId, sync_request_id: SyncRequestId, error: RPCError) { trace!("Sync manager received a failed RPC"); @@ -748,6 +761,13 @@ impl SyncManager { } => { self.add_peers_force_range_sync(&peers, head_root, head_slot); } + SyncMessage::UpdatedPeerCgc(peer_id) => { + debug!( + peer_id = ?peer_id, + "Received updated peer CGC message" + ); + self.updated_peer_cgc(peer_id); + } SyncMessage::RpcBlock { sync_request_id, peer_id,