From b23f19272d70017d07e7a374e253a9b53e6bcbbe Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 8 Apr 2020 01:08:05 +1000 Subject: [PATCH] v0.11.1 Network update (#989) * Minor log bumps * Initial building of extended RPC methods * Wire in extended RPC methods * Merge initial peer management template * Add a PeerDB and give the peer manager some basic functions * Initial connection of peer manager * Add peer manager to lighthouse * Connect peer manager with new RPC methods * Correct tests and metadata RPC Co-authored-by: Diva --- Cargo.lock | 1 + beacon_node/client/src/lib.rs | 2 +- beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/behaviour.rs | 165 ++++++- .../src/discovery/{enr_helpers.rs => enr.rs} | 35 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 131 +++--- beacon_node/eth2-libp2p/src/lib.rs | 12 +- .../eth2-libp2p/src/peer_manager/mod.rs | 287 ++++++++++++ .../eth2-libp2p/src/peer_manager/peer_info.rs | 188 ++++++++ .../eth2-libp2p/src/peer_manager/peerdb.rs | 427 ++++++++++++++++++ beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 50 +- beacon_node/eth2-libp2p/src/rpc/methods.rs | 27 ++ beacon_node/eth2-libp2p/src/rpc/mod.rs | 16 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 89 ++-- beacon_node/eth2-libp2p/src/service.rs | 81 +--- beacon_node/eth2-libp2p/src/types/globals.rs | 39 +- beacon_node/eth2-libp2p/src/types/mod.rs | 9 +- .../eth2-libp2p/src/types/peer_info.rs | 45 -- .../eth2-libp2p/tests/gossipsub_tests.rs | 8 +- beacon_node/eth2-libp2p/tests/noise.rs | 16 +- beacon_node/eth2-libp2p/tests/rpc_tests.rs | 220 ++++----- beacon_node/network/src/router/mod.rs | 18 +- beacon_node/network/src/router/processor.rs | 11 +- beacon_node/network/src/service.rs | 22 +- beacon_node/rest_api/src/network.rs | 11 +- eth2/utils/hashmap_delay/src/hashset_delay.rs | 20 +- 26 files changed, 1522 insertions(+), 409 deletions(-) rename beacon_node/eth2-libp2p/src/discovery/{enr_helpers.rs => enr.rs} (84%) create mode 100644 beacon_node/eth2-libp2p/src/peer_manager/mod.rs create mode 100644 beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs create mode 100644 beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs delete mode 100644 beacon_node/eth2-libp2p/src/types/peer_info.rs diff --git a/Cargo.lock b/Cargo.lock index 42304d508b..b745d7ca85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1193,6 +1193,7 @@ dependencies = [ "eth2_ssz_types", "fnv", "futures", + "hashmap_delay", "hex 0.3.2", "lazy_static", "libp2p", diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index ab2e0556c0..7f665b9cb1 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -57,6 +57,6 @@ impl Client { /// Returns the local libp2p ENR of this node, for network discovery. pub fn enr(&self) -> Option { - self.network_globals.as_ref()?.local_enr() + self.network_globals.as_ref().map(|n| n.local_enr()) } } diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 2432b34208..cf74cbed96 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -10,6 +10,7 @@ hex = "0.3" # `libp2p/rust-libp2p` repository. libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "4e3003d5283040fee10da1299252dd060a838d97" } types = { path = "../../eth2/types" } +hashmap_delay = { path = "../../eth2/utils/hashmap_delay" } eth2_ssz_types = { path = "../../eth2/utils/ssz_types" } serde = "1.0.102" serde_derive = "1.0.102" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 4878106f55..bd9a8d6c2b 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,5 +1,6 @@ -use crate::discovery::Discovery; -use crate::rpc::{RPCEvent, RPCMessage, RPC}; +use crate::discovery::{enr::Eth2Enr, Discovery}; +use crate::peer_manager::{PeerManager, PeerManagerEvent}; +use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use futures::prelude::*; @@ -14,6 +15,7 @@ use libp2p::{ }; use lru::LruCache; use slog::{crit, debug, o, warn}; +use std::marker::PhantomData; use std::sync::Arc; use types::{EnrForkId, EthSpec, SubnetId}; @@ -35,9 +37,15 @@ pub struct Behaviour { identify: Identify, /// Discovery behaviour. discovery: Discovery, + /// The peer manager that keeps track of peer's reputation and status. + #[behaviour(ignore)] + peer_manager: PeerManager, /// The events generated by this behaviour to be consumed in the swarm poll. #[behaviour(ignore)] events: Vec>, + /// The current meta data of the node, so respond to pings and get metadata + #[behaviour(ignore)] + meta_data: MetaData, /// A cache of recently seen gossip messages. This is used to filter out any possible /// duplicates that may still be seen over gossipsub. #[behaviour(ignore)] @@ -47,18 +55,20 @@ pub struct Behaviour { network_globals: Arc>, #[behaviour(ignore)] /// Keeps track of the current EnrForkId for upgrading gossipsub topics. + // NOTE: This can be accessed via the network_globals ENR. However we keep it here for quick + // lookups for every gossipsub message send. enr_fork_id: EnrForkId, #[behaviour(ignore)] /// Logger for behaviour actions. log: slog::Logger, } +/// Implements the combined behaviour for the libp2p service. impl Behaviour { pub fn new( local_key: &Keypair, net_conf: &NetworkConfig, network_globals: Arc>, - enr_fork_id: EnrForkId, log: &slog::Logger, ) -> error::Result { let local_peer_id = local_key.public().into_peer_id(); @@ -70,36 +80,48 @@ impl Behaviour() + .expect("Local ENR must have subnet bitfield"); + + let meta_data = MetaData { + seq_number: 1, + attnets, + }; + Ok(Behaviour { eth2_rpc: RPC::new(log.clone()), gossipsub: Gossipsub::new(local_peer_id, net_conf.gs_config.clone()), - discovery: Discovery::new( - local_key, - net_conf, - enr_fork_id.clone(), - network_globals.clone(), - log, - )?, + discovery: Discovery::new(local_key, net_conf, network_globals.clone(), log)?, identify, + peer_manager: PeerManager::new(network_globals.clone(), log), events: Vec::new(), seen_gossip_messages: LruCache::new(100_000), + meta_data, network_globals, enr_fork_id, log: behaviour_log, }) } + /// Obtain a reference to the discovery protocol. pub fn discovery(&self) -> &Discovery { &self.discovery } + /// Obtain a reference to the gossipsub protocol. pub fn gs(&self) -> &Gossipsub { &self.gossipsub } -} -/// Implements the combined behaviour for the libp2p service. -impl Behaviour { /* Pubsub behaviour functions */ /// Subscribes to a gossipsub topic kind, letting the network service determine the @@ -225,6 +247,8 @@ impl Behaviour e); } + // update the local meta data which informs our peers of the update during PINGS + self.update_metadata(); } /// A request to search for peers connected to a long-lived subnet. @@ -259,6 +283,45 @@ impl Behaviour() + .expect("Local discovery must have bitfield"); + } + + /// Sends a PING/PONG request/response to a peer. + fn send_ping(&mut self, id: RequestId, peer_id: PeerId) { + let pong_response = RPCEvent::Response( + id, + RPCErrorResponse::Success(RPCResponse::Pong(crate::rpc::methods::Ping { + data: self.meta_data.seq_number, + })), + ); + self.send_rpc(peer_id, pong_response); + } + + /// Sends a METADATA request to a peer. + fn send_meta_data_request(&mut self, peer_id: PeerId) { + let metadata_request = + RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); + self.send_rpc(peer_id, metadata_request); + } + + /// Sends a METADATA response to a peer. + fn send_meta_data_response(&mut self, id: RequestId, peer_id: PeerId) { + let metadata_response = RPCEvent::Response( + id, + RPCErrorResponse::Success(RPCResponse::MetaData(self.meta_data.clone())), + ); + self.send_rpc(peer_id, metadata_response); + } } // Implement the NetworkBehaviourEventProcess trait so that we can derive NetworkBehaviour for Behaviour @@ -277,7 +340,7 @@ impl } Ok(msg) => { // if this message isn't a duplicate, notify the network - self.events.push(BehaviourEvent::GossipMessage { + self.events.push(BehaviourEvent::PubsubMessage { id, source: propagation_source, topics: gs_msg.topics, @@ -310,7 +373,41 @@ impl self.events.push(BehaviourEvent::PeerDisconnected(peer_id)) } RPCMessage::RPC(peer_id, rpc_event) => { - self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) + // The METADATA and PING RPC responses are handled within the behaviour and not + // propagated + // TODO: Improve the RPC types to better handle this logic discrepancy + match rpc_event { + RPCEvent::Request(id, RPCRequest::Ping(ping)) => { + // inform the peer manager and send the response + self.peer_manager.ping_request(&peer_id, ping.data); + self.send_ping(id, peer_id); + } + RPCEvent::Request(id, RPCRequest::MetaData(_)) => { + // send the requested meta-data + self.send_meta_data_response(id, peer_id); + } + RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Pong(ping))) => { + self.peer_manager.pong_response(&peer_id, ping.data); + } + RPCEvent::Response( + _, + RPCErrorResponse::Success(RPCResponse::MetaData(meta_data)), + ) => { + self.peer_manager.meta_data_response(&peer_id, meta_data); + } + RPCEvent::Request(_, RPCRequest::Status(_)) + | RPCEvent::Response(_, RPCErrorResponse::Success(RPCResponse::Status(_))) => { + // inform the peer manager that we have received a status from a peer + self.peer_manager.peer_statusd(&peer_id); + // propagate the STATUS message upwards + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)); + } + + _ => { + // propagate all other RPC messages upwards + self.events.push(BehaviourEvent::RPC(peer_id, rpc_event)) + } + } } } } @@ -321,6 +418,39 @@ impl Behaviour( &mut self, ) -> Async>> { + // check the peer manager for events + loop { + match self.peer_manager.poll() { + Ok(Async::Ready(Some(event))) => match event { + PeerManagerEvent::Status(peer_id) => { + // it's time to status. We don't keep a beacon chain reference here, so we inform + // the network to send a status to this peer + return Async::Ready(NetworkBehaviourAction::GenerateEvent( + BehaviourEvent::StatusPeer(peer_id), + )); + } + PeerManagerEvent::Ping(peer_id) => { + // send a ping to this peer + self.send_ping(RequestId::from(0usize), peer_id); + } + PeerManagerEvent::MetaData(peer_id) => { + self.send_meta_data_request(peer_id); + } + PeerManagerEvent::DisconnectPeer(_peer_id) => { + //TODO: Implement + } + PeerManagerEvent::BanPeer(_peer_id) => { + //TODO: Implement + } + }, + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) | Err(_) => { + crit!(self.log, "Error polling peer manager"); + break; + } + } + } + if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } @@ -369,6 +499,7 @@ impl NetworkBehaviourEventPr } /// The types of events than can be obtained from polling the behaviour. +#[derive(Debug)] pub enum BehaviourEvent { /// A received RPC event and the peer that it was received from. RPC(PeerId, RPCEvent), @@ -377,7 +508,7 @@ pub enum BehaviourEvent { /// A peer has disconnected. PeerDisconnected(PeerId), /// A gossipsub message has been received. - GossipMessage { + PubsubMessage { /// The gossipsub message id. Used when propagating blocks after validation. id: MessageId, /// The peer from which we received this message, not the peer that published it. @@ -389,4 +520,6 @@ pub enum BehaviourEvent { }, /// Subscribed to peer for given topic PeerSubscribed(PeerId, TopicHash), + /// Inform the network to send a Status to this peer. + StatusPeer(PeerId), } diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs b/beacon_node/eth2-libp2p/src/discovery/enr.rs similarity index 84% rename from beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs rename to beacon_node/eth2-libp2p/src/discovery/enr.rs index de3960b9bf..6cd3beac41 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr_helpers.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr.rs @@ -1,10 +1,12 @@ +//! Helper functions and an extension trait for Ethereum 2 ENRs. + use super::ENR_FILENAME; -use crate::Enr; +use crate::types::{Enr, EnrBitfield}; use crate::NetworkConfig; use libp2p::core::identity::Keypair; use libp2p::discv5::enr::{CombinedKey, EnrBuilder}; use slog::{debug, warn}; -use ssz::Encode; +use ssz::{Decode, Encode}; use ssz_types::BitVector; use std::convert::TryInto; use std::fs::File; @@ -18,6 +20,33 @@ pub const ETH2_ENR_KEY: &'static str = "eth2"; /// The ENR field specifying the subnet bitfield. pub const BITFIELD_ENR_KEY: &'static str = "attnets"; +/// Extension trait for ENR's within Eth2. +pub trait Eth2Enr { + /// The subnet bitfield associated with the ENR. + fn bitfield(&self) -> Result, &'static str>; + + fn eth2(&self) -> Result; +} + +impl Eth2Enr for Enr { + fn bitfield(&self) -> Result, &'static str> { + let bitfield_bytes = self + .get(BITFIELD_ENR_KEY) + .ok_or_else(|| "ENR bitfield non-existent")?; + + BitVector::::from_ssz_bytes(bitfield_bytes) + .map_err(|_| "Could not decode the ENR SSZ bitfield") + } + + fn eth2(&self) -> Result { + let eth2_bytes = self + .get(ETH2_ENR_KEY) + .ok_or_else(|| "ENR has no eth2 field")?; + + EnrForkId::from_ssz_bytes(eth2_bytes).map_err(|_| "Could not decode EnrForkId") + } +} + /// Loads an ENR from file if it exists and matches the current NodeId and sequence number. If none /// exists, generates a new one. /// @@ -76,7 +105,7 @@ pub fn build_or_load_enr( } /// Builds a lighthouse ENR given a `NetworkConfig`. -fn build_enr( +pub fn build_enr( enr_key: &CombinedKey, config: &NetworkConfig, enr_fork_id: EnrForkId, diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index 466a3b7e0c..f7b7fea93f 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -1,11 +1,13 @@ ///! This manages the discovery and management of peers. -mod enr_helpers; +pub(crate) mod enr; + +// Allow external use of the lighthouse ENR builder +pub use enr::build_enr; use crate::metrics; -use crate::types::EnrBitfield; -use crate::Enr; -use crate::{error, NetworkConfig, NetworkGlobals, PeerInfo}; -use enr_helpers::{BITFIELD_ENR_KEY, ETH2_ENR_KEY}; +use crate::rpc::MetaData; +use crate::{error, Enr, NetworkConfig, NetworkGlobals}; +use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; use libp2p::core::{identity::Keypair, ConnectedPoint, Multiaddr, PeerId}; use libp2p::discv5::enr::NodeId; @@ -71,23 +73,18 @@ impl Discovery { pub fn new( local_key: &Keypair, config: &NetworkConfig, - enr_fork_id: EnrForkId, network_globals: Arc>, log: &slog::Logger, ) -> error::Result { let log = log.clone(); - // checks if current ENR matches that found on disk - let local_enr = - enr_helpers::build_or_load_enr::(local_key.clone(), config, enr_fork_id, &log)?; - - *network_globals.local_enr.write() = Some(local_enr.clone()); - let enr_dir = match config.network_dir.to_str() { Some(path) => String::from(path), None => String::from(""), }; + let local_enr = network_globals.local_enr.read().clone(); + info!(log, "ENR Initialised"; "enr" => local_enr.to_base64(), "seq" => local_enr.seq(), "id"=> format!("{}",local_enr.node_id()), "ip" => format!("{:?}", local_enr.ip()), "udp"=> format!("{:?}", local_enr.udp()), "tcp" => format!("{:?}", local_enr.tcp())); let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); @@ -174,13 +171,7 @@ impl Discovery { let id = *subnet_id as usize; let local_enr = self.discovery.local_enr(); - let bitfield_bytes = local_enr - .get(BITFIELD_ENR_KEY) - .ok_or_else(|| "ENR bitfield non-existent")?; - - let mut current_bitfield = - BitVector::::from_ssz_bytes(bitfield_bytes) - .map_err(|_| "Could not decode local ENR SSZ bitfield")?; + let mut current_bitfield = local_enr.bitfield::()?; if id >= current_bitfield.len() { return Err(format!( @@ -211,6 +202,8 @@ impl Discovery { .discovery .enr_insert(BITFIELD_ENR_KEY, current_bitfield.as_ssz_bytes()); + // replace the global version + *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); Ok(()) } @@ -240,6 +233,9 @@ impl Discovery { "error" => format!("{:?}", e) ) }); + + // replace the global version with discovery version + *self.network_globals.local_enr.write() = self.discovery.local_enr().clone(); } /// A request to find peers on a given subnet. @@ -247,20 +243,12 @@ impl Discovery { // This currently checks for currently connected peers and if we don't have // PEERS_WANTED_BEFORE_DISCOVERY connected to a given subnet we search for more. pub fn peers_request(&mut self, subnet_id: SubnetId) { - // TODO: Add PeerManager struct to do this loop for us - let peers_on_subnet = self .network_globals - .connected_peer_set + .peers .read() - .values() - .fold(0, |found_peers, peer_info| { - if peer_info.on_subnet(subnet_id) { - found_peers + 1 - } else { - found_peers - } - }); + .peers_on_subnet(&subnet_id) + .count() as u64; if peers_on_subnet < TARGET_SUBNET_PEERS { let target_peers = TARGET_SUBNET_PEERS - peers_on_subnet; @@ -325,27 +313,21 @@ impl Discovery { // pick a random NodeId let random_node = NodeId::random(); - let enr_fork_id = self.enr_fork_id().to_vec(); + let enr_fork_id = match self.local_enr().eth2() { + Ok(v) => v, + Err(e) => { + crit!(self.log, "Local ENR has no fork id"; "error" => e); + return; + } + }; // predicate for finding nodes with a matching fork - let eth2_fork_predicate = move |enr: &Enr| enr.get(ETH2_ENR_KEY) == Some(&enr_fork_id); + let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone()); let predicate = move |enr: &Enr| eth2_fork_predicate(enr) && enr_predicate(enr); // general predicate self.discovery .find_enr_predicate(random_node, predicate, num_nodes); } - - /// Returns our current `eth2` field as SSZ bytes, associated with the local ENR. We only search for peers - /// that have this field. - fn enr_fork_id(&self) -> Vec { - self.local_enr() - .get(ETH2_ENR_KEY) - .map(|bytes| bytes.clone()) - .unwrap_or_else(|| { - crit!(self.log, "Local ENR has no eth2 field"); - Vec::new() - }) - } } // Redirect all behaviour events to underlying discovery behaviour. @@ -365,36 +347,44 @@ where self.discovery.addresses_of_peer(peer_id) } - fn inject_connected(&mut self, peer_id: PeerId, _endpoint: ConnectedPoint) { + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + // TODO: Replace with PeerManager with custom behvaviour // Find ENR info about a peer if possible. - let mut peer_info = PeerInfo::new(); + + match endpoint { + ConnectedPoint::Dialer { .. } => { + self.network_globals + .peers + .write() + .connect_outgoing(&peer_id); + } + ConnectedPoint::Listener { .. } => { + self.network_globals.peers.write().connect_ingoing(&peer_id); + } + } + if let Some(enr) = self.discovery.enr_of_peer(&peer_id) { - let bitfield = match enr.get(BITFIELD_ENR_KEY) { - Some(bitfield_bytes) => { - match EnrBitfield::::from_ssz_bytes(bitfield_bytes) { - Ok(bitfield) => bitfield, - Err(e) => { - warn!(self.log, "Peer had invalid ENR bitfield"; + let bitfield = match enr.bitfield::() { + Ok(v) => v, + Err(e) => { + warn!(self.log, "Peer has invalid ENR bitfield"; "peer_id" => format!("{}", peer_id), "error" => format!("{:?}", e)); - return; - } - } - } - None => { - warn!(self.log, "Peer has no ENR bitfield"; - "peer_id" => format!("{}", peer_id)); return; } }; - peer_info.enr_bitfield = Some(bitfield); + // use this as a baseline, until we get the actual meta-data + let meta_data = MetaData { + seq_number: 0, + attnets: bitfield, + }; + self.network_globals + .peers + .write() + .add_metadata(&peer_id, meta_data); } - self.network_globals - .connected_peer_set - .write() - .insert(peer_id, peer_info); // TODO: Drop peers if over max_peer limit metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); @@ -405,10 +395,7 @@ where } fn inject_disconnected(&mut self, peer_id: &PeerId, _endpoint: ConnectedPoint) { - self.network_globals - .connected_peer_set - .write() - .remove(peer_id); + self.network_globals.peers.write().disconnect(peer_id); metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); metrics::set_gauge( @@ -471,7 +458,7 @@ where // peers that get discovered during a query but are not contactable or // don't match a predicate can end up here. For debugging purposes we // log these to see if we are unnecessarily dropping discovered peers - if enr.get(ETH2_ENR_KEY) == Some(&self.enr_fork_id().to_vec()) { + if enr.eth2() == self.local_enr().eth2() { trace!(self.log, "Peer found in process of query"; "peer_id" => format!("{}", enr.peer_id()), "tcp_socket" => enr.tcp_socket()); } else { // this is temporary warning for debugging the DHT @@ -484,7 +471,7 @@ where let mut address = Multiaddr::from(socket.ip()); address.push(Protocol::Tcp(self.tcp_port)); let enr = self.discovery.local_enr(); - enr_helpers::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); + enr::save_enr_to_disk(Path::new(&self.enr_dir), enr, &self.log); return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address, @@ -513,9 +500,9 @@ where if self.network_globals.connected_peers() < self.max_peers && self .network_globals - .connected_peer_set + .peers .read() - .get(&peer_id) + .peer_info(&peer_id) .is_none() && !self.banned_peers.contains(&peer_id) { diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 37e8aebfed..e751760a40 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -7,19 +7,19 @@ extern crate lazy_static; pub mod behaviour; mod config; -mod discovery; +pub mod discovery; mod metrics; +mod peer_manager; pub mod rpc; mod service; pub mod types; -// shift this type into discv5 -pub type Enr = libp2p::discv5::enr::Enr; - -pub use crate::types::{error, GossipTopic, NetworkGlobals, PeerInfo, PubsubMessage}; +pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; +pub use behaviour::BehaviourEvent; pub use config::Config as NetworkConfig; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm}; +pub use peer_manager::{PeerDB, PeerInfo}; pub use rpc::RPCEvent; -pub use service::{Libp2pEvent, Service}; +pub use service::Service; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs new file mode 100644 index 0000000000..78bb22493f --- /dev/null +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -0,0 +1,287 @@ +//! Implementation of a Lighthouse's peer management system. + +pub use self::peerdb::*; +use crate::rpc::MetaData; +use crate::{NetworkGlobals, PeerId}; +use futures::prelude::*; +use futures::Stream; +use hashmap_delay::HashSetDelay; +use slog::{crit, debug, error, warn}; +use smallvec::SmallVec; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use types::EthSpec; + +mod peer_info; +mod peerdb; + +pub use peer_info::PeerInfo; +/// The minimum reputation before a peer is disconnected. +// Most likely this needs tweaking +const MINIMUM_REPUTATION_BEFORE_BAN: Rep = 20; +/// The time in seconds between re-status's peers. +const STATUS_INTERVAL: u64 = 300; +/// The time in seconds between PING events. We do not send a ping if the other peer as PING'd us within +/// this time frame (Seconds) +const PING_INTERVAL: u64 = 30; + +/// The main struct that handles peer's reputation and connection status. +pub struct PeerManager { + /// Storage of network globals to access the PeerDB. + network_globals: Arc>, + /// A queue of events that the `PeerManager` is waiting to produce. + events: SmallVec<[PeerManagerEvent; 5]>, + /// A collection of peers awaiting to be Ping'd. + ping_peers: HashSetDelay, + /// A collection of peers awaiting to be Status'd. + status_peers: HashSetDelay, + /// Last updated moment. + last_updated: Instant, + /// The logger associated with the `PeerManager`. + log: slog::Logger, +} + +/// A collection of actions a peer can perform which will adjust its reputation +/// Each variant has an associated reputation change. +pub enum PeerAction { + /// The peer timed out on an RPC request/response. + TimedOut = -10, + /// The peer sent and invalid request/response or encoding. + InvalidMessage = -20, + /// The peer sent something objectively malicious. + Malicious = -50, + /// Received an expected message. + ValidMessage = 20, + /// Peer disconnected. + Disconnected = -30, +} + +/// The events that the PeerManager outputs (requests). +pub enum PeerManagerEvent { + /// Sends a STATUS to a peer. + Status(PeerId), + /// Sends a PING to a peer. + Ping(PeerId), + /// Request METADATA from a peer. + MetaData(PeerId), + /// The peer should be disconnected. + DisconnectPeer(PeerId), + /// The peer should be disconnected and banned. + BanPeer(PeerId), +} + +impl PeerManager { + pub fn new(network_globals: Arc>, log: &slog::Logger) -> Self { + PeerManager { + network_globals, + events: SmallVec::new(), + last_updated: Instant::now(), + ping_peers: HashSetDelay::new(Duration::from_secs(PING_INTERVAL)), + status_peers: HashSetDelay::new(Duration::from_secs(STATUS_INTERVAL)), + log: log.clone(), + } + } + + /* Public accessible functions */ + + /// A ping request has been received. + // NOTE: The behaviour responds with a PONG automatically + pub fn ping_request(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a ping + // reset the to-ping timer for this peer + self.ping_peers.insert(peer_id.clone()); + + // if the sequence number is unknown send update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "ping_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PING from an unknown peer"; "peer_id" => format!("{}", peer_id)); + } + } + + /// A PONG has been returned from a peer. + pub fn pong_response(&mut self, peer_id: &PeerId, seq: u64) { + if let Some(peer_info) = self.network_globals.peers.read().peer_info(peer_id) { + // received a pong + + // if the sequence number is unknown send update the meta data of the peer. + if let Some(meta_data) = &peer_info.meta_data { + if meta_data.seq_number < seq { + debug!(self.log, "Requesting new metadata from peer"; "peer_id" => format!("{}", peer_id), "known_seq_no" => meta_data.seq_number, "pong_seq_no" => seq); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + // if we don't know the meta-data, request it + debug!(self.log, "Requesting first metadata from peer"; "peer_id" => format!("{}", peer_id)); + self.events + .push(PeerManagerEvent::MetaData(peer_id.clone())); + } + } else { + crit!(self.log, "Received a PONG from an unknown peer"; "peer_id" => format!("{}", peer_id)); + } + } + + /// Received a metadata response from a peer. + pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { + if let Some(known_meta_data) = &peer_info.meta_data { + if known_meta_data.seq_number < meta_data.seq_number { + debug!(self.log, "Updating peer's metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + } else { + warn!(self.log, "Received old metadata"; "peer_id" => format!("{}", peer_id), "known_seq_no" => known_meta_data.seq_number, "new_seq_no" => meta_data.seq_number); + } + } else { + // we have no meta-data for this peer, update + debug!(self.log, "Obtained peer's metadata"; "peer_id" => format!("{}", peer_id), "new_seq_no" => meta_data.seq_number); + peer_info.meta_data = Some(meta_data); + } + } else { + crit!(self.log, "Received METADATA from an unknown peer"; "peer_id" => format!("{}", peer_id)); + } + } + + /// A STATUS message has been received from a peer. This resets the status timer. + pub fn peer_statusd(&mut self, peer_id: &PeerId) { + self.status_peers.insert(peer_id.clone()); + } + + /// Checks the reputation of a peer and if it is too low, bans it and + /// sends the corresponding event. Informs if it got banned + fn gets_banned(&mut self, peer_id: &PeerId) -> bool { + // if the peer was already banned don't inform again + let mut peerdb = self.network_globals.peers.write(); + if peerdb.reputation(peer_id) < MINIMUM_REPUTATION_BEFORE_BAN + && !peerdb.connection_status(peer_id).is_banned() + { + peerdb.ban(peer_id); + self.events.push(PeerManagerEvent::BanPeer(peer_id.clone())); + return true; + } + false + } + + /// Sets a peer as disconnected. If its reputation gets too low requests + /// the peer to be banned and to be disconnected otherwise + pub fn disconnect(&mut self, peer_id: &PeerId) { + self.update_reputations(); + { + let mut peerdb = self.network_globals.peers.write(); + peerdb.disconnect(peer_id); + peerdb.add_reputation(peer_id, PeerAction::Disconnected as Rep); + } + if !self.gets_banned(peer_id) { + self.events + .push(PeerManagerEvent::DisconnectPeer(peer_id.clone())); + } + + // remove the ping and status timer for the peer + self.ping_peers.remove(peer_id); + self.status_peers.remove(peer_id); + } + + /// Sets a peer as connected as long as their reputation allows it + /// Informs if the peer was accepted + pub fn connect_ingoing(&mut self, peer_id: &PeerId) -> bool { + self.update_reputations(); + let mut peerdb = self.network_globals.peers.write(); + peerdb.new_peer(peer_id); + if !peerdb.connection_status(peer_id).is_banned() { + peerdb.connect_ingoing(peer_id); + return true; + } + // start a ping and status timer for the peer + self.ping_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id.clone()); + + false + } + + /// Sets a peer as connected as long as their reputation allows it + /// Informs if the peer was accepted + pub fn connect_outgoing(&mut self, peer_id: &PeerId) -> bool { + self.update_reputations(); + let mut peerdb = self.network_globals.peers.write(); + peerdb.new_peer(peer_id); + if !peerdb.connection_status(peer_id).is_banned() { + peerdb.connect_outgoing(peer_id); + return true; + } + // start a ping and status timer for the peer + self.ping_peers.insert(peer_id.clone()); + self.status_peers.insert(peer_id.clone()); + + false + } + + /// Provides a given peer's reputation if it exists. + pub fn get_peer_rep(&self, peer_id: &PeerId) -> Rep { + self.network_globals.peers.read().reputation(peer_id) + } + + /// Updates the reputation of known peers according to their connection + /// status and the time that has passed. + pub fn update_reputations(&mut self) { + let now = Instant::now(); + let elapsed = (now - self.last_updated).as_secs(); + // 0 seconds means now - last_updated < 0, but (most likely) not = 0. + // In this case, do nothing (updating last_updated would propagate + // rounding errors) + if elapsed > 0 { + self.last_updated = now; + // TODO decide how reputations change with time. If they get too low + // set the peers as banned + } + } + + /// Reports a peer for some action. + /// + /// If the peer doesn't exist, log a warning and insert defaults. + pub fn report_peer(&mut self, peer_id: &PeerId, action: PeerAction) { + self.update_reputations(); + self.network_globals + .peers + .write() + .add_reputation(peer_id, action as Rep); + self.update_reputations(); + } +} + +impl Stream for PeerManager { + type Item = PeerManagerEvent; + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + // poll the timeouts for pings and status' + while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { + error!(self.log, "Failed to check for peers to ping"; "error" => format!("{}",e)); + })? { + self.events.push(PeerManagerEvent::Ping(peer_id)); + } + + while let Async::Ready(Some(peer_id)) = self.ping_peers.poll().map_err(|e| { + error!(self.log, "Failed to check for peers to status"; "error" => format!("{}",e)); + })? { + self.events.push(PeerManagerEvent::Status(peer_id)); + } + + if !self.events.is_empty() { + return Ok(Async::Ready(Some(self.events.remove(0)))); + } else { + self.events.shrink_to_fit(); + } + + Ok(Async::NotReady) + } +} diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs new file mode 100644 index 0000000000..8a6ca98cd0 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/peer_manager/peer_info.rs @@ -0,0 +1,188 @@ +use super::peerdb::{Rep, DEFAULT_REPUTATION}; +use crate::rpc::MetaData; +use std::time::Instant; +use types::{EthSpec, SubnetId}; +use PeerConnectionStatus::*; + +/// Information about a given connected peer. +#[derive(Debug)] +pub struct PeerInfo { + /// The connection status of the peer + _status: PeerStatus, + /// The peers reputation + pub reputation: Rep, + /// Client managing this peer + _client: Client, + /// Connection status of this peer + pub connection_status: PeerConnectionStatus, + /// The current syncing state of the peer. The state may be determined after it's initial + /// connection. + pub syncing_status: PeerSyncingStatus, + /// The ENR subnet bitfield of the peer. This may be determined after it's initial + /// connection. + pub meta_data: Option>, +} + +impl Default for PeerInfo { + fn default() -> PeerInfo { + PeerInfo { + reputation: DEFAULT_REPUTATION, + _status: Default::default(), + _client: Client { + _client_name: "Unknown".into(), + _version: vec![0], + }, + connection_status: Default::default(), + syncing_status: PeerSyncingStatus::Unknown, + meta_data: None, + } + } +} + +impl PeerInfo { + /// Returns if the peer is subscribed to a given `SubnetId` + pub fn on_subnet(&self, subnet_id: SubnetId) -> bool { + if let Some(meta_data) = &self.meta_data { + return meta_data + .attnets + .get(*subnet_id as usize) + .unwrap_or_else(|_| false); + } + false + } +} + +#[derive(Debug)] +pub enum PeerStatus { + /// The peer is healthy + Healthy, + /// The peer is clogged. It has not been responding to requests on time + Clogged, +} + +impl Default for PeerStatus { + fn default() -> Self { + PeerStatus::Healthy + } +} + +/// Representation of the client managing a peer +#[derive(Debug)] +pub struct Client { + /// The client's name (Ex: lighthouse, prism, nimbus, etc) + _client_name: String, + /// The client's version + _version: Vec, +} + +/// Connection Status of the peer +#[derive(Debug, Clone)] +pub enum PeerConnectionStatus { + Connected { + /// number of ingoing connections + n_in: u8, + /// number of outgoing connections + n_out: u8, + }, + Disconnected { + /// last time the peer was connected or discovered + since: Instant, + }, + Banned { + /// moment when the peer was banned + since: Instant, + }, + Unknown { + /// time since we know of this peer + since: Instant, + }, +} + +#[derive(Debug, Clone)] +pub enum PeerSyncingStatus { + /// At the current state as our node. + Synced, + /// The peer is further ahead than our node and useful for block downloads. + Ahead, + /// Is behind our current head and not useful for block downloads. + Behind, + /// Not currently known as a STATUS handshake has not occurred. + Unknown, +} + +impl Default for PeerConnectionStatus { + fn default() -> Self { + PeerConnectionStatus::Unknown { + since: Instant::now(), + } + } +} + +impl PeerConnectionStatus { + /// Checks if the status is connected + pub fn is_connected(&self) -> bool { + match self { + PeerConnectionStatus::Connected { .. } => true, + _ => false, + } + } + + /// Checks if the status is banned + pub fn is_banned(&self) -> bool { + match self { + PeerConnectionStatus::Banned { .. } => true, + _ => false, + } + } + + /// Checks if the status is disconnected + pub fn is_disconnected(&self) -> bool { + match self { + Disconnected { .. } => true, + _ => false, + } + } + + /// Modifies the status to Connected and increases the number of ingoing + /// connections by one + pub fn connect_ingoing(&mut self) { + match self { + Connected { n_in, .. } => *n_in += 1, + Disconnected { .. } | Banned { .. } | Unknown { .. } => { + *self = Connected { n_in: 1, n_out: 0 } + } + } + } + + /// Modifies the status to Connected and increases the number of outgoing + /// connections by one + pub fn connect_outgoing(&mut self) { + match self { + Connected { n_out, .. } => *n_out += 1, + Disconnected { .. } | Banned { .. } | Unknown { .. } => { + *self = Connected { n_in: 0, n_out: 1 } + } + } + } + + /// Modifies the status to Disconnected and sets the last seen instant to now + pub fn disconnect(&mut self) { + *self = Disconnected { + since: Instant::now(), + }; + } + + /// Modifies the status to Banned + pub fn ban(&mut self) { + *self = Banned { + since: Instant::now(), + }; + } + + pub fn connections(&self) -> (u8, u8) { + match self { + Connected { n_in, n_out } => (*n_in, *n_out), + _ => (0, 0), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs new file mode 100644 index 0000000000..f2c792c2ea --- /dev/null +++ b/beacon_node/eth2-libp2p/src/peer_manager/peerdb.rs @@ -0,0 +1,427 @@ +use super::peer_info::{PeerConnectionStatus, PeerInfo}; +use crate::rpc::methods::MetaData; +use crate::PeerId; +use slog::warn; +use std::collections::HashMap; +use types::{EthSpec, SubnetId}; + +/// A peer's reputation. +pub type Rep = i32; + +/// Max number of disconnected nodes to remember +const MAX_DC_PEERS: usize = 30; +/// The default starting reputation for an unknown peer. +pub const DEFAULT_REPUTATION: Rep = 50; + +/// Storage of known peers, their reputation and information +pub struct PeerDB { + /// The collection of known connected peers, their status and reputation + peers: HashMap>, + /// Tracking of number of disconnected nodes + n_dc: usize, + /// PeerDB's logger + log: slog::Logger, +} + +impl PeerDB { + pub fn new(log: &slog::Logger) -> Self { + Self { + log: log.clone(), + n_dc: 0, + peers: HashMap::new(), + } + } + /// Gives the reputation of a peer, or DEFAULT_REPUTATION if it is unknown. + pub fn reputation(&self, peer_id: &PeerId) -> Rep { + self.peers + .get(peer_id) + .map_or(DEFAULT_REPUTATION, |info| info.reputation) + } + + /// Gives the ids of all known peers. + pub fn peers(&self) -> impl Iterator { + self.peers.keys() + } + + /// Returns a peer's info, if known. + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peers.get(peer_id) + } + + /// Returns a mutable reference to a peer's info if known. + pub fn peer_info_mut(&mut self, peer_id: &PeerId) -> Option<&mut PeerInfo> { + self.peers.get_mut(peer_id) + } + + /// Gives the ids of all known connected peers. + pub fn connected_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.connection_status.is_connected()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives an iterator of all peers on a given subnet. + pub fn peers_on_subnet(&self, subnet_id: &SubnetId) -> impl Iterator { + let subnet_id_filter = subnet_id.clone(); + self.peers + .iter() + .filter(move |(_, info)| { + info.connection_status.is_connected() && info.on_subnet(subnet_id_filter) + }) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known disconnected peers. + pub fn disconnected_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.connection_status.is_disconnected()) + .map(|(peer_id, _)| peer_id) + } + + /// Gives the ids of all known banned peers. + pub fn banned_peers(&self) -> impl Iterator { + self.peers + .iter() + .filter(|(_, info)| info.connection_status.is_banned()) + .map(|(peer_id, _)| peer_id) + } + + /// Returns a vector containing peers (their ids and info), sorted by + /// reputation from highest to lowest, and filtered using `is_status` + pub fn best_peers_by_status(&self, is_status: F) -> Vec<(&PeerId, &PeerInfo)> + where + F: Fn(&PeerConnectionStatus) -> bool, + { + let mut by_status = self + .peers + .iter() + .filter(|(_, info)| is_status(&info.connection_status)) + .collect::>(); + by_status.sort_by_key(|(_, info)| Rep::max_value() - info.reputation); + by_status + } + + /// Returns the peer with highest reputation that satisfies `is_status` + pub fn best_by_status(&self, is_status: F) -> Option<&PeerId> + where + F: Fn(&PeerConnectionStatus) -> bool, + { + self.peers + .iter() + .filter(|(_, info)| is_status(&info.connection_status)) + .max_by_key(|(_, info)| info.reputation) + .map(|(id, _)| id) + } + + /// Sets a peer as connected with an ingoing connection + pub fn connect_ingoing(&mut self, peer_id: &PeerId) { + let info = self + .peers + .entry(peer_id.clone()) + .or_insert_with(|| Default::default()); + + if info.connection_status.is_disconnected() { + self.n_dc -= 1; + } + info.connection_status.connect_ingoing(); + } + + /// Add the meta data of a peer. + pub fn add_metadata(&mut self, peer_id: &PeerId, meta_data: MetaData) { + if let Some(peer_info) = self.peers.get_mut(peer_id) { + peer_info.meta_data = Some(meta_data); + } else { + warn!(self.log, "Tried to add meta data for a non-existant peer"; "peer_id" => format!("{}", peer_id)); + } + } + + /// Sets a peer as connected with an outgoing connection + pub fn connect_outgoing(&mut self, peer_id: &PeerId) { + let info = self + .peers + .entry(peer_id.clone()) + .or_insert_with(|| Default::default()); + + if info.connection_status.is_disconnected() { + self.n_dc -= 1; + } + info.connection_status.connect_outgoing(); + } + + /// Sets the peer as disconnected + pub fn disconnect(&mut self, peer_id: &PeerId) { + let log_ref = &self.log; + let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + warn!(log_ref, "Disconnecting unknown peer"; + "peer_id" => format!("{:?}",peer_id)); + PeerInfo::default() + }); + + if !info.connection_status.is_disconnected() { + info.connection_status.disconnect(); + self.n_dc += 1; + } + self.shrink_to_fit(); + } + + /// Drops the peers with the lowest reputation so that the number of + /// disconnected peers is less than MAX_DC_PEERS + pub fn shrink_to_fit(&mut self) { + // for caution, but the difference should never be > 1 + while self.n_dc > MAX_DC_PEERS { + let to_drop = self + .peers + .iter() + .filter(|(_, info)| info.connection_status.is_disconnected()) + .min_by_key(|(_, info)| info.reputation) + .map(|(id, _)| id.clone()) + .unwrap(); // should be safe since n_dc > MAX_DC_PEERS > 0 + self.peers.remove(&to_drop); + self.n_dc -= 1; + } + } + + /// Sets a peer as banned + pub fn ban(&mut self, peer_id: &PeerId) { + let log_ref = &self.log; + let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + warn!(log_ref, "Banning unknown peer"; + "peer_id" => format!("{:?}",peer_id)); + PeerInfo::default() + }); + if info.connection_status.is_disconnected() { + self.n_dc -= 1; + } + info.connection_status.ban(); + } + + /// Inserts a new peer with the default PeerInfo if it is not already present + /// Returns if the peer was new to the PeerDB + pub fn new_peer(&mut self, peer_id: &PeerId) -> bool { + if !self.peers.contains_key(peer_id) { + self.peers.insert(peer_id.clone(), Default::default()); + return true; + } + false + } + + /// Sets the reputation of peer + pub fn set_reputation(&mut self, peer_id: &PeerId, rep: Rep) { + let log_ref = &self.log; + self.peers + .entry(peer_id.clone()) + .or_insert_with(|| { + warn!(log_ref, "Setting the reputation of an unknown peer"; + "peer_id" => format!("{:?}",peer_id)); + PeerInfo::default() + }) + .reputation = rep; + } + + /// Adds to a peer's reputation by `change`. If the reputation exceeds Rep's + /// upper (lower) bounds, it stays at the maximum (minimum) value + pub fn add_reputation(&mut self, peer_id: &PeerId, change: Rep) { + let log_ref = &self.log; + let info = self.peers.entry(peer_id.clone()).or_insert_with(|| { + warn!(log_ref, "Adding to the reputation of an unknown peer"; + "peer_id" => format!("{:?}",peer_id)); + PeerInfo::default() + }); + info.reputation = info.reputation.saturating_add(change); + } + + pub fn connection_status(&self, peer_id: &PeerId) -> PeerConnectionStatus { + self.peer_info(peer_id) + .map_or(PeerConnectionStatus::default(), |info| { + info.connection_status.clone() + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use slog::{o, Drain}; + use types::MinimalEthSpec; + type M = MinimalEthSpec; + + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + if enabled { + slog::Logger::root(drain.filter_level(level).fuse(), o!()) + } else { + slog::Logger::root(drain.filter(|_| false).fuse(), o!()) + } + } + + fn get_db() -> PeerDB { + let log = build_log(slog::Level::Debug, true); + PeerDB::new(&log) + } + + #[test] + fn test_peer_connected_successfully() { + let mut pdb = get_db(); + let random_peer = PeerId::random(); + + let (n_in, n_out) = (10, 20); + for _ in 0..n_in { + pdb.connect_ingoing(&random_peer); + } + for _ in 0..n_out { + pdb.connect_outgoing(&random_peer); + } + + // the peer is known + let peer_info = pdb.peer_info(&random_peer); + assert!(peer_info.is_some()); + // this is the only peer + assert_eq!(pdb.peers().count(), 1); + // the peer has the default reputation + assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); + // it should be connected, and therefore not counted as disconnected + assert_eq!(pdb.n_dc, 0); + assert!(peer_info.unwrap().connection_status.is_connected()); + assert_eq!( + peer_info.unwrap().connection_status.connections(), + (n_in, n_out) + ); + } + + #[test] + fn test_set_reputation() { + let mut pdb = get_db(); + let random_peer = PeerId::random(); + pdb.connect_ingoing(&random_peer); + + let mut rep = Rep::min_value(); + pdb.set_reputation(&random_peer, rep); + assert_eq!(pdb.reputation(&random_peer), rep); + + rep = Rep::max_value(); + pdb.set_reputation(&random_peer, rep); + assert_eq!(pdb.reputation(&random_peer), rep); + + rep = Rep::max_value() / 100; + pdb.set_reputation(&random_peer, rep); + assert_eq!(pdb.reputation(&random_peer), rep); + } + + #[test] + fn test_reputation_change() { + let mut pdb = get_db(); + + // 0 change does not change de reputation + let random_peer = PeerId::random(); + let change: Rep = 0; + pdb.connect_ingoing(&random_peer); + pdb.add_reputation(&random_peer, change); + assert_eq!(pdb.reputation(&random_peer), DEFAULT_REPUTATION); + + // overflowing change is capped + let random_peer = PeerId::random(); + let change = Rep::max_value(); + pdb.connect_ingoing(&random_peer); + pdb.add_reputation(&random_peer, change); + assert_eq!(pdb.reputation(&random_peer), Rep::max_value()); + } + + #[test] + fn test_disconnected_are_bounded() { + let mut pdb = get_db(); + + for _ in 0..MAX_DC_PEERS + 1 { + let p = PeerId::random(); + pdb.connect_ingoing(&p); + } + assert_eq!(pdb.n_dc, 0); + + for p in pdb.connected_peers().cloned().collect::>() { + pdb.disconnect(&p); + } + + assert_eq!(pdb.n_dc, MAX_DC_PEERS); + } + + #[test] + fn test_best_peers() { + let mut pdb = get_db(); + + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + pdb.new_peer(&p0); + pdb.new_peer(&p1); + pdb.new_peer(&p2); + pdb.connect_ingoing(&p0); + pdb.connect_ingoing(&p1); + pdb.connect_ingoing(&p2); + pdb.set_reputation(&p0, 70); + pdb.set_reputation(&p1, 100); + pdb.set_reputation(&p2, 50); + + let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected); + assert!(vec![&p1, &p0, &p2] + .into_iter() + .eq(best_peers.into_iter().map(|p| p.0))); + } + + #[test] + fn test_the_best_peer() { + let mut pdb = get_db(); + + let p0 = PeerId::random(); + let p1 = PeerId::random(); + let p2 = PeerId::random(); + pdb.new_peer(&p0); + pdb.new_peer(&p1); + pdb.new_peer(&p2); + pdb.connect_ingoing(&p0); + pdb.connect_ingoing(&p1); + pdb.connect_ingoing(&p2); + pdb.set_reputation(&p0, 70); + pdb.set_reputation(&p1, 100); + pdb.set_reputation(&p2, 50); + + let the_best = pdb.best_by_status(PeerConnectionStatus::is_connected); + assert!(the_best.is_some()); + // Consistency check + let best_peers = pdb.best_peers_by_status(PeerConnectionStatus::is_connected); + assert_eq!(the_best, best_peers.into_iter().map(|p| p.0).next()); + } + + #[test] + fn test_disconnected_consistency() { + let mut pdb = get_db(); + + let random_peer = PeerId::random(); + + pdb.new_peer(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + + pdb.connect_ingoing(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + pdb.disconnect(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + + pdb.connect_outgoing(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + pdb.disconnect(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + + pdb.ban(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + pdb.disconnect(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + + pdb.disconnect(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + pdb.disconnect(&random_peer); + assert_eq!(pdb.n_dc, pdb.disconnected_peers().count()); + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 893af554ce..9fd430acd1 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -2,7 +2,8 @@ use crate::rpc::methods::*; use crate::rpc::{ codec::base::OutboundCodec, protocol::{ - ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_STATUS, + ProtocolId, RPCError, RPC_BLOCKS_BY_RANGE, RPC_BLOCKS_BY_ROOT, RPC_GOODBYE, RPC_META_DATA, + RPC_PING, RPC_STATUS, }, }; use crate::rpc::{ErrorMessage, RPCErrorResponse, RPCRequest, RPCResponse}; @@ -48,6 +49,8 @@ impl Encoder for SSZInboundCodec { RPCResponse::Status(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRange(res) => res.as_ssz_bytes(), RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), + RPCResponse::Pong(res) => res.data.as_ssz_bytes(), + RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, RPCErrorResponse::InvalidRequest(err) => err.as_ssz_bytes(), RPCErrorResponse::ServerError(err) => err.as_ssz_bytes(), @@ -103,6 +106,24 @@ impl Decoder for SSZInboundCodec { }))), _ => unreachable!("Cannot negotiate an unknown version"), }, + RPC_PING => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( + &packet, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_META_DATA => match self.protocol.version.as_str() { + "1" => { + if packet.len() > 0 { + Err(RPCError::Custom( + "Get metadata request should be empty".into(), + )) + } else { + Ok(Some(RPCRequest::MetaData(PhantomData))) + } + } + _ => unreachable!("Cannot negotiate an unknown version"), + }, _ => unreachable!("Cannot negotiate an unknown protocol"), }, Ok(None) => Ok(None), @@ -146,7 +167,8 @@ impl Encoder for SSZOutboundCodec { RPCRequest::Goodbye(req) => req.as_ssz_bytes(), RPCRequest::BlocksByRange(req) => req.as_ssz_bytes(), RPCRequest::BlocksByRoot(req) => req.block_roots.as_ssz_bytes(), - RPCRequest::Phantom(_) => unreachable!("Never encode phantom data"), + RPCRequest::Ping(req) => req.as_ssz_bytes(), + RPCRequest::MetaData(_) => return Ok(()), // no metadata to encode }; // length-prefix self.inner @@ -189,6 +211,18 @@ impl Decoder for SSZOutboundCodec { )), // cannot have an empty block message. _ => unreachable!("Cannot negotiate an unknown version"), }, + RPC_PING => match self.protocol.version.as_str() { + "1" => Err(RPCError::Custom( + "PING stream terminated unexpectedly".into(), + )), // cannot have an empty block message. + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_META_DATA => match self.protocol.version.as_str() { + "1" => Err(RPCError::Custom( + "Metadata stream terminated unexpectedly".into(), + )), // cannot have an empty block message. + _ => unreachable!("Cannot negotiate an unknown version"), + }, _ => unreachable!("Cannot negotiate an unknown protocol"), } } else { @@ -219,6 +253,18 @@ impl Decoder for SSZOutboundCodec { )))), _ => unreachable!("Cannot negotiate an unknown version"), }, + RPC_PING => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::Pong(Ping { + data: u64::from_ssz_bytes(&raw_bytes)?, + }))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, + RPC_META_DATA => match self.protocol.version.as_str() { + "1" => Ok(Some(RPCResponse::MetaData(MetaData::from_ssz_bytes( + &raw_bytes, + )?))), + _ => unreachable!("Cannot negotiate an unknown version"), + }, _ => unreachable!("Cannot negotiate an unknown protocol"), } } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 4f1258ef16..fbfecaad2d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -1,5 +1,6 @@ //! Available RPC methods types and ids. +use crate::types::EnrBitfield; use ssz_derive::{Decode, Encode}; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; @@ -28,6 +29,22 @@ pub struct StatusMessage { pub head_slot: Slot, } +/// The PING request/response message. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct Ping { + /// The metadata sequence number. + pub data: u64, +} + +/// The METADATA response structure. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct MetaData { + /// A sequential counter indicating when data gets modified. + pub seq_number: u64, + /// The persistent subnet bitfield. + pub attnets: EnrBitfield, +} + /// The reason given for a `Goodbye` message. /// /// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`, @@ -136,6 +153,12 @@ pub enum RPCResponse { /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Box>), + + /// A PONG response to a PING request. + Pong(Ping), + + /// A response to a META_DATA request. + MetaData(MetaData), } /// Indicates which response is being terminated by a stream termination response. @@ -202,6 +225,8 @@ impl RPCErrorResponse { RPCResponse::Status(_) => false, RPCResponse::BlocksByRange(_) => true, RPCResponse::BlocksByRoot(_) => true, + RPCResponse::Pong(_) => false, + RPCResponse::MetaData(_) => false, }, RPCErrorResponse::InvalidRequest(_) => true, RPCErrorResponse::ServerError(_) => true, @@ -249,6 +274,8 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlocksByRoot(block) => { write!(f, "BlocksByRoot: BLock slot: {}", block.message.slot) } + RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), + RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number), } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 2beae12d08..bb9e2e711f 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -13,10 +13,11 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; pub use methods::{ - ErrorMessage, RPCErrorResponse, RPCResponse, RequestId, ResponseTermination, StatusMessage, + ErrorMessage, MetaData, RPCErrorResponse, RPCResponse, RequestId, ResponseTermination, + StatusMessage, }; pub use protocol::{RPCError, RPCProtocol, RPCRequest}; -use slog::o; +use slog::{debug, o}; use std::marker::PhantomData; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; @@ -120,9 +121,18 @@ where // if initialised the connection, report this upwards to send the HELLO request if let ConnectedPoint::Dialer { .. } = connected_point { self.events.push(NetworkBehaviourAction::GenerateEvent( - RPCMessage::PeerDialed(peer_id), + RPCMessage::PeerDialed(peer_id.clone()), )); } + + // find the peer's meta-data + debug!(self.log, "Requesting new peer's metadata"; "peer_id" => format!("{}",peer_id)); + let rpc_event = + RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); + self.events.push(NetworkBehaviourAction::SendEvent { + peer_id, + event: rpc_event, + }); } fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index e077fd7404..0f93759fef 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -9,24 +9,21 @@ use crate::rpc::{ }, methods::ResponseTermination, }; -use futures::{ - future::{self, FutureResult}, - sink, stream, Sink, Stream, -}; +use futures::future::*; +use futures::{future, sink, stream, Sink, Stream}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::marker::PhantomData; use std::time::Duration; use tokio::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::prelude::*; use tokio::timer::timeout; use tokio::util::FutureExt; use tokio_io_timeout::TimeoutStream; use types::EthSpec; /// The maximum bytes that can be sent across the RPC. -const MAX_RPC_SIZE: usize = 4_194_304; // 4M +const MAX_RPC_SIZE: usize = 1_048_576; // 1M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req"; /// Time allowed for the first byte of a request to arrive before we time out (Time To First Byte). @@ -44,6 +41,10 @@ pub const RPC_GOODBYE: &str = "goodbye"; pub const RPC_BLOCKS_BY_RANGE: &str = "beacon_blocks_by_range"; /// The `BlocksByRoot` protocol name. pub const RPC_BLOCKS_BY_ROOT: &str = "beacon_blocks_by_root"; +/// The `Ping` protocol name. +pub const RPC_PING: &str = "ping"; +/// The `MetaData` protocol name. +pub const RPC_META_DATA: &str = "metadata"; #[derive(Debug, Clone)] pub struct RPCProtocol { @@ -54,18 +55,21 @@ impl UpgradeInfo for RPCProtocol { type Info = ProtocolId; type InfoIter = Vec; + /// The list of supported RPC protocols for Lighthouse. fn protocol_info(&self) -> Self::InfoIter { vec![ ProtocolId::new(RPC_STATUS, "1", "ssz"), ProtocolId::new(RPC_GOODBYE, "1", "ssz"), ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz"), ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz"), + ProtocolId::new(RPC_PING, "1", "ssz"), + ProtocolId::new(RPC_META_DATA, "1", "ssz"), ] } } /// Tracks the types in a protocol id. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ProtocolId { /// The rpc message type/name. pub message_name: String, @@ -125,13 +129,16 @@ where type Output = InboundOutput; type Error = RPCError; - type Future = future::AndThen< - future::MapErr< - timeout::Timeout>>, - FnMapErr, - >, + type Future = future::Either< FutureResult, RPCError>, - FnAndThen, + future::AndThen< + future::MapErr< + timeout::Timeout>>, + FnMapErr, + >, + FutureResult, RPCError>, + FnAndThen, + >, >; fn upgrade_inbound( @@ -141,22 +148,36 @@ where ) -> Self::Future { match protocol.encoding.as_str() { "ssz" | _ => { + let protocol_name = protocol.message_name.clone(); let ssz_codec = BaseInboundCodec::new(SSZInboundCodec::new(protocol, MAX_RPC_SIZE)); let codec = InboundCodec::SSZ(ssz_codec); let mut timed_socket = TimeoutStream::new(socket); timed_socket.set_read_timeout(Some(Duration::from_secs(TTFB_TIMEOUT))); - Framed::new(timed_socket, codec) - .into_future() - .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .map_err(RPCError::from as FnMapErr) - .and_then({ - |(req, stream)| match req { - Some(req) => futures::future::ok((req, stream)), - None => futures::future::err(RPCError::Custom( - "Stream terminated early".into(), - )), - } - } as FnAndThen) + + let socket = Framed::new(timed_socket, codec); + + // MetaData requests should be empty, return the stream + if protocol_name == RPC_META_DATA { + futures::future::Either::A(futures::future::ok(( + RPCRequest::MetaData(PhantomData), + socket, + ))) + } else { + futures::future::Either::B( + socket + .into_future() + .timeout(Duration::from_secs(REQUEST_TIMEOUT)) + .map_err(RPCError::from as FnMapErr) + .and_then({ + |(req, stream)| match req { + Some(request) => futures::future::ok((request, stream)), + None => futures::future::err(RPCError::Custom( + "Stream terminated early".into(), + )), + } + } as FnAndThen), + ) + } } } } @@ -173,7 +194,8 @@ pub enum RPCRequest { Goodbye(GoodbyeReason), BlocksByRange(BlocksByRangeRequest), BlocksByRoot(BlocksByRootRequest), - Phantom(PhantomData), + Ping(Ping), + MetaData(PhantomData), } impl UpgradeInfo for RPCRequest { @@ -195,7 +217,8 @@ impl RPCRequest { RPCRequest::Goodbye(_) => vec![ProtocolId::new(RPC_GOODBYE, "1", "ssz")], RPCRequest::BlocksByRange(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_RANGE, "1", "ssz")], RPCRequest::BlocksByRoot(_) => vec![ProtocolId::new(RPC_BLOCKS_BY_ROOT, "1", "ssz")], - RPCRequest::Phantom(_) => Vec::new(), + RPCRequest::Ping(_) => vec![ProtocolId::new(RPC_PING, "1", "ssz")], + RPCRequest::MetaData(_) => vec![ProtocolId::new(RPC_META_DATA, "1", "ssz")], } } @@ -209,7 +232,8 @@ impl RPCRequest { RPCRequest::Goodbye(_) => false, RPCRequest::BlocksByRange(_) => true, RPCRequest::BlocksByRoot(_) => true, - RPCRequest::Phantom(_) => unreachable!("Phantom should never be initialised"), + RPCRequest::Ping(_) => true, + RPCRequest::MetaData(_) => true, } } @@ -221,7 +245,8 @@ impl RPCRequest { RPCRequest::Goodbye(_) => false, RPCRequest::BlocksByRange(_) => true, RPCRequest::BlocksByRoot(_) => true, - RPCRequest::Phantom(_) => unreachable!("Phantom should never be initialised"), + RPCRequest::Ping(_) => false, + RPCRequest::MetaData(_) => false, } } @@ -235,7 +260,8 @@ impl RPCRequest { RPCRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, RPCRequest::Status(_) => unreachable!(), RPCRequest::Goodbye(_) => unreachable!(), - RPCRequest::Phantom(_) => unreachable!("Phantom should never be initialised"), + RPCRequest::Ping(_) => unreachable!(), + RPCRequest::MetaData(_) => unreachable!(), } } } @@ -361,7 +387,8 @@ impl std::fmt::Display for RPCRequest { RPCRequest::Goodbye(reason) => write!(f, "Goodbye: {}", reason), RPCRequest::BlocksByRange(req) => write!(f, "Blocks by range: {}", req), RPCRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), - RPCRequest::Phantom(_) => unreachable!("Phantom should never be initialised"), + RPCRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), + RPCRequest::MetaData(_) => write!(f, "MetaData request"), } } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 9f265c2a93..3de905473e 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -1,8 +1,8 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::discovery::enr; use crate::multiaddr::Protocol; -use crate::rpc::RPCEvent; use crate::types::{error, GossipKind}; -use crate::{NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; +use crate::{NetworkConfig, NetworkGlobals}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ @@ -14,7 +14,6 @@ use libp2p::core::{ upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, ConnectedPoint, }; -use libp2p::gossipsub::MessageId; use libp2p::{core, noise, secio, swarm::NetworkBehaviour, PeerId, Swarm, Transport}; use slog::{crit, debug, error, info, trace, warn}; use std::fs::File; @@ -60,34 +59,33 @@ impl Service { ) -> error::Result<(Arc>, Self)> { trace!(log, "Libp2p Service starting"); + // initialise the node's ID let local_keypair = if let Some(hex_bytes) = &config.secret_key_hex { keypair_from_hex(hex_bytes)? } else { load_private_key(config, &log) }; - // load the private key from CLI flag, disk or generate a new one - let local_peer_id = PeerId::from(local_keypair.public()); - info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", local_peer_id)); + // Create an ENR or load from disk if appropriate + let enr = + enr::build_or_load_enr::(local_keypair.clone(), config, enr_fork_id, &log)?; + let local_peer_id = enr.peer_id(); // set up a collection of variables accessible outside of the network crate let network_globals = Arc::new(NetworkGlobals::new( - local_peer_id.clone(), + enr.clone(), config.libp2p_port, config.discovery_port, + &log, )); + info!(log, "Libp2p Service"; "peer_id" => format!("{:?}", enr.peer_id())); + let mut swarm = { // Set up the transport - tcp/ws with noise/secio and mplex/yamux let transport = build_transport(local_keypair.clone()); // Lighthouse network behaviour - let behaviour = Behaviour::new( - &local_keypair, - config, - network_globals.clone(), - enr_fork_id, - &log, - )?; + let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; Swarm::new(transport, behaviour, local_peer_id.clone()) }; @@ -176,42 +174,15 @@ impl Service { } impl Stream for Service { - type Item = Libp2pEvent; + type Item = BehaviourEvent; type Error = error::Error; fn poll(&mut self) -> Poll, Self::Error> { loop { match self.swarm.poll() { - Ok(Async::Ready(Some(event))) => match event { - BehaviourEvent::GossipMessage { - id, - source, - topics, - message, - } => { - trace!(self.log, "Gossipsub message received"; "service" => "Swarm"); - return Ok(Async::Ready(Some(Libp2pEvent::PubsubMessage { - id, - source, - topics, - message, - }))); - } - BehaviourEvent::RPC(peer_id, event) => { - return Ok(Async::Ready(Some(Libp2pEvent::RPC(peer_id, event)))); - } - BehaviourEvent::PeerDialed(peer_id) => { - return Ok(Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id)))); - } - BehaviourEvent::PeerDisconnected(peer_id) => { - return Ok(Async::Ready(Some(Libp2pEvent::PeerDisconnected(peer_id)))); - } - BehaviourEvent::PeerSubscribed(peer_id, topic) => { - return Ok(Async::Ready(Some(Libp2pEvent::PeerSubscribed( - peer_id, topic, - )))); - } - }, + Ok(Async::Ready(Some(event))) => { + return Ok(Async::Ready(Some(event))); + } Ok(Async::Ready(None)) => unreachable!("Swarm stream shouldn't end"), Ok(Async::NotReady) => break, _ => break, @@ -319,26 +290,6 @@ fn build_transport(local_private_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox) transport } -#[derive(Debug)] -/// Events that can be obtained from polling the Libp2p Service. -pub enum Libp2pEvent { - /// An RPC response request has been received on the swarm. - RPC(PeerId, RPCEvent), - /// Initiated the connection to a new peer. - PeerDialed(PeerId), - /// A peer has disconnected. - PeerDisconnected(PeerId), - /// Received pubsub message. - PubsubMessage { - id: MessageId, - source: PeerId, - topics: Vec, - message: PubsubMessage, - }, - /// Subscribed to peer for a topic hash. - PeerSubscribed(PeerId, TopicHash), -} - fn keypair_from_hex(hex_bytes: &str) -> error::Result { let hex_bytes = if hex_bytes.starts_with("0x") { hex_bytes[2..].to_string() diff --git a/beacon_node/eth2-libp2p/src/types/globals.rs b/beacon_node/eth2-libp2p/src/types/globals.rs index 93574f9d88..1dce8129f4 100644 --- a/beacon_node/eth2-libp2p/src/types/globals.rs +++ b/beacon_node/eth2-libp2p/src/types/globals.rs @@ -1,43 +1,56 @@ //! A collection of variables that are accessible outside of the network thread itself. -use crate::{Enr, GossipTopic, Multiaddr, PeerId, PeerInfo}; +use crate::peer_manager::PeerDB; +use crate::rpc::methods::MetaData; +use crate::{discovery::enr::Eth2Enr, Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::sync::atomic::{AtomicU16, Ordering}; use types::EthSpec; pub struct NetworkGlobals { /// The current local ENR. - pub local_enr: RwLock>, + pub local_enr: RwLock, + /// The current node's meta-data. + pub meta_data: RwLock>, /// The local peer_id. pub peer_id: RwLock, /// Listening multiaddrs. pub listen_multiaddrs: RwLock>, - /// The tcp port that the libp2p service is listening on + /// The TCP port that the libp2p service is listening on pub listen_port_tcp: AtomicU16, - /// The udp port that the discovery service is listening on + /// The UDP port that the discovery service is listening on pub listen_port_udp: AtomicU16, - /// The collection of currently connected peers. - pub connected_peer_set: RwLock>>, + /// The collection of known peers. + pub peers: RwLock>, /// The current gossipsub topic subscriptions. pub gossipsub_subscriptions: RwLock>, } impl NetworkGlobals { - pub fn new(peer_id: PeerId, tcp_port: u16, udp_port: u16) -> Self { + pub fn new(enr: Enr, tcp_port: u16, udp_port: u16, log: &slog::Logger) -> Self { + // set up the local meta data of the node + let meta_data = RwLock::new(MetaData { + seq_number: 0, + attnets: enr + .bitfield::() + .expect("Local ENR must have a bitfield specified"), + }); + NetworkGlobals { - local_enr: RwLock::new(None), - peer_id: RwLock::new(peer_id), + local_enr: RwLock::new(enr.clone()), + meta_data, + peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), listen_port_tcp: AtomicU16::new(tcp_port), listen_port_udp: AtomicU16::new(udp_port), - connected_peer_set: RwLock::new(HashMap::new()), + peers: RwLock::new(PeerDB::new(log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), } } /// Returns the local ENR from the underlying Discv5 behaviour that external peers may connect /// to. - pub fn local_enr(&self) -> Option { + pub fn local_enr(&self) -> Enr { self.local_enr.read().clone() } @@ -63,6 +76,6 @@ impl NetworkGlobals { /// Returns the number of libp2p connected peers. pub fn connected_peers(&self) -> usize { - self.connected_peer_set.read().len() + self.peers.read().connected_peers().count() } } diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2-libp2p/src/types/mod.rs index 13fa844a50..410df08df6 100644 --- a/beacon_node/eth2-libp2p/src/types/mod.rs +++ b/beacon_node/eth2-libp2p/src/types/mod.rs @@ -1,10 +1,15 @@ pub mod error; mod globals; -mod peer_info; mod pubsub; mod topics; +use types::{BitVector, EthSpec}; + +#[allow(type_alias_bounds)] +pub type EnrBitfield = BitVector; + +pub type Enr = libp2p::discv5::enr::Enr; + pub use globals::NetworkGlobals; -pub use peer_info::{EnrBitfield, PeerInfo}; pub use pubsub::PubsubMessage; pub use topics::{GossipEncoding, GossipKind, GossipTopic}; diff --git a/beacon_node/eth2-libp2p/src/types/peer_info.rs b/beacon_node/eth2-libp2p/src/types/peer_info.rs deleted file mode 100644 index 1db953c130..0000000000 --- a/beacon_node/eth2-libp2p/src/types/peer_info.rs +++ /dev/null @@ -1,45 +0,0 @@ -//NOTE: This should be removed in favour of the PeerManager PeerInfo, once built. - -use types::{BitVector, EthSpec, SubnetId}; - -#[allow(type_alias_bounds)] -pub type EnrBitfield = BitVector; - -/// Information about a given connected peer. -#[derive(Debug, Clone)] -pub struct PeerInfo { - /// The current syncing state of the peer. The state may be determined after it's initial - /// connection. - pub syncing_state: Option, - /// The ENR subnet bitfield of the peer. This may be determined after it's initial - /// connection. - pub enr_bitfield: Option>, -} - -#[derive(Debug, Clone)] -pub enum PeerSyncingState { - /// At the current state as our node. - Synced, - /// The peer is further ahead than our node and useful for block downloads. - Ahead, - /// Is behind our current head and not useful for block downloads. - Behind, -} - -impl PeerInfo { - /// Creates a new PeerInfo, specifying it's - pub fn new() -> Self { - PeerInfo { - syncing_state: None, - enr_bitfield: None, - } - } - - /// Returns if the peer is subscribed to a given `SubnetId` - pub fn on_subnet(&self, subnet_id: SubnetId) -> bool { - if let Some(bitfield) = &self.enr_bitfield { - return bitfield.get(*subnet_id as usize).unwrap_or_else(|_| false); - } - false - } -} diff --git a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs index 49d0c538db..8268fc5d87 100644 --- a/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs +++ b/beacon_node/eth2-libp2p/tests/gossipsub_tests.rs @@ -45,7 +45,7 @@ fn test_gossipsub_forward() { for node in nodes.iter_mut() { loop { match node.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::PubsubMessage { + Async::Ready(Some(BehaviourEvent::PubsubMessage { topics, message, source, @@ -68,7 +68,7 @@ fn test_gossipsub_forward() { return Ok(Async::Ready(())); } } - Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) => { + Async::Ready(Some(BehaviourEvent::PeerSubscribed(_, topic))) => { // Publish on beacon block topic if topic == TopicHash::from_raw(publishing_topic.clone()) { subscribed_count += 1; @@ -117,7 +117,7 @@ fn test_gossipsub_full_mesh_publish() { let mut received_count = 0; tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { for node in nodes.iter_mut() { - while let Async::Ready(Some(Libp2pEvent::PubsubMessage { + while let Async::Ready(Some(BehaviourEvent::PubsubMessage { topics, message, .. })) = node.poll().unwrap() { @@ -135,7 +135,7 @@ fn test_gossipsub_full_mesh_publish() { } } } - while let Async::Ready(Some(Libp2pEvent::PeerSubscribed(_, topic))) = + while let Async::Ready(Some(BehaviourEvent::PeerSubscribed(_, topic))) = publishing_node.poll().unwrap() { // Publish on beacon block topic diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs index 38713903b6..ac29f3959b 100644 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -2,6 +2,7 @@ use crate::behaviour::{Behaviour, BehaviourEvent}; use crate::multiaddr::Protocol; use ::types::{EnrForkId, MinimalEthSpec}; +use eth2_libp2p::discovery::build_enr; use eth2_libp2p::*; use futures::prelude::*; use libp2p::core::identity::Keypair; @@ -11,6 +12,7 @@ use libp2p::{ secio, PeerId, Swarm, Transport, }; use slog::{crit, debug, info, Level}; +use std::convert::TryInto; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::Arc; @@ -31,24 +33,20 @@ fn build_secio_swarm( ) -> error::Result> { let local_keypair = Keypair::generate_secp256k1(); let local_peer_id = PeerId::from(local_keypair.public()); - + let enr_key: libp2p::discv5::enr::CombinedKey = local_keypair.clone().try_into().unwrap(); + let enr = build_enr::(&enr_key, config, EnrForkId::default()).unwrap(); let network_globals = Arc::new(NetworkGlobals::new( - local_peer_id.clone(), + enr, config.libp2p_port, config.discovery_port, + &log, )); let mut swarm = { // Set up the transport - tcp/ws with secio and mplex/yamux let transport = build_secio_transport(local_keypair.clone()); // Lighthouse network behaviour - let behaviour = Behaviour::new( - &local_keypair, - config, - network_globals.clone(), - EnrForkId::default(), - &log, - )?; + let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; Swarm::new(transport, behaviour, local_peer_id.clone()) }; diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 48a57e31e5..ddf31a37e4 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -1,7 +1,7 @@ #![cfg(test)] use eth2_libp2p::rpc::methods::*; use eth2_libp2p::rpc::*; -use eth2_libp2p::{Libp2pEvent, RPCEvent}; +use eth2_libp2p::{BehaviourEvent, RPCEvent}; use slog::{warn, Level}; use std::sync::atomic::{AtomicBool, Ordering::Relaxed}; use std::sync::{Arc, Mutex}; @@ -53,29 +53,29 @@ fn test_status_rpc() { let sender_future = future::poll_fn(move || -> Poll { loop { match sender.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { // Send a STATUS message warn!(sender_log, "Sending RPC"); sender .swarm .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); } - Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the RPC response RPCEvent::Response(id, response @ RPCErrorResponse::Success(_)) => { - warn!(sender_log, "Sender Received"); - assert_eq!(id, 1); + if id == 1 { + warn!(sender_log, "Sender Received"); + let response = { + match response { + RPCErrorResponse::Success(r) => r, + _ => unreachable!(), + } + }; + assert_eq!(response, sender_response.clone()); - let response = { - match response { - RPCErrorResponse::Success(r) => r, - _ => unreachable!(), - } - }; - assert_eq!(response, sender_response.clone()); - - warn!(sender_log, "Sender Completed"); - return Ok(Async::Ready(true)); + warn!(sender_log, "Sender Completed"); + return Ok(Async::Ready(true)); + } } e => panic!("Received invalid RPC message {}", e), }, @@ -89,18 +89,20 @@ fn test_status_rpc() { let receiver_future = future::poll_fn(move || -> Poll { loop { match receiver.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(peer_id, event))) => match event { // Should receive sent RPC request RPCEvent::Request(id, request) => { - assert_eq!(id, 1); - assert_eq!(rpc_request.clone(), request); - - // send the response - warn!(log, "Receiver Received"); - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response(id, RPCErrorResponse::Success(rpc_response.clone())), - ); + if request == rpc_request { + // send the response + warn!(log, "Receiver Received"); + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), + ), + ); + } } e => panic!("Received invalid RPC message {}", e), }, @@ -166,33 +168,37 @@ fn test_blocks_by_range_chunked_rpc() { let sender_future = future::poll_fn(move || -> Poll { loop { match sender.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { // Send a BlocksByRange request warn!(sender_log, "Sender sending RPC request"); sender .swarm .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); } - Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the RPC response RPCEvent::Response(id, response) => { - warn!(sender_log, "Sender received a response"); - assert_eq!(id, 1); - match response { - RPCErrorResponse::Success(res) => { - assert_eq!(res, sender_response.clone()); - *messages_received.lock().unwrap() += 1; - warn!(sender_log, "Chunk received"); + if id == 1 { + warn!(sender_log, "Sender received a response"); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ) => { + // should be exactly 10 messages before terminating + assert_eq!( + *messages_received.lock().unwrap(), + messages_to_send + ); + // end the test + return Ok(Async::Ready(true)); + } + _ => panic!("Invalid RPC received"), } - RPCErrorResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ) => { - // should be exactly 10 messages before terminating - assert_eq!(*messages_received.lock().unwrap(), messages_to_send); - // end the test - return Ok(Async::Ready(true)); - } - _ => panic!("Invalid RPC received"), } } _ => panic!("Received invalid RPC message"), @@ -207,34 +213,33 @@ fn test_blocks_by_range_chunked_rpc() { let receiver_future = future::poll_fn(move || -> Poll { loop { match receiver.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(peer_id, event))) => match event { // Should receive the sent RPC request RPCEvent::Request(id, request) => { - assert_eq!(id, 1); - assert_eq!(rpc_request.clone(), request); + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); - // send the response - warn!(log, "Receiver got request"); - - for _ in 1..=messages_to_send { + for _ in 1..=messages_to_send { + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), + ), + ); + } + // send the stream termination receiver.swarm.send_rpc( - peer_id.clone(), + peer_id, RPCEvent::Response( id, - RPCErrorResponse::Success(rpc_response.clone()), + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), ), ); } - // send the stream termination - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCErrorResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ), - ), - ); } _ => panic!("Received invalid RPC message"), }, @@ -298,33 +303,34 @@ fn test_blocks_by_range_single_empty_rpc() { let sender_future = future::poll_fn(move || -> Poll { loop { match sender.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { // Send a BlocksByRange request warn!(sender_log, "Sender sending RPC request"); sender .swarm .send_rpc(peer_id, RPCEvent::Request(1, sender_request.clone())); } - Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the RPC response RPCEvent::Response(id, response) => { - warn!(sender_log, "Sender received a response"); - assert_eq!(id, 1); - match response { - RPCErrorResponse::Success(res) => { - assert_eq!(res, sender_response.clone()); - *messages_received.lock().unwrap() += 1; - warn!(sender_log, "Chunk received"); + if id == 1 { + warn!(sender_log, "Sender received a response"); + match response { + RPCErrorResponse::Success(res) => { + assert_eq!(res, sender_response.clone()); + *messages_received.lock().unwrap() += 1; + warn!(sender_log, "Chunk received"); + } + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ) => { + // should be exactly 1 messages before terminating + assert_eq!(*messages_received.lock().unwrap(), 1); + // end the test + return Ok(Async::Ready(true)); + } + _ => panic!("Invalid RPC received"), } - RPCErrorResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ) => { - // should be exactly 1 messages before terminating - assert_eq!(*messages_received.lock().unwrap(), 1); - // end the test - return Ok(Async::Ready(true)); - } - _ => panic!("Invalid RPC received"), } } m => panic!("Received invalid RPC message: {}", m), @@ -339,29 +345,31 @@ fn test_blocks_by_range_single_empty_rpc() { let receiver_future = future::poll_fn(move || -> Poll { loop { match receiver.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::RPC(peer_id, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(peer_id, event))) => match event { // Should receive the sent RPC request RPCEvent::Request(id, request) => { - assert_eq!(id, 1); - assert_eq!(rpc_request.clone(), request); + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); - // send the response - warn!(log, "Receiver got request"); - - receiver.swarm.send_rpc( - peer_id.clone(), - RPCEvent::Response(id, RPCErrorResponse::Success(rpc_response.clone())), - ); - // send the stream termination - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCErrorResponse::StreamTermination( - ResponseTermination::BlocksByRange, + receiver.swarm.send_rpc( + peer_id.clone(), + RPCEvent::Response( + id, + RPCErrorResponse::Success(rpc_response.clone()), ), - ), - ); + ); + // send the stream termination + receiver.swarm.send_rpc( + peer_id, + RPCEvent::Response( + id, + RPCErrorResponse::StreamTermination( + ResponseTermination::BlocksByRange, + ), + ), + ); + } } _ => panic!("Received invalid RPC message"), }, @@ -409,7 +417,7 @@ fn test_goodbye_rpc() { let sender_future = future::poll_fn(move || -> Poll { loop { match sender.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::PeerDialed(peer_id))) => { + Async::Ready(Some(BehaviourEvent::PeerDialed(peer_id))) => { // Send a Goodbye request warn!(sender_log, "Sender sending RPC request"); sender @@ -426,13 +434,15 @@ fn test_goodbye_rpc() { let receiver_future = future::poll_fn(move || -> Poll { loop { match receiver.poll().unwrap() { - Async::Ready(Some(Libp2pEvent::RPC(_, event))) => match event { + Async::Ready(Some(BehaviourEvent::RPC(_, event))) => match event { // Should receive the sent RPC request RPCEvent::Request(id, request) => { - assert_eq!(id, 0); - assert_eq!(rpc_request.clone(), request); - // receives the goodbye. Nothing left to do - return Ok(Async::Ready(true)); + if request == rpc_request { + assert_eq!(id, 0); + assert_eq!(rpc_request.clone(), request); + // receives the goodbye. Nothing left to do + return Ok(Async::Ready(true)); + } } _ => panic!("Received invalid RPC message"), }, diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index df9a364b33..e987a71682 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -47,6 +47,8 @@ pub enum RouterMessage { /// A gossip message has been received. The fields are: message id, the peer that sent us this /// message and the message itself. PubsubMessage(MessageId, PeerId, PubsubMessage), + /// The peer manager has requested we re-status a peer. + StatusPeer(PeerId), } impl Router { @@ -87,9 +89,10 @@ impl Router { /// Handle all messages incoming from the network service. fn handle_message(&mut self, message: RouterMessage) { match message { - // we have initiated a connection to a peer - RouterMessage::PeerDialed(peer_id) => { - self.processor.on_connect(peer_id); + // we have initiated a connection to a peer or the peer manager has requested a + // re-status + RouterMessage::PeerDialed(peer_id) | RouterMessage::StatusPeer(peer_id) => { + self.processor.send_status(peer_id); } // A peer has disconnected RouterMessage::PeerDisconnected(peer_id) => { @@ -143,7 +146,8 @@ impl Router { RPCRequest::BlocksByRoot(request) => self .processor .on_blocks_by_root_request(peer_id, request_id, request), - RPCRequest::Phantom(_) => unreachable!("Phantom never initialised"), + RPCRequest::Ping(_) => unreachable!("Ping MUST be handled in the behaviour"), + RPCRequest::MetaData(_) => unreachable!("MetaData MUST be handled in the behaviour"), } } @@ -187,6 +191,12 @@ impl Router { Some(beacon_block), ); } + RPCResponse::Pong(_) => { + unreachable!("Ping must be handled in the behaviour"); + } + RPCResponse::MetaData(_) => { + unreachable!("Meta data must be handled in the behaviour"); + } }, RPCErrorResponse::StreamTermination(response_type) => { // have received a stream termination, notify the processing functions diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index c297bf5e18..ca996b86b1 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -114,10 +114,11 @@ impl Processor { self.send_to_sync(SyncMessage::RPCError(peer_id, request_id)); } - /// Handle the connection of a new peer. - /// /// Sends a `Status` message to the peer. - pub fn on_connect(&mut self, peer_id: PeerId) { + /// + /// Called when we first connect to a peer, or when the PeerManager determines we need to + /// re-status. + pub fn send_status(&mut self, peer_id: PeerId) { if let Some(status_message) = status_message(&self.chain) { debug!( self.log, @@ -521,7 +522,7 @@ impl Processor { } BlockProcessingOutcome::ParentUnknown { .. } => { // Inform the sync manager to find parents for this block - trace!(self.log, "Block with unknown parent received"; + debug!(self.log, "Block with unknown parent received"; "peer_id" => format!("{:?}",peer_id)); self.send_to_sync(SyncMessage::UnknownBlock(peer_id, block)); } @@ -592,7 +593,7 @@ impl Processor { } AttestationProcessingOutcome::UnknownHeadBlock { beacon_block_root } => { // TODO: Maintain this attestation and re-process once sync completes - trace!( + debug!( self.log, "Attestation for unknown block"; "peer_id" => format!("{:?}", peer_id), diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 409ea70ca9..6db0a21a77 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -7,7 +7,7 @@ use crate::{ }; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{rpc::RPCRequest, Enr, Libp2pEvent, MessageId, NetworkGlobals, PeerId, Swarm}; +use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId, Swarm}; use eth2_libp2p::{PubsubMessage, RPCEvent}; use futures::prelude::*; use futures::Stream; @@ -280,7 +280,7 @@ fn spawn_service( loop { match service.libp2p.poll() { Ok(Async::Ready(Some(event))) => match event { - Libp2pEvent::RPC(peer_id, rpc_event) => { + BehaviourEvent::RPC(peer_id, rpc_event) => { // if we received a Goodbye message, drop and ban the peer if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { peers_to_ban.push(peer_id.clone()); @@ -289,19 +289,25 @@ fn spawn_service( .try_send(RouterMessage::RPC(peer_id, rpc_event)) .map_err(|_| { debug!(log, "Failed to send RPC to router");} )?; } - Libp2pEvent::PeerDialed(peer_id) => { - debug!(log, "Peer Dialed"; "peer_id" => format!("{:?}", peer_id)); + BehaviourEvent::PeerDialed(peer_id) => { + debug!(log, "Peer Dialed"; "peer_id" => format!("{}", peer_id)); service.router_send .try_send(RouterMessage::PeerDialed(peer_id)) .map_err(|_| { debug!(log, "Failed to send peer dialed to router");})?; } - Libp2pEvent::PeerDisconnected(peer_id) => { - debug!(log, "Peer Disconnected"; "peer_id" => format!("{:?}", peer_id)); + BehaviourEvent::PeerDisconnected(peer_id) => { + debug!(log, "Peer Disconnected"; "peer_id" => format!("{}", peer_id)); service.router_send .try_send(RouterMessage::PeerDisconnected(peer_id)) .map_err(|_| { debug!(log, "Failed to send peer disconnect to router");})?; } - Libp2pEvent::PubsubMessage { + BehaviourEvent::StatusPeer(peer_id) => { + debug!(log, "Re-status peer"; "peer_id" => format!("{}", peer_id)); + service.router_send + .try_send(RouterMessage::StatusPeer(peer_id)) + .map_err(|_| { debug!(log, "Failed to send re-status peer to router");})?; + } + BehaviourEvent::PubsubMessage { id, source, message, @@ -329,7 +335,7 @@ fn spawn_service( } } } - Libp2pEvent::PeerSubscribed(_, _) => {} + BehaviourEvent::PeerSubscribed(_, _) => {} }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), Ok(Async::NotReady) => break, diff --git a/beacon_node/rest_api/src/network.rs b/beacon_node/rest_api/src/network.rs index 1589c0368f..43b48b2a12 100644 --- a/beacon_node/rest_api/src/network.rs +++ b/beacon_node/rest_api/src/network.rs @@ -34,12 +34,7 @@ pub fn get_enr( req: Request, network: Arc>, ) -> ApiResult { - ResponseBuilder::new(&req)?.body_no_ssz( - &network - .local_enr() - .map(|enr| enr.to_base64()) - .unwrap_or_else(|| "".into()), - ) + ResponseBuilder::new(&req)?.body_no_ssz(&network.local_enr().to_base64()) } /// HTTP handler to return the `PeerId` from the client's libp2p service. @@ -68,9 +63,9 @@ pub fn get_peer_list( network: Arc>, ) -> ApiResult { let connected_peers: Vec = network - .connected_peer_set + .peers .read() - .keys() + .connected_peers() .map(PeerId::to_string) .collect(); ResponseBuilder::new(&req)?.body_no_ssz(&connected_peers) diff --git a/eth2/utils/hashmap_delay/src/hashset_delay.rs b/eth2/utils/hashmap_delay/src/hashset_delay.rs index 1ef0bdf658..bd93d6c8e7 100644 --- a/eth2/utils/hashmap_delay/src/hashset_delay.rs +++ b/eth2/utils/hashmap_delay/src/hashset_delay.rs @@ -57,14 +57,20 @@ where self.insert_at(key, self.default_entry_timeout); } - /// Inserts an entry that will expire at a given instant. + /// Inserts an entry that will expire at a given instant. If the entry already exists, the + /// timeout is updated. pub fn insert_at(&mut self, key: K, entry_duration: Duration) { - let delay_key = self.expirations.insert(key.clone(), entry_duration.clone()); - let entry = MapEntry { - key: delay_key, - value: Instant::now() + entry_duration, - }; - self.entries.insert(key, entry); + if self.contains(&key) { + // update the timeout + self.update_timeout(&key, entry_duration); + } else { + let delay_key = self.expirations.insert(key.clone(), entry_duration.clone()); + let entry = MapEntry { + key: delay_key, + value: Instant::now() + entry_duration, + }; + self.entries.insert(key, entry); + } } /// Gets a reference to an entry if it exists.