diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 31c0a7f2d4..aded78b02f 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" beacon_chain = { path = "../beacon_chain" } clap = "2.32.0" # SigP repository until PR is merged -libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "fb852bcc2b9b3935555cc93930e913cbec2b0688" } +#libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "fb852bcc2b9b3935555cc93930e913cbec2b0688" } +libp2p = { path = "../../../sharding/rust-libp2p" } types = { path = "../../eth2/types" } serde = "1.0" serde_derive = "1.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index b808818534..e952d1f81a 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -1,3 +1,4 @@ +use crate::discovery::Discovery; use crate::rpc::{RPCEvent, RPCMessage, Rpc}; use crate::NetworkConfig; use crate::{Topic, TopicHash}; @@ -9,7 +10,7 @@ use libp2p::{ }, gossipsub::{Gossipsub, GossipsubEvent}, identify::{protocol::IdentifyInfo, Identify, IdentifyEvent}, - kad::{Kademlia, KademliaOut}, + kad::KademliaOut, ping::{Ping, PingEvent}, tokio_io::{AsyncRead, AsyncWrite}, NetworkBehaviour, PeerId, @@ -18,12 +19,8 @@ use slog::{debug, o, trace, warn}; use ssz::{ssz_encode, Decode, DecodeError, Encode}; use std::time::{Duration, Instant}; use tokio_timer::Delay; ->>>>>>> Adds Kademlia for peer discovery use types::{Attestation, BeaconBlock}; -//TODO: Make this dynamic -const TIME_BETWEEN_KAD_REQUESTS: Duration = Duration::from_secs(30); - /// Builds the network behaviour for the libp2p Swarm. /// Implements gossipsub message routing. #[derive(NetworkBehaviour)] @@ -38,13 +35,10 @@ pub struct Behaviour { /// Keep regular connection to peers and disconnect if absent. ping: Ping, /// Kademlia for peer discovery. - kad: Kademlia, + discovery: Discovery, /// Queue of behaviour events to be processed. #[behaviour(ignore)] events: Vec, - /// The delay until we next search for more peers. - #[behaviour(ignore)] - kad_delay: Delay, /// Logger for behaviour actions. #[behaviour(ignore)] log: slog::Logger, @@ -116,6 +110,12 @@ impl NetworkBehaviourEventProcess format!("{:?}", peer_id), "Addresses" => format!("{:?}", info.listen_addrs)); + // inject the found addresses into our discovery behaviour + for address in &info.listen_addrs { + self.discovery + .add_connected_address(&peer_id, address.clone()); + } } IdentifyEvent::Error { .. } => {} IdentifyEvent::SendBack { .. } => {} @@ -131,31 +131,12 @@ impl NetworkBehaviourEventProcess } } -// implement the kademlia behaviour +// implement the discovery behaviour (currently kademlia) impl NetworkBehaviourEventProcess for Behaviour { - fn inject_event(&mut self, out: KademliaOut) { - match out { - KademliaOut::Discovered { peer_id, .. } => { - debug!(self.log, "Kademlia peer discovered: {:?}", peer_id); - // send this to our topology behaviour - } - KademliaOut::KBucketAdded { .. } => { - // send this to our topology behaviour - } - KademliaOut::FindNodeResult { closer_peers, .. } => { - debug!( - self.log, - "Kademlia query found {} peers", - closer_peers.len() - ); - if closer_peers.is_empty() { - warn!(self.log, "Kademlia random query yielded empty results"); - } - } - KademliaOut::GetProvidersResult { .. } => (), - } + fn inject_event(&mut self, _out: KademliaOut) { + // not interested in kademlia results at the moment } } @@ -168,7 +149,7 @@ impl Behaviour { Behaviour { serenity_rpc: Rpc::new(log), gossipsub: Gossipsub::new(local_peer_id.clone(), net_conf.gs_config.clone()), - kad: Kademlia::new(local_peer_id), + discovery: Discovery::new(local_peer_id, log), identify: Identify::new( identify_config.version, identify_config.user_agent, @@ -176,7 +157,6 @@ impl Behaviour { ), ping: Ping::new(), events: Vec::new(), - kad_delay: Delay::new(Instant::now()), log: behaviour_log, } } @@ -189,19 +169,6 @@ impl Behaviour { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } - // check to see if it's time to search for me peers with kademlia - loop { - match self.kad_delay.poll() { - Ok(Async::Ready(_)) => { - self.get_kad_peers(); - } - Ok(Async::NotReady) => break, - Err(e) => { - warn!(self.log, "Error getting peers from Kademlia. Err: {:?}", e); - } - } - } - Async::NotReady } } @@ -225,18 +192,6 @@ impl Behaviour { self.gossipsub.publish(topic, message_bytes.clone()); } } - - /// Queries for more peers randomly using Kademlia. - pub fn get_kad_peers(&mut self) { - // pick a random PeerId - let random_peer = PeerId::random(); - debug!(self.log, "Running kademlia random peer query"); - self.kad.find_node(random_peer); - - // update the kademlia timeout - self.kad_delay - .reset(Instant::now() + TIME_BETWEEN_KAD_REQUESTS); - } } /// The types of events than can be obtained from polling the behaviour. diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 1a3f3ad3d9..b6857cd378 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -36,6 +36,7 @@ impl Default for Config { gs_config: GossipsubConfigBuilder::new() .max_gossip_size(4_000_000) .inactivity_timeout(Duration::from_secs(90)) + .heartbeat_interval(Duration::from_secs(20)) .build(), identify_config: IdentifyConfig::default(), boot_nodes: vec![], diff --git a/beacon_node/eth2-libp2p/src/discovery.rs b/beacon_node/eth2-libp2p/src/discovery.rs new file mode 100644 index 0000000000..232590c05a --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery.rs @@ -0,0 +1,182 @@ +/// This manages the discovery and management of peers. +/// +/// Currently using Kademlia for peer discovery. +/// +use futures::prelude::*; +use libp2p::core::swarm::{ + ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters, +}; +use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler}; +use libp2p::kad::{Kademlia, KademliaOut}; +use slog::{debug, o, warn}; +use std::collections::HashMap; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_timer::Delay; + +//TODO: Make this dynamic +const TIME_BETWEEN_KAD_REQUESTS: Duration = Duration::from_secs(30); + +/// Maintains a list of discovered peers and implements the discovery protocol to discover new +/// peers. +pub struct Discovery { + /// Queue of events to processed. + // TODO: Re-implement as discovery protocol grows + // events: Vec>, + /// The discovery behaviour used to discover new peers. + discovery: Kademlia, + /// The delay between peer discovery searches. + peer_discovery_delay: Delay, + /// Mapping of known addresses for peer ids. + known_peers: HashMap>, + /// Logger for the discovery behaviour. + log: slog::Logger, +} + +impl Discovery { + pub fn new(local_peer_id: PeerId, log: &slog::Logger) -> Self { + let log = log.new(o!("Service" => "Libp2p-Discovery")); + Self { + // events: Vec::new(), + discovery: Kademlia::new(local_peer_id), + peer_discovery_delay: Delay::new(Instant::now()), + known_peers: HashMap::new(), + log, + } + } + + /// Uses discovery to search for new peers. + pub fn find_peers(&mut self) { + // pick a random PeerId + let random_peer = PeerId::random(); + debug!(self.log, "Searching for peers..."); + self.discovery.find_node(random_peer); + + // update the kademlia timeout + self.peer_discovery_delay + .reset(Instant::now() + TIME_BETWEEN_KAD_REQUESTS); + } + + /// We have discovered an address for a peer, add it to known peers. + pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) { + let known_peers = self + .known_peers + .entry(peer_id.clone()) + .or_insert_with(|| vec![]); + if !known_peers.contains(&address) { + known_peers.push(address.clone()); + } + // pass the address on to kademlia + self.discovery.add_connected_address(peer_id, address); + } +} + +// Redirect all behaviour event to underlying discovery behaviour. +impl NetworkBehaviour for Discovery +where + TSubstream: AsyncRead + AsyncWrite, +{ + type ProtocolsHandler = as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = as NetworkBehaviour>::OutEvent; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + NetworkBehaviour::new_handler(&mut self.discovery) + } + + // TODO: we store all peers in known_peers, when upgrading to discv5 we will avoid duplication + // of peer storage. + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + if let Some(addresses) = self.known_peers.get(peer_id) { + addresses.clone() + } else { + debug!( + self.log, + "Tried to dial: {:?} but no address stored", peer_id + ); + Vec::new() + } + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + NetworkBehaviour::inject_connected(&mut self.discovery, peer_id, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + NetworkBehaviour::inject_disconnected(&mut self.discovery, peer_id, endpoint) + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) { + NetworkBehaviour::inject_replaced(&mut self.discovery, peer_id, closed, opened) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: ::OutEvent, + ) { + // TODO: Upgrade to discv5 + NetworkBehaviour::inject_node_event(&mut self.discovery, peer_id, event) + } + + fn poll( + &mut self, + params: &mut PollParameters, + ) -> Async< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + // check to see if it's time to search for peers + loop { + match self.peer_discovery_delay.poll() { + Ok(Async::Ready(_)) => { + self.find_peers(); + } + Ok(Async::NotReady) => break, + Err(e) => { + warn!( + self.log, + "Error getting peers from discovery behaviour. Err: {:?}", e + ); + } + } + } + // Poll discovery + match self.discovery.poll(params) { + Async::Ready(action) => { + match &action { + NetworkBehaviourAction::GenerateEvent(disc_output) => match disc_output { + KademliaOut::Discovered { + peer_id, addresses, .. + } => { + debug!(self.log, "Kademlia peer discovered"; "Peer"=> format!("{:?}", peer_id), "Addresses" => format!("{:?}", addresses)); + (*self + .known_peers + .entry(peer_id.clone()) + .or_insert_with(|| vec![])) + .extend(addresses.clone()); + } + KademliaOut::FindNodeResult { closer_peers, .. } => { + debug!( + self.log, + "Kademlia query found {} peers", + closer_peers.len() + ); + if closer_peers.is_empty() { + debug!(self.log, "Kademlia random query yielded empty results"); + } + return Async::Ready(action); + } + _ => {} + }, + _ => {} + }; + return Async::Ready(action); + } + Async::NotReady => (), + } + + Async::NotReady + } +} diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 5597f9107d..197c074df4 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -4,6 +4,7 @@ /// This crate builds and manages the libp2p services required by the beacon node. pub mod behaviour; mod config; +mod discovery; pub mod error; pub mod rpc; mod service;