Update libp2p (#1865)

Updates libp2p to the latest version. 

This adds tokio 0.3 support and brings back yamux support. 

This also updates some discv5 configuration parameters for leaner discovery queries
This commit is contained in:
Age Manning
2020-11-06 04:14:14 +00:00
parent 4c4dad9fb5
commit e2ae5010a6
7 changed files with 272 additions and 242 deletions

View File

@@ -42,9 +42,9 @@ regex = "1.3.9"
[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "8c6ce6eb1228de568568f6cd72fb134dea5f9669"
rev = "de104a80c48f6e61bd7bdac8e17f809477fb4c4a"
default-features = false
features = ["websocket", "identify", "mplex", "noise", "gossipsub", "dns", "tcp-tokio"]
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]
[dev-dependencies]
tokio = { version = "0.2.22", features = ["full"] }

View File

@@ -149,13 +149,13 @@ impl Default for Config {
let discv5_config = Discv5ConfigBuilder::new()
.enable_packet_filter()
.session_cache_capacity(1000)
.request_timeout(Duration::from_secs(4))
.request_timeout(Duration::from_secs(1))
.query_peer_timeout(Duration::from_secs(2))
.query_timeout(Duration::from_secs(30))
.request_retries(1)
.enr_peer_update_min(10)
.query_parallelism(5)
.disable_report_discovered_peers()
.query_timeout(Duration::from_secs(30))
.query_peer_timeout(Duration::from_secs(2))
.ip_limit() // limits /24 IP's in buckets.
.ping_interval(Duration::from_secs(300))
.build();

View File

@@ -176,31 +176,6 @@ enum InboundState<TSpec: EthSpec> {
Poisoned,
}
impl<TSpec: EthSpec> InboundState<TSpec> {
/// Sends the given items over the underlying substream, if the state allows it, and returns the
/// final state.
fn send_items(
self,
pending_items: &mut Vec<RPCCodedResponse<TSpec>>,
remaining_chunks: u64,
) -> Self {
if let InboundState::Idle(substream) = self {
// only send on Idle
if !pending_items.is_empty() {
// take the items that we need to send
let to_send = std::mem::replace(pending_items, vec![]);
let fut = process_inbound_substream(substream, remaining_chunks, to_send).boxed();
InboundState::Busy(Box::pin(fut))
} else {
// nothing to do, keep waiting for responses
InboundState::Idle(substream)
}
} else {
self
}
}
}
/// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum OutboundSubstreamState<TSpec: EthSpec> {
/// A request has been sent, and we are awaiting a response. This future is driven in the
@@ -626,69 +601,99 @@ where
// drive inbound streams that need to be processed
let mut substreams_to_remove = Vec::new(); // Closed substreams that need to be removed
for (id, info) in self.inbound_substreams.iter_mut() {
match std::mem::replace(&mut info.state, InboundState::Poisoned) {
state @ InboundState::Idle(..) if !deactivated => {
info.state = state.send_items(&mut info.pending_items, info.remaining_chunks);
}
InboundState::Idle(mut substream) => {
// handler is deactivated, close the stream and mark it for removal
match substream.close().poll_unpin(cx) {
// if we can't close right now, put the substream back and try again later
Poll::Pending => info.state = InboundState::Idle(substream),
Poll::Ready(res) => {
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
if let Err(error) = res {
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
error,
proto: info.protocol,
});
}
if info.pending_items.last().map(|l| l.close_after()) == Some(false) {
// if the request was still active, report back to cancel it
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
proto: info.protocol,
error: RPCError::HandlerRejected,
});
}
loop {
match std::mem::replace(&mut info.state, InboundState::Poisoned) {
InboundState::Idle(substream) if !deactivated => {
if !info.pending_items.is_empty() {
let to_send = std::mem::replace(&mut info.pending_items, vec![]);
let fut = process_inbound_substream(
substream,
info.remaining_chunks,
to_send,
)
.boxed();
info.state = InboundState::Busy(Box::pin(fut));
} else {
info.state = InboundState::Idle(substream);
break;
}
}
}
InboundState::Busy(mut fut) => {
// first check if sending finished
let state = match fut.poll_unpin(cx) {
Poll::Ready((substream, errors, remove, new_remaining_chunks)) => {
info.remaining_chunks = new_remaining_chunks;
// report any error
for error in errors {
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
error,
proto: info.protocol,
})
}
if remove {
InboundState::Idle(mut substream) => {
// handler is deactivated, close the stream and mark it for removal
match substream.close().poll_unpin(cx) {
// if we can't close right now, put the substream back and try again later
Poll::Pending => info.state = InboundState::Idle(substream),
Poll::Ready(res) => {
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
if let Err(error) = res {
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
error,
proto: info.protocol,
});
}
if info.pending_items.last().map(|l| l.close_after()) == Some(false)
{
// if the request was still active, report back to cancel it
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
proto: info.protocol,
error: RPCError::HandlerRejected,
});
}
}
InboundState::Idle(substream)
}
Poll::Pending => InboundState::Busy(fut),
};
info.state = if !deactivated {
// if the last batch finished, send more.
state.send_items(&mut info.pending_items, info.remaining_chunks)
} else {
state
};
break;
}
InboundState::Busy(mut fut) => {
// first check if sending finished
match fut.poll_unpin(cx) {
Poll::Ready((substream, errors, remove, new_remaining_chunks)) => {
info.remaining_chunks = new_remaining_chunks;
// report any error
for error in errors {
self.pending_errors.push(HandlerErr::Inbound {
id: *id,
error,
proto: info.protocol,
})
}
if remove {
substreams_to_remove.push(*id);
if let Some(ref delay_key) = info.delay_key {
self.inbound_substreams_delay.remove(delay_key);
}
}
// The stream may be currently idle. Attempt to process more
// elements
if !deactivated && !info.pending_items.is_empty() {
let to_send =
std::mem::replace(&mut info.pending_items, vec![]);
let fut = process_inbound_substream(
substream,
info.remaining_chunks,
to_send,
)
.boxed();
info.state = InboundState::Busy(Box::pin(fut));
} else {
info.state = InboundState::Idle(substream);
break;
}
}
Poll::Pending => {
info.state = InboundState::Busy(fut);
break;
}
};
}
InboundState::Poisoned => unreachable!("Poisoned inbound substream"),
}
InboundState::Poisoned => unreachable!("Poisoned inbound substream"),
}
}

