From da6ab85e9908959f370af9f3fb2a33c7460bac7a Mon Sep 17 00:00:00 2001 From: Age Manning Date: Tue, 23 Jun 2020 13:45:40 +1000 Subject: [PATCH] Optional Discovery (#1280) * Remove beacon-chain config file * Makes discovery optional * Remove unused dep --- beacon_node/eth2_libp2p/src/config.rs | 6 +- beacon_node/eth2_libp2p/src/discovery/mod.rs | 134 +++++++++++-------- beacon_node/eth2_libp2p/src/service.rs | 7 +- beacon_node/src/cli.rs | 9 +- beacon_node/src/config.rs | 5 + 5 files changed, 101 insertions(+), 60 deletions(-) diff --git a/beacon_node/eth2_libp2p/src/config.rs b/beacon_node/eth2_libp2p/src/config.rs index 6facb7750b..a9af3eea69 100644 --- a/beacon_node/eth2_libp2p/src/config.rs +++ b/beacon_node/eth2_libp2p/src/config.rs @@ -56,6 +56,9 @@ pub struct Config { /// Client version pub client_version: String, + /// Disables the discovery protocol from starting. + pub disable_discovery: bool, + /// List of extra topics to initially subscribe to as strings. pub topics: Vec, } @@ -104,7 +107,7 @@ impl Default for Config { .request_retries(2) .enr_peer_update_min(2) // prevents NAT's should be raised for mainnet .query_parallelism(5) - .query_timeout(Duration::from_secs(60)) + .query_timeout(Duration::from_secs(30)) .query_peer_timeout(Duration::from_secs(2)) .ip_limit() // limits /24 IP's in buckets. .ping_interval(Duration::from_secs(300)) @@ -125,6 +128,7 @@ impl Default for Config { boot_nodes: vec![], libp2p_nodes: vec![], client_version: version::version(), + disable_discovery: false, topics, } } diff --git a/beacon_node/eth2_libp2p/src/discovery/mod.rs b/beacon_node/eth2_libp2p/src/discovery/mod.rs index ca6cd9fe84..5b703d3149 100644 --- a/beacon_node/eth2_libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2_libp2p/src/discovery/mod.rs @@ -112,10 +112,12 @@ enum EventStream { ), /// The future has completed. Present(mpsc::Receiver), - // The future has failed, there are no events from discv5. - Failed, + // The future has failed or discv5 has been disabled. There are no events from discv5. + InActive, } +/// The main discovery service. This can be disabled via CLI arguements. When disabled the +/// underlying processes are not started, but this struct still maintains our current ENR. pub struct Discovery { /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. cached_enrs: LruCache, @@ -145,6 +147,10 @@ pub struct Discovery { /// The discv5 event stream. event_stream: EventStream, + /// Indicates if the discovery service has been started. When the service is disabled, this is + /// always false. + started: bool, + /// Logger for the discovery behaviour. log: slog::Logger, } @@ -196,12 +202,16 @@ impl Discovery { }); } - // Start the discv5 service. - discv5.start(listen_socket); - debug!(log, "Discovery service started"); + // Start the discv5 service and obtain an event stream + let event_stream = if !config.disable_discovery { + discv5.start(listen_socket); + debug!(log, "Discovery service started"); + EventStream::Awaiting(Box::pin(discv5.event_stream())) + } else { + EventStream::InActive + }; // Obtain the event stream - let event_stream = EventStream::Awaiting(Box::pin(discv5.event_stream())); Ok(Self { cached_enrs: LruCache::new(50), @@ -211,6 +221,7 @@ impl Discovery { active_queries: FuturesUnordered::new(), discv5, event_stream, + started: !config.disable_discovery, log, enr_dir, }) @@ -223,10 +234,11 @@ impl Discovery { /// This adds a new `FindPeers` query to the queue if one doesn't already exist. pub fn discover_peers(&mut self) { - // If we are in the process of a query, don't bother queuing a new one. - if self.find_peer_active { + // If the discv5 service isn't running or we are in the process of a query, don't bother queuing a new one. + if !self.started || self.find_peer_active { return; } + // If there is not already a find peer's query queued, add one let query = QueryType::FindPeers; if !self.queued_queries.contains(&query) { @@ -239,54 +251,11 @@ impl Discovery { /// Processes a request to search for more peers on a subnet. pub fn discover_subnet_peers(&mut self, subnet_id: SubnetId, min_ttl: Option) { - self.add_subnet_query(subnet_id, min_ttl, 0); - } - - /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this - /// updates the min_ttl field. - fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { - // remove the entry and complete the query if greater than the maximum search count - if retries >= MAX_DISCOVERY_RETRY { - debug!( - self.log, - "Subnet peer discovery did not find sufficient peers. Reached max retry limit" - ); + // If the discv5 service isn't running, ignore queries + if !self.started { return; } - - // Search through any queued requests and update the timeout if a query for this subnet - // already exists - let mut found = false; - for query in self.queued_queries.iter_mut() { - if let QueryType::Subnet { - subnet_id: ref mut q_subnet_id, - min_ttl: ref mut q_min_ttl, - retries: ref mut q_retries, - } = query - { - if *q_subnet_id == subnet_id { - if *q_min_ttl < min_ttl { - *q_min_ttl = min_ttl; - } - // update the number of retries - *q_retries = retries; - // mimic an `Iter::Find()` and short-circuit the loop - found = true; - break; - } - } - } - if !found { - // Set up the query and add it to the queue - let query = QueryType::Subnet { - subnet_id, - min_ttl, - retries, - }; - // update the metrics and insert into the queue. - metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); - self.queued_queries.push_back(query); - } + self.add_subnet_query(subnet_id, min_ttl, 0); } /// Add an ENR to the routing table of the discovery mechanism. @@ -295,7 +264,7 @@ impl Discovery { self.cached_enrs.put(enr.peer_id(), enr.clone()); if let Err(e) = self.discv5.add_enr(enr) { - warn!( + debug!( self.log, "Could not add peer to the local routing table"; "error" => format!("{}", e) @@ -396,6 +365,53 @@ impl Discovery { /* Internal Functions */ + /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this + /// updates the min_ttl field. + fn add_subnet_query(&mut self, subnet_id: SubnetId, min_ttl: Option, retries: usize) { + // remove the entry and complete the query if greater than the maximum search count + if retries >= MAX_DISCOVERY_RETRY { + debug!( + self.log, + "Subnet peer discovery did not find sufficient peers. Reached max retry limit" + ); + return; + } + + // Search through any queued requests and update the timeout if a query for this subnet + // already exists + let mut found = false; + for query in self.queued_queries.iter_mut() { + if let QueryType::Subnet { + subnet_id: ref mut q_subnet_id, + min_ttl: ref mut q_min_ttl, + retries: ref mut q_retries, + } = query + { + if *q_subnet_id == subnet_id { + if *q_min_ttl < min_ttl { + *q_min_ttl = min_ttl; + } + // update the number of retries + *q_retries = retries; + // mimic an `Iter::Find()` and short-circuit the loop + found = true; + break; + } + } + } + if !found { + // Set up the query and add it to the queue + let query = QueryType::Subnet { + subnet_id, + min_ttl, + retries, + }; + // update the metrics and insert into the queue. + metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); + self.queued_queries.push_back(query); + } + } + /// Consume the discovery queue and initiate queries when applicable. /// /// This also sanitizes the queue removing out-dated queries. @@ -572,6 +588,10 @@ impl Discovery { // Main execution loop to be driven by the peer manager. pub fn poll(&mut self, cx: &mut Context) -> Poll { + if !self.started { + return Poll::Pending; + } + // Process the query queue self.process_queue(); @@ -594,12 +614,12 @@ impl Discovery { Ok(stream) => self.event_stream = EventStream::Present(stream), Err(e) => { slog::crit!(self.log, "Discv5 event stream failed"; "error" => e.to_string()); - self.event_stream = EventStream::Failed; + self.event_stream = EventStream::InActive; } } } } - EventStream::Failed => {} // ignore checking the stream + EventStream::InActive => {} // ignore checking the stream EventStream::Present(ref mut stream) => { while let Ok(event) = stream.try_recv() { match event { diff --git a/beacon_node/eth2_libp2p/src/service.rs b/beacon_node/eth2_libp2p/src/service.rs index f2168345df..e5c056ac8f 100644 --- a/beacon_node/eth2_libp2p/src/service.rs +++ b/beacon_node/eth2_libp2p/src/service.rs @@ -110,7 +110,12 @@ impl Service { )); info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", enr.peer_id())); - debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => config.discovery_port); + let discovery_string = if config.disable_discovery { + "None".into() + } else { + config.discovery_port.to_string() + }; + debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => discovery_string); let mut swarm = { // Set up the transport - tcp/ws with noise/secio and mplex/yamux diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 50f635d0df..07cc44fabc 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -61,7 +61,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true), ) .arg( - Arg::with_name("max_peers") + Arg::with_name("max-peers") .long("max-peers") .help("The maximum number of peers.") .default_value("50") @@ -125,6 +125,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { without an ENR.") .takes_value(true), ) + .arg( + Arg::with_name("disable-discovery") + .long("disable-discovery") + .help("Disables the discv5 discovery protocol. The node will not search for new peers or participate in the discovery protocol.") + .takes_value(false), + ) + /* REST API related arguments */ .arg( Arg::with_name("http") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 38b057d0eb..932708a36c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -195,6 +195,11 @@ pub fn get_config( client_config.network.discv5_config.enr_update = false; } + if cli_args.is_present("disable-discovery") { + client_config.network.disable_discovery = true; + slog::warn!(log, "Discovery is disabled. New peers will not be found"); + } + /* * Http server */