Compiling version of eth2-libp2p

This commit is contained in:
Age Manning
2020-05-06 16:40:39 +10:00
parent c363ffc236
commit c6dad814d4
11 changed files with 54 additions and 31 deletions

27
Cargo.lock generated
View File

@@ -1341,7 +1341,7 @@ dependencies = [
"tokio-io-timeout", "tokio-io-timeout",
"tokio-util", "tokio-util",
"types", "types",
"unsigned-varint", "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)",
"version", "version",
"void", "void",
] ]
@@ -2379,7 +2379,7 @@ dependencies = [
"sha2", "sha2",
"smallvec 1.4.0", "smallvec 1.4.0",
"thiserror", "thiserror",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"void", "void",
"zeroize", "zeroize",
] ]
@@ -2454,7 +2454,7 @@ dependencies = [
"rand 0.7.3", "rand 0.7.3",
"sha2", "sha2",
"smallvec 1.4.0", "smallvec 1.4.0",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"wasm-timer", "wasm-timer",
] ]
@@ -2496,7 +2496,7 @@ dependencies = [
"sha2", "sha2",
"smallvec 1.4.0", "smallvec 1.4.0",
"uint", "uint",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"void", "void",
"wasm-timer", "wasm-timer",
] ]
@@ -2536,7 +2536,7 @@ dependencies = [
"libp2p-core", "libp2p-core",
"log 0.4.8", "log 0.4.8",
"parking_lot 0.10.2", "parking_lot 0.10.2",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@@ -2589,7 +2589,7 @@ dependencies = [
"prost", "prost",
"prost-build", "prost-build",
"rw-stream-sink", "rw-stream-sink",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"void", "void",
] ]
@@ -3019,7 +3019,7 @@ dependencies = [
"sha-1", "sha-1",
"sha2", "sha2",
"sha3", "sha3",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@@ -3039,7 +3039,7 @@ dependencies = [
"log 0.4.8", "log 0.4.8",
"pin-project", "pin-project",
"smallvec 1.4.0", "smallvec 1.4.0",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]] [[package]]
@@ -3272,7 +3272,7 @@ dependencies = [
"percent-encoding 2.1.0", "percent-encoding 2.1.0",
"serde", "serde",
"static_assertions", "static_assertions",
"unsigned-varint", "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"url 2.1.1", "url 2.1.1",
] ]
@@ -5513,6 +5513,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
[[package]]
name = "unsigned-varint"
version = "0.3.3"
source = "git+https://github.com/sigp/unsigned-varint?branch=latest-codecs#76fc423494e59f1ec4c8948bd0d3ae3c09851909"
dependencies = [
"bytes 0.5.4",
"tokio-util",
]
[[package]] [[package]]
name = "unsigned-varint" name = "unsigned-varint"
version = "0.3.3" version = "0.3.3"

View File

@@ -20,7 +20,7 @@ futures = "0.3.4"
error-chain = "0.12.2" error-chain = "0.12.2"
dirs = "2.0.2" dirs = "2.0.2"
fnv = "1.0.6" fnv = "1.0.6"
unsigned-varint = "0.3.3" unsigned-varint = { git = "https://github.com/sigp/unsigned-varint", branch = "latest-codecs", features = ["codec"] }
lazy_static = "1.4.0" lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" }
smallvec = "1.4.0" smallvec = "1.4.0"

View File