View File

@@ -9,7 +9,7 @@ use crate::EnrExt;
use crate::{NetworkConfig, NetworkGlobals, PeerAction};
use futures::prelude::*;
use libp2p::core::{
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::boxed::Boxed,
identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, transport::Boxed,
};
use libp2p::{
core, noise,
@@ -20,7 +20,6 @@ use slog::{crit, debug, info, o, trace, warn};
use ssz::Decode;
use std::fs::File;
use std::io::prelude::*;
use std::io::{Error, ErrorKind};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -323,9 +322,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, and
/// mplex as the multiplexing layer.
fn build_transport(
local_private_key: Keypair,
) -> Result<Boxed<(PeerId, StreamMuxerBox), Error>, Error> {
fn build_transport(local_private_key: Keypair) -> std::io::Result<Boxed<(PeerId, StreamMuxerBox)>> {
let transport = libp2p::tcp::TokioTcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport)?;
#[cfg(feature = "libp2p-websocket")]
@@ -333,15 +330,21 @@ fn build_transport(
let trans_clone = transport.clone();
transport.or_transport(libp2p::websocket::WsConfig::new(trans_clone))
};
// mplex config
let mut mplex_config = libp2p::mplex::MplexConfig::new();
mplex_config.max_buffer_len(256);
mplex_config.max_buffer_len_behaviour(libp2p::mplex::MaxBufferBehaviour::Block);
// Authentication
Ok(transport
.upgrade(core::upgrade::Version::V1)
.authenticate(generate_noise_config(&local_private_key))
.multiplex(libp2p::mplex::MplexConfig::new())
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.multiplex(core::upgrade::SelectUpgrade::new(
libp2p::yamux::Config::default(),
mplex_config,
))
.timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.map_err(|err| Error::new(ErrorKind::Other, err))
.boxed())
}

View File

@@ -40,4 +40,4 @@ igd = "0.11.1"
itertools = "0.9.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
get_if_addrs = "0.5.3"
if-addrs = "0.6.4"

View File

@@ -4,7 +4,7 @@
//! - UPnP
use crate::{NetworkConfig, NetworkMessage};
use get_if_addrs::get_if_addrs;
use if_addrs::get_if_addrs;
use slog::{debug, info, warn};
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use tokio::sync::mpsc;