Upgrade to tokio 0.3 (#1839)

## Description

This PR updates Lighthouse to tokio 0.3. It includes a number of dependency updates and some structural changes as to how we create and spawn tasks.

This also brings with it a number of various improvements:

- Discv5 update
- Libp2p update
- Fix for recompilation issues
- Improved UPnP port mapping handling
- Futures dependency update
- Log downgrade to traces for rejecting peers when we've reached our max



Co-authored-by: blacktemplar <blacktemplar@a1.net>
This commit is contained in:
Age Manning
2020-11-28 05:30:57 +00:00
parent 5a3b94cbb4
commit a567f788bd
81 changed files with 3666 additions and 2762 deletions

View File

@@ -5,7 +5,8 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"
[dependencies]
discv5 = { git = "https://github.com/sigp/discv5", rev = "fba7ceb5cfebd219ebbad6ffdb5d8c31dc8e4bc0", features = ["libp2p"] }
discv5 = { version = "0.1.0-beta.2", features = ["libp2p"] }
unsigned-varint = { git = "https://github.com/sigp/unsigned-varint", branch = "dep-update", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
eth2_ssz_types = { path = "../../consensus/ssz_types" }
@@ -15,15 +16,15 @@ eth2_ssz = "0.1.2"
eth2_ssz_derive = "0.1.0"
slog = { version = "2.5.2", features = ["max_level_trace"] }
lighthouse_version = { path = "../../common/lighthouse_version" }
tokio = { version = "0.2.22", features = ["time", "macros"] }
futures = "0.3.5"
tokio = { version = "0.3.2", features = ["time", "macros"] }
futures = "0.3.7"
error-chain = "0.12.4"
dirs = "3.0.1"
fnv = "1.0.7"
unsigned-varint = { git = "https://github.com/sigp/unsigned-varint", branch = "latest-codecs", features = ["codec"] }
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
smallvec = "1.4.2"
tokio-io-timeout = "0.5.0"
lru = "0.6.0"
parking_lot = "0.11.0"
sha2 = "0.9.1"
@@ -31,8 +32,7 @@ base64 = "0.13.0"
snap = "1.0.1"
void = "1.0.2"
hex = "0.4.2"
tokio-io-timeout = "0.4.0"
tokio-util = { version = "0.3.1", features = ["codec", "compat"] }
tokio-util = { version = "0.4.0", features = ["codec", "compat"] }
tiny-keccak = "2.0.2"
task_executor = { path = "../../common/task_executor" }
rand = "0.7.3"
@@ -42,12 +42,12 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "f53d02bc873fef2bf52cd31e3d5ce366a41d8a8c"
rev = "e3caf9e0e5e78c9d51c6dccf0d6277cef553bb25"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]
[dev-dependencies]
tokio = { version = "0.2.22", features = ["full"] }
tokio = { version = "0.3.2", features = ["full"] }
slog-term = "2.6.0"
slog-async = "2.5.0"
tempdir = "0.3.7"

View File

@@ -983,7 +983,14 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
};
if let Some(goodbye_reason) = goodbye_reason {
debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.to_string());
match goodbye_reason {
GoodbyeReason::Banned => {
debug!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.to_string())
}
_ => {
trace!(self.log, "Disconnecting newly connected peer"; "peer_id" => peer_id.to_string(), "reason" => goodbye_reason.to_string())
}
}
self.peers_to_dc
.push_back((peer_id.clone(), Some(goodbye_reason)));
// NOTE: We don't inform the peer manager that this peer is disconnecting. It is simply
@@ -1079,6 +1086,8 @@ impl<TSpec: EthSpec> NetworkBehaviour for Behaviour<TSpec> {
// Inform the behaviour.
delegate_to_behaviours!(self, inject_disconnected, peer_id);
debug!(self.log, "Peer disconnected"; "peer_id" => %peer_id);
// Decrement the PEERS_PER_CLIENT metric
if let Some(kind) = self
.network_globals

View File

@@ -212,7 +212,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// Start the discv5 service and obtain an event stream
let event_stream = if !config.disable_discovery {
discv5.start(listen_socket).map_err(|e| e.to_string())?;
discv5
.start(listen_socket)
.map_err(|e| e.to_string())
.await?;
debug!(log, "Discovery service started");
EventStream::Awaiting(Box::pin(discv5.event_stream()))
} else {
@@ -712,8 +715,10 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
return;
}
};
// predicate for finding nodes with a matching fork
let eth2_fork_predicate = move |enr: &Enr| enr.eth2() == Ok(enr_fork_id.clone());
// predicate for finding nodes with a matching fork and valid tcp port
let eth2_fork_predicate = move |enr: &Enr| {
enr.eth2() == Ok(enr_fork_id.clone()) && (enr.tcp().is_some() || enr.tcp6().is_some())
};
// General predicate
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
@@ -743,7 +748,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
Ok(r) => {
debug!(self.log, "Discovery query completed"; "peers_found" => r.len());
let mut results: HashMap<PeerId, Option<Instant>> = HashMap::new();
let mut results: HashMap<_, Option<Instant>> = HashMap::new();
r.iter().for_each(|enr| {
// cache the found ENR's
self.cached_enrs.put(enr.peer_id(), enr.clone());
@@ -766,7 +771,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(r) => {
debug!(self.log, "Peer grouped subnet discovery request completed"; "peers_found" => r.len(), "subnets_searched_for" => format!("{:?}",subnets_searched_for));
let mut mapped_results: HashMap<PeerId, Option<Instant>> = HashMap::new();
let mut mapped_results = HashMap::new();
// cache the found ENR's
for enr in r.iter().cloned() {

View File

@@ -7,6 +7,8 @@ extern crate lazy_static;
pub mod behaviour;
mod config;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod discovery;
mod metrics;
mod peer_manager;
@@ -64,6 +66,7 @@ pub use config::Config as NetworkConfig;
pub use config::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage};
pub use discovery::{CombinedKeyExt, EnrExt, Eth2Enr};
pub use discv5;
pub use libp2p::bandwidth::BandwidthSinks;
pub use libp2p::gossipsub::{MessageAcceptance, MessageId, Topic, TopicHash};
pub use libp2p::{core::ConnectedPoint, PeerId, Swarm};
pub use libp2p::{multiaddr, Multiaddr};

View File

@@ -27,6 +27,7 @@ pub use libp2p::core::{identity::Keypair, Multiaddr};
pub mod client;
mod peer_info;
mod peer_sync_status;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
mod peerdb;
pub(crate) mod score;
@@ -639,6 +640,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
/// with a new `PeerId` which involves a discovery routing table lookup. We could dial the
/// multiaddr here, however this could relate to duplicate PeerId's etc. If the lookup
/// proves resource constraining, we should switch to multiaddr dialling here.
#[allow(clippy::mutable_key_type)]
fn peers_discovered(&mut self, results: HashMap<PeerId, Option<Instant>>) {
let mut to_dial_peers = Vec::new();

View File

@@ -22,7 +22,8 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use tokio::time::{delay_queue, delay_until, Delay, DelayQueue, Instant as TInstant};
use tokio::time::{sleep_until, Instant as TInstant, Sleep};
use tokio_util::time::{delay_queue, DelayQueue};
use types::EthSpec;
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
@@ -132,7 +133,7 @@ enum HandlerState {
///
/// While in this state the handler rejects new requests but tries to finish existing ones.
/// Once the timer expires, all messages are killed.
ShuttingDown(Delay),
ShuttingDown(Sleep),
/// The handler is deactivated. A goodbye has been sent and no more messages are sent or
/// received.
Deactivated,
@@ -255,7 +256,7 @@ where
self.dial_queue.push((id, req));
}
self.state = HandlerState::ShuttingDown(delay_until(
self.state = HandlerState::ShuttingDown(sleep_until(
TInstant::now() + Duration::from_secs(SHUTDOWN_TIMEOUT_SECS as u64),
));
}
@@ -540,7 +541,7 @@ where
// purge expired inbound substreams and send an error
loop {
match self.inbound_substreams_delay.poll_next_unpin(cx) {
match self.inbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(inbound_id))) => {
// handle a stream timeout for various states
if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) {
@@ -574,7 +575,7 @@ where
// purge expired outbound substreams
loop {
match self.outbound_substreams_delay.poll_next_unpin(cx) {
match self.outbound_substreams_delay.poll_expired(cx) {
Poll::Ready(Some(Ok(outbound_id))) => {
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
@@ -672,6 +673,7 @@ where
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
break;
} else {
// If we are not removing this substream, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.

View File

@@ -503,8 +503,8 @@ impl From<ssz::DecodeError> for RPCError {
RPCError::SSZDecodeError(err)
}
}
impl From<tokio::time::Elapsed> for RPCError {
fn from(_: tokio::time::Elapsed) -> Self {
impl From<tokio::time::error::Elapsed> for RPCError {
fn from(_: tokio::time::error::Elapsed) -> Self {
RPCError::StreamTimeout
}
}

View File

@@ -12,6 +12,7 @@ use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed,
};
use libp2p::{
bandwidth::{BandwidthLogging, BandwidthSinks},
core, noise,
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
@@ -48,10 +49,10 @@ pub enum Libp2pEvent<TSpec: EthSpec> {
pub struct Service<TSpec: EthSpec> {
/// The libp2p Swarm handler.
pub swarm: Swarm<Behaviour<TSpec>>,
/// The bandwidth logger for the underlying libp2p transport.
pub bandwidth: Arc<BandwidthSinks>,
/// This node's PeerId.
pub local_peer_id: PeerId,
/// The libp2p logger handle.
pub log: Logger,
}
@@ -100,10 +101,11 @@ impl<TSpec: EthSpec> Service<TSpec> {
};
debug!(log, "Attempting to open listening ports"; "address" => format!("{}", config.listen_address), "tcp_port" => config.libp2p_port, "udp_port" => discovery_string);
let mut swarm = {
let (mut swarm, bandwidth) = {
// Set up the transport - tcp/ws with noise and mplex
let transport = build_transport(local_keypair.clone())
let (transport, bandwidth) = build_transport(local_keypair.clone())
.map_err(|e| format!("Failed to build transport: {:?}", e))?;
// Lighthouse network behaviour
let behaviour = Behaviour::new(
&local_keypair,
@@ -121,14 +123,17 @@ impl<TSpec: EthSpec> Service<TSpec> {
self.0.spawn(f, "libp2p");
}
}
SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero"))
.connection_event_buffer_size(64)
.incoming_connection_limit(10)
.outgoing_connection_limit(config.target_peers * 2)
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
.executor(Box::new(Executor(executor)))
.build()
(
SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.notify_handler_buffer_size(std::num::NonZeroUsize::new(32).expect("Not zero"))
.connection_event_buffer_size(64)
.incoming_connection_limit(10)
.outgoing_connection_limit(config.target_peers * 2)
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
.executor(Box::new(Executor(executor)))
.build(),
bandwidth,
)
};
// listen on the specified address
@@ -221,6 +226,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
let service = Service {
local_peer_id,
bandwidth,
swarm,
log,
};
@@ -273,7 +279,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
endpoint: _,
num_established,
} => {
debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => format!("{:?}", cause), "connections" => num_established);
trace!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => format!("{:?}", cause), "connections" => num_established);
}
SwarmEvent::NewListenAddr(multiaddr) => {
return Libp2pEvent::NewListenAddr(multiaddr)
@@ -282,7 +288,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
local_addr,
send_back_addr,
} => {
debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string())
trace!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string())
}
SwarmEvent::IncomingConnectionError {
local_addr,
@@ -329,9 +335,13 @@ impl<TSpec: EthSpec> Service<TSpec> {
}
}
type BoxedTransport = Boxed<(PeerId, StreamMuxerBox)>;
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and
/// mplex as the multiplexing layer.
fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
fn build_transport(
local_private_key: Keypair,
) -> std::io::Result<(BoxedTransport, Arc<BandwidthSinks>)> {
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")]
@@ -340,21 +350,26 @@ fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId,
transport.or_transport(libp2p::websocket::WsConfig::new(trans_clone))
};
let (transport, bandwidth) = BandwidthLogging::new(transport);
// mplex config
let mut mplex_config = libp2p::mplex::MplexConfig::new();
mplex_config.max_buffer_len(256);
mplex_config.max_buffer_len_behaviour(libp2p::mplex::MaxBufferBehaviour::Block);
mplex_config.set_max_buffer_size(256);
mplex_config.set_max_buffer_behaviour(libp2p::mplex::MaxBufferBehaviour::Block);
// Authentication
Ok(transport
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
mplex_config,
))
.timeout(Duration::from_secs(10))
.boxed())
Ok((
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(core::upgrade::SelectUpgrade::new(
libp2p::yamux::YamuxConfig::default(),
mplex_config,
))
.timeout(Duration::from_secs(10))
.boxed(),
bandwidth,
))
}
// Useful helper functions for debugging. Currently not used in the client.

View File

@@ -6,7 +6,9 @@ use eth2_libp2p::Service as LibP2PService;
use eth2_libp2p::{GossipsubConfigBuilder, Libp2pEvent, NetworkConfig};
use slog::{debug, error, o, Drain};
use std::net::{TcpListener, UdpSocket};
use std::sync::Weak;
use std::time::Duration;
use tokio::runtime::Runtime;
use types::{ChainSpec, EnrForkId, MinimalEthSpec};
type E = MinimalEthSpec;
@@ -91,19 +93,18 @@ pub fn build_config(port: u16, mut boot_nodes: Vec<Enr>) -> NetworkConfig {
config
}
pub async fn build_libp2p_instance(boot_nodes: Vec<Enr>, log: slog::Logger) -> Libp2pInstance {
pub async fn build_libp2p_instance(
rt: Weak<Runtime>,
boot_nodes: Vec<Enr>,
log: slog::Logger,
) -> Libp2pInstance {
let port = unused_port("tcp").unwrap();
let config = build_config(port, boot_nodes);
// launch libp2p service
let (signal, exit) = exit_future::signal();
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let executor = task_executor::TaskExecutor::new(
tokio::runtime::Handle::current(),
exit,
log.clone(),
shutdown_tx,
);
let executor = task_executor::TaskExecutor::new(rt, exit, log.clone(), shutdown_tx);
Libp2pInstance(
LibP2PService::new(
executor,
@@ -127,10 +128,14 @@ pub fn get_enr(node: &LibP2PService<E>) -> Enr {
// Returns `n` libp2p peers in fully connected topology.
#[allow(dead_code)]
pub async fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
pub async fn build_full_mesh(
rt: Weak<Runtime>,
log: slog::Logger,
n: usize,
) -> Vec<Libp2pInstance> {
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(build_libp2p_instance(vec![], log.clone()).await);
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await);
}
let multiaddrs: Vec<Multiaddr> = nodes
.iter()
@@ -153,12 +158,15 @@ pub async fn build_full_mesh(log: slog::Logger, n: usize) -> Vec<Libp2pInstance>
// Constructs a pair of nodes with separate loggers. The sender dials the receiver.
// This returns a (sender, receiver) pair.
#[allow(dead_code)]
pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInstance) {
pub async fn build_node_pair(
rt: Weak<Runtime>,
log: &slog::Logger,
) -> (Libp2pInstance, Libp2pInstance) {
let sender_log = log.new(o!("who" => "sender"));
let receiver_log = log.new(o!("who" => "receiver"));
let mut sender = build_libp2p_instance(vec![], sender_log).await;
let mut receiver = build_libp2p_instance(vec![], receiver_log).await;
let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await;
let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await;
let receiver_multiaddr = receiver.swarm.local_enr().multiaddr()[1].clone();
@@ -182,7 +190,7 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta
// wait for either both nodes to listen or a timeout
tokio::select! {
_ = tokio::time::delay_for(Duration::from_millis(500)) => {}
_ = tokio::time::sleep(Duration::from_millis(500)) => {}
_ = joined => {}
}
@@ -197,10 +205,10 @@ pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInsta
// Returns `n` peers in a linear topology
#[allow(dead_code)]
pub async fn build_linear(log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
pub async fn build_linear(rt: Weak<Runtime>, log: slog::Logger, n: usize) -> Vec<Libp2pInstance> {
let mut nodes = Vec::with_capacity(n);
for _ in 0..n {
nodes.push(build_libp2p_instance(vec![], log.clone()).await);
nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await);
}
let multiaddrs: Vec<Multiaddr> = nodes

File diff suppressed because it is too large Load Diff