From 2a9c718a20acba2cd7a88cdc37dc847ad5430e82 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 19 Feb 2020 22:12:25 +1100 Subject: [PATCH] Remove network lock (#840) * Initial work on removing libp2p lock * Removes lock from libp2p service * Completed network lock removal * Correct network termination future * Correct fmt issues * Remove Drop implementation for network service * Address reviewers suggestions * Fix dht persistence test (#844) * Fix persistence test * Block until dht is persisted * Fix libp2p test * Correct test ordering check * Remove expensive tests from debug testing Co-authored-by: Pawan Dhananjay --- Cargo.lock | 2 + beacon_node/beacon_chain/src/builder.rs | 1 + beacon_node/client/src/lib.rs | 2 +- beacon_node/client/src/notifier.rs | 13 +- beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/behaviour.rs | 12 +- beacon_node/eth2-libp2p/src/discovery.rs | 57 +++- beacon_node/eth2-libp2p/src/globals.rs | 30 ++ beacon_node/eth2-libp2p/src/lib.rs | 2 + beacon_node/eth2-libp2p/src/service.rs | 31 +- beacon_node/eth2-libp2p/tests/common/mod.rs | 2 +- beacon_node/network/Cargo.toml | 1 + beacon_node/network/src/persisted_dht.rs | 28 +- beacon_node/network/src/service.rs | 350 ++++++-------------- beacon_node/network/src/service/tests.rs | 102 ++++++ beacon_node/network/src/store.rs | 0 beacon_node/rest_api/src/network.rs | 7 +- 17 files changed, 346 insertions(+), 295 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/globals.rs create mode 100644 beacon_node/network/src/service/tests.rs create mode 100644 beacon_node/network/src/store.rs diff --git a/Cargo.lock b/Cargo.lock index 24461d835e..f9f7f57ad5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1123,6 +1123,7 @@ dependencies = [ "libp2p 0.13.2 (git+https://github.com/SigP/rust-libp2p/?rev=49c95c4c4242f1c9f08558a3daac5e9ecac290d5)", "lighthouse_metrics 0.1.0", "lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2654,6 +2655,7 @@ dependencies = [ "sloggers 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "store 0.1.0", + "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "tree_hash 0.1.1", "types 0.1.0", diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a5108c49b3..e7052b5398 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -529,6 +529,7 @@ fn genesis_block( Ok(genesis_block) } +#[cfg(not(debug_assertions))] #[cfg(test)] mod test { use super::*; diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index a396e8646a..22d9c7c6ff 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -58,6 +58,6 @@ impl Client { /// Returns the local libp2p ENR of this node, for network discovery. pub fn enr(&self) -> Option { - self.libp2p_network.as_ref().map(|n| n.local_enr()) + self.libp2p_network.as_ref()?.local_enr() } } diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index ec77930c9b..eddebce1f3 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -22,9 +22,6 @@ const DAYS_PER_WEEK: f64 = 7.0; const HOURS_PER_DAY: f64 = 24.0; const MINUTES_PER_HOUR: f64 = 60.0; -/// How long to wait for the lock on `network.libp2p_service()` before we give up. -const LIBP2P_LOCK_TIMEOUT: Duration = Duration::from_millis(50); - /// The number of historical observations that should be used to determine the average sync time. const SPEEDO_OBSERVATIONS: usize = 4; @@ -60,15 +57,7 @@ pub fn spawn_notifier( .for_each(move |_| { let log = log_2.clone(); - let connected_peer_count = if let Some(libp2p) = network - .libp2p_service() - .try_lock_until(Instant::now() + LIBP2P_LOCK_TIMEOUT) - { - libp2p.swarm.connected_peers() - } else { - // Use max_value here and we'll print something pretty later. - usize::max_value() - }; + let connected_peer_count = network.connected_peers(); let head_info = beacon_chain.head_info() .map_err(|e| error!( diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 7948663d5e..73cfaa022d 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -28,6 +28,7 @@ lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } tokio-io-timeout = "0.3.1" smallvec = "1.0.0" lru = "0.4.3" +parking_lot = "0.9.0" sha2 = "0.8.0" base64 = "0.11.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 51924be084..66cf6f0a76 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,8 +1,6 @@ use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, RPC}; -use crate::GossipTopic; -use crate::{error, NetworkConfig}; -use crate::{Topic, TopicHash}; +use crate::{error, GossipTopic, NetworkConfig, NetworkGlobals, Topic, TopicHash}; use enr::Enr; use futures::prelude::*; use libp2p::{ @@ -18,6 +16,7 @@ use libp2p::{ use lru::LruCache; use slog::{debug, o}; use std::num::NonZeroU32; +use std::sync::Arc; use std::time::Duration; const MAX_IDENTIFY_ADDRESSES: usize = 20; @@ -47,8 +46,8 @@ pub struct Behaviour { /// duplicates that may still be seen over gossipsub. #[behaviour(ignore)] seen_gossip_messages: LruCache, - /// Logger for behaviour actions. #[behaviour(ignore)] + /// Logger for behaviour actions. log: slog::Logger, } @@ -56,6 +55,7 @@ impl Behaviour { pub fn new( local_key: &Keypair, net_conf: &NetworkConfig, + network_globals: Arc, log: &slog::Logger, ) -> error::Result { let local_peer_id = local_key.public().into_peer_id(); @@ -76,11 +76,11 @@ impl Behaviour { Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - discovery: Discovery::new(local_key, net_conf, log)?, + discovery: Discovery::new(local_key, net_conf, network_globals, log)?, ping: Ping::new(ping_config), identify, - seen_gossip_messages: LruCache::new(100_000), events: Vec::new(), + seen_gossip_messages: LruCache::new(100_000), log: behaviour_log, }) } diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs index a1d1fa5c9b..235fd71947 100644 --- a/beacon_node/eth2-libp2p/src/discovery.rs +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -1,5 +1,5 @@ use crate::metrics; -use crate::{error, NetworkConfig}; +use crate::{error, NetworkConfig, NetworkGlobals}; /// This manages the discovery and management of peers. /// /// Currently using discv5 for peer discovery. @@ -16,6 +16,7 @@ use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::str::FromStr; +use std::sync::{atomic::Ordering, Arc}; use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Delay; @@ -30,9 +31,6 @@ const ENR_FILENAME: &str = "enr.dat"; /// Lighthouse discovery behaviour. This provides peer management and discovery using the Discv5 /// libp2p protocol. pub struct Discovery { - /// The peers currently connected to libp2p streams. - connected_peers: HashSet, - /// The currently banned peers. banned_peers: HashSet, @@ -57,6 +55,9 @@ pub struct Discovery { /// The discovery behaviour used to discover new peers. discovery: Discv5, + /// A collection of network constants that can be read from other threads. + network_globals: Arc, + /// Logger for the discovery behaviour. log: slog::Logger, } @@ -65,6 +66,7 @@ impl Discovery { pub fn new( local_key: &Keypair, config: &NetworkConfig, + network_globals: Arc, log: &slog::Logger, ) -> error::Result { let log = log.clone(); @@ -72,6 +74,8 @@ impl Discovery { // checks if current ENR matches that found on disk let local_enr = load_enr(local_key, config, &log)?; + *network_globals.local_enr.write() = Some(local_enr.clone()); + let enr_dir = match config.network_dir.to_str() { Some(path) => String::from(path), None => String::from(""), @@ -98,13 +102,13 @@ impl Discovery { } Ok(Self { - connected_peers: HashSet::new(), banned_peers: HashSet::new(), max_peers: config.max_peers, peer_discovery_delay: Delay::new(Instant::now()), past_discovery_delay: INITIAL_SEARCH_DELAY, tcp_port: config.libp2p_port, discovery, + network_globals, log, enr_dir, }) @@ -129,12 +133,17 @@ impl Discovery { /// The current number of connected libp2p peers. pub fn connected_peers(&self) -> usize { - self.connected_peers.len() + self.network_globals.connected_peers.load(Ordering::Relaxed) } /// The current number of connected libp2p peers. - pub fn connected_peer_set(&self) -> &HashSet { - &self.connected_peers + pub fn connected_peer_set(&self) -> Vec { + self.network_globals + .connected_peer_set + .read() + .iter() + .cloned() + .collect::>() } /// The peer has been banned. Add this peer to the banned list to prevent any future @@ -180,7 +189,14 @@ where } fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { - self.connected_peers.insert(peer_id); + self.network_globals + .connected_peer_set + .write() + .insert(peer_id); + self.network_globals.connected_peers.store( + self.network_globals.connected_peer_set.read().len(), + Ordering::Relaxed, + ); // TODO: Drop peers if over max_peer limit metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); @@ -188,7 +204,14 @@ where } fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { - self.connected_peers.remove(peer_id); + self.network_globals + .connected_peer_set + .write() + .remove(peer_id); + self.network_globals.connected_peers.store( + self.network_globals.connected_peer_set.read().len(), + Ordering::Relaxed, + ); metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); metrics::set_gauge(&metrics::PEERS_CONNECTED, self.connected_peers() as i64); @@ -224,7 +247,8 @@ where loop { match self.peer_discovery_delay.poll() { Ok(Async::Ready(_)) => { - if self.connected_peers.len() < self.max_peers { + if self.network_globals.connected_peers.load(Ordering::Relaxed) < self.max_peers + { self.find_peers(); } // Set to maximum, and update to earlier, once we get our results back. @@ -278,8 +302,15 @@ where } for peer_id in closer_peers { // if we need more peers, attempt a connection - if self.connected_peers.len() < self.max_peers - && self.connected_peers.get(&peer_id).is_none() + + if self.network_globals.connected_peers.load(Ordering::Relaxed) + < self.max_peers + && self + .network_globals + .connected_peer_set + .read() + .get(&peer_id) + .is_none() && !self.banned_peers.contains(&peer_id) { debug!(self.log, "Peer discovered"; "peer_id"=> format!("{:?}", peer_id)); diff --git a/beacon_node/eth2-libp2p/src/globals.rs b/beacon_node/eth2-libp2p/src/globals.rs new file mode 100644 index 0000000000..901550034b --- /dev/null +++ b/beacon_node/eth2-libp2p/src/globals.rs @@ -0,0 +1,30 @@ +//! A collection of variables that are accessible outside of the network thread itself. +use crate::{Enr, Multiaddr, PeerId}; +use parking_lot::RwLock; +use std::collections::HashSet; +use std::sync::atomic::AtomicUsize; + +pub struct NetworkGlobals { + /// The current local ENR. + pub local_enr: RwLock>, + /// The local peer_id. + pub peer_id: RwLock, + /// Listening multiaddrs. + pub listen_multiaddrs: RwLock>, + /// Current number of connected libp2p peers. + pub connected_peers: AtomicUsize, + /// The collection of currently connected peers. + pub connected_peer_set: RwLock>, +} + +impl NetworkGlobals { + pub fn new(peer_id: PeerId) -> Self { + NetworkGlobals { + local_enr: RwLock::new(None), + peer_id: RwLock::new(peer_id), + listen_multiaddrs: RwLock::new(Vec::new()), + connected_peers: AtomicUsize::new(0), + connected_peer_set: RwLock::new(HashSet::new()), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 4f05fd9cbf..d09921b2d2 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -9,6 +9,7 @@ pub mod behaviour; mod config; mod discovery; pub mod error; +mod globals; mod metrics; pub mod rpc; mod service; @@ -16,6 +17,7 @@ mod topics; pub use behaviour::PubsubMessage; pub use config::Config as NetworkConfig; +pub use globals::NetworkGlobals; pub use libp2p::enr::Enr; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::multiaddr; diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 11505194bb..8e8a59719f 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -3,7 +3,7 @@ use crate::error; use crate::multiaddr::Protocol; use crate::rpc::RPCEvent; use crate::NetworkConfig; -use crate::{Topic, TopicHash}; +use crate::{NetworkGlobals, Topic, TopicHash}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -16,6 +16,7 @@ use slog::{crit, debug, error, info, trace, warn}; use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; +use std::sync::Arc; use std::time::Duration; use tokio::timer::DelayQueue; @@ -47,24 +48,30 @@ pub struct Service { } impl Service { - pub fn new(config: NetworkConfig, log: slog::Logger) -> error::Result { + pub fn new( + config: &NetworkConfig, + log: slog::Logger, + ) -> error::Result<(Arc, Self)> { trace!(log, "Libp2p Service starting"); let local_keypair = if let Some(hex_bytes) = &config.secret_key_hex { keypair_from_hex(hex_bytes)? } else { - load_private_key(&config, &log) + load_private_key(config, &log) }; // load the private key from CLI flag, disk or generate a new one let local_peer_id = PeerId::from(local_keypair.public()); info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", local_peer_id)); + // set up a collection of variables accessible outside of the network crate + let network_globals = Arc::new(NetworkGlobals::new(local_peer_id.clone())); + let mut swarm = { // Set up the transport - tcp/ws with secio and mplex/yamux let transport = build_transport(local_keypair.clone()); // Lighthouse network behaviour - let behaviour = Behaviour::new(&local_keypair, &config, &log)?; + let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; Swarm::new(transport, behaviour, local_peer_id.clone()) }; @@ -93,7 +100,7 @@ impl Service { }; // helper closure for dialing peers - let mut dial_addr = |multiaddr: Multiaddr| { + let mut dial_addr = |multiaddr: &Multiaddr| { match Swarm::dial_addr(&mut swarm, multiaddr.clone()) { Ok(()) => debug!(log, "Dialing libp2p peer"; "address" => format!("{}", multiaddr)), Err(err) => debug!( @@ -104,13 +111,13 @@ impl Service { }; // attempt to connect to user-input libp2p nodes - for multiaddr in config.libp2p_nodes { + for multiaddr in &config.libp2p_nodes { dial_addr(multiaddr); } // attempt to connect to any specified boot-nodes - for bootnode_enr in config.boot_nodes { - for multiaddr in bootnode_enr.multiaddr() { + for bootnode_enr in &config.boot_nodes { + for multiaddr in &bootnode_enr.multiaddr() { // ignore udp multiaddr if it exists let components = multiaddr.iter().collect::>(); if let Protocol::Udp(_) = components[1] { @@ -121,7 +128,7 @@ impl Service { } let mut subscribed_topics: Vec = vec![]; - for topic in config.topics { + for topic in config.topics.clone() { let raw_topic: Topic = topic.into(); let topic_string = raw_topic.no_hash(); if swarm.subscribe(raw_topic.clone()) { @@ -133,13 +140,15 @@ impl Service { } info!(log, "Subscribed to topics"; "topics" => format!("{:?}", subscribed_topics)); - Ok(Service { + let service = Service { local_peer_id, swarm, peers_to_ban: DelayQueue::new(), peer_ban_timeout: DelayQueue::new(), log, - }) + }; + + Ok((network_globals, service)) } /// Adds a peer to be banned for a period of time, specified by a timeout. diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs index d945383ae9..18b9c42f9b 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -43,7 +43,7 @@ pub fn build_libp2p_instance( ) -> LibP2PService { let config = build_config(port, boot_nodes, secret_key); // launch libp2p service - LibP2PService::new(config, log.clone()).unwrap() + LibP2PService::new(&config, log.clone()).unwrap().1 } #[allow(dead_code)] diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index da8c7712c2..b299c4a272 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dev-dependencies] sloggers = "0.3.4" genesis = { path = "../genesis" } +tempdir = "0.3" [dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/network/src/persisted_dht.rs b/beacon_node/network/src/persisted_dht.rs index dd894742b5..c43e5d84fe 100644 --- a/beacon_node/network/src/persisted_dht.rs +++ b/beacon_node/network/src/persisted_dht.rs @@ -1,11 +1,37 @@ +use beacon_chain::BeaconChainTypes; use eth2_libp2p::Enr; use rlp; +use std::sync::Arc; +use store::Store; use store::{DBColumn, Error as StoreError, SimpleStoreItem}; +use types::Hash256; /// 32-byte key for accessing the `DhtEnrs`. pub const DHT_DB_KEY: &str = "PERSISTEDDHTPERSISTEDDHTPERSISTE"; -/// Wrapper around dht for persistence to disk. +pub fn load_dht(store: Arc) -> Vec { + // Load DHT from store + let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); + match store.get(&key) { + Ok(Some(p)) => { + let p: PersistedDht = p; + p.enrs + } + _ => Vec::new(), + } +} + +/// Attempt to persist the ENR's in the DHT to `self.store`. +pub fn persist_dht( + store: Arc, + enrs: Vec, +) -> Result<(), store::Error> { + let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); + store.put(&key, &PersistedDht { enrs })?; + Ok(()) +} + +/// Wrapper around DHT for persistence to disk. pub struct PersistedDht { pub enrs: Vec, } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index e10ab4a45f..bf3b4eaddb 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -1,31 +1,33 @@ use crate::error; use crate::message_handler::{HandlerMessage, MessageHandler}; -use crate::persisted_dht::{PersistedDht, DHT_DB_KEY}; +use crate::persisted_dht::{load_dht, persist_dht}; use crate::NetworkConfig; use beacon_chain::{BeaconChain, BeaconChainTypes}; use core::marker::PhantomData; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, Multiaddr, PeerId, Swarm, Topic}; +use eth2_libp2p::{ + rpc::RPCRequest, Enr, Libp2pEvent, MessageId, Multiaddr, NetworkGlobals, PeerId, Swarm, Topic, +}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; -use parking_lot::Mutex; use slog::{debug, error, info, trace}; -use std::sync::Arc; -use store::Store; +use std::collections::HashSet; +use std::sync::{atomic::Ordering, Arc}; +use std::time::{Duration, Instant}; use tokio::runtime::TaskExecutor; use tokio::sync::{mpsc, oneshot}; -use types::Hash256; +use tokio::timer::Delay; + +mod tests; /// The time in seconds that a peer will be banned and prevented from reconnecting. const BAN_PEER_TIMEOUT: u64 = 30; -/// Service that handles communication between internal services and the eth2_libp2p network service. +/// Service that handles communication between internal services and the `eth2_libp2p` network service. pub struct Service { - libp2p_service: Arc>, libp2p_port: u16, - store: Arc, - log: slog::Logger, + network_globals: Arc, _libp2p_exit: oneshot::Sender<()>, _network_send: mpsc::UnboundedSender, _phantom: PhantomData, @@ -40,9 +42,8 @@ impl Service { ) -> error::Result<(Arc, mpsc::UnboundedSender)> { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::(); - // Get a reference to the beacon chain store - let store = beacon_chain.store.clone(); // launch message handler thread + let store = beacon_chain.store.clone(); let message_handler_send = MessageHandler::spawn( beacon_chain, network_send.clone(), @@ -50,38 +51,34 @@ impl Service { network_log.clone(), )?; + let propagation_percentage = config.propagation_percentage; // launch libp2p service - let libp2p_service = Arc::new(Mutex::new(LibP2PService::new( - config.clone(), - network_log.clone(), - )?)); + let (network_globals, mut libp2p_service) = + LibP2PService::new(config, network_log.clone())?; - // Load DHT from store - let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); - let enrs: Vec = match store.get(&key) { - Ok(Some(p)) => { - let p: PersistedDht = p; - p.enrs - } - _ => Vec::new(), - }; - for enr in enrs { - libp2p_service.lock().swarm.add_enr(enr); + for enr in load_dht::(store.clone()) { + libp2p_service.swarm.add_enr(enr); } - let libp2p_exit = spawn_service( - libp2p_service.clone(), + // A delay used to initialise code after the network has started + // This is currently used to obtain the listening addresses from the libp2p service. + let initial_delay = Delay::new(Instant::now() + Duration::from_secs(1)); + + let libp2p_exit = spawn_service::( + libp2p_service, network_recv, message_handler_send, executor, - network_log.clone(), - config.propagation_percentage, - )?; - let network_service = Service { - libp2p_service, - libp2p_port: config.libp2p_port, store, - log: network_log, + network_globals.clone(), + initial_delay, + network_log.clone(), + propagation_percentage, + )?; + + let network_service = Service { + libp2p_port: config.libp2p_port, + network_globals, _libp2p_exit: libp2p_exit, _network_send: network_send.clone(), _phantom: PhantomData, @@ -92,25 +89,18 @@ impl Service { /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. - pub fn local_enr(&self) -> Enr { - self.libp2p_service - .lock() - .swarm - .discovery() - .local_enr() - .clone() + pub fn local_enr(&self) -> Option { + self.network_globals.local_enr.read().clone() } /// Returns the local libp2p PeerID. pub fn local_peer_id(&self) -> PeerId { - self.libp2p_service.lock().local_peer_id.clone() + self.network_globals.peer_id.read().clone() } /// Returns the list of `Multiaddr` that the underlying libp2p instance is listening on. pub fn listen_multiaddrs(&self) -> Vec { - Swarm::listeners(&self.libp2p_service.lock().swarm) - .cloned() - .collect() + self.network_globals.listen_multiaddrs.read().clone() } /// Returns the libp2p port that this node has been configured to listen using. @@ -120,85 +110,66 @@ impl Service { /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { - self.libp2p_service.lock().swarm.connected_peers() + self.network_globals.connected_peers.load(Ordering::Relaxed) } /// Returns the set of `PeerId` that are connected via libp2p. - pub fn connected_peer_set(&self) -> Vec { - self.libp2p_service - .lock() - .swarm - .discovery() - .connected_peer_set() - .iter() - .cloned() - .collect() - } - - /// Provides a reference to the underlying libp2p service. - pub fn libp2p_service(&self) -> Arc> { - self.libp2p_service.clone() - } - - /// Attempt to persist the enrs in the DHT to `self.store`. - pub fn persist_dht(&self) -> Result<(), store::Error> { - let enrs: Vec = self - .libp2p_service() - .lock() - .swarm - .enr_entries() - .map(|x| x.clone()) - .collect(); - info!( - self.log, - "Persisting DHT to store"; - "Number of peers" => format!("{}", enrs.len()), - ); - let key = Hash256::from_slice(&DHT_DB_KEY.as_bytes()); - self.store.put(&key, &PersistedDht { enrs })?; - Ok(()) + pub fn connected_peer_set(&self) -> HashSet { + self.network_globals.connected_peer_set.read().clone() } } -fn spawn_service( - libp2p_service: Arc>, - network_recv: mpsc::UnboundedReceiver, - message_handler_send: mpsc::UnboundedSender, +fn spawn_service( + mut libp2p_service: LibP2PService, + mut network_recv: mpsc::UnboundedReceiver, + mut message_handler_send: mpsc::UnboundedSender, executor: &TaskExecutor, + store: Arc, + network_globals: Arc, + mut initial_delay: Delay, log: slog::Logger, propagation_percentage: Option, ) -> error::Result> { - let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); + let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); // spawn on the current executor executor.spawn( - network_service( - libp2p_service, - network_recv, - message_handler_send, - log.clone(), - propagation_percentage, - ) - // allow for manual termination - .select(exit_rx.then(|_| Ok(()))) - .then(move |_| { - info!(log.clone(), "Network service shutdown"); - Ok(()) - }), - ); + futures::future::poll_fn(move || -> Result<_, ()> { - Ok(network_exit) -} -//TODO: Potentially handle channel errors -fn network_service( - libp2p_service: Arc>, - mut network_recv: mpsc::UnboundedReceiver, - mut message_handler_send: mpsc::UnboundedSender, - log: slog::Logger, - propagation_percentage: Option, -) -> impl futures::Future { - futures::future::poll_fn(move || -> Result<_, eth2_libp2p::error::Error> { + if !initial_delay.is_elapsed() { + if let Ok(Async::Ready(_)) = initial_delay.poll() { + let multi_addrs = Swarm::listeners(&libp2p_service.swarm).cloned().collect(); + *network_globals.listen_multiaddrs.write() = multi_addrs; + } + } + + // perform termination tasks when the network is being shutdown + if let Ok(Async::Ready(_)) | Err(_) = exit_rx.poll() { + // network thread is terminating + let enrs: Vec = libp2p_service.swarm.enr_entries().cloned().collect(); + debug!( + log, + "Persisting DHT to store"; + "Number of peers" => format!("{}", enrs.len()), + ); + + match persist_dht::(store.clone(), enrs) { + Err(e) => error!( + log, + "Failed to persist DHT on drop"; + "error" => format!("{:?}", e) + ), + Ok(_) => info!( + log, + "Saved DHT state"; + ), + } + + info!(log.clone(), "Network service shutdown"); + return Ok(Async::Ready(())); + } + // processes the network channel before processing the libp2p swarm loop { // poll the network channel @@ -206,7 +177,7 @@ fn network_service( Ok(Async::Ready(Some(message))) => match message { NetworkMessage::RPC(peer_id, rpc_event) => { trace!(log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); - libp2p_service.lock().swarm.send_rpc(peer_id, rpc_event); + libp2p_service.swarm.send_rpc(peer_id, rpc_event); } NetworkMessage::Propagate { propagation_source, @@ -231,7 +202,6 @@ fn network_service( "message_id" => message_id.to_string(), ); libp2p_service - .lock() .swarm .propagate_message(&propagation_source, message_id); } @@ -252,11 +222,11 @@ fn network_service( info!(log, "Random filter did not publish message"); } else { debug!(log, "Sending pubsub message"; "topics" => format!("{:?}",topics)); - libp2p_service.lock().swarm.publish(&topics, message); + libp2p_service.swarm.publish(&topics, message); } } NetworkMessage::Disconnect { peer_id } => { - libp2p_service.lock().disconnect_and_ban_peer( + libp2p_service.disconnect_and_ban_peer( peer_id, std::time::Duration::from_secs(BAN_PEER_TIMEOUT), ); @@ -264,18 +234,20 @@ fn network_service( }, Ok(Async::NotReady) => break, Ok(Async::Ready(None)) => { - return Err(eth2_libp2p::error::Error::from("Network channel closed")); + debug!(log, "Network channel closed"); + return Err(()); } - Err(_) => { - return Err(eth2_libp2p::error::Error::from("Network channel error")); + Err(e) => { + debug!(log, "Network channel error"; "error" => format!("{}", e)); + return Err(()); } } } - // poll the swarm let mut peers_to_ban = Vec::new(); + // poll the swarm loop { - match libp2p_service.lock().poll() { + match libp2p_service.poll() { Ok(Async::Ready(Some(event))) => match event { Libp2pEvent::RPC(peer_id, rpc_event) => { // trace!(log, "Received RPC"; "rpc" => format!("{}", rpc_event)); @@ -286,19 +258,19 @@ fn network_service( }; message_handler_send .try_send(HandlerMessage::RPC(peer_id, rpc_event)) - .map_err(|_| "Failed to send RPC to handler")?; + .map_err(|_| { debug!(log, "Failed to send RPC to handler");} )?; } Libp2pEvent::PeerDialed(peer_id) => { debug!(log, "Peer Dialed"; "peer_id" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDialed(peer_id)) - .map_err(|_| "Failed to send PeerDialed to handler")?; + .map_err(|_| { debug!(log, "Failed to send peer dialed to handler");})?; } Libp2pEvent::PeerDisconnected(peer_id) => { debug!(log, "Peer Disconnected"; "peer_id" => format!("{:?}", peer_id)); message_handler_send .try_send(HandlerMessage::PeerDisconnected(peer_id)) - .map_err(|_| "Failed to send PeerDisconnected to handler")?; + .map_err(|_| { debug!(log, "Failed to send peer disconnect to handler");})?; } Libp2pEvent::PubsubMessage { id, @@ -308,7 +280,7 @@ fn network_service( } => { message_handler_send .try_send(HandlerMessage::PubsubMessage(id, source, message)) - .map_err(|_| "Failed to send pubsub message to handler")?; + .map_err(|_| { debug!(log, "Failed to send pubsub message to handler");})?; } Libp2pEvent::PeerSubscribed(_, _) => {} }, @@ -320,7 +292,7 @@ fn network_service( // ban and disconnect any peers that sent Goodbye requests while let Some(peer_id) = peers_to_ban.pop() { - libp2p_service.lock().disconnect_and_ban_peer( + libp2p_service.disconnect_and_ban_peer( peer_id.clone(), std::time::Duration::from_secs(BAN_PEER_TIMEOUT), ); @@ -328,6 +300,10 @@ fn network_service( Ok(Async::NotReady) }) + + ); + + Ok(network_exit) } /// Types of messages that the network service can receive. @@ -348,127 +324,3 @@ pub enum NetworkMessage { /// Disconnect and bans a peer id. Disconnect { peer_id: PeerId }, } - -impl Drop for Service { - fn drop(&mut self) { - if let Err(e) = self.persist_dht() { - error!( - self.log, - "Failed to persist DHT on drop"; - "error" => format!("{:?}", e) - ) - } else { - info!( - self.log, - "Saved DHT state"; - ) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use beacon_chain::builder::BeaconChainBuilder; - use eth2_libp2p::Enr; - use genesis::{generate_deterministic_keypairs, interop_genesis_state}; - use slog::Logger; - use sloggers::{null::NullLoggerBuilder, Build}; - use std::str::FromStr; - use store::{migrate::NullMigrator, SimpleDiskStore}; - use tokio::runtime::Runtime; - use types::{EthSpec, MinimalEthSpec}; - - fn get_logger() -> Logger { - let builder = NullLoggerBuilder; - builder.build().expect("should build logger") - } - - #[test] - fn test_dht_persistence() { - // Create new LevelDB store - let path = "/tmp"; - let store = Arc::new(SimpleDiskStore::open(&std::path::PathBuf::from(path)).unwrap()); - // Create a `BeaconChain` object to pass to `Service` - let validator_count = 8; - let genesis_time = 13371337; - - let log = get_logger(); - let spec = MinimalEthSpec::default_spec(); - - let genesis_state = interop_genesis_state( - &generate_deterministic_keypairs(validator_count), - genesis_time, - &spec, - ) - .expect("should create interop genesis state"); - let chain = BeaconChainBuilder::new(MinimalEthSpec) - .logger(log.clone()) - .store(store) - .store_migrator(NullMigrator) - .genesis_state(genesis_state) - .expect("should build state using recent genesis") - .dummy_eth1_backend() - .expect("should build the dummy eth1 backend") - .null_event_handler() - .testing_slot_clock(std::time::Duration::from_secs(1)) - .expect("should configure testing slot clock") - .reduced_tree_fork_choice() - .expect("should add fork choice to builder") - .build() - .expect("should build"); - let beacon_chain = Arc::new(chain); - let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap(); - let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap(); - let enrs = vec![enr1, enr2]; - - let runtime = Runtime::new().unwrap(); - - // Create new network service - let (service, _) = Service::new( - beacon_chain.clone(), - &NetworkConfig::default(), - &runtime.executor(), - log.clone(), - ) - .unwrap(); - - // Add enrs manually to dht - for enr in enrs.iter() { - service.libp2p_service().lock().swarm.add_enr(enr.clone()); - } - assert_eq!( - enrs.len(), - service - .libp2p_service() - .lock() - .swarm - .enr_entries() - .collect::>() - .len(), - "DHT should have 2 enrs" - ); - // Drop the service value - std::mem::drop(service); - - // Recover the network service from beacon chain store and fresh network config - let (recovered_service, _) = Service::new( - beacon_chain, - &NetworkConfig::default(), - &runtime.executor(), - log.clone(), - ) - .unwrap(); - assert_eq!( - enrs.len(), - recovered_service - .libp2p_service() - .lock() - .swarm - .enr_entries() - .collect::>() - .len(), - "Recovered DHT should have 2 enrs" - ); - } -} diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs new file mode 100644 index 0000000000..0ed061d2bb --- /dev/null +++ b/beacon_node/network/src/service/tests.rs @@ -0,0 +1,102 @@ +#[cfg(not(debug_assertions))] +#[cfg(test)] +mod tests { + use crate::persisted_dht::load_dht; + use crate::{NetworkConfig, Service}; + use beacon_chain::builder::BeaconChainBuilder; + use beacon_chain::slot_clock::TestingSlotClock; + use eth2_libp2p::Enr; + use futures::{Future, IntoFuture}; + use genesis::{generate_deterministic_keypairs, interop_genesis_state}; + use slog::Logger; + use sloggers::{null::NullLoggerBuilder, Build}; + use std::str::FromStr; + use std::sync::Arc; + use store::{migrate::NullMigrator, SimpleDiskStore}; + use tempdir::TempDir; + use tokio::runtime::Runtime; + use types::{EthSpec, MinimalEthSpec}; + + fn get_logger() -> Logger { + let builder = NullLoggerBuilder; + builder.build().expect("should build logger") + } + + #[test] + fn test_dht_persistence() { + // Create new LevelDB store + let path = TempDir::new("persistence_test").unwrap(); + let store = Arc::new(SimpleDiskStore::open(&path.into_path()).unwrap()); + // Create a `BeaconChain` object to pass to `Service` + let validator_count = 1; + let genesis_time = 13371337; + + let log = get_logger(); + let spec = MinimalEthSpec::default_spec(); + + let genesis_state = interop_genesis_state( + &generate_deterministic_keypairs(validator_count), + genesis_time, + &spec, + ) + .expect("should create interop genesis state"); + let chain = BeaconChainBuilder::new(MinimalEthSpec) + .logger(log.clone()) + .store(store.clone()) + .store_migrator(NullMigrator) + .genesis_state(genesis_state) + .expect("should build state using recent genesis") + .dummy_eth1_backend() + .expect("should build the dummy eth1 backend") + .null_event_handler() + .testing_slot_clock(std::time::Duration::from_secs(1)) + .expect("should configure testing slot clock") + .reduced_tree_fork_choice() + .expect("should add fork choice to builder") + .build() + .expect("should build"); + + let beacon_chain = Arc::new(chain); + let enr1 = Enr::from_str("enr:-IS4QHCYrYZbAKWCBRlAy5zzaDZXJBGkcnh4MHcBFZntXNFrdvJjX04jRzjzCBOonrkTfj499SZuOh8R33Ls8RRcy5wBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQPKY0yuDUmstAHYpMa2_oxVtw0RW_QAdpzBQA8yWM0xOIN1ZHCCdl8").unwrap(); + let enr2 = Enr::from_str("enr:-IS4QJ2d11eu6dC7E7LoXeLMgMP3kom1u3SE8esFSWvaHoo0dP1jg8O3-nx9ht-EO3CmG7L6OkHcMmoIh00IYWB92QABgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIB_c-jQMOXsbjWkbN-Oj99H57gfId5pfb4wa1qxwV4CIN1ZHCCIyk").unwrap(); + let enrs = vec![enr1, enr2]; + + let runtime = Runtime::new().unwrap(); + let executor = runtime.executor(); + + let mut config = NetworkConfig::default(); + config.boot_nodes = enrs.clone(); + runtime + .block_on_all( + // Create a new network service which implicitly gets dropped at the + // end of the block. + Service::new(beacon_chain.clone(), &config, &executor, log.clone()) + .into_future() + .and_then(move |(_service, _)| Ok(())), + ) + .unwrap(); + + // Load the persisted dht from the store + let persisted_enrs = load_dht::< + beacon_chain::builder::Witness< + SimpleDiskStore, + store::migrate::NullMigrator, + TestingSlotClock, + beacon_chain::eth1_chain::CachingEth1Backend< + types::eth_spec::MinimalEthSpec, + SimpleDiskStore, + >, + types::eth_spec::MinimalEthSpec, + beacon_chain::events::NullEventHandler, + >, + >(store); + assert!( + persisted_enrs.contains(&enrs[0]), + "should have persisted the first ENR to store" + ); + assert!( + persisted_enrs.contains(&enrs[1]), + "should have persisted the second ENR to store" + ); + } +} diff --git a/beacon_node/network/src/store.rs b/beacon_node/network/src/store.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/beacon_node/rest_api/src/network.rs b/beacon_node/rest_api/src/network.rs index 171d59beda..1d8df29a35 100644 --- a/beacon_node/rest_api/src/network.rs +++ b/beacon_node/rest_api/src/network.rs @@ -34,7 +34,12 @@ pub fn get_enr( req: Request, network: Arc>, ) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz(&network.local_enr().to_base64()) + ResponseBuilder::new(&req)?.body_no_ssz( + &network + .local_enr() + .map(|enr| enr.to_base64()) + .unwrap_or_else(|| "".into()), + ) } /// HTTP handler to return the `PeerId` from the client's libp2p service.