From 0525876882e9474443b145812a416d4f47bee8b5 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Fri, 11 Sep 2020 00:52:27 +0000 Subject: [PATCH] Dial cached enr's before making subnet discovery query (#1376) ## Issue Addressed Closes #1365 ## Proposed Changes Dial peers in the `cached_enrs` who aren't connected, aren't banned and satisfy the subnet predicate before making a subnet discovery query. --- beacon_node/eth2_libp2p/src/discovery/mod.rs | 11 ++- .../eth2_libp2p/src/peer_manager/mod.rs | 73 ++++++++++++++++--- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 8ca6e7fe5f..57b0b8abf2 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -31,12 +31,12 @@ use tokio::sync::mpsc; use types::{EnrForkId, EthSpec, SubnetId}; mod subnet_predicate; -use subnet_predicate::subnet_predicate; +pub use subnet_predicate::subnet_predicate; /// Local ENR storage filename. pub const ENR_FILENAME: &str = "enr.dat"; /// Target number of peers we'd like to have connected to a given long-lived subnet. -const TARGET_SUBNET_PEERS: usize = 3; +pub const TARGET_SUBNET_PEERS: usize = 3; /// Target number of peers to search for given a grouped subnet query. const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6; /// Number of times to attempt a discovery request. @@ -287,6 +287,11 @@ impl Discovery { self.discv5.local_enr() } + /// Return the cached enrs. + pub fn cached_enrs(&self) -> impl Iterator { + self.cached_enrs.iter() + } + /// This adds a new `FindPeers` query to the queue if one doesn't already exist. pub fn discover_peers(&mut self) { // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. @@ -558,7 +563,7 @@ impl Discovery { .peers_on_subnet(subnet_query.subnet_id) .count(); - if peers_on_subnet > TARGET_SUBNET_PEERS { + if peers_on_subnet >= TARGET_SUBNET_PEERS { debug!(self.log, "Discovery ignored"; "reason" => "Already connected to desired peers", "connected_peers_on_subnet" => peers_on_subnet, diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index 2d40f8a0d4..211db22633 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -1,7 +1,7 @@ //! Implementation of a Lighthouse's peer management system. pub use self::peerdb::*; -use crate::discovery::{Discovery, DiscoveryEvent}; +use crate::discovery::{subnet_predicate, Discovery, DiscoveryEvent, TARGET_SUBNET_PEERS}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{error, metrics}; use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId, SubnetDiscovery}; @@ -19,7 +19,7 @@ use std::{ task::{Context, Poll}, time::{Duration, Instant}, }; -use types::EthSpec; +use types::{EthSpec, SubnetId}; pub use libp2p::core::{identity::Keypair, Multiaddr}; @@ -214,18 +214,45 @@ impl PeerManager { /// A request to find peers on a given subnet. pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { - // Extend the time to maintain peers if required. - for s in subnets_to_discover.iter() { - if let Some(min_ttl) = s.min_ttl { - self.network_globals + let filtered: Vec = subnets_to_discover + .into_iter() + .filter(|s| { + // Extend min_ttl of connected peers on required subnets + if let Some(min_ttl) = s.min_ttl { + self.network_globals + .peers + .write() + .extend_peers_on_subnet(s.subnet_id, min_ttl); + } + // Already have target number of peers, no need for subnet discovery + let peers_on_subnet = self + .network_globals .peers - .write() - .extend_peers_on_subnet(s.subnet_id, min_ttl); - } - } + .read() + .peers_on_subnet(s.subnet_id) + .count(); + if peers_on_subnet >= TARGET_SUBNET_PEERS { + debug!( + self.log, + "Discovery query ignored"; + "subnet_id" => format!("{:?}",s.subnet_id), + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); + false + // Queue an outgoing connection request to the cached peers that are on `s.subnet_id`. + // If we connect to the cached peers before the discovery query starts, then we potentially + // save a costly discovery query. + } else { + self.dial_cached_enrs_in_subnet(s.subnet_id); + true + } + }) + .collect(); // request the subnet query from discovery - self.discovery.discover_subnet_peers(subnets_to_discover); + self.discovery.discover_subnet_peers(filtered); } /// A STATUS message has been received from a peer. This resets the status timer. @@ -531,6 +558,30 @@ impl PeerManager { self.events.push(PeerManagerEvent::SocketUpdated(multiaddr)); } + /// Dial cached enrs in discovery service that are in the given `subnet_id` and aren't + /// in Connected, Dialing or Banned state. + fn dial_cached_enrs_in_subnet(&mut self, subnet_id: SubnetId) { + let predicate = subnet_predicate::(vec![subnet_id], &self.log); + let peers_to_dial: Vec = self + .discovery() + .cached_enrs() + .filter_map(|(peer_id, enr)| { + let peers = self.network_globals.peers.read(); + if predicate(enr) + && !peers.is_connected_or_dialing(peer_id) + && !peers.is_banned(peer_id) + { + Some(peer_id.clone()) + } else { + None + } + }) + .collect(); + for peer in &peers_to_dial { + self.dial_peer(peer); + } + } + /// Peers that have been returned by discovery requests are dialed here if they are suitable. /// /// NOTE: By dialing `PeerId`s and not multiaddrs, libp2p requests the multiaddr associated