@@ -4,6 +4,7 @@ use crate::rpc::*;
use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::types::{GossipEncoding, GossipKind, GossipTopic};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash};
use discv5::Discv5Event; use discv5::Discv5Event;
use futures::prelude::*;
use libp2p::{ use libp2p::{
core::{identity::Keypair, ConnectedPoint}, core::{identity::Keypair, ConnectedPoint},
gossipsub::{Gossipsub, GossipsubEvent, MessageId}, gossipsub::{Gossipsub, GossipsubEvent, MessageId},

View File

@@ -118,6 +118,7 @@ impl CombinedKeyExt for CombinedKey {
fn peer_id_to_node_id(peer_id: &PeerId) -> Option<discv5::enr::NodeId> { fn peer_id_to_node_id(peer_id: &PeerId) -> Option<discv5::enr::NodeId> {
let bytes = peer_id.as_bytes(); let bytes = peer_id.as_bytes();
// must be the identity hash // must be the identity hash
/* To be updated
if bytes.len() == 34 && bytes[0] == 0x00 { if bytes.len() == 34 && bytes[0] == 0x00 {
// left over is potentially secp256k1 key // left over is potentially secp256k1 key
@@ -130,9 +131,11 @@ fn peer_id_to_node_id(peer_id: &PeerId) -> Option<discv5::enr::NodeId> {
return Some(discv5::enr::NodeId::parse(&output).expect("Must be correct length")); return Some(discv5::enr::NodeId::parse(&output).expect("Must be correct length"));
} }
} }
*/
None None
} }
/*
mod tests { mod tests {
use super::*; use super::*;
use std::convert::TryInto; use std::convert::TryInto;
@@ -161,3 +164,4 @@ mod tests {
assert_eq!(enr.node_id(), node_id); assert_eq!(enr.node_id(), node_id);
} }
} }
*/

View File

@@ -386,7 +386,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
// ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP
// port is removed, which is assumed to be associated with the discv5 protocol (and // port is removed, which is assumed to be associated with the discv5 protocol (and
// therefore irrelevant for other libp2p components). // therefore irrelevant for other libp2p components).
let out_list = enr.multiaddr(); let mut out_list = enr.multiaddr();
out_list.retain(|addr| { out_list.retain(|addr| {
addr.iter() addr.iter()
.find(|v| match v { .find(|v| match v {

View File

@@ -453,7 +453,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
impl<TSpec: EthSpec> Stream for PeerManager<TSpec> { impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
type Item = PeerManagerEvent; type Item = PeerManagerEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// poll the timeouts for pings and status' // poll the timeouts for pings and status'
loop { loop {
match self.ping_peers.poll_next_unpin(cx) { match self.ping_peers.poll_next_unpin(cx) {

View File

@@ -203,7 +203,7 @@ impl<TSpec: EthSpec> Decoder for SSZOutboundCodec<TSpec> {
match self.inner.decode(src).map_err(RPCError::from) { match self.inner.decode(src).map_err(RPCError::from) {
Ok(Some(mut packet)) => { Ok(Some(mut packet)) => {
// take the bytes from the buffer // take the bytes from the buffer
let raw_bytes = packet.take(); let raw_bytes = packet.split();
match self.protocol.message_name { match self.protocol.message_name {
Protocol::Status => match self.protocol.version { Protocol::Status => match self.protocol.version {

View File

@@ -119,7 +119,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyInboundCodec<TSpec> {
// `n` is how many bytes the reader read in the compressed stream // `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position(); let n = reader.get_ref().position();
self.len = None; self.len = None;
src.split_to(n as usize); let _read_bytes = src.split_to(n as usize);
match self.protocol.message_name { match self.protocol.message_name {
Protocol::Status => match self.protocol.version { Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes(
@@ -264,7 +264,7 @@ impl<TSpec: EthSpec> Decoder for SSZSnappyOutboundCodec<TSpec> {
// `n` is how many bytes the reader read in the compressed stream // `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position(); let n = reader.get_ref().position();
self.len = None; self.len = None;
src.split_to(n as usize); let _read_byts = src.split_to(n as usize);
match self.protocol.message_name { match self.protocol.message_name {
Protocol::Status => match self.protocol.version { Protocol::Status => match self.protocol.version {
Version::V1 => Ok(Some(RPCResponse::Status( Version::V1 => Ok(Some(RPCResponse::Status(
@@ -336,7 +336,7 @@ impl<TSpec: EthSpec> OutboundCodec<RPCRequest<TSpec>> for SSZSnappyOutboundCodec
// `n` is how many bytes the reader read in the compressed stream // `n` is how many bytes the reader read in the compressed stream
let n = reader.get_ref().position(); let n = reader.get_ref().position();
self.len = None; self.len = None;
src.split_to(n as usize); let _read_bytes = src.split_to(n as usize);
Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?)) Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?))
} }
Err(e) => match e.kind() { Err(e) => match e.kind() {

View File

@@ -466,6 +466,13 @@ where
}; };
error!(self.log, "Attempted sending multiple responses to a single response request"); error!(self.log, "Attempted sending multiple responses to a single response request");
} }
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing: true,
};
error!(self.log, "Attempted sending multiple responses to a single response request");
}
InboundSubstreamState::Poisoned => { InboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned inbound substream"); crit!(self.log, "Poisoned inbound substream");
unreachable!("Coding error: Poisoned substream"); unreachable!("Coding error: Poisoned substream");
@@ -510,7 +517,7 @@ where
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e), NegotiationError::ProtocolError(e),
)) => match e { )) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err), ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()),
ProtocolError::InvalidProtocol => { ProtocolError::InvalidProtocol => {
RPCError::InternalError("Protocol was deemed invalid") RPCError::InternalError("Protocol was deemed invalid")
} }
@@ -578,7 +585,7 @@ where
"Could not poll inbound stream timer", "Could not poll inbound stream timer",
))); )));
} }
Poll::Pending => break, Poll::Pending | Poll::Ready(None) => break,
} }
} }
@@ -605,7 +612,7 @@ where
"Could not poll outbound stream timer", "Could not poll outbound stream timer",
))); )));
} }
Poll::Pending => break, Poll::Pending | Poll::Ready(None) => break,
} }
} }
@@ -641,7 +648,7 @@ where
} }
Err(e) => { Err(e) => {
// error with sending in the codec // error with sending in the codec
error!(self.log, "Error sending RPC message"; "message" => message.to_string()); error!(self.log, "Error sending RPC message"; "error" => e.to_string());
// keep connection with the peer and return the // keep connection with the peer and return the
// stream to awaiting response if this message // stream to awaiting response if this message
// wasn't closing the stream // wasn't closing the stream
@@ -701,7 +708,7 @@ where
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
// error during flush // error during flush
error!(self.log, "Error sending flushing RPC message"); error!(self.log, "Error sending flushing RPC message"; "error" => e.to_string());
// close the stream if required // close the stream if required
// TODO: Duplicate code // TODO: Duplicate code
if closing { if closing {

View File

@@ -10,19 +10,18 @@ use crate::rpc::{
}, },
methods::ResponseTermination, methods::ResponseTermination,
}; };
use futures::future::*; use futures::future::Ready;
use futures::prelude::*; use futures::prelude::*;
use futures::prelude::{AsyncRead, AsyncWrite}; use futures::prelude::{AsyncRead, AsyncWrite};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use tokio_io_timeout::TimeoutStream; use tokio_io_timeout::TimeoutStream;
use tokio_util::{ use tokio_util::{
codec::Framed, codec::Framed,
compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}, compat::{Compat, FuturesAsyncReadCompatExt},
}; };
use types::EthSpec; use types::EthSpec;
@@ -375,8 +374,10 @@ where
} }
}; };
let socket = Framed::new(socket, codec); let mut socket = Framed::new(socket, codec);
Box::pin(future::join(socket.send(self), future::ok(socket)).map(|(_, socket)| socket))
let future = async { socket.send(self).await.map(|_| socket) };
Box::pin(future)
} }
} }
@@ -411,7 +412,7 @@ impl From<ssz::DecodeError> for RPCError {
} }
} }
impl From<tokio::time::Elapsed> for RPCError { impl From<tokio::time::Elapsed> for RPCError {
fn from(err: tokio::time::Elapsed) -> Self { fn from(_: tokio::time::Elapsed) -> Self {
RPCError::StreamTimeout RPCError::StreamTimeout
} }
} }
@@ -444,7 +445,7 @@ impl std::error::Error for RPCError {
match *self { match *self {
// NOTE: this does have a source // NOTE: this does have a source
RPCError::SSZDecodeError(_) => None, RPCError::SSZDecodeError(_) => None,
RPCError::IoError(ref err) => Some(err), RPCError::IoError(_) => None,
RPCError::StreamTimeout => None, RPCError::StreamTimeout => None,
RPCError::UnsupportedProtocol => None, RPCError::UnsupportedProtocol => None,
RPCError::IncompleteStream => None, RPCError::IncompleteStream => None,
@@ -469,6 +470,7 @@ impl<TSpec: EthSpec> std::fmt::Display for RPCRequest<TSpec> {
} }
} }
/*
/// Converts a futures AsyncRead + AsyncWrite object to a tokio::AsyncRead + tokio::AsyncWrite /// Converts a futures AsyncRead + AsyncWrite object to a tokio::AsyncRead + tokio::AsyncWrite
/// object. /// object.
struct TokioNegotiatedStream<T: AsyncRead + AsyncWrite + Unpin>(T); struct TokioNegotiatedStream<T: AsyncRead + AsyncWrite + Unpin>(T);
@@ -498,3 +500,4 @@ impl<T: AsyncRead + AsyncWrite + Unpin> tokio::io::AsyncWrite for TokioNegotiate
Pin::new(&mut self.0).poll_close(cx) Pin::new(&mut self.0).poll_close(cx)
} }
} }
*/

View File

@@ -182,7 +182,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
impl<TSpec: EthSpec> Stream for Service<TSpec> { impl<TSpec: EthSpec> Stream for Service<TSpec> {
type Item = Result<BehaviourEvent<TSpec>, error::Error>; type Item = Result<BehaviourEvent<TSpec>, error::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
match self.swarm.poll_next_unpin(cx) { match self.swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => { Poll::Ready(Some(event)) => {
@@ -190,7 +190,6 @@ impl<TSpec: EthSpec> Stream for Service<TSpec> {
} }
Poll::Ready(None) => unreachable!("Swarm stream shouldn't end"), Poll::Ready(None) => unreachable!("Swarm stream shouldn't end"),
Poll::Pending => break, Poll::Pending => break,
_ => break,
} }
} }