diff --git a/beacon_node/lighthouse_network/src/behaviour/mod.rs b/beacon_node/lighthouse_network/src/behaviour/mod.rs index d3f9b40c42..c0a1fb3f71 100644 --- a/beacon_node/lighthouse_network/src/behaviour/mod.rs +++ b/beacon_node/lighthouse_network/src/behaviour/mod.rs @@ -2,7 +2,9 @@ use crate::behaviour::gossipsub_scoring_parameters::{ lighthouse_gossip_thresholds, PeerScoreSettings, }; use crate::config::gossipsub_config; -use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent}; +use crate::discovery::{ + subnet_predicate, Discovery, DiscoveryEvent, FIND_NODE_QUERY_CLOSEST_PEERS, +}; use crate::peer_manager::{ config::Config as PeerManagerCfg, peerdb::score::PeerAction, peerdb::score::ReportSource, ConnectionDirection, PeerManager, PeerManagerEvent, @@ -218,7 +220,7 @@ impl Behaviour { let mut discovery = Discovery::new(local_key, &config, network_globals.clone(), log).await?; // start searching for peers - discovery.discover_peers(); + discovery.discover_peers(FIND_NODE_QUERY_CLOSEST_PEERS); // Grab our local ENR FORK ID let enr_fork_id = network_globals @@ -1230,9 +1232,9 @@ impl NetworkBehaviourEventProcess for Behaviou // the network to send a status to this peer self.add_event(BehaviourEvent::StatusPeer(peer_id)); } - PeerManagerEvent::DiscoverPeers => { + PeerManagerEvent::DiscoverPeers(peers_to_find) => { // Peer manager has requested a discovery query for more peers. - self.discovery.discover_peers(); + self.discovery.discover_peers(peers_to_find); } PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => { // Peer manager has requested a subnet discovery query for more peers. diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 4f7ec432b7..ab4d54a1e7 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -63,7 +63,7 @@ const MAX_SUBNETS_IN_QUERY: usize = 3; /// /// We could reduce this constant to speed up queries however at the cost of security. It will /// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. -const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; +pub const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; /// The threshold for updating `min_ttl` on a connected peer. const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); @@ -317,17 +317,18 @@ impl Discovery { } /// This adds a new `FindPeers` query to the queue if one doesn't already exist. - pub fn discover_peers(&mut self) { + /// The `target_peers` parameter informs discovery to end the query once the target is found. + /// The maximum this can be is 16. + pub fn discover_peers(&mut self, target_peers: usize) { // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. if !self.started || self.find_peer_active { return; } // Immediately start a FindNode query - debug!(self.log, "Starting a peer discovery request"); + let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers); + debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers ); self.find_peer_active = true; - self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS, |_| { - true - }); + self.start_query(QueryType::FindPeers, target_peers, |_| true); } /// Processes a request to search for more peers on a subnet. diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 6b8f6fff60..48edd3abb6 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -9,6 +9,7 @@ use discv5::Enr; use hashset_delay::HashSetDelay; use libp2p::identify::IdentifyInfo; use peerdb::{client::ClientKind, BanOperation, BanResult, ScoreUpdateResult}; +use rand::seq::SliceRandom; use slog::{debug, error, warn}; use smallvec::SmallVec; use std::{ @@ -37,17 +38,24 @@ mod network_behaviour; /// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; +/// This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would +/// lower our peer count below this number. Instead we favour a non-uniform distribution of subnet +/// peers. +pub const MIN_SYNC_COMMITTEE_PEERS: u64 = 2; /// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of /// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and /// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. pub const PEER_EXCESS_FACTOR: f32 = 0.1; -/// A fraction of `PeerManager::target_peers` that need to be outbound-only connections. -pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.3; +/// A fraction of `PeerManager::target_peers` that we want to be outbound-only connections. +pub const TARGET_OUTBOUND_ONLY_FACTOR: f32 = 0.3; +/// A fraction of `PeerManager::target_peers` that if we get below, we start a discovery query to +/// reach our target. MIN_OUTBOUND_ONLY_FACTOR must be < TARGET_OUTBOUND_ONLY_FACTOR. +pub const MIN_OUTBOUND_ONLY_FACTOR: f32 = 0.2; /// The fraction of extra peers beyond the PEER_EXCESS_FACTOR that we allow us to dial for when /// requiring subnet peers. More specifically, if our target peer limit is 50, and our excess peer /// limit is 55, and we are at 55 peers, the following parameter provisions a few more slots of /// dialing priority peers we need for validator duties. -pub const PRIORITY_PEER_EXCESS: f32 = 0.1; +pub const PRIORITY_PEER_EXCESS: f32 = 0.2; /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { @@ -99,8 +107,8 @@ pub enum PeerManagerEvent { Banned(PeerId, Vec), /// The peer should be unbanned with the associated ip addresses. UnBanned(PeerId, Vec), - /// Request the behaviour to discover more peers. - DiscoverPeers, + /// Request the behaviour to discover more peers and the amount of peers to discover. + DiscoverPeers(usize), /// Request the behaviour to discover peers on subnets. DiscoverSubnetPeers(Vec), } @@ -291,19 +299,7 @@ impl PeerManager { } // Queue another discovery if we need to - let peer_count = self.network_globals.connected_or_dialing_peers(); - let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); - let min_outbound_only_target = - (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; - - if self.discovery_enabled - && (peer_count < self.target_peers.saturating_sub(to_dial_peers.len()) - || outbound_only_peer_count < min_outbound_only_target) - { - // We need more peers, re-queue a discovery lookup. - debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); - self.events.push(PeerManagerEvent::DiscoverPeers); - } + self.maintain_peer_count(to_dial_peers.len()); to_dial_peers } @@ -342,6 +338,23 @@ impl PeerManager { as usize } + /// The minimum number of outbound peers that we reach before we start another discovery query. + fn min_outbound_only_peers(&self) -> usize { + (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize + } + + /// The minimum number of outbound peers that we reach before we start another discovery query. + fn target_outbound_peers(&self) -> usize { + (self.target_peers as f32 * TARGET_OUTBOUND_ONLY_FACTOR).ceil() as usize + } + + /// The maximum number of peers that are connected or dialing before we refuse to do another + /// discovery search for more outbound peers. We can use up to half the priority peer excess allocation. + fn max_outbound_dialing_peers(&self) -> usize { + (self.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS / 2.0)).ceil() + as usize + } + /* Notifications from the Swarm */ // A peer is being dialed. @@ -363,11 +376,12 @@ impl PeerManager { /// Reports whether the peer limit is reached in which case we stop allowing new incoming /// connections. pub fn peer_limit_reached(&self, count_dialing: bool) -> bool { - let max_peers = self.max_peers(); if count_dialing { - self.network_globals.connected_or_dialing_peers() >= max_peers + // This is an incoming connection so limit by the standard max peers + self.network_globals.connected_or_dialing_peers() >= self.max_peers() } else { - self.network_globals.connected_peers() >= max_peers + // We dialed this peer, allow up to max_outbound_dialing_peers + self.network_globals.connected_peers() >= self.max_outbound_dialing_peers() } } @@ -819,6 +833,278 @@ impl PeerManager { } } + /// This function checks the status of our current peers and optionally requests a discovery + /// query if we need to find more peers to maintain the current number of peers + fn maintain_peer_count(&mut self, dialing_peers: usize) { + // Check if we need to do a discovery lookup + if self.discovery_enabled { + let peer_count = self.network_globals.connected_or_dialing_peers(); + let outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); + let wanted_peers = if peer_count < self.target_peers.saturating_sub(dialing_peers) { + // We need more peers in general. + // The maximum discovery query is for 16 peers, but we can search for less if + // needed. + std::cmp::min( + self.target_peers.saturating_sub(dialing_peers) - peer_count, + 16, + ) + } else if outbound_only_peer_count < self.min_outbound_only_peers() + && peer_count < self.max_outbound_dialing_peers() + { + std::cmp::min( + self.max_outbound_dialing_peers() + .saturating_sub(dialing_peers) + - peer_count, + 16, + ) + } else { + 0 + }; + + if wanted_peers != 0 { + // We need more peers, re-queue a discovery lookup. + debug!(self.log, "Starting a new peer discovery query"; "connected" => peer_count, "target" => self.target_peers, "outbound" => outbound_only_peer_count, "wanted" => wanted_peers); + self.events + .push(PeerManagerEvent::DiscoverPeers(wanted_peers)); + } + } + } + + /// Remove excess peers back down to our target values. + /// This prioritises peers with a good score and uniform distribution of peers across + /// subnets. + /// + /// The logic for the peer pruning is as follows: + /// + /// Global rules: + /// - Always maintain peers we need for a validator duty. + /// - Do not prune outbound peers to exceed our outbound target. + /// - Do not prune more peers than our target peer count. + /// - If we have an option to remove a number of peers, remove ones that have the least + /// long-lived subnets. + /// - When pruning peers based on subnet count. If multiple peers can be chosen, choose a peer + /// that is not subscribed to a long-lived sync committee subnet. + /// - When pruning peers based on subnet count, do not prune a peer that would lower us below the + /// MIN_SYNC_COMMITTEE_PEERS peer count. To keep it simple, we favour a minimum number of sync-committee-peers over + /// uniformity subnet peers. NOTE: We could apply more sophisticated logic, but the code is + /// simpler and easier to maintain if we take this approach. If we are pruning subnet peers + /// below the MIN_SYNC_COMMITTEE_PEERS and maintaining the sync committee peers, this should be + /// fine as subnet peers are more likely to be found than sync-committee-peers. Also, we're + /// in a bit of trouble anyway if we have so few peers on subnets. The + /// MIN_SYNC_COMMITTEE_PEERS + /// number should be set low as an absolute lower bound to maintain peers on the sync + /// committees. + /// + /// Prune peers in the following order: + /// 1. Remove worst scoring peers + /// 2. Remove peers that are not subscribed to a subnet (they have less value) + /// 3. Remove peers that we have many on any particular subnet + /// 4. Randomly remove peers if all the above are satisfied + /// + fn prune_excess_peers(&mut self) { + // The current number of connected peers. + let connected_peer_count = self.network_globals.connected_peers(); + if connected_peer_count <= self.target_peers { + // No need to prune peers + return; + } + + // Keep a list of peers we are pruning. + let mut peers_to_prune = std::collections::HashSet::new(); + let connected_outbound_peer_count = self.network_globals.connected_outbound_only_peers(); + + // Keep track of the number of outbound peers we are pruning. + let mut outbound_peers_pruned = 0; + + macro_rules! prune_peers { + ($filter: expr) => { + for (peer_id, info) in self + .network_globals + .peers + .read() + .worst_connected_peers() + .iter() + .filter(|(_, info)| !info.has_future_duty() && $filter(*info)) + { + if peers_to_prune.len() + >= connected_peer_count.saturating_sub(self.target_peers) + { + // We have found all the peers we need to drop, end. + break; + } + if peers_to_prune.contains(*peer_id) { + continue; + } + // Only remove up to the target outbound peer count. + if info.is_outbound_only() { + if self.target_outbound_peers() + outbound_peers_pruned + < connected_outbound_peer_count + { + outbound_peers_pruned += 1; + } else { + continue; + } + } + peers_to_prune.insert(**peer_id); + } + }; + } + + // 1. Look through peers that have the worst score (ignoring non-penalized scored peers). + prune_peers!(|info: &PeerInfo| { info.score().score() < 0.0 }); + + // 2. Attempt to remove peers that are not subscribed to a subnet, if we still need to + // prune more. + if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { + prune_peers!(|info: &PeerInfo| { !info.has_long_lived_subnet() }); + } + + // 3. and 4. Remove peers that are too grouped on any given subnet. If all subnets are + // uniformly distributed, remove random peers. + if peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { + // Of our connected peers, build a map from subnet_id -> Vec<(PeerId, PeerInfo)> + let mut subnet_to_peer: HashMap)>> = + HashMap::new(); + // These variables are used to track if a peer is in a long-lived sync-committee as we + // may wish to retain this peer over others when pruning. + let mut sync_committee_peer_count: HashMap = HashMap::new(); + let mut peer_to_sync_committee: HashMap< + PeerId, + std::collections::HashSet, + > = HashMap::new(); + + for (peer_id, info) in self.network_globals.peers.read().connected_peers() { + // Ignore peers we are already pruning + if peers_to_prune.contains(peer_id) { + continue; + } + + // Count based on long-lived subnets not short-lived subnets + // NOTE: There are only 4 sync committees. These are likely to be denser than the + // subnets, so our priority here to make the subnet peer count uniform, ignoring + // the dense sync committees. + for subnet in info.long_lived_subnets() { + match subnet { + Subnet::Attestation(_) => { + subnet_to_peer + .entry(subnet) + .or_insert_with(Vec::new) + .push((*peer_id, info.clone())); + } + Subnet::SyncCommittee(id) => { + *sync_committee_peer_count.entry(id).or_default() += 1; + peer_to_sync_committee + .entry(*peer_id) + .or_default() + .insert(id); + } + } + } + } + + // Add to the peers to prune mapping + while peers_to_prune.len() < connected_peer_count.saturating_sub(self.target_peers) { + if let Some((_, peers_on_subnet)) = subnet_to_peer + .iter_mut() + .max_by_key(|(_, peers)| peers.len()) + { + // and the subnet still contains peers + if !peers_on_subnet.is_empty() { + // Order the peers by the number of subnets they are long-lived + // subscribed too, shuffle equal peers. + peers_on_subnet.shuffle(&mut rand::thread_rng()); + peers_on_subnet.sort_by_key(|(_, info)| info.long_lived_subnet_count()); + + // Try and find a candidate peer to remove from the subnet. + // We ignore peers that would put us below our target outbound peers + // and we currently ignore peers that would put us below our + // sync-committee threshold, if we can avoid it. + + let mut removed_peer_index = None; + for (index, (candidate_peer, info)) in peers_on_subnet.iter().enumerate() { + // Ensure we don't remove too many outbound peers + if info.is_outbound_only() { + if self.target_outbound_peers() + < connected_outbound_peer_count + .saturating_sub(outbound_peers_pruned) + { + outbound_peers_pruned += 1; + } else { + // Restart the main loop with the outbound peer removed from + // the list. This will lower the peers per subnet count and + // potentially a new subnet may be chosen to remove peers. This + // can occur recursively until we have no peers left to choose + // from. + continue; + } + } + + // Check the sync committee + if let Some(subnets) = peer_to_sync_committee.get(candidate_peer) { + // The peer is subscribed to some long-lived sync-committees + // Of all the subnets this peer is subscribed too, the minimum + // peer count of all of them is min_subnet_count + if let Some(min_subnet_count) = subnets + .iter() + .filter_map(|v| sync_committee_peer_count.get(v).copied()) + .min() + { + // If the minimum count is our target or lower, we + // shouldn't remove this peer, because it drops us lower + // than our target + if min_subnet_count <= MIN_SYNC_COMMITTEE_PEERS { + // Do not drop this peer in this pruning interval + continue; + } + } + } + + // This peer is suitable to be pruned + removed_peer_index = Some(index); + break; + } + + // If we have successfully found a candidate peer to prune, prune it, + // otherwise all peers on this subnet should not be removed due to our + // outbound limit or min_subnet_count. In this case, we remove all + // peers from the pruning logic and try another subnet. + if let Some(index) = removed_peer_index { + let (candidate_peer, _) = peers_on_subnet.remove(index); + // Remove pruned peers from other subnet counts + for subnet_peers in subnet_to_peer.values_mut() { + subnet_peers.retain(|(peer_id, _)| peer_id != &candidate_peer); + } + // Remove pruned peers from all sync-committee counts + if let Some(known_sync_committes) = + peer_to_sync_committee.get(&candidate_peer) + { + for sync_committee in known_sync_committes { + if let Some(sync_committee_count) = + sync_committee_peer_count.get_mut(sync_committee) + { + *sync_committee_count = + sync_committee_count.saturating_sub(1); + } + } + } + peers_to_prune.insert(candidate_peer); + } else { + peers_on_subnet.clear(); + } + continue; + } + } + // If there are no peers left to prune exit. + break; + } + } + + // Disconnect the pruned peers. + for peer_id in peers_to_prune { + self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); + } + } + /// The Peer manager's heartbeat maintains the peer count and maintains peer reputations. /// /// It will request discovery queries if the peer count has not reached the desired number of @@ -826,19 +1112,15 @@ impl PeerManager { /// /// NOTE: Discovery will only add a new query if one isn't already queued. fn heartbeat(&mut self) { - let peer_count = self.network_globals.connected_or_dialing_peers(); - let mut outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); - let min_outbound_only_target = - (self.target_peers as f32 * MIN_OUTBOUND_ONLY_FACTOR).ceil() as usize; + // Optionally run a discovery query if we need more peers. + self.maintain_peer_count(0); - if self.discovery_enabled - && (peer_count < self.target_peers - || outbound_only_peer_count < min_outbound_only_target) - { - // If we need more peers, queue a discovery lookup. - debug!(self.log, "Starting a new peer discovery query"; "connected_peers" => peer_count, "target_peers" => self.target_peers); - self.events.push(PeerManagerEvent::DiscoverPeers); - } + // Cleans up the connection state of dialing peers. + // Libp2p dials peer-ids, but sometimes the response is from another peer-id or libp2p + // returns dial errors without a peer-id attached. This function reverts peers that have a + // dialing status long than DIAL_TIMEOUT seconds to a disconnected status. This is important because + // we count the number of dialing peers in our inbound connections. + self.network_globals.peers.write().cleanup_dialing_peers(); // Updates peer's scores and unban any peers if required. let actions = self.network_globals.peers.write().update_scores(); @@ -852,40 +1134,9 @@ impl PeerManager { // Maintain minimum count for sync committee peers. self.maintain_sync_committee_peers(); - // Keep a list of peers we are disconnecting - let mut disconnecting_peers = Vec::new(); - - let connected_peer_count = self.network_globals.connected_peers(); - if connected_peer_count > self.target_peers { - // Remove excess peers with the worst scores, but keep subnet peers. - // Must also ensure that the outbound-only peer count does not go below the minimum threshold. - outbound_only_peer_count = self.network_globals.connected_outbound_only_peers(); - let mut n_outbound_removed = 0; - for (peer_id, info) in self - .network_globals - .peers - .read() - .worst_connected_peers() - .iter() - .filter(|(_, info)| !info.has_future_duty()) - { - if disconnecting_peers.len() == connected_peer_count - self.target_peers { - break; - } - if info.is_outbound_only() { - if min_outbound_only_target < outbound_only_peer_count - n_outbound_removed { - n_outbound_removed += 1; - } else { - continue; - } - } - disconnecting_peers.push(**peer_id); - } - } - - for peer_id in disconnecting_peers { - self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); - } + // Prune any excess peers back to our target in such a way that incentivises good scores and + // a uniform distribution of subnets. + self.prune_excess_peers(); } // Update metrics related to peer scoring. @@ -977,7 +1228,7 @@ enum ConnectingType { mod tests { use super::*; use slog::{o, Drain}; - use types::MinimalEthSpec as E; + use types::MainnetEthSpec as E; pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { let decorator = slog_term::TermDecorator::new().build(); @@ -1212,4 +1463,434 @@ mod tests { // the number of connected peers updates and we will not remove too many peers. assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); } + + #[tokio::test] + /// We want to test that the peer manager removes peers that are not subscribed to a subnet as + /// a priority over all else. + async fn test_peer_manager_remove_non_subnet_peers_when_all_healthy() { + let mut peer_manager = build_peer_manager(3).await; + + // Create 5 peers to connect to. + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); + let peer4 = PeerId::random(); + + println!("{}", peer0); + println!("{}", peer1); + println!("{}", peer2); + println!("{}", peer3); + println!("{}", peer4); + + peer_manager.inject_connect_ingoing(&peer0, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer1, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer2, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer3, "/ip4/0.0.0.0".parse().unwrap(), None); + peer_manager.inject_connect_ingoing(&peer4, "/ip4/0.0.0.0".parse().unwrap(), None); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + attnets.set(1, true).unwrap(); + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets: Default::default(), + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer0) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer0, Subnet::Attestation(1.into())); + + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + attnets.set(10, true).unwrap(); + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets: Default::default(), + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer2) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer2, Subnet::Attestation(10.into())); + + let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); + syncnets.set(3, true).unwrap(); + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets: Default::default(), + syncnets, + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer4) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer4, Subnet::SyncCommittee(3.into())); + + // Perform the heartbeat. + peer_manager.heartbeat(); + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!(peer_manager.network_globals.connected_or_dialing_peers(), 3); + + // Check that we removed the peers that were not subscribed to any subnet + let mut peers_should_have_removed = std::collections::HashSet::new(); + peers_should_have_removed.insert(peer1); + peers_should_have_removed.insert(peer3); + for (peer, _) in peer_manager + .network_globals + .peers + .read() + .peers() + .filter(|(_, info)| { + matches!( + info.connection_status(), + PeerConnectionStatus::Disconnecting { .. } + ) + }) + { + println!("{}", peer); + assert!(peers_should_have_removed.remove(peer)); + } + // Ensure we removed all the peers + assert!(peers_should_have_removed.is_empty()); + } + + #[tokio::test] + /// Test the pruning logic to remove grouped subnet peers + async fn test_peer_manager_prune_grouped_subnet_peers() { + let target = 9; + let mut peer_manager = build_peer_manager(target).await; + + // Create 5 peers to connect to. + let mut peers = Vec::new(); + for x in 0..20 { + // Make 20 peers and group peers as: + // id mod % 4 + // except for the last 5 peers which all go on their own subnets + // So subnets 0-2 should have 4 peers subnet 3 should have 3 and 15-19 should have 1 + let subnet: u64 = { + if x < 15 { + x % 4 + } else { + x + } + }; + + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + attnets.set(subnet as usize, true).unwrap(); + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets: Default::default(), + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, Subnet::Attestation(subnet.into())); + println!("{},{},{}", x, subnet, peer); + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + // Check that we removed the peers that were not subscribed to any subnet + // Should remove peers from subnet 0-2 first. Removing 3 peers subnets 0-3 now have 3 + // peers. + // Should then remove 8 peers each from subnets 1-4. New total: 11 peers. + // Therefore the remaining peer set should be each on their own subnet. + // Lets check this: + + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + for peer in connected_peers.iter() { + let position = peers.iter().position(|peer_id| peer_id == peer).unwrap(); + println!("{},{}", position, peer); + } + + println!(); + + for peer in connected_peers.iter() { + let position = peers.iter().position(|peer_id| peer_id == peer).unwrap(); + println!("{},{}", position, peer); + + if position < 15 { + let y = position % 4; + for x in 0..4 { + let alternative_index = y + 4 * x; + if alternative_index != position && alternative_index < 15 { + // Make sure a peer on the same subnet has been removed + println!( + "Check against: {}, {}", + alternative_index, &peers[alternative_index] + ); + assert!(!connected_peers.contains(&peers[alternative_index])); + } + } + } + } + } + + /// Test the pruning logic to prioritise peers with the most subnets + /// + /// Create 6 peers. + /// Peer0: None + /// Peer1 : Subnet 1,2,3 + /// Peer2 : Subnet 1,2 + /// Peer3 : Subnet 3 + /// Peer4 : Subnet 1 + /// Peer5 : Subnet 2 + /// + /// Prune 3 peers: Should be Peer0, Peer 4 and Peer 5 because (4 and 5) are both on the subnet with the + /// most peers and have the least subscribed long-lived subnets. And peer 0 because it has no + /// long-lived subnet. + #[tokio::test] + async fn test_peer_manager_prune_subnet_peers_most_subscribed() { + let target = 3; + let mut peer_manager = build_peer_manager(target).await; + + // Create 6 peers to connect to. + let mut peers = Vec::new(); + for x in 0..6 { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + + match x { + 0 => {} + 1 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + attnets.set(3, true).unwrap(); + } + 2 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + } + 3 => { + attnets.set(3, true).unwrap(); + } + 4 => { + attnets.set(1, true).unwrap(); + } + 5 => { + attnets.set(2, true).unwrap(); + } + _ => unreachable!(), + } + + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets: Default::default(), + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + println!("{},{}", x, peer); + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + // Check that we removed peers 4 and 5 + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + assert!(!connected_peers.contains(&peers[0])); + assert!(!connected_peers.contains(&peers[4])); + assert!(!connected_peers.contains(&peers[5])); + } + + /// Test the pruning logic to prioritise peers with the most subnets, but not at the expense of + /// removing our few sync-committee subnets. + /// + /// Create 6 peers. + /// Peer0: None + /// Peer1 : Subnet 1,2,3, + /// Peer2 : Subnet 1,2, + /// Peer3 : Subnet 3 + /// Peer4 : Subnet 1,2, Sync-committee-1 + /// Peer5 : Subnet 1,2, Sync-committee-2 + /// + /// Prune 3 peers: Should be Peer0, Peer1 and Peer2 because (4 and 5 are on a sync-committee) + #[tokio::test] + async fn test_peer_manager_prune_subnet_peers_sync_committee() { + let target = 3; + let mut peer_manager = build_peer_manager(target).await; + + // Create 6 peers to connect to. + let mut peers = Vec::new(); + for x in 0..6 { + let peer = PeerId::random(); + peer_manager.inject_connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); + + // Have some of the peers be on a long-lived subnet + let mut attnets = crate::types::EnrAttestationBitfield::::new(); + let mut syncnets = crate::types::EnrSyncCommitteeBitfield::::new(); + + match x { + 0 => {} + 1 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + attnets.set(3, true).unwrap(); + } + 2 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + } + 3 => { + attnets.set(3, true).unwrap(); + } + 4 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + syncnets.set(1, true).unwrap(); + } + 5 => { + attnets.set(1, true).unwrap(); + attnets.set(2, true).unwrap(); + syncnets.set(2, true).unwrap(); + } + _ => unreachable!(), + } + + let metadata = crate::rpc::MetaDataV2 { + seq_number: 0, + attnets, + syncnets, + }; + peer_manager + .network_globals + .peers + .write() + .peer_info_mut(&peer) + .unwrap() + .set_meta_data(MetaData::V2(metadata)); + let long_lived_subnets = peer_manager + .network_globals + .peers + .read() + .peer_info(&peer) + .unwrap() + .long_lived_subnets(); + println!("{},{}", x, peer); + for subnet in long_lived_subnets { + println!("Subnet: {:?}", subnet); + peer_manager + .network_globals + .peers + .write() + .add_subscription(&peer, subnet); + } + peers.push(peer); + } + + // Perform the heartbeat. + peer_manager.heartbeat(); + + // Tests that when we are over the target peer limit, after disconnecting an unhealthy peer, + // the number of connected peers updates and we will not remove too many peers. + assert_eq!( + peer_manager.network_globals.connected_or_dialing_peers(), + target + ); + + // Check that we removed peers 4 and 5 + let connected_peers: std::collections::HashSet<_> = peer_manager + .network_globals + .peers + .read() + .connected_or_dialing_peers() + .cloned() + .collect(); + + assert!(!connected_peers.contains(&peers[0])); + assert!(!connected_peers.contains(&peers[1])); + assert!(!connected_peers.contains(&peers[2])); + } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index cddff1218c..1f44488a56 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -29,6 +29,9 @@ const BANNED_PEERS_PER_IP_THRESHOLD: usize = 5; /// Relative factor of peers that are allowed to have a negative gossipsub score without penalizing /// them in lighthouse. const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR: f32 = 0.1; +/// The time we allow peers to be in the dialing state in our PeerDb before we revert them to a +/// disconnected state. +const DIAL_TIMEOUT: u64 = 15; /// Storage of known peers, their reputation and information pub struct PeerDB { @@ -322,6 +325,32 @@ impl PeerDB { /* Mutability */ + /// Cleans up the connection state of dialing peers. + // Libp2p dial's peerids, but sometimes the response is from another peer-id or libp2p + // returns dial errors without a peer-id attached. This function reverts peers that have a + // dialing status longer than DIAL_TIMEOUT seconds to a disconnected status. This is important because + // we count the number of dialing peers in our inbound connections. + pub fn cleanup_dialing_peers(&mut self) { + let peers_to_disconnect: Vec<_> = self + .peers + .iter() + .filter_map(|(peer_id, info)| { + if let PeerConnectionStatus::Dialing { since } = info.connection_status() { + if (*since) + std::time::Duration::from_secs(DIAL_TIMEOUT) + < std::time::Instant::now() + { + return Some(*peer_id); + } + } + None + }) + .collect(); + + for peer_id in peers_to_disconnect { + self.update_connection_state(&peer_id, NewConnectionState::Disconnected); + } + } + /// Allows the sync module to update sync status' of peers. Returns None, if the peer doesn't /// exist and returns Some(bool) representing if the sync state was modified. pub fn update_sync_status( diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 941ca7e6c9..6273356b8f 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -1,6 +1,7 @@ use super::client::Client; use super::score::{PeerAction, Score, ScoreState}; use super::sync_status::SyncStatus; +use crate::discovery::Eth2Enr; use crate::Multiaddr; use crate::{rpc::MetaData, types::Subnet}; use discv5::Enr; @@ -139,11 +140,92 @@ impl PeerInfo { self.enr.as_ref() } + /// An iterator over all the subnets this peer is subscribed to. + pub fn subnets(&self) -> impl Iterator { + self.subnets.iter() + } + + /// Returns the number of long lived subnets a peer is subscribed to. + // NOTE: This currently excludes sync committee subnets + pub fn long_lived_subnet_count(&self) -> usize { + if let Some(meta_data) = self.meta_data.as_ref() { + return meta_data.attnets().num_set_bits(); + } else if let Some(enr) = self.enr.as_ref() { + if let Ok(attnets) = enr.attestation_bitfield::() { + return attnets.num_set_bits(); + } + } + 0 + } + + /// Returns an iterator over the long-lived subnets if it has any. + pub fn long_lived_subnets(&self) -> Vec { + let mut long_lived_subnets = Vec::new(); + // Check the meta_data + if let Some(meta_data) = self.meta_data.as_ref() { + for subnet in 0..=meta_data.attnets().highest_set_bit().unwrap_or(0) { + if meta_data.attnets().get(subnet).unwrap_or(false) { + long_lived_subnets.push(Subnet::Attestation((subnet as u64).into())); + } + } + + if let Ok(syncnet) = meta_data.syncnets() { + for subnet in 0..=syncnet.highest_set_bit().unwrap_or(0) { + if syncnet.get(subnet).unwrap_or(false) { + long_lived_subnets.push(Subnet::SyncCommittee((subnet as u64).into())); + } + } + } + } else if let Some(enr) = self.enr.as_ref() { + if let Ok(attnets) = enr.attestation_bitfield::() { + for subnet in 0..=attnets.highest_set_bit().unwrap_or(0) { + if attnets.get(subnet).unwrap_or(false) { + long_lived_subnets.push(Subnet::Attestation((subnet as u64).into())); + } + } + } + + if let Ok(syncnets) = enr.sync_committee_bitfield::() { + for subnet in 0..=syncnets.highest_set_bit().unwrap_or(0) { + if syncnets.get(subnet).unwrap_or(false) { + long_lived_subnets.push(Subnet::SyncCommittee((subnet as u64).into())); + } + } + } + } + long_lived_subnets + } + /// Returns if the peer is subscribed to a given `Subnet` from the gossipsub subscriptions. pub fn on_subnet_gossipsub(&self, subnet: &Subnet) -> bool { self.subnets.contains(subnet) } + /// Returns true if the peer is connected to a long-lived subnet. + pub fn has_long_lived_subnet(&self) -> bool { + // Check the meta_data + if let Some(meta_data) = self.meta_data.as_ref() { + if !meta_data.attnets().is_zero() && !self.subnets.is_empty() { + return true; + } + if let Ok(sync) = meta_data.syncnets() { + if !sync.is_zero() { + return true; + } + } + } + + // We may not have the metadata but may have an ENR. Lets check that + if let Some(enr) = self.enr.as_ref() { + if let Ok(attnets) = enr.attestation_bitfield::() { + if !attnets.is_zero() && !self.subnets.is_empty() { + return true; + } + } + } + false + } + /// Returns the seen addresses of the peer. pub fn seen_addresses(&self) -> impl Iterator + '_ { self.seen_addresses.iter() diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index 5c87062e2c..2b0fe6f55a 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -23,7 +23,7 @@ use crate::metrics; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. -/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s. +/// Subnet discovery query takes at most 30 secs, 2 slots take 24s. const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; /// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random /// gossip topics that we subscribed to due to the validator connection. diff --git a/beacon_node/network/src/subnet_service/sync_subnets.rs b/beacon_node/network/src/subnet_service/sync_subnets.rs index 51fef235a1..9e92f62250 100644 --- a/beacon_node/network/src/subnet_service/sync_subnets.rs +++ b/beacon_node/network/src/subnet_service/sync_subnets.rs @@ -21,7 +21,7 @@ use crate::metrics; /// The minimum number of slots ahead that we attempt to discover peers for a subscription. If the /// slot is less than this number, skip the peer discovery process. -/// Subnet discovery query takes atmost 30 secs, 2 slots take 24s. +/// Subnet discovery query takes at most 30 secs, 2 slots take 24s. const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; /// A particular subnet at a given slot. @@ -115,7 +115,7 @@ impl SyncCommitteeService { metrics::inc_counter(&metrics::SYNC_COMMITTEE_SUBSCRIPTION_REQUESTS); //NOTE: We assume all subscriptions have been verified before reaching this service - // Registers the validator with the subnet service. + // Registers the validator with the subnet service. // This will subscribe to long-lived random subnets if required. trace!(self.log, "Sync committee subscription";