From f3e707c3db3174d2fafc700d631c647c01756288 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 1 May 2020 22:53:33 +1000 Subject: [PATCH] Further progress towards porting eth2-libp2p adds caching to discovery --- Cargo.lock | 1 + beacon_node/eth2-libp2p/Cargo.toml | 1 + beacon_node/eth2-libp2p/src/discovery/enr.rs | 9 +- .../eth2-libp2p/src/discovery/enr_ext.rs | 163 ++++++++++++++++++ beacon_node/eth2-libp2p/src/discovery/mod.rs | 86 +++++---- beacon_node/eth2-libp2p/src/lib.rs | 1 + 6 files changed, 217 insertions(+), 44 deletions(-) create mode 100644 beacon_node/eth2-libp2p/src/discovery/enr_ext.rs diff --git a/Cargo.lock b/Cargo.lock index 05ad0662b1..d239912ca3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,6 +1315,7 @@ dependencies = [ "smallvec 1.4.0", "snap", "tempdir", + "tiny-keccak 2.0.2", "tokio 0.2.20", "tokio-io-timeout", "tokio-util", diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index f1e22a90e3..652ec4c475 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -34,6 +34,7 @@ tokio-io-timeout = "0.4.0" tokio-util = { version = "0.3.1", features = ["codec"] } libp2p = "0.18.1" discv5 = "0.1.0-alpha.1" +tiny-keccak = "2.0.2" [dev-dependencies] slog-stdlog = "4.0.0" diff --git a/beacon_node/eth2-libp2p/src/discovery/enr.rs b/beacon_node/eth2-libp2p/src/discovery/enr.rs index 53cdc4d441..3f3cbe12f7 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr.rs @@ -1,15 +1,15 @@ //! Helper functions and an extension trait for Ethereum 2 ENRs. -pub use discv5::enr::{CombinedKey, EnrBuilder}; +pub use discv5::enr::{self, CombinedKey, EnrBuilder}; pub use libp2p::core::identity::Keypair; use super::ENR_FILENAME; use crate::types::{Enr, EnrBitfield}; +use crate::CombinedKeyExt; use crate::NetworkConfig; use slog::{debug, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; -use std::convert::TryInto; use std::fs::File; use std::io::prelude::*; use std::path::Path; @@ -62,10 +62,7 @@ pub fn build_or_load_enr( // Build the local ENR. // Note: Discovery should update the ENR record's IP to the external IP as seen by the // majority of our peers, if the CLI doesn't expressly forbid it. - let enr_key: CombinedKey = local_key - .try_into() - .map_err(|_| "Invalid key type for ENR records")?; - + let enr_key = CombinedKey::from_libp2p(&local_key)?; let mut local_enr = build_enr::(&enr_key, config, enr_fork_id)?; let enr_f = config.network_dir.join(ENR_FILENAME); diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs new file mode 100644 index 0000000000..c32008b624 --- /dev/null +++ b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs @@ -0,0 +1,163 @@ +//! ENR extension trait to support libp2p integration. +use crate::{Enr, Multiaddr, PeerId}; +use discv5::enr::{CombinedKey, CombinedPublicKey}; +use libp2p::core::{identity::Keypair, multiaddr::Protocol}; +use tiny_keccak::{Hasher, Keccak}; + +/// Extend ENR for libp2p types. +pub trait ENRExt { + /// The libp2p `PeerId` for the record. + fn peer_id(&self) -> PeerId; + + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// The vector remains empty if these fields are not defined. + fn multiaddr(&self) -> Vec; +} + +/// Extend ENR CombinedPublicKey for libp2p types. +pub trait CombinedKeyPublicExt { + /// Converts the publickey into a peer id, without consuming the key. + fn into_peer_id(&self) -> PeerId; +} + +/// Extend ENR CombinedKey for conversion to libp2p keys. +pub trait CombinedKeyExt { + /// Converts a libp2p key into an ENR combined key. + fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result; +} + +impl ENRExt for Enr { + /// The libp2p `PeerId` for the record. + fn peer_id(&self) -> PeerId { + self.public_key().into_peer_id() + } + + /// Returns a list of multiaddrs if the ENR has an `ip` and either a `tcp` or `udp` key **or** an `ip6` and either a `tcp6` or `udp6`. + /// The vector remains empty if these fields are not defined. + /// + /// Note: Only available with the `libp2p` feature flag. + fn multiaddr(&self) -> Vec { + let mut multiaddrs: Vec = Vec::new(); + if let Some(ip) = self.ip() { + if let Some(udp) = self.udp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Udp(udp)); + multiaddrs.push(multiaddr); + } + + if let Some(tcp) = self.tcp() { + let mut multiaddr: Multiaddr = ip.into(); + multiaddr.push(Protocol::Tcp(tcp)); + multiaddrs.push(multiaddr); + } + } + if let Some(ip6) = self.ip6() { + if let Some(udp6) = self.udp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Udp(udp6)); + multiaddrs.push(multiaddr); + } + + if let Some(tcp6) = self.tcp6() { + let mut multiaddr: Multiaddr = ip6.into(); + multiaddr.push(Protocol::Tcp(tcp6)); + multiaddrs.push(multiaddr); + } + } + multiaddrs + } +} + +impl CombinedKeyPublicExt for CombinedPublicKey { + /// Converts the publickey into a peer id, without consuming the key. + /// + /// This is only available with the `libp2p` feature flag. + fn into_peer_id(&self) -> PeerId { + match self { + Self::Secp256k1(pk) => { + let pk_bytes = pk.serialize_compressed(); + let libp2p_pk = libp2p::core::PublicKey::Secp256k1( + libp2p::core::identity::secp256k1::PublicKey::decode(&pk_bytes) + .expect("valid public key"), + ); + PeerId::from_public_key(libp2p_pk) + } + Self::Ed25519(pk) => { + let pk_bytes = pk.to_bytes(); + let libp2p_pk = libp2p::core::PublicKey::Ed25519( + libp2p::core::identity::ed25519::PublicKey::decode(&pk_bytes) + .expect("valid public key"), + ); + PeerId::from_public_key(libp2p_pk) + } + } + } +} + +impl CombinedKeyExt for CombinedKey { + fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result { + match key { + Keypair::Secp256k1(key) => { + let secret = discv5::enr::secp256k1::SecretKey::parse(&key.secret().to_bytes()) + .expect("libp2p key must be valid"); + Ok(CombinedKey::Secp256k1(secret)) + } + Keypair::Ed25519(key) => { + let ed_keypair = + discv5::enr::ed25519_dalek::SecretKey::from_bytes(&key.encode()[..32]) + .expect("libp2p key must be valid"); + Ok(CombinedKey::from(ed_keypair)) + } + _ => Err("ENR: Unsupported libp2p key type"), + } + } +} + +// helper function to convert a peer_id to a node_id. This is only possible for secp256k1 libp2p +// peer_ids +fn peer_id_to_node_id(peer_id: &PeerId) -> Option { + let bytes = peer_id.as_bytes(); + // must be the identity hash + if bytes.len() == 34 && bytes[0] == 0x00 { + // left over is potentially secp256k1 key + + if let Ok(key) = discv5::enr::secp256k1::PublicKey::parse(&bytes[1..]) { + let uncompressed_key_bytes = key.serialize(); + let mut output = [0_u8; 32]; + let mut hasher = Keccak::v256(); + hasher.update(&uncompressed_key_bytes); + hasher.finalize(&mut output); + return Some(discv5::enr::NodeId::parse(&output).expect("Must be correct length")); + } + } + None +} + +mod tests { + use super::*; + use std::convert::TryInto; + + #[test] + fn test_peer_id_conversion() { + let key = discv5::enr::secp256k1::SecretKey::parse_slice( + &hex::decode("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + .unwrap(), + ) + .unwrap(); + + let peer_id: PeerId = + hex::decode("1220dd86cd1b9414f4b9b42a1b1258390ee9097298126df92a61789483ac90801ed6") + .unwrap() + .try_into() + .unwrap(); + + let node_id = peer_id_to_node_id(&peer_id).unwrap(); + + let enr = { + let mut builder = discv5::enr::EnrBuilder::new("v4"); + builder.build(&key).unwrap() + }; + + assert_eq!(enr.node_id(), node_id); + } +} diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index b0a06f62c0..b937537760 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -1,29 +1,34 @@ ///! This manages the discovery and management of peers. pub(crate) mod enr; +pub mod enr_ext; // Allow external use of the lighthouse ENR builder pub use enr::{build_enr, CombinedKey, Keypair}; +use enr_ext::{CombinedKeyExt, ENRExt}; use crate::metrics; use crate::{error, Enr, NetworkConfig, NetworkGlobals}; use discv5::{enr::NodeId, Discv5, Discv5Event}; use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; -use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; +use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId}; use libp2p::multiaddr::Protocol; use libp2p::swarm::{ protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, }; +use lru::LruCache; use slog::{crit, debug, info, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; -use std::collections::{HashSet, VecDeque}; -use std::net::SocketAddr; -use std::path::Path; -use std::sync::Arc; -use std::task::Poll; -use std::time::Duration; +use std::{ + collections::{HashSet, VecDeque}, + net::SocketAddr, + path::Path, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; use tokio::time::{delay_until, Delay, Instant}; use types::{EnrForkId, EthSpec, SubnetId}; @@ -42,6 +47,9 @@ pub struct Discovery { /// Events to be processed by the behaviour. events: VecDeque>, + /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. + cached_enrs: LruCache, + /// The currently banned peers. banned_peers: HashSet, @@ -94,9 +102,7 @@ impl Discovery { let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); // convert the keypair into an ENR key - let enr_key: CombinedKey = local_key - .try_into() - .map_err(|_| "Invalid key type for ENR records")?; + let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?; let mut discovery = Discv5::new( local_enr, @@ -128,6 +134,7 @@ impl Discovery { Ok(Self { events: VecDeque::with_capacity(16), + cached_enrs: LruCache::new(50), banned_peers: HashSet::new(), max_peers: config.max_peers, peer_discovery_delay: delay_until(Instant::now()), @@ -154,6 +161,9 @@ impl Discovery { /// Add an ENR to the routing table of the discovery mechanism. pub fn add_enr(&mut self, enr: Enr) { + // add the enr to seen caches + self.cached_enrs.put(enr.peer_id(), enr.clone()); + let _ = self.discovery.add_enr(enr).map_err(|e| { warn!( self.log, @@ -181,7 +191,20 @@ impl Discovery { /// Returns the ENR of a known peer if it exists. pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option { - self.discovery.enr_of_peer(peer_id) + // first search the local cache + if let Some(enr) = self.cached_enrs.get(peer_id) { + return Some(enr.clone()); + } + // not in the local cache, look in the routing table + /* TODO: Correct this function + if let Some(node_id) = peer_id_to_node_id(peer_id) { + // TODO: Need to update discv5 + // self.discovery.find_enr(&node_id) + } else { + None + } + */ + None } /// Adds/Removes a subnet from the ENR Bitfield @@ -359,23 +382,11 @@ impl NetworkBehaviour for Discovery { } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - // TODO - // Addresses are ordered by decreasing likelyhood of connectivity, so start with - // the addresses of that peer in the k-buckets. - - /* - if let Some(node_id) = self.known_peer_ids.get(peer_id) { - let key = kbucket::Key::from(node_id.clone()); - let mut out_list = - if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) { - entry.value().multiaddr().to_vec() - } else { - Vec::new() - }; - + if let Some(enr) = self.enr_of_peer(peer_id) { // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP // port is removed, which is assumed to be associated with the discv5 protocol (and // therefore irrelevant for other libp2p components). + let out_list = enr.multiaddr(); out_list.retain(|addr| { addr.iter() .find(|v| match v { @@ -390,15 +401,13 @@ impl NetworkBehaviour for Discovery { // PeerId is not known Vec::new() } - */ - Vec::new() } // ignore libp2p connections/streams - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} // ignore libp2p connections/streams - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} // no libp2p discv5 events - event originate from the session_service. fn inject_event( @@ -412,7 +421,8 @@ impl NetworkBehaviour for Discovery { fn poll( &mut self, - params: &mut impl PollParameters, + cx: &mut Context, + _: &mut impl PollParameters, ) -> Poll< NetworkBehaviourAction< ::InEvent, @@ -421,7 +431,7 @@ impl NetworkBehaviour for Discovery { > { // search for peers if it is time loop { - match self.peer_discovery_delay.poll() { + match self.peer_discovery_delay.poll_unpin(cx) { Poll::Ready(_) => { if self.network_globals.connected_peers() < self.max_peers { self.find_peers(); @@ -432,16 +442,13 @@ impl NetworkBehaviour for Discovery { ); } Poll::Pending => break, - Err(e) => { - warn!(self.log, "Discovery peer search failed"; "error" => format!("{:?}", e)); - } } } // Poll discovery loop { - match self.discovery.poll(params) { - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + match self.discovery.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => { match event { Discv5Event::Discovered(_enr) => { // peers that get discovered during a query but are not contactable or @@ -481,9 +488,12 @@ impl NetworkBehaviour for Discovery { self.peer_discovery_delay .reset(Instant::now() + Duration::from_secs(delay)); - for peer_id in closer_peers { - // if we need more peers, attempt a connection + for enr in closer_peers { + // cache known peers + let peer_id = enr.peer_id(); + self.cached_enrs.put(enr.peer_id(), enr); + // if we need more peers, attempt a connection if self.network_globals.connected_or_dialing_peers() < self.max_peers && !self diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 9230a4afb0..7346866b11 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -17,6 +17,7 @@ pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; pub use behaviour::BehaviourEvent; pub use config::Config as NetworkConfig; +pub use discovery::enr_ext::{CombinedKeyExt, ENRExt}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{multiaddr, Multiaddr}; pub use libp2p::{PeerId, Swarm};