diff --git a/Cargo.lock b/Cargo.lock index 3c3fa5840d..621199f17b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -178,6 +178,12 @@ dependencies = [ "syn", ] +[[package]] +name = "assert_approx_eq" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c07dab4369547dbe5114677b33fbbf724971019f3818172d59a97a61c774ffd" + [[package]] name = "assert_matches" version = "1.3.0" @@ -2936,6 +2942,7 @@ dependencies = [ name = "network" version = "0.1.2" dependencies = [ + "assert_approx_eq", "beacon_chain", "environment", "error-chain", diff --git a/beacon_node/eth2-libp2p/src/behaviour/mod.rs b/beacon_node/eth2-libp2p/src/behaviour/mod.rs index 0e025d573c..aadc9cbc28 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/mod.rs @@ -25,6 +25,7 @@ use std::{ marker::PhantomData, sync::Arc, task::{Context, Poll}, + time::Instant, }; use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; @@ -459,9 +460,10 @@ impl Behaviour { self.update_metadata(); } - /// A request to search for peers connected to a long-lived subnet. - pub fn peers_request(&mut self, subnet_id: SubnetId) { - self.discovery.peers_request(subnet_id); + /// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we + /// would like to retain the peers for. + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + self.discovery.discover_subnet_peers(subnet_id, min_ttl) } /// Updates the local ENR's "eth2" field with the latest EnrForkId. diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 7349fe5ba7..136b00a5d3 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -8,7 +8,7 @@ pub use enr_ext::{CombinedKeyExt, EnrExt}; use crate::metrics; use crate::{error, Enr, NetworkConfig, NetworkGlobals}; -use discv5::{enr::NodeId, Discv5, Discv5Event}; +use discv5::{enr::NodeId, Discv5, Discv5Event, QueryId}; use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId}; @@ -18,20 +18,24 @@ use libp2p::swarm::{ NetworkBehaviourAction, PollParameters, ProtocolsHandler, }; use lru::LruCache; -use slog::{crit, debug, info, warn}; +use slog::{crit, debug, info, trace, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, net::SocketAddr, path::Path, sync::Arc, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; -use tokio::time::{delay_until, Delay, Instant}; +use tokio::time::{delay_until, Delay}; use types::{EnrForkId, EthSpec, SubnetId}; +mod subnet_predicate; + +use subnet_predicate::subnet_predicate; + /// Maximum seconds before searching for extra peers. const MAX_TIME_BETWEEN_PEER_SEARCHES: u64 = 120; /// Initial delay between peer searches. @@ -41,7 +45,18 @@ const MINIMUM_PEERS_BEFORE_DELAY_INCREASE: usize = 5; /// Local ENR storage filename. pub const ENR_FILENAME: &str = "enr.dat"; /// Number of peers we'd like to have connected to a given long-lived subnet. -const TARGET_SUBNET_PEERS: u64 = 3; +const TARGET_SUBNET_PEERS: usize = 3; +/// Number of times to attempt a discovery request +const MAX_DISCOVERY_RETRY: u64 = 3; + +/// A struct representing the information associated with a single discovery request, +/// which can be retried with multiple queries +#[derive(Clone, Debug)] +pub struct Request { + pub query_id: Option, + pub min_ttl: Option, + pub retries: u64, +} /// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 /// libp2p protocol. @@ -79,6 +94,9 @@ pub struct Discovery { /// A collection of network constants that can be read from other threads. network_globals: Arc>, + /// A mapping of SubnetId that we are currently searching for to all information associated with each request. + subnet_queries: HashMap, + /// Logger for the discovery behaviour. log: slog::Logger, } @@ -139,11 +157,12 @@ impl Discovery { cached_enrs: LruCache::new(50), banned_peers: HashSet::new(), max_peers: config.max_peers, - peer_discovery_delay: delay_until(Instant::now()), + peer_discovery_delay: delay_until(tokio::time::Instant::now()), past_discovery_delay: INITIAL_SEARCH_DELAY, tcp_port: config.libp2p_port, discovery, network_globals, + subnet_queries: HashMap::new(), log, enr_dir, }) @@ -280,57 +299,93 @@ impl Discovery { } /// A request to find peers on a given subnet. - // TODO: This logic should be improved with added sophistication in peer management - // This currently checks for currently connected peers and if we don't have - // PEERS_WANTED_BEFORE_DISCOVERY connected to a given subnet we search for more. - pub fn peers_request(&mut self, subnet_id: SubnetId) { + pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { + // TODO: Extend this to an event once discovery becomes a thread managed by the peer + // manager + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .extend_peers_on_subnet(subnet_id, min_ttl); + } + + // If there is already a discovery request in process for this subnet, ignore this request, + // but update the min_ttl. + if let Some(request) = self.subnet_queries.get_mut(&subnet_id) { + // update the min_ttl if required + if let Some(min_ttl) = min_ttl { + if request.min_ttl < Some(min_ttl) { + request.min_ttl = Some(min_ttl); + } + } + return; + } + + // Insert a request and start a query for the subnet + self.subnet_queries.insert( + subnet_id.clone(), + Request { + query_id: None, + min_ttl, + retries: 0, + }, + ); + self.run_subnet_query(subnet_id); + } + + /// Runs a discovery request for a given subnet_id if one already exists. + fn run_subnet_query(&mut self, subnet_id: SubnetId) { + let mut request = match self.subnet_queries.remove(&subnet_id) { + Some(v) => v, + None => return, // request doesn't exist + }; + + // increment the retry count + request.retries += 1; + let peers_on_subnet = self .network_globals .peers .read() .peers_on_subnet(subnet_id) - .count() as u64; + .count(); - if peers_on_subnet < TARGET_SUBNET_PEERS { - let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; - debug!(self.log, "Searching for peers for subnet"; - "subnet_id" => *subnet_id, - "connected_peers_on_subnet" => peers_on_subnet, - "target_subnet_peers" => TARGET_SUBNET_PEERS, - "peers_to_find" => target_peers - ); - - let log_clone = self.log.clone(); - - let subnet_predicate = move |enr: &Enr| { - if let Some(bitfield_bytes) = enr.get(BITFIELD_ENR_KEY) { - let bitfield = match BitVector::::from_ssz_bytes( - bitfield_bytes, - ) { - Ok(v) => v, - Err(e) => { - warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e)); - return false; - } - }; - - return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| { - debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id())); - false - }); - } - false - }; - - // start the query - self.start_query(subnet_predicate, target_peers as usize); - } else { - debug!(self.log, "Discovery ignored"; + if peers_on_subnet > TARGET_SUBNET_PEERS { + trace!(self.log, "Discovery ignored"; "reason" => "Already connected to desired peers", "connected_peers_on_subnet" => peers_on_subnet, "target_subnet_peers" => TARGET_SUBNET_PEERS, ); + return; } + + // remove the entry and complete the query if greater than the maximum search count + if request.retries >= MAX_DISCOVERY_RETRY { + debug!( + self.log, + "Subnet peer discovery did not find sufficient peers. Reached max retry limit" + ); + return; + } + + let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; + debug!(self.log, "Searching for peers for subnet"; + "subnet_id" => *subnet_id, + "connected_peers_on_subnet" => peers_on_subnet, + "target_subnet_peers" => TARGET_SUBNET_PEERS, + "peers_to_find" => target_peers, + "attempt" => request.retries, + ); + + // start the query, and update the queries map if necessary + let subnet_predicate = subnet_predicate::(subnet_id, &self.log); + if let Some(query_id) = self.start_query(subnet_predicate, target_peers) { + request.query_id = Some(query_id); + } else { + // ENR is not present remove the query + return; + } + self.subnet_queries.insert(subnet_id, request); } /* Internal Functions */ @@ -348,7 +403,7 @@ impl Discovery { /// This can optionally search for peers for a given predicate. Regardless of the predicate /// given, this will only search for peers on the same enr_fork_id as specified in the local /// ENR. - fn start_query(&mut self, enr_predicate: F, num_nodes: usize) + fn start_query(&mut self, enr_predicate: F, num_nodes: usize) -> Option where F: Fn(&Enr) -> bool + Send + 'static + Clone, { @@ -359,18 +414,54 @@ impl Discovery { Ok(v) => v, Err(e) => { crit!(self.log, "Local ENR has no fork id"; "error" => e); - return; + return None; } }; // predicate for finding nodes with a matching fork - let eth2_fork_predicate = move |enr: &Enr| { - enr.eth2().map(|enr| enr.fork_digest) == Ok(enr_fork_id.fork_digest.clone()) - }; + let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr); // general predicate - self.discovery - .find_enr_predicate(random_node, predicate, num_nodes); + Some( + self.discovery + .find_enr_predicate(random_node, predicate, num_nodes), + ) + } + + /// Peers that are found during discovery are optionally dialed. + // TODO: Shift to peer manager. As its own service, discovery should spit out discovered nodes + // and the peer manager should decide about who to connect to. + fn dial_discovered_peers(&mut self, peers: Vec, min_ttl: Option) { + for enr in peers { + // cache known peers + let peer_id = enr.peer_id(); + self.cached_enrs.put(enr.peer_id(), enr); + + // if we need more peers, attempt a connection + if self.network_globals.connected_or_dialing_peers() < self.max_peers + && !self + .network_globals + .peers + .read() + .is_connected_or_dialing(&peer_id) + && !self.banned_peers.contains(&peer_id) + { + debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string()); + // TODO: Update output + // This should be updated with the peer dialing. In fact created once the peer is + // dialed + if let Some(min_ttl) = min_ttl { + self.network_globals + .peers + .write() + .update_min_ttl(&peer_id, min_ttl); + } + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id, + condition: DialPeerCondition::Disconnected, + }); + } + } } } @@ -440,7 +531,8 @@ impl NetworkBehaviour for Discovery { } // Set to maximum, and update to earlier, once we get our results back. self.peer_discovery_delay.reset( - Instant::now() + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), + tokio::time::Instant::now() + + Duration::from_secs(MAX_TIME_BETWEEN_PEER_SEARCHES), ); } Poll::Pending => break, @@ -477,7 +569,11 @@ impl NetworkBehaviour for Discovery { address, }); } - Discv5Event::FindNodeResult { closer_peers, .. } => { + Discv5Event::FindNodeResult { + closer_peers, + query_id, + .. + } => { debug!(self.log, "Discovery query completed"; "peers_found" => closer_peers.len()); // update the time to the next query if self.past_discovery_delay < MAX_TIME_BETWEEN_PEER_SEARCHES @@ -486,40 +582,30 @@ impl NetworkBehaviour for Discovery { { self.past_discovery_delay *= 2; } - let delay = std::cmp::min( + let delay = std::cmp::max( self.past_discovery_delay, MAX_TIME_BETWEEN_PEER_SEARCHES, ); self.peer_discovery_delay - .reset(Instant::now() + Duration::from_secs(delay)); + .reset(tokio::time::Instant::now() + Duration::from_secs(delay)); - for enr in closer_peers { - // cache known peers - let peer_id = enr.peer_id(); - self.cached_enrs.put(enr.peer_id(), enr); - - // if we need more peers, attempt a connection - if self.network_globals.connected_or_dialing_peers() - < self.max_peers - && !self - .network_globals - .peers - .read() - .is_connected_or_dialing(&peer_id) - && !self.banned_peers.contains(&peer_id) - { - // TODO: Debugging only - // NOTE: The peer manager will get updated by the global swarm. - let connection_status = self - .network_globals - .peers - .read() - .connection_status(&peer_id); - debug!(self.log, "Connecting to discovered peer"; "peer_id"=> peer_id.to_string(), "status" => format!("{:?}", connection_status)); - self.events.push_back(NetworkBehaviourAction::DialPeer { - peer_id, - condition: DialPeerCondition::Disconnected, - }); + // if this is a subnet query, run it to completion + if let Some((subnet_id, min_ttl)) = self + .subnet_queries + .iter() + .find(|(_, request)| request.query_id == Some(query_id)) + .map(|(subnet_id, request)| { + (subnet_id.clone(), request.min_ttl.clone()) + }) + { + debug!(self.log, "Peer subnet discovery request completed"; "peers_found" => closer_peers.len(), "subnet_id" => *subnet_id); + self.dial_discovered_peers(closer_peers, min_ttl); + self.run_subnet_query(subnet_id); + } else { + if closer_peers.is_empty() { + debug!(self.log, "Peer Discovery request yielded no results."); + } else { + self.dial_discovered_peers(closer_peers, None); } } } diff --git a/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs b/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs new file mode 100644 index 0000000000..89451d7f6b --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery/subnet_predicate.rs @@ -0,0 +1,33 @@ +///! The subnet predicate used for searching for a particular subnet. +use super::*; + +/// Returns the predicate for a given subnet. +pub fn subnet_predicate( + subnet_id: SubnetId, + log: &slog::Logger, +) -> impl Fn(&Enr) -> bool + Send + 'static + Clone +where + TSpec: EthSpec, +{ + let log_clone = log.clone(); + + move |enr: &Enr| { + if let Some(bitfield_bytes) = enr.get(BITFIELD_ENR_KEY) { + let bitfield = match BitVector::::from_ssz_bytes( + bitfield_bytes, + ) { + Ok(v) => v, + Err(e) => { + warn!(log_clone, "Could not decode ENR bitfield for peer"; "peer_id" => format!("{}", enr.peer_id()), "error" => format!("{:?}", e)); + return false; + } + }; + + return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| { + debug!(log_clone, "Peer found but not on desired subnet"; "peer_id" => format!("{}", enr.peer_id())); + false + }); + } + false + } +} diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs index 4c97b2c081..825162662d 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs @@ -31,6 +31,10 @@ pub struct PeerInfo { /// The ENR subnet bitfield of the peer. This may be determined after it's initial /// connection. pub meta_data: Option>, + /// The time we would like to retain this peer. After this time, the peer is no longer + /// necessary. + #[serde(skip)] + pub min_ttl: Option, } impl Default for PeerInfo { @@ -43,6 +47,7 @@ impl Default for PeerInfo { listening_addresses: vec![], sync_status: PeerSyncStatus::Unknown, meta_data: None, + min_ttl: None, } } } diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs index 69b7b12152..38a1fa39b6 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -2,7 +2,7 @@ use super::peer_info::{PeerConnectionStatus, PeerInfo}; use super::peer_sync_status::PeerSyncStatus; use crate::rpc::methods::MetaData; use crate::PeerId; -use slog::{crit, debug, warn}; +use slog::{crit, debug, trace, warn}; use std::collections::{hash_map::Entry, HashMap}; use std::time::Instant; use types::{EthSpec, SubnetId}; @@ -236,6 +236,42 @@ impl PeerDB { debug!(self.log, "Peer dialing in db"; "peer_id" => peer_id.to_string(), "n_dc" => self.n_dc); } + /// Update min ttl of a peer. + pub fn update_min_ttl(&mut self, peer_id: &PeerId, min_ttl: Instant) { + let info = self.peers.entry(peer_id.clone()).or_default(); + + // only update if the ttl is longer + if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { + info.min_ttl = Some(min_ttl); + + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + debug!(self.log, "Updating the time a peer is required for"; "peer_id" => peer_id.to_string(), "future_min_ttl_secs" => min_ttl_secs); + } + } + + /// Extends the ttl of all peers on the given subnet that have a shorter + /// min_ttl than what's given. + pub fn extend_peers_on_subnet(&mut self, subnet_id: SubnetId, min_ttl: Instant) { + let log = &self.log; + self.peers.iter_mut() + .filter(move |(_, info)| { + info.connection_status.is_connected() && info.on_subnet(subnet_id) + }) + .for_each(|(peer_id,info)| { + if info.min_ttl.is_none() || Some(min_ttl) > info.min_ttl { + info.min_ttl = Some(min_ttl); + } + let min_ttl_secs = min_ttl + .checked_duration_since(Instant::now()) + .map(|duration| duration.as_secs()) + .unwrap_or_else(|| 0); + trace!(log, "Updating minimum duration a peer is required for"; "peer_id" => peer_id.to_string(), "min_ttl" => min_ttl_secs); + }); + } + /// Sets a peer as connected with an ingoing connection. pub fn connect_ingoing(&mut self, peer_id: &PeerId) { let info = self.peers.entry(peer_id.clone()).or_default(); diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index e3963efcb1..e9181e88b2 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -11,6 +11,7 @@ lazy_static = "1.4.0" matches = "0.1.8" tempfile = "3.1.0" exit-future = "0.2.0" +assert_approx_eq = "1.1.0" [dependencies] beacon_chain = { path = "../beacon_chain" } @@ -35,4 +36,4 @@ fnv = "1.0.6" rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } -environment = { path = "../../lighthouse/environment" } \ No newline at end of file +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/network/src/attestation_service/mod.rs b/beacon_node/network/src/attestation_service/mod.rs index 90f67629f3..2fb8facea6 100644 --- a/beacon_node/network/src/attestation_service/mod.rs +++ b/beacon_node/network/src/attestation_service/mod.rs @@ -28,15 +28,19 @@ const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 1; const TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 6; /// The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random /// gossip topics that we subscribed to due to the validator connection. -const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; // 30 mins at a 12s slot time +const LAST_SEEN_VALIDATOR_TIMEOUT: u32 = 150; +// 30 mins at a 12s slot time /// The fraction of a slot that we subscribe to a subnet before the required slot. /// /// Note: The time is calculated as `time = milliseconds_per_slot / ADVANCE_SUBSCRIPTION_TIME`. const ADVANCE_SUBSCRIBE_TIME: u32 = 3; /// The default number of slots before items in hash delay sets used by this class should expire. -const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; // 36s at 12s slot time +const DEFAULT_EXPIRATION_TIMEOUT: u32 = 3; +// 36s at 12s slot time +/// The default number of slots before items in hash delay sets used by this class should expire. +const DURATION_DIFFERENCE: Duration = Duration::from_millis(1); -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Eq, Clone)] pub enum AttServiceMessage { /// Subscribe to the specified subnet id. Subscribe(SubnetId), @@ -47,12 +51,45 @@ pub enum AttServiceMessage { /// Remove the `SubnetId` from the ENR bitfield. EnrRemove(SubnetId), /// Discover peers for a particular subnet. - DiscoverPeers(SubnetId), + /// The includes the `Instant` we need the discovered peer until. + DiscoverPeers { + subnet_id: SubnetId, + min_ttl: Option, + }, +} + +impl PartialEq for AttServiceMessage { + fn eq(&self, other: &AttServiceMessage) -> bool { + match (self, other) { + (&AttServiceMessage::Subscribe(a), &AttServiceMessage::Subscribe(b)) => a == b, + (&AttServiceMessage::Unsubscribe(a), &AttServiceMessage::Unsubscribe(b)) => a == b, + (&AttServiceMessage::EnrAdd(a), &AttServiceMessage::EnrAdd(b)) => a == b, + (&AttServiceMessage::EnrRemove(a), &AttServiceMessage::EnrRemove(b)) => a == b, + ( + &AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }, + &AttServiceMessage::DiscoverPeers { + subnet_id: other_subnet_id, + min_ttl: other_min_ttl, + }, + ) => match (min_ttl, other_min_ttl) { + (Some(min_ttl_instant), Some(other_min_ttl_instant)) => { + min_ttl_instant.saturating_duration_since(other_min_ttl_instant) + < DURATION_DIFFERENCE + && other_min_ttl_instant.saturating_duration_since(min_ttl_instant) + < DURATION_DIFFERENCE + && subnet_id == other_subnet_id + } + (None, None) => subnet_id == other_subnet_id, + _ => false, + }, + _ => false, + } + } } /// A particular subnet at a given slot. -#[derive(PartialEq, Eq, Hash, Clone)] -struct ExactSubnet { +#[derive(PartialEq, Eq, Hash, Clone, Debug)] +pub struct ExactSubnet { /// The `SubnetId` associated with this subnet. pub subnet_id: SubnetId, /// The `Slot` associated with this subnet. @@ -244,24 +281,18 @@ impl AttestationService { return Ok(()); } - // check current event log to see if there is a discovery event queued - if self - .events - .iter() - .find(|event| event == &&AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)) - .is_some() - { - // already queued a discovery event - return Ok(()); - } - // if the slot is more than epoch away, add an event to start looking for peers if exact_subnet.slot < current_slot.saturating_add(TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD) { - // then instantly add a discovery request - self.events - .push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)); + // add one slot to ensure we keep the peer for the subscription slot + let min_ttl = self + .beacon_chain + .slot_clock + .duration_to_slot(exact_subnet.slot + 1) + .map(|duration| std::time::Instant::now() + duration); + + self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl); } else { // Queue the discovery event to be executed for // TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD @@ -296,6 +327,52 @@ impl AttestationService { Ok(()) } + /// Checks if we have a discover peers event already and sends a new event if necessary + /// + /// If a message exists for the same subnet, compare the `min_ttl` of the current and + /// existing messages and extend the existing message as necessary. + fn send_or_update_discovery_event(&mut self, subnet_id: SubnetId, min_ttl: Option) { + // track whether this message already exists in the event queue + let mut is_duplicate = false; + + self.events.iter_mut().for_each(|event| { + match event { + AttServiceMessage::DiscoverPeers { + subnet_id: other_subnet_id, + min_ttl: other_min_ttl, + } => { + if subnet_id == *other_subnet_id { + let other_min_ttl_clone = other_min_ttl.clone(); + match (min_ttl, other_min_ttl_clone) { + (Some(min_ttl_instant), Some(other_min_ttl_instant)) => + // only update the min_ttl if it is greater than the existing min_ttl and a DURATION_DIFFERENCE padding + { + if min_ttl_instant.saturating_duration_since(other_min_ttl_instant) + > DURATION_DIFFERENCE + { + *other_min_ttl = min_ttl; + } + } + (None, Some(_)) => { + // Update the min_ttl to None, because the new message is longer-lived. + *other_min_ttl = None; + } + (Some(_), None) => {} // Don't replace this because the existing message is for a longer-lived peer. + (None, None) => {} // Duplicate message, do nothing. + } + is_duplicate = true; + return; + } + } + _ => {} + }; + }); + if !is_duplicate { + self.events + .push_back(AttServiceMessage::DiscoverPeers { subnet_id, min_ttl }); + } + } + /// Checks the current random subnets and subscriptions to determine if a new subscription for this /// subnet is required for the given slot. /// @@ -436,18 +513,17 @@ impl AttestationService { // if we are not already subscribed, then subscribe let topic_kind = &GossipKind::CommitteeIndex(subnet_id); - if let None = self + let already_subscribed = self .network_globals .gossipsub_subscriptions .read() .iter() .find(|topic| topic.kind() == topic_kind) - { - // not already subscribed to the topic + .is_some(); + if !already_subscribed { // send a discovery request and a subscription - self.events - .push_back(AttServiceMessage::DiscoverPeers(subnet_id)); + self.send_or_update_discovery_event(subnet_id, None); self.events .push_back(AttServiceMessage::Subscribe(subnet_id)); } @@ -461,8 +537,15 @@ impl AttestationService { /// Request a discovery query to find peers for a particular subnet. fn handle_discover_peers(&mut self, exact_subnet: ExactSubnet) { debug!(self.log, "Searching for peers for subnet"; "subnet" => *exact_subnet.subnet_id, "target_slot" => exact_subnet.slot); - self.events - .push_back(AttServiceMessage::DiscoverPeers(exact_subnet.subnet_id)); + + // add one slot to ensure we keep the peer for the subscription slot + let min_ttl = self + .beacon_chain + .slot_clock + .duration_to_slot(exact_subnet.slot + 1) + .map(|duration| std::time::Instant::now() + duration); + + self.send_or_update_discovery_event(exact_subnet.subnet_id, min_ttl) } /// A queued subscription is ready. @@ -619,7 +702,7 @@ impl Stream for AttestationService { match self.discover_peers.poll_next_unpin(cx) { Poll::Ready(Some(Ok(exact_subnet))) => self.handle_discover_peers(exact_subnet), Poll::Ready(Some(Err(e))) => { - error!(self.log, "Failed to check for peer discovery requests"; "error"=> format!("{}", e)); + error!(self.log, "Failed to check for peer discovery requests"; "error"=> format ! ("{}", e)); } Poll::Ready(None) | Poll::Pending => {} } diff --git a/beacon_node/network/src/attestation_service/tests.rs b/beacon_node/network/src/attestation_service/tests/mod.rs similarity index 87% rename from beacon_node/network/src/attestation_service/tests.rs rename to beacon_node/network/src/attestation_service/tests/mod.rs index c0d32248f2..568a551c91 100644 --- a/beacon_node/network/src/attestation_service/tests.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -16,10 +16,9 @@ mod tests { use slog::Logger; use sloggers::{null::NullLoggerBuilder, Build}; use slot_clock::{SlotClock, SystemTimeSlotClock}; - use std::time::SystemTime; + use std::time::{Duration, SystemTime}; use store::MemoryStore; use tempfile::tempdir; - use tokio::time::Duration; use types::{CommitteeIndex, EnrForkId, EthSpec, MinimalEthSpec}; const SLOT_DURATION_MILLIS: u64 = 200; @@ -192,7 +191,10 @@ mod tests { assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any2), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3) ] @@ -240,7 +242,10 @@ mod tests { assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any2), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any1), AttServiceMessage::EnrAdd(_any3) ] @@ -278,16 +283,28 @@ mod tests { .validator_subscriptions(subscriptions) .unwrap(); + let min_ttl = Instant::now().checked_add( + attestation_service + .beacon_chain + .slot_clock + .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) + .unwrap(), + ); + // just discover peers, don't subscribe yet - let expected = vec![AttServiceMessage::DiscoverPeers(SubnetId::new( - validator_index, - ))]; + let expected = vec![AttServiceMessage::DiscoverPeers { + subnet_id: SubnetId::new(validator_index), + min_ttl, + }]; let events = get_events(attestation_service, no_events_expected, 1).await; assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any1), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3) ] @@ -325,9 +342,20 @@ mod tests { .validator_subscriptions(subscriptions) .unwrap(); + let min_ttl = Instant::now().checked_add( + attestation_service + .beacon_chain + .slot_clock + .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) + .unwrap(), + ); + // we should discover peers, wait, then subscribe let expected = vec![ - AttServiceMessage::DiscoverPeers(SubnetId::new(validator_index)), + AttServiceMessage::DiscoverPeers { + subnet_id: SubnetId::new(validator_index), + min_ttl, + }, AttServiceMessage::Subscribe(SubnetId::new(validator_index)), ]; @@ -335,7 +363,10 @@ mod tests { assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any1), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3) ] @@ -381,7 +412,10 @@ mod tests { assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any1), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3) ] @@ -419,17 +453,29 @@ mod tests { .validator_subscriptions(subscriptions) .unwrap(); + let min_ttl = Instant::now().checked_add( + attestation_service + .beacon_chain + .slot_clock + .duration_to_slot(current_slot + Slot::new(subscription_slot) + Slot::new(1)) + .unwrap(), + ); + // expect discover peers because we will enter TARGET_PEER_DISCOVERY_SLOT_LOOK_AHEAD range - let expected: Vec = vec![AttServiceMessage::DiscoverPeers( - SubnetId::new(validator_index), - )]; + let expected: Vec = vec![AttServiceMessage::DiscoverPeers { + subnet_id: SubnetId::new(validator_index), + min_ttl, + }]; let events = get_events(attestation_service, no_events_expected, 5).await; assert_matches!( events[..3], [ - AttServiceMessage::DiscoverPeers(_any1), + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant + }, AttServiceMessage::Subscribe(_any2), AttServiceMessage::EnrAdd(_any3) ] @@ -470,9 +516,10 @@ mod tests { for event in events { match event { - AttServiceMessage::DiscoverPeers(_any_subnet) => { - discover_peer_count = discover_peer_count + 1 - } + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant, + } => discover_peer_count = discover_peer_count + 1, AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, @@ -517,9 +564,10 @@ mod tests { for event in events { match event { - AttServiceMessage::DiscoverPeers(_any_subnet) => { - discover_peer_count = discover_peer_count + 1 - } + AttServiceMessage::DiscoverPeers { + subnet_id: _any_subnet, + min_ttl: _any_instant, + } => discover_peer_count = discover_peer_count + 1, AttServiceMessage::Subscribe(_any_subnet) => subscribe_count = subscribe_count + 1, AttServiceMessage::EnrAdd(_any_subnet) => enr_add_count = enr_add_count + 1, _ => unexpected_msg_count = unexpected_msg_count + 1, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index fad08ea901..efabaa23ba 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -258,8 +258,8 @@ fn spawn_service( AttServiceMessage::EnrRemove(subnet_id) => { service.libp2p.swarm.update_enr_subnet(subnet_id, false); } - AttServiceMessage::DiscoverPeers(subnet_id) => { - service.libp2p.swarm.peers_request(subnet_id); + AttServiceMessage::DiscoverPeers{subnet_id, min_ttl} => { + service.libp2p.swarm.discover_subnet_peers(subnet_id, min_ttl); } } }