Further progress towards porting eth2-libp2p adds caching to discovery

This commit is contained in:
Age Manning
2020-05-01 22:53:33 +10:00
parent 9e6ae448a6
commit f3e707c3db
6 changed files with 217 additions and 44 deletions

View File

@@ -1,29 +1,34 @@
///! This manages the discovery and management of peers.
pub(crate) mod enr;
pub mod enr_ext;
// Allow external use of the lighthouse ENR builder
pub use enr::{build_enr, CombinedKey, Keypair};
use enr_ext::{CombinedKeyExt, ENRExt};
use crate::metrics;
use crate::{error, Enr, NetworkConfig, NetworkGlobals};
use discv5::{enr::NodeId, Discv5, Discv5Event};
use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY};
use futures::prelude::*;
use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p::core::{connection::ConnectionId, Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::{
protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour,
NetworkBehaviourAction, PollParameters, ProtocolsHandler,
};
use lru::LruCache;
use slog::{crit, debug, info, warn};
use ssz::{Decode, Encode};
use ssz_types::BitVector;
use std::collections::{HashSet, VecDeque};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use std::{
collections::{HashSet, VecDeque},
net::SocketAddr,
path::Path,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::time::{delay_until, Delay, Instant};
use types::{EnrForkId, EthSpec, SubnetId};
@@ -42,6 +47,9 @@ pub struct Discovery<TSpec: EthSpec> {
/// Events to be processed by the behaviour.
events: VecDeque<NetworkBehaviourAction<void::Void, Discv5Event>>,
/// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs.
cached_enrs: LruCache<PeerId, Enr>,
/// The currently banned peers.
banned_peers: HashSet<PeerId>,
@@ -94,9 +102,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port);
// convert the keypair into an ENR key
let enr_key: CombinedKey = local_key
.try_into()
.map_err(|_| "Invalid key type for ENR records")?;
let enr_key: CombinedKey = CombinedKey::from_libp2p(&local_key)?;
let mut discovery = Discv5::new(
local_enr,
@@ -128,6 +134,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(Self {
events: VecDeque::with_capacity(16),
cached_enrs: LruCache::new(50),
banned_peers: HashSet::new(),
max_peers: config.max_peers,
peer_discovery_delay: delay_until(Instant::now()),
@@ -154,6 +161,9 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Add an ENR to the routing table of the discovery mechanism.
pub fn add_enr(&mut self, enr: Enr) {
// add the enr to seen caches
self.cached_enrs.put(enr.peer_id(), enr.clone());
let _ = self.discovery.add_enr(enr).map_err(|e| {
warn!(
self.log,
@@ -181,7 +191,20 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// Returns the ENR of a known peer if it exists.
pub fn enr_of_peer(&mut self, peer_id: &PeerId) -> Option<Enr> {
self.discovery.enr_of_peer(peer_id)
// first search the local cache
if let Some(enr) = self.cached_enrs.get(peer_id) {
return Some(enr.clone());
}
// not in the local cache, look in the routing table
/* TODO: Correct this function
if let Some(node_id) = peer_id_to_node_id(peer_id) {
// TODO: Need to update discv5
// self.discovery.find_enr(&node_id)
} else {
None
}
*/
None
}
/// Adds/Removes a subnet from the ENR Bitfield
@@ -359,23 +382,11 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
// TODO
// Addresses are ordered by decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets.
/*
if let Some(node_id) = self.known_peer_ids.get(peer_id) {
let key = kbucket::Key::from(node_id.clone());
let mut out_list =
if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
entry.value().multiaddr().to_vec()
} else {
Vec::new()
};
if let Some(enr) = self.enr_of_peer(peer_id) {
// ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP
// port is removed, which is assumed to be associated with the discv5 protocol (and
// therefore irrelevant for other libp2p components).
let out_list = enr.multiaddr();
out_list.retain(|addr| {
addr.iter()
.find(|v| match v {
@@ -390,15 +401,13 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// PeerId is not known
Vec::new()
}
*/
Vec::new()
}
// ignore libp2p connections/streams
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_connected(&mut self, _: &PeerId) {}
// ignore libp2p connections/streams
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
// no libp2p discv5 events - event originate from the session_service.
fn inject_event(
@@ -412,7 +421,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
fn poll(
&mut self,
params: &mut impl PollParameters,
cx: &mut Context,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
@@ -421,7 +431,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
> {
// search for peers if it is time
loop {
match self.peer_discovery_delay.poll() {
match self.peer_discovery_delay.poll_unpin(cx) {
Poll::Ready(_) => {
if self.network_globals.connected_peers() < self.max_peers {
self.find_peers();
@@ -432,16 +442,13 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
);
}
Poll::Pending => break,
Err(e) => {
warn!(self.log, "Discovery peer search failed"; "error" => format!("{:?}", e));
}
}
}
// Poll discovery
loop {
match self.discovery.poll(params) {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
match self.discovery.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
match event {
Discv5Event::Discovered(_enr) => {
// peers that get discovered during a query but are not contactable or
@@ -481,9 +488,12 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
self.peer_discovery_delay
.reset(Instant::now() + Duration::from_secs(delay));
for peer_id in closer_peers {
// if we need more peers, attempt a connection
for enr in closer_peers {
// cache known peers
let peer_id = enr.peer_id();
self.cached_enrs.put(enr.peer_id(), enr);
// if we need more peers, attempt a connection
if self.network_globals.connected_or_dialing_peers()
< self.max_peers
&& !self