diff --git a/Cargo.lock b/Cargo.lock index f61f5f18a1..12eb07ed6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1522,6 +1522,7 @@ dependencies = [ "lighthouse_metrics", "lru 0.5.3", "parking_lot 0.11.0", + "rand 0.7.3", "serde", "serde_derive", "sha2 0.9.1", diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index cb172d59c6..2b49c61400 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -35,6 +35,8 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] } discv5 = { version = "0.1.0-alpha.7", features = ["libp2p"] } tiny-keccak = "2.0.2" environment = { path = "../../lighthouse/environment" } +# TODO: Remove rand crate for mainnet +rand = "0.7.3" [dependencies.libp2p] #version = "0.19.1" diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index 414b828e8f..d3f91ae4cf 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -610,7 +610,7 @@ impl Behaviour { } PeerManagerEvent::DisconnectPeer(peer_id, reason) => { debug!(self.log, "PeerManager requested to disconnect a peer"; - "peer_id" => peer_id.to_string()); + "peer_id" => peer_id.to_string(), "reason" => reason.to_string()); // queue for disabling self.peers_to_dc.push_back(peer_id.clone()); // send one goodbye @@ -731,8 +731,25 @@ impl NetworkBehaviour for Behaviour { conn_id: &ConnectionId, endpoint: &ConnectedPoint, ) { - // If the peer is banned, send a goodbye and disconnect. - if self.peer_manager.is_banned(peer_id) { + let goodbye_reason: Option = if self.peer_manager.is_banned(peer_id) { + // If the peer is banned, send goodbye with reason banned. + Some(GoodbyeReason::Banned) + } else if self.peer_manager.peer_limit_reached() + && self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map_or(true, |i| !i.has_future_duty()) + { + //If we are at our peer limit and we don't need the peer for a future validator + //duty, send goodbye with reason TooManyPeers + Some(GoodbyeReason::TooManyPeers) + } else { + None + }; + + if let Some(reason) = goodbye_reason { self.peers_to_dc.push_back(peer_id.clone()); // send a goodbye on all possible handlers for this peer self.handler_events.push_back(NBAction::NotifyHandler { @@ -740,7 +757,7 @@ impl NetworkBehaviour for Behaviour { handler: NotifyHandler::All, event: BehaviourHandlerIn::Shutdown(Some(( RequestId::Behaviour, - RPCRequest::Goodbye(GoodbyeReason::Banned), + RPCRequest::Goodbye(reason), ))), }); return; @@ -773,7 +790,16 @@ impl NetworkBehaviour for Behaviour { fn inject_connected(&mut self, peer_id: &PeerId) { // Drop any connection from a banned peer. The goodbye and disconnects are handled in // `inject_connection_established()`, which gets called first. - if self.peer_manager.is_banned(peer_id) { + // The same holds if we reached the peer limit and the connected peer has no future duty. + if self.peer_manager.is_banned(peer_id) + || (self.peer_manager.peer_limit_reached() + && self + .network_globals + .peers + .read() + .peer_info(peer_id) + .map_or(true, |i| !i.has_future_duty())) + { return; } @@ -828,7 +854,16 @@ impl NetworkBehaviour for Behaviour { event: ::OutEvent, ) { // All events from banned peers are rejected - if self.peer_manager.is_banned(&peer_id) { + // The same holds if we reached the peer limit and the connected peer has no future duty. + if self.peer_manager.is_banned(&peer_id) + || (self.peer_manager.peer_limit_reached() + && self + .network_globals + .peers + .read() + .peer_info(&peer_id) + .map_or(true, |i| !i.has_future_duty())) + { return; } diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index d428362b0c..6b75253e05 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -37,7 +37,7 @@ pub struct Config { pub enr_tcp_port: Option, /// Target number of connected peers. - pub max_peers: usize, + pub target_peers: usize, /// Gossipsub configuration parameters. #[serde(skip)] @@ -122,7 +122,7 @@ impl Default for Config { enr_address: None, enr_udp_port: None, enr_tcp_port: None, - max_peers: 50, + target_peers: 50, gs_config, discv5_config, boot_nodes: vec![], diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 58824a28c0..b04fd7338a 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -42,6 +42,11 @@ const PING_INTERVAL: u64 = 30; /// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; +/// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of +/// `PeerManager::target_peers`. For clarity, if `PeerManager::target_peers` is 50 and +/// PEER_EXCESS_FACTOR = 0.1 we allow 10% more nodes, i.e 55. +const PEER_EXCESS_FACTOR: f32 = 0.1; + /// The main struct that handles peer's reputation and connection status. pub struct PeerManager { /// Storage of network globals to access the `PeerDB`. @@ -54,6 +59,8 @@ pub struct PeerManager { status_peers: HashSetDelay, /// The target number of peers we would like to connect to. target_peers: usize, + /// The maximum number of peers we allow (exceptions for subnet peers) + max_peers: usize, /// The discovery service. discovery: Discovery, /// The heartbeat interval to perform routine maintenance. @@ -99,7 +106,8 @@ impl PeerManager { events: SmallVec::new(), ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), - target_peers: config.max_peers, //TODO: Add support for target peers and max peers + target_peers: config.target_peers, + max_peers: (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as usize, discovery, heartbeat, log: log.clone(), @@ -278,6 +286,12 @@ impl PeerManager { self.network_globals.peers.read().is_banned(peer_id) } + /// Reports whether the peer limit is reached in which case we stop allowing new incoming + /// connections. + pub fn peer_limit_reached(&self) -> bool { + self.network_globals.connected_or_dialing_peers() >= self.max_peers + } + /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { @@ -478,11 +492,13 @@ impl PeerManager { fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option) { let mut to_dial_peers = Vec::new(); + let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); for enr in peers { let peer_id = enr.peer_id(); - // if we need more peers, attempt a connection - if self.network_globals.connected_or_dialing_peers() < self.target_peers + // we attempt a connection if this peer is a subnet peer or if the max peer count + // is not yet filled (including dialling peers) + if (min_ttl.is_some() || connected_or_dialing + to_dial_peers.len() < self.max_peers) && !self .network_globals .peers @@ -514,7 +530,6 @@ impl PeerManager { /// This is called by `connect_ingoing` and `connect_outgoing`. /// /// This informs if the peer was accepted in to the db or not. - // TODO: Drop peers if over max_peer limit fn connect_peer(&mut self, peer_id: &PeerId, connection: ConnectingType) -> bool { // TODO: remove after timed updates //self.update_reputations(); @@ -673,11 +688,30 @@ impl PeerManager { self.discovery.discover_peers(); } - // TODO: If we have too many peers, remove peers that are not required for subnet - // validation. - // Updates peer's scores. self.update_peer_scores(); + + let connected_peer_count = self.network_globals.connected_peers(); + if connected_peer_count > self.target_peers { + //remove excess peers with the worst scores, but keep subnet peers + for (peer_id, _) in self + .network_globals + .peers + .read() + .worst_connected_peers() + .iter() + .filter(|(_, info)| !info.has_future_duty()) + .take(connected_peer_count - self.target_peers) + //we only need to disconnect peers with healthy scores, since the others got already + //disconnected in update_peer_scores + .filter(|(_, info)| info.score.state() == ScoreState::Healthy) + { + self.events.push(PeerManagerEvent::DisconnectPeer( + (*peer_id).clone(), + GoodbyeReason::TooManyPeers, + )); + } + } } } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs index f7635a4d36..fa9def34f4 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs @@ -63,6 +63,11 @@ impl PeerInfo { } false } + + /// Reports if this peer has some future validator duty in which case it is valuable to keep it. + pub fn has_future_duty(&self) -> bool { + self.min_ttl.map_or(false, |i| i >= Instant::now()) + } } #[derive(Clone, Debug, Serialize)] diff --git a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs index f87ddbc89e..fe7fffadbb 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs @@ -3,6 +3,7 @@ use super::peer_sync_status::PeerSyncStatus; use super::score::Score; use crate::rpc::methods::MetaData; use crate::PeerId; +use rand::seq::SliceRandom; use slog::{crit, debug, trace, warn}; use std::collections::HashMap; use std::time::Instant; @@ -168,6 +169,20 @@ impl PeerDB { .map(|(peer_id, _)| peer_id) } + /// Returns a vector of all connected peers sorted by score beginning with the worst scores. + /// Ties get broken randomly. + pub fn worst_connected_peers(&self) -> Vec<(&PeerId, &PeerInfo)> { + let mut connected = self + .peers + .iter() + .filter(|(_, info)| info.connection_status.is_connected()) + .collect::>(); + + connected.shuffle(&mut rand::thread_rng()); + connected.sort_by_key(|(_, info)| info.score); + connected + } + /// Returns a vector containing peers (their ids and info), sorted by /// score from highest to lowest, and filtered using `is_status` pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2c52d66015..588ce25cdc 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -67,9 +67,9 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( - Arg::with_name("max-peers") - .long("max-peers") - .help("The maximum number of peers.") + Arg::with_name("target-peers") + .long("target-peers") + .help("The target number of peers.") .default_value("50") .takes_value(true), ) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 79ec6c6f87..7bbc81f23b 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -90,10 +90,10 @@ pub fn get_config( client_config.network.listen_address = listen_address; } - if let Some(max_peers_str) = cli_args.value_of("max-peers") { - client_config.network.max_peers = max_peers_str + if let Some(target_peers_str) = cli_args.value_of("target-peers") { + client_config.network.target_peers = target_peers_str .parse::() - .map_err(|_| format!("Invalid number of max peers: {}", max_peers_str))?; + .map_err(|_| format!("Invalid number of target peers: {}", target_peers_str))?; } if let Some(port_str) = cli_args.value_of("port") {