From 09b40b7a5e791e11893be598194fb69e00e9bc7c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 29 Jul 2020 02:43:50 +0000 Subject: [PATCH] Discover query grouping (#1364) ## Issue Addressed #1281 ## Proposed Changes Groups queries for specific subnets into groups of up to 3. ## Additional Info --- beacon_node/eth2_libp2p/Cargo.toml | 2 +- beacon_node/eth2_libp2p/src/discovery/mod.rs | 348 +++++++++++------- .../src/discovery/subnet_predicate.rs | 28 +- .../eth2_libp2p/src/peer_manager/mod.rs | 15 +- .../network/src/attestation_service/mod.rs | 2 +- 5 files changed, 256 insertions(+), 139 deletions(-) diff --git a/beacon_node/eth2_libp2p/Cargo.toml b/beacon_node/eth2_libp2p/Cargo.toml index 2b49c61400..5e10675ab1 100644 --- a/beacon_node/eth2_libp2p/Cargo.toml +++ b/beacon_node/eth2_libp2p/Cargo.toml @@ -43,7 +43,7 @@ rand = "0.7.3" git = "https://github.com/sigp/rust-libp2p" rev = "147bb43fa56c1b84253606eabedb0794eeed8b94" default-features = false -features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "secio", "tcp-tokio"] +features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "secio", "tcp-tokio"] [dev-dependencies] tokio = { version = "0.2.21", features = ["full"] } diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index 174ecfabd9..a1b50f48e1 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -19,13 +19,13 @@ use slog::{crit, debug, info, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, net::SocketAddr, path::Path, pin::Pin, sync::Arc, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use tokio::sync::mpsc; use types::{EnrForkId, EthSpec, SubnetId}; @@ -37,33 +37,50 @@ use subnet_predicate::subnet_predicate; pub const ENR_FILENAME: &str = "enr.dat"; /// Target number of peers we'd like to have connected to a given long-lived subnet. const TARGET_SUBNET_PEERS: usize = 3; -/// Number of times to attempt a discovery request +/// Target number of peers to search for given a grouped subnet query. +const TARGET_PEERS_FOR_GROUPED_QUERY: usize = 6; +/// Number of times to attempt a discovery request. const MAX_DISCOVERY_RETRY: usize = 3; /// The maximum number of concurrent discovery queries. -const MAX_CONCURRENT_QUERIES: usize = 1; +const MAX_CONCURRENT_QUERIES: usize = 2; +/// The max number of subnets to search for in a single subnet discovery query. +const MAX_SUBNETS_IN_QUERY: usize = 3; /// The number of closest peers to search for when doing a regular peer search. /// /// We could reduce this constant to speed up queries however at the cost of security. It will /// make it easier to peers to eclipse this node. Kademlia suggests a value of 16. const FIND_NODE_QUERY_CLOSEST_PEERS: usize = 16; +/// The threshold for updating `min_ttl` on a connected peer. +const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); /// The events emitted by polling discovery. pub enum DiscoveryEvent { - /// A query has completed. The first parameter is the `min_ttl` of the peers if it is specified - /// and the second parameter are the discovered peers. - QueryResult(Option, Vec), + /// A query has completed. This result contains a mapping of discovered peer IDs to the `min_ttl` + /// of the peer if it is specified. + QueryResult(HashMap>), /// This indicates that our local UDP socketaddr has been updated and we should inform libp2p. SocketUpdated(SocketAddr), } +#[derive(Debug, Clone, PartialEq)] +struct SubnetQuery { + subnet_id: SubnetId, + min_ttl: Option, + retries: usize, +} + #[derive(Debug, Clone, PartialEq)] enum QueryType { /// We are searching for subnet peers. - Subnet { - subnet_id: SubnetId, - min_ttl: Option, - retries: usize, - }, + Subnet(SubnetQuery), + /// We are searching for more peers without ENR or time constraints. + FindPeers, +} + +#[derive(Debug, Clone, PartialEq)] +enum GroupedQueryType { + /// We are searching for peers on one of a few subnets. + Subnet(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } @@ -73,30 +90,19 @@ impl QueryType { pub fn expired(&self) -> bool { match self { Self::FindPeers => false, - Self::Subnet { min_ttl, .. } => { - if let Some(ttl) = min_ttl { - ttl < &Instant::now() + Self::Subnet(subnet_query) => { + if let Some(ttl) = subnet_query.min_ttl { + ttl < Instant::now() } else { true } } } } - - /// Returns the min_ttl of the query if one exists - /// - /// This is required for returning to the peer manager. The peer manager will update newly - /// connected peers with this `min_ttl` - pub fn min_ttl(&self) -> Option { - match self { - Self::FindPeers => None, - Self::Subnet { min_ttl, .. } => *min_ttl, - } - } } /// The result of a query. -struct QueryResult(QueryType, Result, discv5::QueryError>); +struct QueryResult(GroupedQueryType, Result, discv5::QueryError>); // Awaiting the event stream future enum EventStream { @@ -381,18 +387,13 @@ impl Discovery { // already exists let mut found = false; for query in self.queued_queries.iter_mut() { - if let QueryType::Subnet { - subnet_id: ref mut q_subnet_id, - min_ttl: ref mut q_min_ttl, - retries: ref mut q_retries, - } = query - { - if *q_subnet_id == subnet_id { - if *q_min_ttl < min_ttl { - *q_min_ttl = min_ttl; + if let QueryType::Subnet(ref mut subnet_query) = query { + if subnet_query.subnet_id == subnet_id { + if subnet_query.min_ttl < min_ttl { + subnet_query.min_ttl = min_ttl; } // update the number of retries - *q_retries = retries; + subnet_query.retries = retries; // mimic an `Iter::Find()` and short-circuit the loop found = true; break; @@ -401,11 +402,11 @@ impl Discovery { } if !found { // Set up the query and add it to the queue - let query = QueryType::Subnet { + let query = QueryType::Subnet(SubnetQuery { subnet_id, min_ttl, retries, - }; + }); // update the metrics and insert into the queue. debug!(self.log, "Queuing subnet query"; "subnet" => *subnet_id, "retries" => retries); self.queued_queries.push_back(query); @@ -420,28 +421,46 @@ impl Discovery { // Sanitize the queue, removing any out-dated subnet queries self.queued_queries.retain(|query| !query.expired()); + // use this to group subnet queries together for a single discovery request + let mut subnet_queries: Vec = Vec::new(); + // Check that we are within our query concurrency limit while !self.at_capacity() && !self.queued_queries.is_empty() { // consume and process the query queue match self.queued_queries.pop_front() { Some(QueryType::FindPeers) => { - // Only permit one FindPeers query at a time - if self.find_peer_active { + // Only start a find peers query if it is the last message in the queue. + // We want to prioritize subnet queries, so we don't miss attestations. + if self.queued_queries.is_empty() { + // This is a regular request to find additional peers + debug!(self.log, "Discovery query started"); + self.find_peer_active = true; + self.start_query( + GroupedQueryType::FindPeers, + FIND_NODE_QUERY_CLOSEST_PEERS, + |_| true, + ); + } else { self.queued_queries.push_back(QueryType::FindPeers); - continue; } - // This is a regular request to find additional peers - debug!(self.log, "Discovery query started"); - self.find_peer_active = true; - self.start_query(QueryType::FindPeers, FIND_NODE_QUERY_CLOSEST_PEERS); } - Some(QueryType::Subnet { - subnet_id, - min_ttl, - retries, - }) => { - // This query is for searching for peers of a particular subnet - self.start_subnet_query(subnet_id, min_ttl, retries); + Some(QueryType::Subnet(subnet_query)) => { + subnet_queries.push(subnet_query); + + // We want to start a grouped subnet query if: + // 1. We've grouped MAX_SUBNETS_IN_QUERY subnets together. + // 2. There are no more messages in the queue. + // 3. There is exactly one message in the queue and it is FindPeers. + if subnet_queries.len() == MAX_SUBNETS_IN_QUERY + || self.queued_queries.is_empty() + || (self.queued_queries.front() == Some(&QueryType::FindPeers) + && self.queued_queries.len() == 1) + { + // This query is for searching for peers of a particular subnet + // Drain subnet_queries so we can re-use it as we continue to process the queue + let grouped_queries: Vec = subnet_queries.drain(..).collect(); + self.start_subnet_query(grouped_queries); + } } None => {} // Queue is empty } @@ -456,47 +475,57 @@ impl Discovery { self.active_queries.len() >= MAX_CONCURRENT_QUERIES } - /// Runs a discovery request for a given subnet_id if one already exists. - fn start_subnet_query( - &mut self, - subnet_id: SubnetId, - min_ttl: Option, - retries: usize, - ) { - // Determine if we have sufficient peers, which may make this discovery unnecessary. - let peers_on_subnet = self - .network_globals - .peers - .read() - .peers_on_subnet(subnet_id) - .count(); + /// Runs a discovery request for a given group of subnets. + fn start_subnet_query(&mut self, subnet_queries: Vec) { + let mut filtered_subnet_ids: Vec = Vec::new(); - if peers_on_subnet > TARGET_SUBNET_PEERS { - debug!(self.log, "Discovery ignored"; - "reason" => "Already connected to desired peers", - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, + // find subnet queries that are still necessary + let filtered_subnet_queries: Vec = subnet_queries + .into_iter() + .filter(|subnet_query| { + // Determine if we have sufficient peers, which may make this discovery unnecessary. + let peers_on_subnet = self + .network_globals + .peers + .read() + .peers_on_subnet(subnet_query.subnet_id) + .count(); + + if peers_on_subnet > TARGET_SUBNET_PEERS { + debug!(self.log, "Discovery ignored"; + "reason" => "Already connected to desired peers", + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + ); + return false; + } + + let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; + debug!(self.log, "Discovery query started for subnet"; + "subnet_id" => *subnet_query.subnet_id, + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + "peers_to_find" => target_peers, + "attempt" => subnet_query.retries, + "min_ttl" => format!("{:?}", subnet_query.min_ttl), + ); + + filtered_subnet_ids.push(subnet_query.subnet_id); + true + }) + .collect(); + + // Only start a discovery query if we have a subnet to look for. + if !filtered_subnet_queries.is_empty() { + // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate + let subnet_predicate = subnet_predicate::(filtered_subnet_ids, &self.log); + + self.start_query( + GroupedQueryType::Subnet(filtered_subnet_queries), + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, ); - return; } - - let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; - debug!(self.log, "Discovery query started for subnet"; - "subnet_id" => *subnet_id, - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - "peers_to_find" => target_peers, - "attempt" => retries, - "min_ttl" => format!("{:?}", min_ttl), - ); - - // start the query, and update the queries map if necessary - let query = QueryType::Subnet { - subnet_id, - min_ttl, - retries, - }; - self.start_query(query, target_peers); } /// Search for a specified number of new peers using the underlying discovery mechanism. @@ -504,7 +533,26 @@ impl Discovery { /// This can optionally search for peers for a given predicate. Regardless of the predicate /// given, this will only search for peers on the same enr_fork_id as specified in the local /// ENR. - fn start_query(&mut self, query: QueryType, target_peers: usize) { + fn start_query( + &mut self, + grouped_query: GroupedQueryType, + target_peers: usize, + additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, + ) { + // Make sure there are subnet queries included + let contains_queries = match &grouped_query { + GroupedQueryType::Subnet(queries) => !queries.is_empty(), + GroupedQueryType::FindPeers => true, + }; + + if !contains_queries { + debug!( + self.log, + "No subnets included in this request. Skipping discovery request." + ); + return; + } + // Generate a random target node id. let random_node = NodeId::random(); @@ -519,31 +567,24 @@ impl Discovery { let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); // General predicate - let predicate: Box bool + Send> = match &query { - QueryType::FindPeers => Box::new(eth2_fork_predicate), - QueryType::Subnet { subnet_id, .. } => { - // build the subnet predicate as a combination of the eth2_fork_predicate and the - // subnet predicate - let subnet_predicate = subnet_predicate::(*subnet_id, &self.log); - Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && subnet_predicate(enr)) - } - }; + let predicate: Box bool + Send> = + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); // Build the future let query_future = self .discv5 .find_node_predicate(random_node, predicate, target_peers) - .map(|v| QueryResult(query, v)); + .map(|v| QueryResult(grouped_query, v)); // Add the future to active queries, to be executed. self.active_queries.push(Box::pin(query_future)); } /// Drives the queries returning any results from completed queries. - fn poll_queries(&mut self, cx: &mut Context) -> Option<(Option, Vec)> { + fn poll_queries(&mut self, cx: &mut Context) -> Option>> { while let Poll::Ready(Some(query_future)) = self.active_queries.poll_next_unpin(cx) { match query_future.0 { - QueryType::FindPeers => { + GroupedQueryType::FindPeers => { self.find_peer_active = false; match query_future.1 { Ok(r) if r.is_empty() => { @@ -551,31 +592,96 @@ impl Discovery { } Ok(r) => { debug!(self.log, "Discovery query completed"; "peers_found" => r.len()); - return Some((None, r)); + let mut results: HashMap> = HashMap::new(); + r.iter().for_each(|enr| { + // cache the found ENR's + self.cached_enrs.put(enr.peer_id(), enr.clone()); + results.insert(enr.peer_id(), None); + }); + return Some(results); } Err(e) => { warn!(self.log, "Discovery query failed"; "error" => e.to_string()); } } } - QueryType::Subnet { - subnet_id, - min_ttl, - retries, - } => { + GroupedQueryType::Subnet(queries) => { + let subnets_searched_for: Vec = + queries.iter().map(|query| query.subnet_id).collect(); match query_future.1 { Ok(r) if r.is_empty() => { - debug!(self.log, "Subnet discovery query yielded no results."; "subnet_id" => *subnet_id, "retries" => retries); + debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => format!("{:?}",subnets_searched_for)); } Ok(r) => { - debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => r.len(), "subnet_id" => *subnet_id); - // A subnet query has completed. Add back to the queue, incrementing retries. - self.add_subnet_query(subnet_id, min_ttl, retries + 1); - // Report the results back to the peer manager. - return Some((query_future.0.min_ttl(), r)); + debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for)); + + let mut mapped_results: HashMap> = + HashMap::new(); + + // cache the found ENR's + for enr in r.iter().cloned() { + self.cached_enrs.put(enr.peer_id(), enr); + } + + // Map each subnet query's min_ttl to the set of ENR's returned for that subnet. + queries.iter().for_each(|query| { + // A subnet query has completed. Add back to the queue, incrementing retries. + self.add_subnet_query( + query.subnet_id, + query.min_ttl, + query.retries + 1, + ); + + // Check the specific subnet against the enr + let subnet_predicate = + subnet_predicate::(vec![query.subnet_id], &self.log); + + r.iter() + .filter(|enr| subnet_predicate(enr)) + .map(|enr| enr.peer_id()) + .for_each(|peer_id| { + let other_min_ttl = mapped_results.get_mut(&peer_id); + + // map peer IDs to the min_ttl furthest in the future + match (query.min_ttl, other_min_ttl) { + // update the mapping if the min_ttl is greater + ( + Some(min_ttl_instant), + Some(Some(other_min_ttl_instant)), + ) => { + if min_ttl_instant.saturating_duration_since( + *other_min_ttl_instant, + ) > DURATION_DIFFERENCE + { + *other_min_ttl_instant = min_ttl_instant; + } + } + // update the mapping if we have a specified min_ttl + (Some(min_ttl), Some(None)) => { + mapped_results.insert(peer_id, Some(min_ttl)); + } + // first seen min_ttl for this enr + (Some(min_ttl), None) => { + mapped_results.insert(peer_id, Some(min_ttl)); + } + // first seen min_ttl for this enr + (None, None) => { + mapped_results.insert(peer_id, None); + } + (None, Some(Some(_))) => {} // Don't replace the existing specific min_ttl + (None, Some(None)) => {} // No-op because this is a duplicate + } + }); + }); + + if mapped_results.is_empty() { + return None; + } else { + return Some(mapped_results); + } } Err(e) => { - warn!(self.log,"Subnet Discovery query failed"; "subnet_id" => *subnet_id, "error" => e.to_string()); + warn!(self.log,"Grouped subnet discovery query failed"; "subnets_searched_for" => format!("{:?}",subnets_searched_for), "error" => e.to_string()); } } } @@ -594,13 +700,9 @@ impl Discovery { self.process_queue(); // Drive the queries and return any results from completed queries - if let Some((min_ttl, result)) = self.poll_queries(cx) { - // cache the found ENR's - for enr in result.iter().cloned() { - self.cached_enrs.put(enr.peer_id(), enr); - } + if let Some(results) = self.poll_queries(cx) { // return the result to the peer manager - return Poll::Ready(DiscoveryEvent::QueryResult(min_ttl, result)); + return Poll::Ready(DiscoveryEvent::QueryResult(results)); } // Process the server event stream diff --git a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs index 2e74ff32ef..e38fd4cf89 100644 --- a/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs +++ b/beacon_node/eth2_libp2p/src/discovery/subnet_predicate.rs @@ -1,9 +1,10 @@ ///! The subnet predicate used for searching for a particular subnet. use super::*; +use std::ops::Deref; /// Returns the predicate for a given subnet. pub fn subnet_predicate( - subnet_id: SubnetId, + subnet_ids: Vec, log: &slog::Logger, ) -> impl Fn(&Enr) -> bool + Send where @@ -23,10 +24,27 @@ where } }; - return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| { - debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id())); - false - }); + let matches: Vec<&SubnetId> = subnet_ids + .iter() + .filter(|id| bitfield.get(**id.deref() as usize).unwrap_or(false)) + .collect(); + + if matches.is_empty() { + debug!( + log_clone, + "Peer found but not on any of the desired subnets"; + "peer_id" => format!("{}", enr.peer_id()) + ); + return false; + } else { + debug!( + log_clone, + "Peer found on desired subnet(s)"; + "peer_id" => format!("{}", enr.peer_id()), + "subnets" => format!("{:?}", matches.as_slice()) + ); + return true; + } } false } diff --git a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs index a57fa90b20..e27afbe431 100644 --- a/beacon_node/eth2_libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2_libp2p/src/peer_manager/mod.rs @@ -4,7 +4,7 @@ pub use self::peerdb::*; use crate::discovery::{Discovery, DiscoveryEvent}; use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode}; use crate::{error, metrics}; -use crate::{Enr, EnrExt, NetworkConfig, NetworkGlobals, PeerId}; +use crate::{EnrExt, NetworkConfig, NetworkGlobals, PeerId}; use futures::prelude::*; use futures::Stream; use hashset_delay::HashSetDelay; @@ -32,6 +32,7 @@ pub(crate) mod score; pub use peer_info::{PeerConnectionStatus::*, PeerInfo}; pub use peer_sync_status::{PeerSyncStatus, SyncInfo}; use score::{PeerAction, ScoreState}; +use std::collections::HashMap; /// The time in seconds between re-status's peers. const STATUS_INTERVAL: u64 = 300; /// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within @@ -39,7 +40,7 @@ const STATUS_INTERVAL: u64 = 300; const PING_INTERVAL: u64 = 30; /// The heartbeat performs regular updates such as updating reputations and performing discovery -/// requests. This defines the interval in seconds. +/// requests. This defines the interval in seconds. const HEARTBEAT_INTERVAL: u64 = 30; /// A fraction of `PeerManager::target_peers` that we allow to connect to us in excess of @@ -487,13 +488,11 @@ impl PeerManager { /// with a new `PeerId` which involves a discovery routing table lookup. We could dial the /// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup /// proves resource constraining, we should switch to multiaddr dialling here. - fn peers_discovered(&mut self, peers: &[Enr], min_ttl: Option) { + fn peers_discovered(&mut self, results: HashMap>) { let mut to_dial_peers = Vec::new(); let connected_or_dialing = self.network_globals.connected_or_dialing_peers(); - for enr in peers { - let peer_id = enr.peer_id(); - + for (peer_id, min_ttl) in results { // we attempt a connection if this peer is a subnet peer or if the max peer count // is not yet filled (including dialling peers) if (min_ttl.is_some() || connected_or_dialing + to_dial_peers.len() < self.max_peers) @@ -726,9 +725,7 @@ impl Stream for PeerManager { while let Poll::Ready(event) = self.discovery.poll(cx) { match event { DiscoveryEvent::SocketUpdated(socket_addr) => self.socket_updated(socket_addr), - DiscoveryEvent::QueryResult(min_ttl, peers) => { - self.peers_discovered(&peers, min_ttl) - } + DiscoveryEvent::QueryResult(results) => self.peers_discovered(results), } } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 5673f73fd5..8ba791c169 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -41,7 +41,7 @@ const ADVANCE_SUBSCRIBE_TIME: u32 = 3; /// The default number of slots before items in hash delay sets used by this class should expire. const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; // 36s at 12s slot time -/// The default number of slots before items in hash delay sets used by this class should expire. +/// The duration difference between two instance for them to be considered equal. const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); #[derive(Debug, Eq, Clone)]