//! Implementation of Lighthouse's peer management system. use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RpcErrorResponse}; use crate::service::TARGET_SUBNET_PEERS; use crate::{Gossipsub, NetworkGlobals, PeerId, Subnet, SubnetDiscovery, metrics}; use delay_map::HashSetDelay; use discv5::Enr; use libp2p::identify::Info as IdentifyInfo; use lru_cache::LRUTimeCache; use peerdb::{BanOperation, BanResult, ScoreUpdateResult}; use rand::seq::SliceRandom; use smallvec::SmallVec; use std::{ sync::Arc, time::{Duration, Instant}, }; use tracing::{debug, error, trace, warn}; use types::{DataColumnSubnetId, EthSpec, SubnetId, SyncSubnetId}; pub use libp2p::core::Multiaddr; pub use libp2p::identity::Keypair; pub mod peerdb; use crate::peer_manager::peerdb::client::ClientKind; use crate::types::GossipKind; use libp2p::multiaddr; use network_utils::discovery_metrics; use network_utils::enr_ext::{EnrExt, peer_id_to_node_id}; pub use peerdb::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::net::IpAddr; use strum::IntoEnumIterator; use types::data::{CustodyIndex, compute_subnets_from_custody_group, get_custody_groups}; /// Unified peer subnet information structure for pruning logic. struct PeerSubnetInfo { info: PeerInfo, attestation_subnets: HashSet, sync_committees: HashSet, custody_subnets: HashSet, } pub mod config; mod network_behaviour; /// The heartbeat performs regular updates such as updating reputations and performing discovery /// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; /// The minimum amount of time we allow peers to reconnect to us after a disconnect when we are /// saturated with peers. This effectively looks like a swarm BAN for this amount of time. pub const PEER_RECONNECTION_TIMEOUT: Duration = Duration::from_secs(600); /// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would /// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet /// peers. pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2; /// Avoid pruning sampling peers if subnet peer count is below this number. pub const MIN_SAMPLING_COLUMN_SUBNET_PEERS: u64 = 2; /// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of /// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and /// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. pub const PEER_EXCESS_FACTOR: f32 = 0.1; /// A fraction of `PeerManager::target_peers` that we want to be outbound-only connections. pub const TARGET_OUTBOUND_ONLY_FACTOR: f32 = 0.3; /// A fraction of `PeerManager::target_peers` that if we get below, we start a discovery query to /// reach our target. MIN_OUTBOUND_ONLY_FACTOR must be < TARGET_OUTBOUND_ONLY_FACTOR. pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.2; /// The fraction of extra peers beyond the PEER_EXCESS_FACTOR that we allow us to dial for when /// requiring subnet peers. More specifically, if our target peer limit is 50, and our excess peer /// limit is 55, and we are at 55 peers, the following parameter provisions a few more slots of /// dialing priority peers we need for validator duties. pub const PRIORITY_PEER_EXCESS: f32 = 0.2; /// The numbre of inbound libp2p peers we have seen before we consider our NAT to be open. pub const LIBP2P_NAT_OPEN_THRESHOLD: usize = 3; /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. network_globals: Arc>, /// A queue of events that the `PeerManager` is waiting to produce. events: SmallVec<[PeerManagerEvent; 16]>, /// A collection of inbound-connected peers awaiting to be Ping'd. inbound_ping_peers: HashSetDelay, /// A collection of outbound-connected peers awaiting to be Ping'd. outbound_ping_peers: HashSetDelay, /// A collection of peers awaiting to be Status'd. status_peers: HashSetDelay, /// The target number of peers we would like to connect to. target_peers: usize, /// Peers queued to be dialed. peers_to_dial: Vec, /// The number of temporarily banned peers. This is used to prevent instantaneous /// reconnection. // NOTE: This just prevents re-connections. The state of the peer is otherwise unaffected. A // peer can be in a disconnected state and new connections will be refused and logged as if the // peer is banned without it being reflected in the peer's state. // Also the banned state can out-last the peer's reference in the peer db. So peers that are // unknown to us can still be temporarily banned. This is fundamentally a relationship with // the swarm. Regardless of our knowledge of the peer in the db, it will be temporarily banned // at the swarm layer. // NOTE: An LRUTimeCache is used compared to a structure that needs to be polled to avoid very // frequent polling to unban peers. Instead, this cache piggy-backs the PeerManager heartbeat // to update and clear the cache. Therefore the PEER_RECONNECTION_TIMEOUT only has a resolution // of the HEARTBEAT_INTERVAL. temporary_banned_peers: LRUTimeCache, /// A collection of sync committee subnets that we need to stay subscribed to. /// Sync committee subnets are longer term (256 epochs). Hence, we need to re-run /// discovery queries for subnet peers if we disconnect from existing sync /// committee subnet peers. sync_committee_subnets: HashMap, /// A mapping of all custody groups to column subnets to avoid re-computation. subnets_by_custody_group: HashMap>, /// The heartbeat interval to perform routine maintenance. heartbeat: tokio::time::Interval, /// Keeps track of whether the discovery service is enabled or not. discovery_enabled: bool, /// Keeps track if the current instance is reporting metrics or not. metrics_enabled: bool, /// Keeps track of whether the QUIC protocol is enabled or not. quic_enabled: bool, trusted_peers: HashSet, } /// The events that the `PeerManager` outputs (requests). #[derive(Debug)] pub enum PeerManagerEvent { /// A peer has dialed us. PeerConnectedIncoming(PeerId), /// A peer has been dialed. PeerConnectedOutgoing(PeerId), /// A peer has disconnected. PeerDisconnected(PeerId), /// Sends a STATUS to a peer. Status(PeerId), /// Sends a PING to a peer. Ping(PeerId), /// Request METADATA from a peer. MetaData(PeerId), /// The peer should be disconnected. DisconnectPeer(PeerId, GoodbyeReason), /// Inform the behaviour to ban this peer and associated ip addresses. Banned(PeerId, Vec), /// The peer should be unbanned with the associated ip addresses. UnBanned(PeerId, Vec), /// Request the behaviour to discover more peers and the amount of peers to discover. DiscoverPeers(usize), /// Request the behaviour to discover peers on subnets. DiscoverSubnetPeers(Vec), } impl PeerManager { // NOTE: Must be run inside a tokio executor. pub fn new( cfg: config::Config, network_globals: Arc>, ) -> Result { let config::Config { discovery_enabled, metrics_enabled, target_peer_count, status_interval, ping_interval_inbound, ping_interval_outbound, quic_enabled, } = cfg; // Set up the peer manager heartbeat interval let heartbeat = tokio::time::interval(Duration::from_secs(HEARTBEAT_INTERVAL)); // Compute subnets for all custody groups let subnets_by_custody_group = if network_globals.spec.is_peer_das_scheduled() { (0..network_globals.spec.number_of_custody_groups) .map(|custody_index| { let subnets = compute_subnets_from_custody_group::( custody_index, &network_globals.spec, ) .expect("Should compute subnets for all custody groups") .collect(); (custody_index, subnets) }) .collect::>>() } else { HashMap::new() }; Ok(PeerManager { network_globals, events: SmallVec::new(), peers_to_dial: Default::default(), inbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_inbound)), outbound_ping_peers: HashSetDelay::new(Duration::from_secs(ping_interval_outbound)), status_peers: HashSetDelay::new(Duration::from_secs(status_interval)), target_peers: target_peer_count, temporary_banned_peers: LRUTimeCache::new(PEER_RECONNECTION_TIMEOUT), sync_committee_subnets: Default::default(), subnets_by_custody_group, heartbeat, discovery_enabled, metrics_enabled, quic_enabled, trusted_peers: Default::default(), }) } /* Public accessible functions */ /// The application layer wants to disconnect from a peer for a particular reason. /// /// All instant disconnections are fatal and we ban the associated peer. /// /// This will send a goodbye and disconnect the peer if it is connected or dialing. pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) { // Update the sync status if required if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) { debug!(%peer_id, %reason, score = %info.score(), "Sending goodbye to peer"); if matches!(reason, GoodbyeReason::IrrelevantNetwork) { info.update_sync_status(SyncStatus::IrrelevantPeer); } } self.report_peer( peer_id, PeerAction::Fatal, source, Some(reason), "goodbye_peer", ); } /// Reports a peer for some action. /// /// If the peer doesn't exist, log a warning and insert defaults. pub fn report_peer( &mut self, peer_id: &PeerId, action: PeerAction, source: ReportSource, reason: Option, msg: &'static str, ) { let action = self .network_globals .peers .write() .report_peer(peer_id, action, source, msg); self.handle_score_action(peer_id, action, reason); } /// Upon adjusting a Peer's score, there are times the peer manager must pass messages up to /// libp2p. This function handles the conditional logic associated with each score update /// result. fn handle_score_action( &mut self, peer_id: &PeerId, action: ScoreUpdateResult, reason: Option, ) { match action { ScoreUpdateResult::Ban(ban_operation) => { // The peer has been banned and we need to handle the banning operation // NOTE: When we ban a peer, its IP address can be banned. We do not recursively search // through all our connected peers banning all other peers that are using this IP address. // If these peers are behaving fine, we permit their current connections. However, if any new // nodes or current nodes try to reconnect on a banned IP, they will be instantly banned // and disconnected. self.handle_ban_operation(peer_id, ban_operation, reason); } ScoreUpdateResult::Disconnect => { // The peer has transitioned to a disconnect state and has been marked as such in // the peer db. We must inform libp2p to disconnect this peer. self.inbound_ping_peers.remove(peer_id); self.outbound_ping_peers.remove(peer_id); self.events.push(PeerManagerEvent::DisconnectPeer( *peer_id, GoodbyeReason::BadScore, )); } ScoreUpdateResult::NoAction => { // The report had no effect on the peer and there is nothing to do. } ScoreUpdateResult::Unbanned(unbanned_ips) => { // Inform the Swarm to unban the peer self.events .push(PeerManagerEvent::UnBanned(*peer_id, unbanned_ips)); } } } /// If a peer is being banned, this handles the banning operation. fn handle_ban_operation( &mut self, peer_id: &PeerId, ban_operation: BanOperation, reason: Option, ) { match ban_operation { BanOperation::TemporaryBan => { // The peer could be temporarily banned. We only do this in the case that // we have currently reached our peer target limit. if self.network_globals.connected_peers() >= self.target_peers { // We have enough peers, prevent this reconnection. self.temporary_banned_peers.raw_insert(*peer_id); self.events.push(PeerManagerEvent::Banned(*peer_id, vec![])); } } BanOperation::DisconnectThePeer => { // The peer was currently connected, so we start a disconnection. // Once the peer has disconnected, its connection state will transition to a // banned state. self.events.push(PeerManagerEvent::DisconnectPeer( *peer_id, reason.unwrap_or(GoodbyeReason::BadScore), )); } BanOperation::PeerDisconnecting => { // The peer is currently being disconnected and will be banned once the // disconnection completes. } BanOperation::ReadyToBan(banned_ips) => { // The peer is not currently connected, we can safely ban it at the swarm // level. // If a peer is being banned, this trumps any temporary ban the peer might be // under. We no longer track it in the temporary ban list. if !self.temporary_banned_peers.raw_remove(peer_id) { // If the peer is not already banned, inform the Swarm to ban the peer self.events .push(PeerManagerEvent::Banned(*peer_id, banned_ips)); // If the peer was in the process of being un-banned, remove it (a rare race // condition) self.events.retain(|event| { if let PeerManagerEvent::UnBanned(unbanned_peer_id, _) = event { unbanned_peer_id != peer_id // Remove matching peer ids } else { true } }); } } } } /// Peers that have been returned by discovery requests that are suitable for dialing are /// returned here. /// /// This function decides whether or not to dial these peers. pub fn peers_discovered(&mut self, results: HashMap>) { let mut to_dial_peers = 0; let results_count = results.len(); let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); for (enr, min_ttl) in results { // There are two conditions in deciding whether to dial this peer. // 1. If we are less than our max connections. Discovery queries are executed to reach // our target peers, so its fine to dial up to our max peers (which will get pruned // in the next heartbeat down to our target). // 2. If the peer is one our validators require for a specific subnet, then it is // considered a priority. We have pre-allocated some extra priority slots for these // peers as specified by PRIORITY_PEER_EXCESS. Therefore we dial these peers, even // if we are already at our max_peer limit. if !self.peers_to_dial.contains(&enr) && ((min_ttl.is_some() && connected_or_dialing + to_dial_peers < self.max_priority_peers()) || connected_or_dialing + to_dial_peers < self.max_peers()) { // This should be updated with the peer dialing. In fact created once the peer is // dialed let peer_id = enr.peer_id(); if let Some(min_ttl) = min_ttl { self.network_globals .peers .write() .update_min_ttl(&peer_id, min_ttl); } if self.dial_peer(enr) { debug!(%peer_id, "Added discovered ENR peer to dial queue"); to_dial_peers += 1; } } } // The heartbeat will attempt new discovery queries every N seconds if the node needs more // peers. As an optimization, this function can recursively trigger new discovery queries // immediatelly if we don't fulfill our peers needs after completing a query. This // recursiveness results in an infinite loop in networks where there not enough peers to // reach out target. To prevent the infinite loop, if a query returns no useful peers, we // will cancel the recursiveness and wait for the heartbeat to trigger another query latter. if results_count > 0 && to_dial_peers == 0 { debug!( results = results_count, "Skipping recursive discovery query after finding no useful results" ); metrics::inc_counter(&metrics::DISCOVERY_NO_USEFUL_ENRS); } else { // Queue another discovery if we need to self.maintain_peer_count(to_dial_peers); } } /// A STATUS message has been received from a peer. This resets the status timer. pub fn peer_statusd(&mut self, peer_id: &PeerId) { self.status_peers.insert(*peer_id); } /// Insert the sync subnet into list of long lived sync committee subnets that we need to /// maintain adequate number of peers for. pub fn add_sync_subnet(&mut self, subnet_id: SyncSubnetId, min_ttl: Instant) { match self.sync_committee_subnets.entry(subnet_id) { Entry::Vacant(_) => { self.sync_committee_subnets.insert(subnet_id, min_ttl); } Entry::Occupied(old) => { if *old.get() < min_ttl { self.sync_committee_subnets.insert(subnet_id, min_ttl); } } } } /// The maximum number of peers we allow to connect to us. This is `target_peers` * (1 + /// PEER_EXCESS_FACTOR) fn max_peers(&self) -> usize { (self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize } /// The maximum number of peers we allow when dialing a priority peer (i.e a peer that is /// subscribed to subnets that our validator requires. This is `target_peers` * (1 + /// PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS) fn max_priority_peers(&self) -> usize { (self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS)).ceil() as usize } /// The minimum number of outbound peers that we reach before we start another discovery query. fn min_outbound_only_peers(&self) -> usize { (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize } /// The minimum number of outbound peers that we reach before we start another discovery query. fn target_outbound_peers(&self) -> usize { (self.target_peers as f32 * TARGET_OUTBOUND_ONLY_FACTOR).ceil() as usize } /// The maximum number of peers that are connected or dialing before we refuse to do another /// discovery search for more outbound peers. We can use up to half the priority peer excess allocation. fn max_outbound_dialing_peers(&self) -> usize { (self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS / 2.0)).ceil() as usize } /* Notifications from the Swarm */ /// A peer is being dialed. /// Returns true, if this peer will be dialed. pub fn dial_peer(&mut self, peer: Enr) -> bool { if self .network_globals .peers .read() .should_dial(&peer.peer_id()) { self.peers_to_dial.push(peer); true } else { false } } /// Reports if a peer is banned or not. /// /// This is used to determine if we should accept incoming connections. pub fn ban_status(&self, peer_id: &PeerId) -> Option { self.network_globals.peers.read().ban_status(peer_id) } pub fn is_connected(&self, peer_id: &PeerId) -> bool { self.network_globals.peers.read().is_connected(peer_id) } /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { let previous_kind = peer_info.client().kind; let previous_listening_addresses = peer_info.set_listening_addresses(info.listen_addrs.clone()); peer_info.set_client(peerdb::client::Client::from_identify_info(info)); if previous_kind != peer_info.client().kind || *peer_info.listening_addresses() != previous_listening_addresses { debug!( %peer_id, protocol_version = &info.protocol_version, agent_version = &info.agent_version, listening_addresses = ?info.listen_addrs, observed_address = ?info.observed_addr, protocols = ?info.protocols, "Identified Peer" ); } } else { error!( peer_id = peer_id.to_string(), "Received an Identify response from an unknown peer" ); } } /// An error has occurred in the RPC. /// /// This adjusts a peer's score based on the error. pub fn handle_rpc_error( &mut self, peer_id: &PeerId, protocol: Protocol, err: &RPCError, direction: ConnectionDirection, ) { let client = self.network_globals.client(peer_id); let score = self.network_globals.peers.read().score(peer_id); debug!(%protocol, %err, %client, %peer_id, %score, ?direction, "RPC Error"); metrics::inc_counter_vec( &metrics::TOTAL_RPC_ERRORS_PER_CLIENT, &[ client.kind.as_ref(), err.as_static_str(), direction.as_ref(), ], ); // Map this error to a `PeerAction` (if any) let peer_action = match err { RPCError::IncompleteStream => { // They closed early, this could mean poor connection PeerAction::MidToleranceError } RPCError::InternalError(e) => { debug!(error = %e, %peer_id, "Internal RPC Error"); return; } RPCError::HandlerRejected => PeerAction::Fatal, RPCError::InvalidData(_) => { // Peer is not complying with the protocol. This is considered a malicious action PeerAction::Fatal } RPCError::IoError(_e) => { // this could their fault or ours, so we tolerate this PeerAction::HighToleranceError } RPCError::ErrorResponse(code, _) => match code { RpcErrorResponse::Unknown => PeerAction::HighToleranceError, RpcErrorResponse::ResourceUnavailable => { // Don't ban on this because we want to retry with a block by root request. if matches!( protocol, Protocol::BlobsByRoot | Protocol::DataColumnsByRoot ) { return; } // NOTE: This error only makes sense for the `BlocksByRange` and `BlocksByRoot` // protocols. // // If we are syncing, there is no point keeping these peers around and // continually failing to request blocks. We instantly ban them and hope that // by the time the ban lifts, the peers will have completed their backfill // sync. // // TODO: Potentially a more graceful way of handling such peers, would be to // implement a new sync type which tracks these peers and prevents the sync // algorithms from requesting blocks from them (at least for a set period of // time, multiple failures would then lead to a ban). match direction { // If the blocks request was initiated by us, then we have no use of this // peer and so we ban it. ConnectionDirection::Outgoing => PeerAction::Fatal, // If the blocks request was initiated by the peer, then we let the peer decide if // it wants to continue talking to us, we do not ban the peer. ConnectionDirection::Incoming => return, } } RpcErrorResponse::ServerError => PeerAction::MidToleranceError, RpcErrorResponse::InvalidRequest => PeerAction::LowToleranceError, RpcErrorResponse::RateLimited => match protocol { Protocol::Ping => PeerAction::MidToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, // Lighthouse does not currently make light client requests; therefore, this // is an unexpected scenario. We do not ban the peer for rate limiting. Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, Protocol::LightClientUpdatesByRange => return, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, }, RpcErrorResponse::BlobsNotFoundForBlock => PeerAction::LowToleranceError, }, RPCError::SSZDecodeError(_) => PeerAction::Fatal, RPCError::UnsupportedProtocol => { // Not supporting a protocol shouldn't be considered a malicious action, but // it is an action that in some cases will make the peer unfit to continue // communicating. match protocol { Protocol::Ping => PeerAction::Fatal, Protocol::BlocksByRange => return, Protocol::BlocksByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, Protocol::DataColumnsByRoot => return, Protocol::DataColumnsByRange => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, Protocol::LightClientUpdatesByRange => return, Protocol::MetaData => PeerAction::Fatal, Protocol::Status => PeerAction::Fatal, } } RPCError::StreamTimeout => match direction { ConnectionDirection::Incoming => { // There was a timeout responding to a peer. debug!(%peer_id, "Timed out responding to RPC Request"); return; } ConnectionDirection::Outgoing => match protocol { Protocol::Ping => PeerAction::LowToleranceError, Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, Protocol::LightClientUpdatesByRange => return, Protocol::Goodbye => return, Protocol::MetaData => return, Protocol::Status => return, }, }, RPCError::NegotiationTimeout => PeerAction::LowToleranceError, RPCError::Disconnected => return, // No penalty for a graceful disconnection }; self.report_peer( peer_id, peer_action, ReportSource::RPC, None, "handle_rpc_error", ); } /// A ping request has been received. // NOTE: The behaviour responds with a PONG automatically pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a ping // reset the to-ping timer for this peer trace!(%peer_id, seq_no = seq, "Received a ping request"); match peer_info.connection_direction() { Some(ConnectionDirection::Incoming) => { self.inbound_ping_peers.insert(*peer_id); } Some(ConnectionDirection::Outgoing) => { self.outbound_ping_peers.insert(*peer_id); } None => { warn!(%peer_id, "Received a ping from a peer with an unknown connection direction"); } } // if the sequence number is unknown send an update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { trace!(%peer_id, known_seq_no = meta_data.seq_number(), ping_seq_no = seq, "Requesting new metadata from peer"); self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { // if we don't know the meta-data, request it debug!(%peer_id, "Requesting first metadata from peer"); self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { error!(%peer_id, "Received a PING from an unknown peer"); } } /// A PONG has been returned from a peer. pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { // received a pong // if the sequence number is unknown send update the meta data of the peer. if let Some(meta_data) = &peer_info.meta_data() { if *meta_data.seq_number() < seq { trace!(%peer_id, known_seq_no = meta_data.seq_number(), pong_seq_no = seq, "Requesting new metadata from peer"); self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { // if we don't know the meta-data, request it trace!(%peer_id, "Requesting first metadata from peer"); self.events.push(PeerManagerEvent::MetaData(*peer_id)); } } else { error!(%peer_id, "Received a PONG from an unknown peer"); } } /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) -> bool { let mut invalid_meta_data = false; let mut updated_cgc = false; if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { trace!(%peer_id, known_seq_no = known_meta_data.seq_number(), new_seq_no = meta_data.seq_number(), "Updating peer's metadata"); } else { trace!(%peer_id, known_seq_no = known_meta_data.seq_number(), new_seq_no = meta_data.seq_number(), "Received old metadata"); // Updating metadata even in this case to prevent storing // incorrect `attnets/syncnets` for a peer } } else { // we have no meta-data for this peer, update let cgc = meta_data .custody_group_count() .map(|&count| count.to_string()) .unwrap_or_else(|_| "unknown".to_string()); debug!( %peer_id, new_seq_no = meta_data.seq_number(), cgc, "Obtained peer's metadata" ); } let known_custody_group_count = peer_info .meta_data() .and_then(|meta_data| meta_data.custody_group_count().copied().ok()); let custody_group_count_opt = meta_data.custody_group_count().copied().ok(); peer_info.set_meta_data(meta_data); if self.network_globals.spec.is_peer_das_scheduled() { // Gracefully ignore metadata/v2 peers. // We only send metadata v3 requests when PeerDAS is scheduled if let Some(custody_group_count) = custody_group_count_opt { match self.compute_peer_custody_groups(peer_id, custody_group_count) { Ok(custody_groups) => { let custody_subnets = custody_groups .into_iter() .flat_map(|custody_index| { self.subnets_by_custody_group .get(&custody_index) .cloned() .unwrap_or_else(|| { warn!( %custody_index, %peer_id, "Custody group not found in subnet mapping" ); vec![] }) }) .collect(); peer_info.set_custody_subnets(custody_subnets); updated_cgc = Some(custody_group_count) != known_custody_group_count; } Err(err) => { debug!( info = "Sending goodbye to peer", peer_id = %peer_id, custody_group_count, error = ?err, "Unable to compute peer custody groups from metadata" ); invalid_meta_data = true; } }; } } } else { error!(%peer_id, "Received METADATA from an unknown peer"); } // Disconnect peers with invalid metadata and find other peers instead. if invalid_meta_data { self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager) } updated_cgc } /// Updates the gossipsub scores for all known peers in gossipsub. pub(crate) fn update_gossipsub_scores(&mut self, gossipsub: &Gossipsub) { let actions = self .network_globals .peers .write() .update_gossipsub_scores(self.target_peers, gossipsub); for (peer_id, score_action) in actions { self.handle_score_action(&peer_id, score_action, None); } } /* Internal functions */ /// Sets a peer as connected as long as their reputation allows it /// Informs if the peer was accepted fn inject_connect_ingoing( &mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option, ) -> bool { self.inject_peer_connection(peer_id, ConnectingType::IngoingConnected { multiaddr }, enr) } /// Sets a peer as connected as long as their reputation allows it /// Informs if the peer was accepted fn inject_connect_outgoing( &mut self, peer_id: &PeerId, multiaddr: Multiaddr, enr: Option, ) -> bool { self.inject_peer_connection( peer_id, ConnectingType::OutgoingConnected { multiaddr }, enr, ) } /// Updates the state of the peer as disconnected. /// /// This is also called when dialing a peer fails. fn inject_disconnect(&mut self, peer_id: &PeerId) { let (ban_operation, purged_peers) = self .network_globals .peers .write() .inject_disconnect(peer_id); if let Some(ban_operation) = ban_operation { // The peer was awaiting a ban, continue to ban the peer. self.handle_ban_operation(peer_id, ban_operation, None); } // Remove the ping and status timer for the peer self.inbound_ping_peers.remove(peer_id); self.outbound_ping_peers.remove(peer_id); self.status_peers.remove(peer_id); self.events.extend( purged_peers .into_iter() .map(|(peer_id, unbanned_ips)| PeerManagerEvent::UnBanned(peer_id, unbanned_ips)), ); } /// Registers a peer as connected. The `ingoing` parameter determines if the peer is being /// dialed or connecting to us. /// /// This is called by `connect_ingoing` and `connect_outgoing`. /// /// Informs if the peer was accepted in to the db or not. fn inject_peer_connection( &mut self, peer_id: &PeerId, connection: ConnectingType, enr: Option, ) -> bool { { let mut peerdb = self.network_globals.peers.write(); if peerdb.ban_status(peer_id).is_some() { // don't connect if the peer is banned error!(%peer_id, "Connection has been allowed to a banned peer"); } match connection { ConnectingType::Dialing => { peerdb.dialing_peer(peer_id, enr); return true; } ConnectingType::IngoingConnected { multiaddr } => { peerdb.connect_ingoing(peer_id, multiaddr, enr); // start a timer to ping inbound peers. self.inbound_ping_peers.insert(*peer_id); } ConnectingType::OutgoingConnected { multiaddr } => { peerdb.connect_outgoing(peer_id, multiaddr, enr); // start a timer for to ping outbound peers. self.outbound_ping_peers.insert(*peer_id); } } } // start a ping and status timer for the peer self.status_peers.insert(*peer_id); true } // Gracefully disconnects a peer without banning them. pub fn disconnect_peer(&mut self, peer_id: PeerId, reason: GoodbyeReason) { self.events .push(PeerManagerEvent::DisconnectPeer(peer_id, reason)); self.network_globals .peers .write() .notify_disconnecting(&peer_id, false); } /// Run discovery query for additional sync committee peers if we fall below `TARGET_PEERS`. fn maintain_sync_committee_peers(&mut self) { // Remove expired entries self.sync_committee_subnets .retain(|_, v| *v > Instant::now()); let subnets_to_discover: Vec = self .sync_committee_subnets .iter() .filter_map(|(k, v)| { if self .network_globals .peers .read() .good_peers_on_subnet(Subnet::SyncCommittee(*k)) .count() < TARGET_SUBNET_PEERS { Some(SubnetDiscovery { subnet: Subnet::SyncCommittee(*k), min_ttl: Some(*v), }) } else { None } }) .collect(); // request the subnet query from discovery if !subnets_to_discover.is_empty() { debug!( subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::>(), "Making subnet queries for maintaining sync committee peers" ); self.events .push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover)); } } /// Run discovery query for additional custody peers if we fall below `MIN_SAMPLING_COLUMN_SUBNET_PEERS`. fn maintain_custody_peers(&mut self) { let subnets_to_discover: Vec = self .network_globals .sampling_subnets() .iter() .filter_map(|custody_subnet| { if self .network_globals .peers .read() .has_good_peers_in_custody_subnet( custody_subnet, MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize, ) { None } else { Some(SubnetDiscovery { subnet: Subnet::DataColumn(*custody_subnet), min_ttl: None, }) } }) .collect(); // request the subnet query from discovery if !subnets_to_discover.is_empty() { debug!( subnets = ?subnets_to_discover.iter().map(|s| s.subnet).collect::>(), "Making subnet queries for maintaining custody peers" ); self.events .push(PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover)); } } fn maintain_trusted_peers(&mut self) { let trusted_peers = self.trusted_peers.clone(); for trusted_peer in trusted_peers { self.dial_peer(trusted_peer); } } /// This function checks the status of our current peers and optionally requests a discovery /// query if we need to find more peers to maintain the current number of peers fn maintain_peer_count(&mut self, dialing_peers: usize) { // Check if we need to do a discovery lookup if self.discovery_enabled { let peer_count = self.network_globals.connected_or_dialing_peers(); let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); let wanted_peers = if peer_count < self.target_peers.saturating_sub(dialing_peers) { // We need more peers in general. self.max_peers().saturating_sub(dialing_peers) - peer_count } else if outbound_only_peer_count < self.min_outbound_only_peers() && peer_count < self.max_outbound_dialing_peers() { self.max_outbound_dialing_peers() .saturating_sub(dialing_peers) .saturating_sub(peer_count) } else { 0 }; if wanted_peers != 0 { // We need more peers, re-queue a discovery lookup. debug!( connected = peer_count, target = self.target_peers, outbound = outbound_only_peer_count, wanted = wanted_peers, "Starting a new peer discovery query" ); self.events .push(PeerManagerEvent::DiscoverPeers(wanted_peers)); } } } /// Build unified peer subnet information from connected peers. /// /// This creates a unified structure containing all subnet information for each peer, /// excluding trusted peers and peers already marked for pruning. fn build_peer_subnet_info( &self, peers_to_prune: &HashSet, ) -> HashMap> { let mut peer_subnet_info: HashMap> = HashMap::new(); for (peer_id, info) in self.network_globals.peers.read().connected_peers() { // Ignore peers we trust or that we are already pruning if info.is_trusted() || peers_to_prune.contains(peer_id) { continue; } let mut peer_info = PeerSubnetInfo { info: info.clone(), attestation_subnets: HashSet::new(), sync_committees: HashSet::new(), custody_subnets: HashSet::new(), }; // Populate subnet information from long-lived subnets for subnet in info.long_lived_subnets() { match subnet { Subnet::Attestation(subnet_id) => { peer_info.attestation_subnets.insert(subnet_id); } Subnet::SyncCommittee(id) => { peer_info.sync_committees.insert(id); } Subnet::DataColumn(id) => { peer_info.custody_subnets.insert(id); } } } peer_subnet_info.insert(*peer_id, peer_info); } peer_subnet_info } /// Build reverse lookup from custody subnets to peer lists. fn build_custody_subnet_lookup( peer_subnet_info: &HashMap>, ) -> HashMap> { let mut custody_subnet_to_peers: HashMap> = HashMap::new(); for (peer_id, peer_info) in peer_subnet_info { for &custody_subnet in &peer_info.custody_subnets { custody_subnet_to_peers .entry(custody_subnet) .or_default() .push(*peer_id); } } custody_subnet_to_peers } /// Determine if a peer should be protected from pruning based on various criteria. /// /// Protection criteria: /// - Outbound peers: don't prune if it would drop below target outbound peer count /// - Data column sampling: ≤ MIN_SAMPLING_COLUMN_SUBNET_PEERS (2) peers per subnet /// - Sync committees: ≤ MIN_SYNC_COMMITTEE_PEERS (2) peers per committee /// - Attestation subnets: protect peers on the scarcest attestation subnets /// /// Returns true if the peer should be protected (not pruned). fn should_protect_peer( &self, candidate_info: &PeerSubnetInfo, sampling_subnets: &HashSet, custody_subnet_to_peers: &HashMap>, peer_subnet_info: &HashMap>, connected_outbound_peer_count: usize, outbound_peers_pruned: usize, ) -> bool { // Ensure we don't remove too many outbound peers if candidate_info.info.is_outbound_only() && self.target_outbound_peers() >= connected_outbound_peer_count.saturating_sub(outbound_peers_pruned) { return true; } // Check data column sampling subnets // If the peer exists in a sampling subnet that is less than or equal to MIN_SAMPLING_COLUMN_SUBNET_PEERS, we keep it let should_protect_sampling = candidate_info .custody_subnets .iter() .filter(|subnet| sampling_subnets.contains(subnet)) .any(|subnet| { let count = custody_subnet_to_peers .get(subnet) .map(|peers| peers.len()) .unwrap_or(0); count <= MIN_SAMPLING_COLUMN_SUBNET_PEERS as usize }); if should_protect_sampling { return true; } // Check sync committee protection let should_protect_sync = candidate_info.sync_committees.iter().any(|sync_committee| { let count = peer_subnet_info .values() .filter(|p| p.sync_committees.contains(sync_committee)) .count(); count <= MIN_SYNC_COMMITTEE_PEERS as usize }); if should_protect_sync { return true; } // Check attestation subnet to avoid pruning from subnets with the lowest peer count let attestation_subnet_counts: HashMap = peer_subnet_info .values() .flat_map(|p| &p.attestation_subnets) .fold(HashMap::new(), |mut acc, &subnet| { *acc.entry(subnet).or_insert(0) += 1; acc }); if let Some(&least_dense_size) = attestation_subnet_counts.values().min() { let is_on_least_dense = candidate_info .attestation_subnets .iter() .any(|subnet| attestation_subnet_counts.get(subnet) == Some(&least_dense_size)); if is_on_least_dense { return true; } } false } /// Find the best candidate for removal from the densest custody subnet. /// /// Returns the PeerId of the candidate to remove, or None if no suitable candidate found. fn find_prune_candidate( &self, column_subnet: DataColumnSubnetId, column_subnet_to_peers: &HashMap>, peer_subnet_info: &HashMap>, sampling_subnets: &HashSet, connected_outbound_peer_count: usize, outbound_peers_pruned: usize, ) -> Option { let peers_on_subnet_clone = column_subnet_to_peers.get(&column_subnet)?.clone(); // Create a sorted list of peers prioritized for removal let mut sorted_peers = peers_on_subnet_clone; sorted_peers.shuffle(&mut rand::rng()); sorted_peers.sort_by_key(|peer_id| { if let Some(peer_info) = peer_subnet_info.get(peer_id) { ( peer_info.info.custody_subnet_count(), peer_info.info.is_synced_or_advanced(), ) } else { (0, false) } }); // Try and find a candidate peer to remove from the subnet for candidate_peer in &sorted_peers { let Some(candidate_info) = peer_subnet_info.get(candidate_peer) else { continue; }; // Check if this peer should be protected if self.should_protect_peer( candidate_info, sampling_subnets, column_subnet_to_peers, peer_subnet_info, connected_outbound_peer_count, outbound_peers_pruned, ) { continue; } // Found a suitable candidate return Some(*candidate_peer); } None } /// Remove excess peers back down to our target values. /// This prioritises peers with a good score and uniform distribution of peers across /// data column subnets. /// /// The logic for the peer pruning is as follows: /// /// Global rules: /// - Always maintain peers we need for a validator duty. /// - Do not prune outbound peers to exceed our outbound target. /// - Do not prune more peers than our target peer count. /// - If we have an option to remove a number of peers, remove ones that have the least /// long-lived subnets. /// - When pruning peers based on subnet count. If multiple peers can be chosen, choose a peer /// that is not subscribed to a long-lived sync committee subnet. /// - When pruning peers based on subnet count, do not prune a peer that would lower us below the /// MIN_SYNC_COMMITTEE_PEERS peer count. To keep it simple, we favour a minimum number of sync-committee-peers over /// uniformity subnet peers. NOTE: We could apply more sophisticated logic, but the code is /// simpler and easier to maintain if we take this approach. If we are pruning subnet peers /// below the MIN_SYNC_COMMITTEE_PEERS and maintaining the sync committee peers, this should be /// fine as subnet peers are more likely to be found than sync-committee-peers. Also, we're /// in a bit of trouble anyway if we have so few peers on subnets. The /// MIN_SYNC_COMMITTEE_PEERS /// number should be set low as an absolute lower bound to maintain peers on the sync /// committees. /// - Do not prune trusted peers. NOTE: This means if a user has more trusted peers than the /// excess peer limit, all of the following logic is subverted as we will not prune any peers. /// Also, the more trusted peers a user has, the less room Lighthouse has to efficiently manage /// its peers across the subnets. /// /// Prune peers in the following order: /// 1. Remove worst scoring peers /// 2. Remove peers that are not subscribed to a subnet (they have less value) /// 3. Remove peers that we have many on any particular subnet, with some exceptions /// - Don't remove peers needed for data column sampling (≥ MIN_SAMPLING_COLUMN_SUBNET_PEERS) /// - Don't remove peers needed for sync committees (>=MIN_SYNC_COMMITTEE_PEERS) /// - Don't remove peers from the lowest density attestation subnets /// 4. Randomly remove peers if all the above are satisfied until we reach `target_peers`, or /// until we can't prune any more peers due to the above constraints. fn prune_excess_peers(&mut self) { // The current number of connected peers. let connected_peer_count = self.network_globals.connected_peers(); if connected_peer_count <= self.target_peers { // No need to prune peers return; } // Keep a list of peers we are pruning. let mut peers_to_prune = HashSet::new(); let connected_outbound_peer_count = self.network_globals.connected_outbound_only_peers(); // Keep track of the number of outbound peers we are pruning. let mut outbound_peers_pruned = 0; macro_rules! prune_peers { ($filter: expr) => { let filter = $filter; for (peer_id, info) in self .network_globals .peers .read() .worst_connected_peers() .iter() .filter(|(_, info)| { !info.has_future_duty() && !info.is_trusted() && filter(*info) }) { if peers_to_prune.len() >= connected_peer_count.saturating_sub(self.target_peers) { // We have found all the peers we need to drop, end. break; } if peers_to_prune.contains(*peer_id) { continue; } // Only remove up to the target outbound peer count. if info.is_outbound_only() { if self.target_outbound_peers() + outbound_peers_pruned < connected_outbound_peer_count { outbound_peers_pruned += 1; } else { continue; } } peers_to_prune.insert(**peer_id); } }; } // 1. Look through peers that have the worst score (ignoring non-penalized scored peers). prune_peers!(|info: &PeerInfo| { info.score().score() < 0.0 }); // 2. Attempt to remove peers that are not subscribed to a subnet, if we still need to // prune more. if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { prune_peers!(|info: &PeerInfo| { !info.has_long_lived_subnet() }); } // 3. and 4. Remove peers that are too grouped on any given data column subnet. If all subnets are // uniformly distributed, remove random peers. if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { let sampling_subnets = self.network_globals.sampling_subnets(); let mut peer_subnet_info = self.build_peer_subnet_info(&peers_to_prune); let mut custody_subnet_to_peers = Self::build_custody_subnet_lookup(&peer_subnet_info); // Attempt to prune peers to `target_peers`, or until we run out of peers to prune. while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { let custody_subnet_with_most_peers = custody_subnet_to_peers .iter() .filter(|(_, peers)| !peers.is_empty()) .max_by_key(|(_, peers)| peers.len()) .map(|(subnet_id, _)| *subnet_id); if let Some(densest_subnet) = custody_subnet_with_most_peers { // If we have successfully found a candidate peer to prune, prune it, // otherwise all peers on this subnet should not be removed due to our // outbound limit or min_subnet_count. In this case, we remove all // peers from the pruning logic and try another subnet. if let Some(candidate_peer) = self.find_prune_candidate( densest_subnet, &custody_subnet_to_peers, &peer_subnet_info, &sampling_subnets, connected_outbound_peer_count, outbound_peers_pruned, ) { // Update outbound peer count if needed if let Some(candidate_info) = peer_subnet_info.get(&candidate_peer) && candidate_info.info.is_outbound_only() { outbound_peers_pruned += 1; } // Remove the candidate peer from the maps, so we don't account for them // when finding the next prune candidate. for subnet_peers in custody_subnet_to_peers.values_mut() { subnet_peers.retain(|peer_id| peer_id != &candidate_peer); } peer_subnet_info.remove(&candidate_peer); peers_to_prune.insert(candidate_peer); } else if let Some(peers) = custody_subnet_to_peers.get_mut(&densest_subnet) { // If we can't find a prune candidate in this subnet, remove peers in this subnet peers.clear() } } else { // If there are no peers left to prune, exit. break; } } } // Disconnect the pruned peers. for peer_id in peers_to_prune { self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); } } /// Unbans any temporarily banned peers that have served their timeout. fn unban_temporary_banned_peers(&mut self) { for peer_id in self.temporary_banned_peers.remove_expired() { self.events .push(PeerManagerEvent::UnBanned(peer_id, Vec::new())); } } /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of /// overall peers, as well as the desired number of outbound-only peers. /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { // Optionally run a discovery query if we need more peers. self.maintain_peer_count(0); self.maintain_trusted_peers(); // Cleans up the connection state of dialing peers. // Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p // returns dial errors without a peer-id attached. This function reverts peers that have a // dialing status long than DIAL_TIMEOUT seconds to a disconnected status. This is important because // we count the number of dialing peers in our inbound connections. self.network_globals.peers.write().cleanup_dialing_peers(); // Updates peer's scores and unban any peers if required. let actions = self.network_globals.peers.write().update_scores(); for (peer_id, action) in actions { self.handle_score_action(&peer_id, action, None); } // Update peer score metrics; self.update_peer_score_metrics(); // Maintain minimum count for custody peers if we are subscribed to any data column topics (i.e. PeerDAS activated) let peerdas_enabled = self .network_globals .gossipsub_subscriptions .read() .iter() .any(|topic| matches!(topic.kind(), &GossipKind::DataColumnSidecar(_))); if peerdas_enabled { self.maintain_custody_peers(); } // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); // Prune any excess peers back to our target in such a way that incentivises good scores and // a uniform distribution of subnets. self.prune_excess_peers(); // Unban any peers that have served their temporary ban timeout self.unban_temporary_banned_peers(); // Maintains memory by shrinking mappings self.shrink_mappings(); } // Reduce memory footprint by routinely shrinking associating mappings. fn shrink_mappings(&mut self) { self.inbound_ping_peers.shrink_to(5); self.outbound_ping_peers.shrink_to(5); self.status_peers.shrink_to(5); self.temporary_banned_peers.shrink_to_fit(); self.sync_committee_subnets.shrink_to_fit(); } // Update metrics related to peer scoring. fn update_peer_score_metrics(&self) { if !self.metrics_enabled { return; } // reset the gauges let _ = metrics::PEER_SCORE_DISTRIBUTION .as_ref() .map(|gauge| gauge.reset()); let _ = metrics::PEER_SCORE_PER_CLIENT .as_ref() .map(|gauge| gauge.reset()); let mut avg_score_per_client: HashMap = HashMap::with_capacity(5); { let peers_db_read_lock = self.network_globals.peers.read(); let connected_peers = peers_db_read_lock.best_peers_by_status(PeerInfo::is_connected); let total_peers = connected_peers.len(); for (id, (_peer, peer_info)) in connected_peers.into_iter().enumerate() { // First quartile if id == 0 { metrics::set_gauge_vec( &metrics::PEER_SCORE_DISTRIBUTION, &["1st"], peer_info.score().score() as i64, ); } else if id == (total_peers * 3 / 4).saturating_sub(1) { metrics::set_gauge_vec( &metrics::PEER_SCORE_DISTRIBUTION, &["3/4"], peer_info.score().score() as i64, ); } else if id == (total_peers / 2).saturating_sub(1) { metrics::set_gauge_vec( &metrics::PEER_SCORE_DISTRIBUTION, &["1/2"], peer_info.score().score() as i64, ); } else if id == (total_peers / 4).saturating_sub(1) { metrics::set_gauge_vec( &metrics::PEER_SCORE_DISTRIBUTION, &["1/4"], peer_info.score().score() as i64, ); } else if id == total_peers.saturating_sub(1) { metrics::set_gauge_vec( &metrics::PEER_SCORE_DISTRIBUTION, &["last"], peer_info.score().score() as i64, ); } let score_peers: &mut (f64, usize) = avg_score_per_client .entry(peer_info.client().kind.to_string()) .or_default(); score_peers.0 += peer_info.score().score(); score_peers.1 += 1; } } // read lock ended for (client, (score, peers)) in avg_score_per_client { metrics::set_float_gauge_vec( &metrics::PEER_SCORE_PER_CLIENT, &[&client.to_string()], score / (peers as f64), ); } } // Update peer count related metrics. fn update_peer_count_metrics(&self) { let mut peers_connected = 0; let mut clients_per_peer = HashMap::new(); let mut inbound_ipv4_peers_connected: usize = 0; let mut inbound_ipv6_peers_connected: usize = 0; let mut peers_connected_multi: HashMap<(&str, &str), i32> = HashMap::new(); let mut peers_per_custody_group_count: HashMap = HashMap::new(); for (_, peer_info) in self.network_globals.peers.read().connected_peers() { peers_connected += 1; *clients_per_peer .entry(peer_info.client().kind.to_string()) .or_default() += 1; let direction = match peer_info.connection_direction() { Some(ConnectionDirection::Incoming) => "inbound", Some(ConnectionDirection::Outgoing) => "outbound", None => "none", }; // Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty. // This situation occurs when the peer is initially registered in PeerDB, but the peer // info has not yet been updated at `PeerManager::identify`. let transport = peer_info .listening_addresses() .iter() .find_map(|addr| { addr.iter().find_map(|proto| match proto { multiaddr::Protocol::QuicV1 => Some("quic"), multiaddr::Protocol::Tcp(_) => Some("tcp"), _ => None, }) }) .unwrap_or("unknown"); *peers_connected_multi .entry((direction, transport)) .or_default() += 1; if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() { *peers_per_custody_group_count .entry(meta_data.custody_group_count) .or_default() += 1; } // Check if incoming peer is ipv4 if peer_info.is_incoming_ipv4_connection() { inbound_ipv4_peers_connected += 1; } // Check if incoming peer is ipv6 if peer_info.is_incoming_ipv6_connection() { inbound_ipv6_peers_connected += 1; } } // Set ipv4 nat_open metric flag if threshold of peercount is met, unset if below threshold if inbound_ipv4_peers_connected >= LIBP2P_NAT_OPEN_THRESHOLD { metrics::set_gauge_vec(&discovery_metrics::NAT_OPEN, &["libp2p_ipv4"], 1); } else { metrics::set_gauge_vec(&discovery_metrics::NAT_OPEN, &["libp2p_ipv4"], 0); } // Set ipv6 nat_open metric flag if threshold of peercount is met, unset if below threshold if inbound_ipv6_peers_connected >= LIBP2P_NAT_OPEN_THRESHOLD { metrics::set_gauge_vec(&discovery_metrics::NAT_OPEN, &["libp2p_ipv6"], 1); } else { metrics::set_gauge_vec(&discovery_metrics::NAT_OPEN, &["libp2p_ipv6"], 0); } // PEERS_CONNECTED metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected); // CUSTODY_GROUP_COUNT for (custody_group_count, peer_count) in peers_per_custody_group_count.into_iter() { metrics::set_gauge_vec( &metrics::PEERS_PER_CUSTODY_GROUP_COUNT, &[&custody_group_count.to_string()], peer_count, ) } // PEERS_PER_CLIENT for client_kind in ClientKind::iter() { let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); metrics::set_gauge_vec( &metrics::PEERS_PER_CLIENT, &[client_kind.as_ref()], *value as i64, ); } // PEERS_CONNECTED_MULTI for direction in ["inbound", "outbound", "none"] { for transport in ["quic", "tcp", "unknown"] { metrics::set_gauge_vec( &metrics::PEERS_CONNECTED_MULTI, &[direction, transport], *peers_connected_multi .get(&(direction, transport)) .unwrap_or(&0) as i64, ); } } } fn compute_peer_custody_groups( &self, peer_id: &PeerId, custody_group_count: u64, ) -> Result, String> { // If we don't have a node id, we cannot compute the custody duties anyway let node_id = peer_id_to_node_id(peer_id)?; let spec = &self.network_globals.spec; if !(spec.custody_requirement..=spec.number_of_custody_groups) .contains(&custody_group_count) { return Err("Invalid custody group count in metadata: out of range".to_string()); } get_custody_groups(node_id.raw(), custody_group_count, spec).map_err(|e| { format!( "Error computing peer custody groups for node {} with cgc={}: {:?}", node_id, custody_group_count, e ) }) } pub fn add_trusted_peer(&mut self, enr: Enr) { self.trusted_peers.insert(enr); } pub fn remove_trusted_peer(&mut self, enr: Enr) { self.trusted_peers.remove(&enr); } #[cfg(test)] fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option { self.network_globals .peers .read() .peer_info(peer_id) .map(|peer_info| peer_info.custody_subnets_iter().count()) } } enum ConnectingType { /// We are in the process of dialing this peer. Dialing, /// A peer has dialed us. IngoingConnected { // The multiaddr the peer connected to us on. multiaddr: Multiaddr, }, /// We have successfully dialed a peer. OutgoingConnected { /// The multiaddr we dialed to reach the peer. multiaddr: Multiaddr, }, } #[cfg(test)] mod tests { use super::*; use crate::NetworkConfig; use crate::rpc::MetaDataV3; use types::{ChainSpec, ForkName, MainnetEthSpec as E}; async fn build_peer_manager(target_peer_count: usize) -> PeerManager { build_peer_manager_with_trusted_peers(vec![], target_peer_count).await } async fn build_peer_manager_with_trusted_peers( trusted_peers: Vec, target_peer_count: usize, ) -> PeerManager { let spec = Arc::new(E::default_spec()); build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await } async fn build_peer_manager_with_opts( trusted_peers: Vec, target_peer_count: usize, spec: Arc, ) -> PeerManager { let config = config::Config { target_peer_count, discovery_enabled: false, ..Default::default() }; let network_config = Arc::new(NetworkConfig { target_peers: target_peer_count, ..Default::default() }); let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec); PeerManager::new(config, Arc::new(globals)).unwrap() } fn empty_synced_status() -> SyncStatus { SyncStatus::Synced { info: empty_sync_info(), } } fn empty_sync_info() -> SyncInfo { SyncInfo { head_slot: Default::default(), head_root: Default::default(), finalized_epoch: Default::default(), finalized_root: Default::default(), earliest_available_slot: None, } } #[tokio::test] async fn test_peer_manager_disconnects_correctly_during_heartbeat() { // Create 6 peers to connect to with a target of 3. // 2 will be outbound-only, and have the lowest score. // 1 will be a trusted peer. // The other 3 will be ingoing peers. // We expect this test to disconnect from 3 peers. 1 from the outbound peer (the other must // remain due to the outbound peer limit) and 2 from the ingoing peers (the trusted peer // should remain connected). let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); let outbound_only_peer2 = PeerId::random(); let trusted_peer = PeerId::random(); let mut peer_manager = build_peer_manager_with_trusted_peers(vec![trusted_peer], 3).await; peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_outgoing( &outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap(), None, ); peer_manager.inject_connect_outgoing( &outbound_only_peer2, "/ip4/0.0.0.0".parse().unwrap(), None, ); // Set the outbound-only peers to have the lowest score. peer_manager .network_globals .peers .write() .peer_info_mut(&outbound_only_peer1) .unwrap() .add_to_score(-1.0); peer_manager .network_globals .peers .write() .peer_info_mut(&outbound_only_peer2) .unwrap() .add_to_score(-2.0); // Check initial connected peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 6); peer_manager.heartbeat(); // Check that we disconnected from two peers. // Check that one outbound-only peer was removed because it had the worst score // and that we did not disconnect the other outbound peer due to the minimum outbound quota. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); assert!( peer_manager .network_globals .peers .read() .is_connected(&outbound_only_peer1) ); assert!( !peer_manager .network_globals .peers .read() .is_connected(&outbound_only_peer2) ); // The trusted peer remains connected assert!( peer_manager .network_globals .peers .read() .is_connected(&trusted_peer) ); peer_manager.heartbeat(); // The trusted peer remains connected, even after subsequent heartbeats. assert!( peer_manager .network_globals .peers .read() .is_connected(&trusted_peer) ); // Check that if we are at target number of peers, we do not disconnect any. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } #[tokio::test] async fn test_peer_manager_not_enough_outbound_peers_no_panic_during_heartbeat() { let mut peer_manager = build_peer_manager(20).await; // Connect to 20 ingoing-only peers. for _i in 0..19 { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); } // Connect an outbound-only peer. // Give it the lowest score so that it is evaluated first in the disconnect list iterator. let outbound_only_peer = PeerId::random(); peer_manager.inject_connect_ingoing( &outbound_only_peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); peer_manager .network_globals .peers .write() .peer_info_mut(&(outbound_only_peer)) .unwrap() .add_to_score(-1.0); // After heartbeat, we will have removed one peer. // Having less outbound-only peers than minimum won't cause panic when the outbound-only peer is being considered for disconnection. peer_manager.heartbeat(); assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), 20 ); } #[tokio::test] async fn test_peer_manager_remove_unhealthy_peers_brings_peers_below_target() { let mut peer_manager = build_peer_manager(3).await; // Create 4 peers to connect to. // One pair will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None); // Connect to two peers that are on the threshold of being disconnected. peer_manager.inject_connect_ingoing( &inbound_only_peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager.inject_connect_outgoing( &outbound_only_peer1, "/ip4/0.0.0.0/tcp/8000".parse().unwrap(), None, ); peer_manager .network_globals .peers .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals .peers .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .add_to_score(-19.8); peer_manager .network_globals .peers .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); peer_manager .network_globals .peers .write() .peer_info_mut(&(outbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting one unhealthy peer, // the loop to check for disconnecting peers will stop because we have removed enough peers (only needed to remove 1 to reach target). assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } #[tokio::test] async fn test_peer_manager_removes_enough_peers_when_one_is_unhealthy() { let mut peer_manager = build_peer_manager(3).await; // Create 5 peers to connect to. // One will be unhealthy inbound only and outbound only peers. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let inbound_only_peer1 = PeerId::random(); let outbound_only_peer1 = PeerId::random(); peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_outgoing( &outbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap(), None, ); // Have one peer be on the verge of disconnection. peer_manager.inject_connect_ingoing( &inbound_only_peer1, "/ip4/0.0.0.0".parse().unwrap(), None, ); peer_manager .network_globals .peers .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .add_to_score(-19.9); peer_manager .network_globals .peers .write() .peer_info_mut(&(inbound_only_peer1)) .unwrap() .set_gossipsub_score(-85.0); peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } #[tokio::test] /// We want to test that the peer manager removes peers that are not subscribed to a subnet as /// a priority over all else. async fn test_peer_manager_remove_non_subnet_peers_when_all_healthy() { let mut peer_manager = build_peer_manager(3).await; let spec = peer_manager.network_globals.spec.clone(); // Create 5 peers to connect to. let peer0 = PeerId::random(); let peer1 = PeerId::random(); let peer2 = PeerId::random(); let peer3 = PeerId::random(); let peer4 = PeerId::random(); println!("{}", peer0); println!("{}", peer1); println!("{}", peer2); println!("{}", peer3); println!("{}", peer4); peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer3, "/ip4/0.0.0.0".parse().unwrap(), None); peer_manager.inject_connect_ingoing(&peer4, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet let mut attnets = crate::types::EnrAttestationBitfield::::new(); attnets.set(1, true).unwrap(); let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets: Default::default(), custody_group_count: spec.custody_requirement, }; peer_manager .network_globals .peers .write() .peer_info_mut(&peer0) .unwrap() .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers .write() .add_subscription(&peer0, Subnet::Attestation(1.into())); let mut attnets = crate::types::EnrAttestationBitfield::::new(); attnets.set(10, true).unwrap(); let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets: Default::default(), custody_group_count: spec.custody_requirement, }; peer_manager .network_globals .peers .write() .peer_info_mut(&peer2) .unwrap() .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers .write() .add_subscription(&peer2, Subnet::Attestation(10.into())); let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); syncnets.set(3, true).unwrap(); let metadata = MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets, custody_group_count: spec.custody_requirement, }; peer_manager .network_globals .peers .write() .peer_info_mut(&peer4) .unwrap() .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers .write() .add_subscription(&peer4, Subnet::SyncCommittee(3.into())); // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); // Check that we removed the peers that were not subscribed to any subnet let mut peers_should_have_removed = HashSet::new(); peers_should_have_removed.insert(peer1); peers_should_have_removed.insert(peer3); for (peer, _) in peer_manager .network_globals .peers .read() .peers() .filter(|(_, info)| { matches!( info.connection_status(), PeerConnectionStatus::Disconnecting { .. } ) }) { println!("{}", peer); assert!(peers_should_have_removed.remove(peer)); } // Ensure we removed all the peers assert!(peers_should_have_removed.is_empty()); } #[tokio::test] /// Test a metadata response should update custody subnets async fn test_peer_manager_update_custody_subnets() { // PeerDAS is enabled from Fulu. let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await; let pubkey = Keypair::generate_secp256k1().public(); let peer_id = PeerId::from_public_key(&pubkey); peer_manager.inject_connect_ingoing( &peer_id, Multiaddr::empty().with_p2p(peer_id).unwrap(), None, ); // A newly connected peer should have no custody subnets before metadata is received. let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); assert_eq!(custody_subnet_count, Some(0)); // Metadata should update the custody subnets. let peer_cgc = 4; let meta_data = MetaData::V3(MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets: Default::default(), custody_group_count: peer_cgc, }); let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone()); assert!(cgc_updated); let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); assert_eq!(custody_subnet_count, Some(peer_cgc as usize)); // Make another update and assert that CGC is not updated. let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data); assert!(!cgc_updated); let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id); assert_eq!(custody_subnet_count, Some(peer_cgc as usize)); } #[tokio::test] /// Test the pruning logic to remove grouped data column subnet peers async fn test_peer_manager_prune_grouped_data_column_subnet_peers() { let target = 9; let mut peer_manager = build_peer_manager(target).await; // Override sampling subnets to prevent sampling peer protection from interfering with this test. *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 20 peers to connect to. let mut peers = Vec::new(); for x in 0..20 { // Make 20 peers and group peers as: // id mod % 4 // except for the last 5 peers which all go on their own subnets // So subnets 0-2 should have 4 peers subnet 3 should have 3 and 15-19 should have 1 let subnet: u64 = { if x < 15 { x % 4 } else { x } }; let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet { let mut peers_db = peer_manager.network_globals.peers.write(); let peer_info = peers_db.peer_info_mut(&peer).unwrap(); peer_info.set_custody_subnets(HashSet::from([DataColumnSubnetId::new(subnet)])); peer_info.update_sync_status(empty_synced_status()); } peer_manager .network_globals .peers .write() .add_subscription(&peer, Subnet::DataColumn(subnet.into())); println!("{},{},{}", x, subnet, peer); peers.push(peer); } // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); // Check that we removed the peers that were not subscribed to any subnet // Should remove peers from subnet 0-2 first. Removing 3 peers subnets 0-3 now have 3 // peers. // Should then remove 8 peers each from subnets 1-4. New total: 11 peers. // Therefore the remaining peer set should be each on their own subnet. // Lets check this: let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); for peer in connected_peers.iter() { let position = peers.iter().position(|peer_id| peer_id == peer).unwrap(); println!("{},{}", position, peer); } println!(); for peer in connected_peers.iter() { let position = peers.iter().position(|peer_id| peer_id == peer).unwrap(); println!("{},{}", position, peer); if position < 15 { let y = position % 4; for x in 0..4 { let alternative_index = y + 4 * x; if alternative_index != position && alternative_index < 15 { // Make sure a peer on the same subnet has been removed println!( "Check against: {}, {}", alternative_index, &peers[alternative_index] ); assert!(!connected_peers.contains(&peers[alternative_index])); } } } } } /// Test the pruning logic to prioritise peers with the most subnets /// /// Create 6 peers. /// Peer0: None /// Peer1 : Subnet 1,2,3 /// Peer2 : Subnet 1,2 /// Peer3 : Subnet 3 /// Peer4 : Subnet 1 /// Peer5 : Subnet 2 /// /// Prune 3 peers: Should be Peer0, Peer 4 and Peer 5 because (4 and 5) are both on the subnet with the /// most peers and have the least subscribed long-lived subnets. And peer 0 because it has no /// long-lived subnet. #[tokio::test] async fn test_peer_manager_prune_data_column_subnet_peers_most_subscribed() { let target = 3; let mut peer_manager = build_peer_manager(target).await; // Create 6 peers to connect to. let mut peers = Vec::new(); for x in 0..6 { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet let custody_subnets = match x { 0 => HashSet::new(), 1 => HashSet::from([ DataColumnSubnetId::new(1), DataColumnSubnetId::new(2), DataColumnSubnetId::new(3), ]), 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]), 3 => HashSet::from([DataColumnSubnetId::new(3)]), 4 => HashSet::from([DataColumnSubnetId::new(1)]), 5 => HashSet::from([DataColumnSubnetId::new(2)]), _ => unreachable!(), }; { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&peer).unwrap(); peer_info.set_custody_subnets(custody_subnets); peer_info.update_sync_status(empty_synced_status()); } let long_lived_subnets = peer_manager .network_globals .peers .read() .peer_info(&peer) .unwrap() .long_lived_subnets(); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager .network_globals .peers .write() .add_subscription(&peer, subnet); } println!("{},{}", x, peer); peers.push(peer); } // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); // Check that we removed peers 4 and 5 let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); assert!(!connected_peers.contains(&peers[0])); assert!(!connected_peers.contains(&peers[4])); assert!(!connected_peers.contains(&peers[5])); } /// Test the pruning logic to prioritise peers with the most data column subnets, but not at /// the expense of removing our few sync-committee subnets. /// /// Create 6 peers. /// Peer0: None /// Peer1 : Column subnet 1,2,3, /// Peer2 : Column subnet 1,2, /// Peer3 : Column subnet 3 /// Peer4 : Column subnet 1,2, Sync-committee-1 /// Peer5 : Column subnet 1,2, Sync-committee-2 /// /// Prune 3 peers: Should be Peer0, Peer1 and Peer2 because (4 and 5 are on a sync-committee) #[tokio::test] async fn test_peer_manager_prune_subnet_peers_sync_committee() { let target = 3; let mut peer_manager = build_peer_manager(target).await; // Override sampling subnets to prevent sampling peer protection from interfering with this test. *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 6 peers to connect to. let mut peers = Vec::new(); for x in 0..6 { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); let custody_subnets = match x { 0 => HashSet::new(), 1 => HashSet::from([ DataColumnSubnetId::new(1), DataColumnSubnetId::new(2), DataColumnSubnetId::new(3), ]), 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]), 3 => HashSet::from([DataColumnSubnetId::new(3)]), 4 => { syncnets.set(1, true).unwrap(); HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]) } 5 => { syncnets.set(2, true).unwrap(); HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]) } _ => unreachable!(), }; { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&peer).unwrap(); peer_info.set_meta_data(MetaData::V3(MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets, custody_group_count: 0, // unused in this test, as pruning logic uses `custody_subnets` })); peer_info.set_custody_subnets(custody_subnets); peer_info.update_sync_status(empty_synced_status()); } let long_lived_subnets = peer_manager .network_globals .peers .read() .peer_info(&peer) .unwrap() .long_lived_subnets(); println!("{},{}", x, peer); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager .network_globals .peers .write() .add_subscription(&peer, subnet); } peers.push(peer); } // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); // Check that we removed peers 4 and 5 let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); assert!(!connected_peers.contains(&peers[0])); assert!(!connected_peers.contains(&peers[1])); assert!(!connected_peers.contains(&peers[2])); } /// Test that custody subnet peer count below the `MIN_SAMPLING_COLUMN_SUBNET_PEERS`(2) /// threshold are protected from pruning. /// /// Create 8 peers. /// Peer0: None (can be pruned) /// Peer1: Subnet 1,4,5 /// Peer2: Subnet 1,4 /// Peer3: Subnet 2 /// Peer4: Subnet 2 /// Peer5: Subnet 1 (can be pruned) /// Peer6: Subnet 3 /// Peer7: Subnet 5 (can be pruned) /// /// Sampling subnets: 1, 2 /// /// Prune 3 peers: Should be Peer0, Peer 5 and Peer 7 because /// - Peer 0 because it has no long-lived subnet. /// - Peer 5 is on the subnet with the most peers and have the least subscribed long-lived subnets. /// - Peer 7 because it's on a non-sampling subnet and have the least subscribed long-lived subnets. #[tokio::test] async fn test_peer_manager_protect_sampling_subnet_peers_below_threshold() { let target = 5; let mut peer_manager = build_peer_manager(target).await; *peer_manager.network_globals.sampling_subnets.write() = HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(2)]); // Create 8 peers to connect to. let mut peers = Vec::new(); for peer_idx in 0..8 { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); // Have some of the peers be on a long-lived subnet let custody_subnets = match peer_idx { 0 => HashSet::new(), 1 => HashSet::from([ DataColumnSubnetId::new(1), DataColumnSubnetId::new(4), DataColumnSubnetId::new(5), ]), 2 => HashSet::from([DataColumnSubnetId::new(1), DataColumnSubnetId::new(4)]), 3 => HashSet::from([DataColumnSubnetId::new(2)]), 4 => HashSet::from([DataColumnSubnetId::new(2)]), 5 => HashSet::from([DataColumnSubnetId::new(1)]), 6 => HashSet::from([DataColumnSubnetId::new(3)]), 7 => HashSet::from([DataColumnSubnetId::new(5)]), _ => unreachable!(), }; { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&peer).unwrap(); peer_info.set_custody_subnets(custody_subnets); peer_info.update_sync_status(empty_synced_status()); } let long_lived_subnets = peer_manager .network_globals .peers .read() .peer_info(&peer) .unwrap() .long_lived_subnets(); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager .network_globals .peers .write() .add_subscription(&peer, subnet); } println!("{},{}", peer_idx, peer); peers.push(peer); } // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); // Check that we removed peers 0, 5 and 7 let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); println!("Connected peers: {:?}", connected_peers); assert!(!connected_peers.contains(&peers[0])); assert!(!connected_peers.contains(&peers[5])); assert!(!connected_peers.contains(&peers[7])); } /// This test is for reproducing the issue: /// https://github.com/sigp/lighthouse/pull/3236#issue-1256432659 /// /// Whether the issue happens depends on `custody_subnet_to_peers` (HashMap), since HashMap doesn't /// guarantee a particular order of iteration. So we repeat the test case to try to reproduce /// the issue. #[tokio::test] async fn test_peer_manager_prune_based_on_subnet_count_repeat() { for _ in 0..100 { test_peer_manager_prune_based_on_subnet_count().await; } } /// Test the pruning logic to prioritize peers with the most column subnets. This test specifies /// the connection direction for the peers. /// Either Peer 4 or 5 is expected to be removed in this test case. /// /// Create 8 peers. /// Peer0 (out) : Column subnet 1, Sync-committee-1 /// Peer1 (out) : Column subnet 1, Sync-committee-1 /// Peer2 (out) : Column subnet 2, Sync-committee-2 /// Peer3 (out) : Column subnet 2, Sync-committee-2 /// Peer4 (out) : Column subnet 3 /// Peer5 (out) : Column subnet 3 /// Peer6 (in) : Column subnet 4 /// Peer7 (in) : Column subnet 5 async fn test_peer_manager_prune_based_on_subnet_count() { let target = 7; let mut peer_manager = build_peer_manager(target).await; // Override sampling subnets to prevent sampling peer protection from interfering with this test. *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 8 peers to connect to. let mut peers = Vec::new(); for peer_idx in 0..8 { let peer = PeerId::random(); // Have some of the peers be on a long-lived subnet let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); let custody_subnets = match peer_idx { 0 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); syncnets.set(1, true).unwrap(); HashSet::from([DataColumnSubnetId::new(1)]) } 1 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); syncnets.set(1, true).unwrap(); HashSet::from([DataColumnSubnetId::new(1)]) } 2 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); syncnets.set(2, true).unwrap(); HashSet::from([DataColumnSubnetId::new(2)]) } 3 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); syncnets.set(2, true).unwrap(); HashSet::from([DataColumnSubnetId::new(2)]) } 4 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); HashSet::from([DataColumnSubnetId::new(3)]) } 5 => { peer_manager.inject_connect_outgoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); HashSet::from([DataColumnSubnetId::new(3)]) } 6 => { peer_manager.inject_connect_ingoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); HashSet::from([DataColumnSubnetId::new(4)]) } 7 => { peer_manager.inject_connect_ingoing( &peer, "/ip4/0.0.0.0".parse().unwrap(), None, ); HashSet::from([DataColumnSubnetId::new(5)]) } _ => unreachable!(), }; let metadata = MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets, custody_group_count: 0, // unused in this test, as pruning logic uses `custody_subnets` }; { let mut peer_db_write = peer_manager.network_globals.peers.write(); let peer_info = peer_db_write.peer_info_mut(&peer).unwrap(); peer_info.set_meta_data(MetaData::V3(metadata)); peer_info.set_custody_subnets(custody_subnets); peer_info.update_sync_status(empty_synced_status()); } let long_lived_subnets = peer_manager .network_globals .peers .read() .peer_info(&peer) .unwrap() .long_lived_subnets(); println!("{},{}", peer_idx, peer); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager .network_globals .peers .write() .add_subscription(&peer, subnet); } peers.push(peer); } // Perform the heartbeat. peer_manager.heartbeat(); // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, // the number of connected peers updates and we will not remove too many peers. assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); // Either peer 4 or 5 should be removed. // Check that we keep 6 and 7 peers, which we have few on a particular subnet. assert!(connected_peers.contains(&peers[6])); assert!(connected_peers.contains(&peers[7])); } /// Test that peers with the sparsest attestation subnets are protected from pruning. /// /// Create 7 peers: /// - 4 on attnet 0 /// - 1 on attnet 1 (least dense) /// - 2 on attnet 2 /// /// Prune 3 peers: 2 peers from subnet 0 and 1 from either subnet 0 or 2, BUT never from attnet 1. #[tokio::test] async fn test_peer_manager_not_prune_sparsest_attestation_subnet() { let target = 4; let mut peer_manager = build_peer_manager(target).await; let spec = peer_manager.network_globals.spec.clone(); let mut peers = Vec::new(); let subnet_assignments = [0, 0, 0, 0, 1, 2, 2]; for &subnet in subnet_assignments.iter() { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); let mut attnets = crate::types::EnrAttestationBitfield::::new(); attnets.set(subnet, true).unwrap(); let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets: Default::default(), custody_group_count: spec.custody_requirement, }; peer_manager .network_globals .peers .write() .peer_info_mut(&peer) .unwrap() .set_meta_data(MetaData::V3(metadata)); peer_manager .network_globals .peers .write() .add_subscription(&peer, Subnet::Attestation((subnet as u64).into())); peers.push(peer); } peer_manager.heartbeat(); // Check attestation subnet to avoid pruning from subnets with lowest peer count: // Peer 4 (on least dense subnet 1) should be protected // Should preferentially remove from subnet 0 (most dense) rather than subnet 1 (least dense) let connected_peers: HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); // Peer 4 (on least dense attestation subnet 1) should be kept assert!(connected_peers.contains(&peers[4])); // Attestation subnet uniformity should protect peers on least dense subnets // Count peers on subnet 1 (least dense) let subnet_1_count = peers .iter() .filter(|&peer| connected_peers.contains(peer)) .filter(|&peer| { peer_manager .network_globals .peers .read() .peer_info(peer) .unwrap() .long_lived_subnets() .iter() .any(|subnet| matches!(subnet, Subnet::Attestation(id) if id == &1u64.into())) }) .count(); assert!(subnet_1_count > 0, "Least dense subnet should be protected"); } /// Test the pruning logic prioritizes synced and advanced peers over behind/unknown peers. /// /// Create 6 peers with different sync statuses: /// Peer0: Behind /// Peer1: Unknown /// Peer2: Synced /// Peer3: Advanced /// Peer4: Synced /// Peer5: Unknown /// /// Target: 3 peers. Should prune peers 0, 1, 5 (behind/unknown) and keep 2, 3, 4 (synced/advanced). #[tokio::test] async fn test_peer_manager_prune_should_prioritize_synced_advanced_peers() { let target = 3; let mut peer_manager = build_peer_manager(target).await; // Override sampling subnets to prevent sampling peer protection from interfering with this test. *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); let mut peers = Vec::new(); let current_peer_count = 6; for i in 0..current_peer_count { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); let sync_status = match i { 0 => SyncStatus::Behind { info: empty_sync_info(), }, 1 | 5 => SyncStatus::Unknown, 2 | 4 => SyncStatus::Synced { info: empty_sync_info(), }, 3 => SyncStatus::Advanced { info: empty_sync_info(), }, _ => unreachable!(), }; { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&peer).unwrap(); peer_info.update_sync_status(sync_status); // make sure all the peers have some long live subnets that are not protected peer_info.set_custody_subnets(HashSet::from([DataColumnSubnetId::new(2)])) } let long_lived_subnets = peer_manager .network_globals .peers .read() .peer_info(&peer) .unwrap() .long_lived_subnets(); for subnet in long_lived_subnets { println!("Subnet: {:?}", subnet); peer_manager .network_globals .peers .write() .add_subscription(&peer, subnet); } peers.push(peer); } // Perform the heartbeat to trigger pruning peer_manager.heartbeat(); // Should have exactly target number of peers assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); let connected_peers: std::collections::HashSet<_> = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); // Count how many synced/advanced peers are kept vs behind/unknown peers let synced_advanced_kept = [&peers[2], &peers[3], &peers[4]] .iter() .filter(|peer| connected_peers.contains(peer)) .count(); let behind_unknown_kept = [&peers[0], &peers[1], &peers[5]] .iter() .filter(|peer| connected_peers.contains(peer)) .count(); assert_eq!(synced_advanced_kept, target); assert_eq!(behind_unknown_kept, 0); } /// Test that `peer_subnet_info` is properly cleaned up during pruning iterations. /// /// Without proper cleanup, stale peer data affects protection logic for sync committees and we /// may end up pruning more than expected. #[tokio::test] async fn test_peer_manager_prune_mixed_custody_subnet_protection() { let target = 6; let mut peer_manager = build_peer_manager(target).await; // Override sampling subnets to prevent sampling peer protection from interfering. *peer_manager.network_globals.sampling_subnets.write() = HashSet::new(); // Create 12 peers: // * 4 on custody subnet 0, all on sync committee 0 subnet as well (should only prune up to 2 peers) // * 3 on subnet 1 // * 2 on subnet 2 // * 3 scattered. let mut peers = Vec::new(); for i in 0..12 { let peer = PeerId::random(); peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); let custody_subnet = match i { ..4 => 0, 4..7 => 1, 7..9 => 2, _ => i - 6, }; let on_sync_committee = i < 4; { let mut peers_db = peer_manager.network_globals.peers.write(); let peer_info = peers_db.peer_info_mut(&peer).unwrap(); peer_info .set_custody_subnets(HashSet::from([DataColumnSubnetId::new(custody_subnet)])); peer_info.update_sync_status(empty_synced_status()); if on_sync_committee { let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); syncnets.set(0, true).unwrap(); peer_info.set_meta_data(MetaData::V3(MetaDataV3 { seq_number: 0, attnets: Default::default(), syncnets, custody_group_count: 0, })); } for subnet in peer_info.long_lived_subnets() { peers_db.add_subscription(&peer, subnet); } peers.push(peer); } } assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), 12 ); peer_manager.heartbeat(); assert_eq!( peer_manager.network_globals.connected_or_dialing_peers(), target ); let connected_peers: HashSet = peer_manager .network_globals .peers .read() .connected_or_dialing_peers() .cloned() .collect(); // only 2 peers should be pruned from the 4 peers in subnet 0. let remaining_sync_peers = connected_peers .iter() .filter(|peer| peers[0..4].contains(peer)) .count(); assert_eq!( remaining_sync_peers, 2, "Sync committee protection should preserve exactly MIN_SYNC_COMMITTEE_PEERS (2)" ); } // Test properties PeerManager should have using randomly generated input. #[cfg(test)] mod property_based_tests { use crate::peer_manager::config::DEFAULT_TARGET_PEERS; use crate::peer_manager::tests::build_peer_manager_with_trusted_peers; use crate::rpc::{MetaData, MetaDataV3}; use libp2p::PeerId; use proptest::prelude::*; use std::collections::HashSet; use tokio::runtime::Runtime; use typenum::Unsigned; use types::DataColumnSubnetId; use types::{EthSpec, MainnetEthSpec as E}; #[derive(Clone, Debug)] struct PeerCondition { peer_id: PeerId, outgoing: bool, attestation_net_bitfield: Vec, sync_committee_net_bitfield: Vec, score: f64, trusted: bool, gossipsub_score: f64, custody_subnets: HashSet, } fn peer_condition_strategy() -> impl Strategy { let attestation_len = ::SubnetBitfieldLength::to_usize(); let sync_committee_len = ::SyncCommitteeSubnetCount::to_usize(); let spec = E::default_spec(); let total_subnet_count = spec.data_column_sidecar_subnet_count; let custody_requirement = spec.custody_requirement; // Create the pool of available subnet IDs let available_subnets: Vec = (custody_requirement..total_subnet_count).collect(); let max_custody_subnets = available_subnets.len(); // Trusted peer probability constants - 1 in 5 peers should be trusted (20%) const TRUSTED_PEER_WEIGHT_FALSE: u32 = 4; const TRUSTED_PEER_WEIGHT_TRUE: u32 = 1; ( proptest::collection::vec(any::(), attestation_len), proptest::collection::vec(any::(), sync_committee_len), any::(), any::(), any::(), // Weight trusted peers to avoid test rejection due to too many trusted peers prop_oneof![ TRUSTED_PEER_WEIGHT_FALSE => Just(false), TRUSTED_PEER_WEIGHT_TRUE => Just(true), ], 0..=max_custody_subnets, ) .prop_flat_map( move |( attestation_net_bitfield, sync_committee_net_bitfield, score, outgoing, gossipsub_score, trusted, custody_subnet_count, )| { // Use proptest's subsequence to select a random subset of subnets let custody_subnets_strategy = proptest::sample::subsequence( available_subnets.clone(), custody_subnet_count, ); ( Just(attestation_net_bitfield), Just(sync_committee_net_bitfield), Just(score), Just(outgoing), Just(gossipsub_score), Just(trusted), custody_subnets_strategy, ) }, ) .prop_map( |( attestation_net_bitfield, sync_committee_net_bitfield, score, outgoing, gossipsub_score, trusted, custody_subnets_vec, )| { let custody_subnets: HashSet = custody_subnets_vec .into_iter() .map(DataColumnSubnetId::new) .collect(); PeerCondition { peer_id: PeerId::random(), outgoing, attestation_net_bitfield, sync_committee_net_bitfield, score, trusted, gossipsub_score, custody_subnets, } }, ) } // Upper bound for testing peer pruning - we test with at least the target number // and up to 50% more than the target to verify pruning behavior. const MAX_TEST_PEERS: usize = 300; proptest! { #[test] fn prune_excess_peers(peer_conditions in proptest::collection::vec(peer_condition_strategy(), DEFAULT_TARGET_PEERS..=MAX_TEST_PEERS)) { let target_peer_count = DEFAULT_TARGET_PEERS; let spec = E::default_spec(); let trusted_peers: Vec<_> = peer_conditions .iter() .filter_map(|p| if p.trusted { Some(p.peer_id) } else { None }) .collect(); // If we have a high percentage of trusted peers, it is very difficult to reason about // the expected results of the pruning. prop_assume!(trusted_peers.len() <= peer_conditions.len() / 3_usize); let rt = Runtime::new().unwrap(); let result = rt.block_on(async move { // Collect all the trusted peers let mut peer_manager = build_peer_manager_with_trusted_peers(trusted_peers, target_peer_count).await; // Create peers based on the randomly generated conditions. for condition in &peer_conditions { let mut attnets = crate::types::EnrAttestationBitfield::::new(); let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); if condition.outgoing { peer_manager.inject_connect_outgoing( &condition.peer_id, "/ip4/0.0.0.0".parse().unwrap(), None, ); } else { peer_manager.inject_connect_ingoing( &condition.peer_id, "/ip4/0.0.0.0".parse().unwrap(), None, ); } for (i, value) in condition.attestation_net_bitfield.iter().enumerate() { attnets.set(i, *value).unwrap(); } for (i, value) in condition.sync_committee_net_bitfield.iter().enumerate() { syncnets.set(i, *value).unwrap(); } let subnets_per_custody_group = spec.data_column_sidecar_subnet_count / spec.number_of_custody_groups; let metadata = MetaDataV3 { seq_number: 0, attnets, syncnets, custody_group_count: condition.custody_subnets.len() as u64 / subnets_per_custody_group, }; let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap(); peer_info.set_meta_data(MetaData::V3(metadata)); peer_info.set_gossipsub_score(condition.gossipsub_score); peer_info.add_to_score(condition.score); peer_info.set_custody_subnets(condition.custody_subnets.clone()); for subnet in peer_info.long_lived_subnets() { peer_db.add_subscription(&condition.peer_id, subnet); } } // Perform the heartbeat. peer_manager.heartbeat(); // The minimum number of connected peers cannot be less than the target peer count // or submitted peers. let expected_peer_count = target_peer_count.min(peer_conditions.len()); // Trusted peers could make this larger however. let no_of_trusted_peers = peer_conditions .iter() .filter(|condition| condition.trusted) .count(); let expected_peer_count = expected_peer_count.max(no_of_trusted_peers); let target_peer_condition = peer_manager.network_globals.connected_or_dialing_peers() == expected_peer_count; // It could be that we reach our target outbound limit and are unable to prune any // extra, which violates the target_peer_condition. let outbound_peers = peer_manager.network_globals.connected_outbound_only_peers(); let hit_outbound_limit = outbound_peers == peer_manager.target_outbound_peers(); // No trusted peers should be disconnected let trusted_peer_disconnected = peer_conditions.iter().any(|condition| { condition.trusted && !peer_manager .network_globals .peers .read() .is_connected(&condition.peer_id) }); (target_peer_condition || hit_outbound_limit) && !trusted_peer_disconnected }); prop_assert!(result); } } } #[tokio::test] async fn test_custody_peer_logic_only_runs_when_peerdas_enabled() { use crate::types::{GossipEncoding, GossipTopic}; let mut peer_manager = build_peer_manager(5).await; // Set up sampling subnets so maintain_custody_peers would have work to do *peer_manager.network_globals.sampling_subnets.write() = std::collections::HashSet::from([ DataColumnSubnetId::new(0), DataColumnSubnetId::new(1), ]); // Test 1: No data column subscriptions - custody peer logic should NOT run peer_manager.heartbeat(); // Should be no new DiscoverSubnetPeers events since PeerDAS is not enabled let discovery_events: Vec<_> = peer_manager .events .iter() .filter(|event| matches!(event, PeerManagerEvent::DiscoverSubnetPeers(_))) .collect(); assert!( discovery_events.is_empty(), "Should not generate discovery events when PeerDAS is disabled, but found: {:?}", discovery_events ); // Test 2: Add data column subscription - custody peer logic should run let data_column_topic = GossipTopic::new( GossipKind::DataColumnSidecar(DataColumnSubnetId::new(0)), GossipEncoding::SSZSnappy, [0, 0, 0, 0], // fork_digest ); peer_manager .network_globals .gossipsub_subscriptions .write() .insert(data_column_topic); // Clear any existing events to isolate the test peer_manager.events.clear(); peer_manager.heartbeat(); // Should now have DiscoverSubnetPeers events since PeerDAS is enabled let discovery_events: Vec<_> = peer_manager .events .iter() .filter(|event| matches!(event, PeerManagerEvent::DiscoverSubnetPeers(_))) .collect(); assert!( !discovery_events.is_empty(), "Should generate discovery events when PeerDAS is enabled, but found no discovery events" ); } }