diff --git a/Cargo.lock b/Cargo.lock index 5fccb1f978..c5e1713363 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,7 +1341,7 @@ dependencies = [ "tokio-io-timeout", "tokio-util", "types", - "unsigned-varint", + "unsigned-varint 0.3.3 (git+https://github.com/sigp/unsigned-varint?branch=latest-codecs)", "version", "void", ] @@ -2379,7 +2379,7 @@ dependencies = [ "sha2", "smallvec 1.4.0", "thiserror", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "void", "zeroize", ] @@ -2454,7 +2454,7 @@ dependencies = [ "rand 0.7.3", "sha2", "smallvec 1.4.0", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "wasm-timer", ] @@ -2496,7 +2496,7 @@ dependencies = [ "sha2", "smallvec 1.4.0", "uint", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "void", "wasm-timer", ] @@ -2536,7 +2536,7 @@ dependencies = [ "libp2p-core", "log 0.4.8", "parking_lot 0.10.2", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2589,7 +2589,7 @@ dependencies = [ "prost", "prost-build", "rw-stream-sink", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "void", ] @@ -3019,7 +3019,7 @@ dependencies = [ "sha-1", "sha2", "sha3", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3039,7 +3039,7 @@ dependencies = [ "log 0.4.8", "pin-project", "smallvec 1.4.0", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3272,7 +3272,7 @@ dependencies = [ "percent-encoding 2.1.0", "serde", "static_assertions", - "unsigned-varint", + "unsigned-varint 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "url 2.1.1", ] @@ -5513,6 +5513,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" +[[package]] +name = "unsigned-varint" +version = "0.3.3" +source = "git+https://github.com/sigp/unsigned-varint?branch=latest-codecs#76fc423494e59f1ec4c8948bd0d3ae3c09851909" +dependencies = [ + "bytes 0.5.4", + "tokio-util", +] + [[package]] name = "unsigned-varint" version = "0.3.3" diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index b3dd49ebd7..fe8f5547db 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -20,7 +20,7 @@ futures = "0.3.4" error-chain = "0.12.2" dirs = "2.0.2" fnv = "1.0.6" -unsigned-varint = "0.3.3" +unsigned-varint = { git = "https://github.com/sigp/unsigned-varint", branch = "latest-codecs", features = ["codec"] } lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } smallvec = "1.4.0" diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index e029ddc1ff..fecdd32ec2 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -4,6 +4,7 @@ use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; use discv5::Discv5Event; +use futures::prelude::*; use libp2p::{ core::{identity::Keypair, ConnectedPoint}, gossipsub::{Gossipsub, GossipsubEvent, MessageId}, diff --git a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs index 514fb3a8e2..8de09196ff 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr_ext.rs @@ -118,6 +118,7 @@ impl CombinedKeyExt for CombinedKey { fn peer_id_to_node_id(peer_id: &PeerId) -> Option { let bytes = peer_id.as_bytes(); // must be the identity hash + /* To be updated if bytes.len() == 34 && bytes[0] == 0x00 { // left over is potentially secp256k1 key @@ -130,9 +131,11 @@ fn peer_id_to_node_id(peer_id: &PeerId) -> Option { return Some(discv5::enr::NodeId::parse(&output).expect("Must be correct length")); } } + */ None } +/* mod tests { use super::*; use std::convert::TryInto; @@ -161,3 +164,4 @@ mod tests { assert_eq!(enr.node_id(), node_id); } } +*/ diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index f46338600e..bc43f62597 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -386,7 +386,7 @@ impl NetworkBehaviour for Discovery { // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP // port is removed, which is assumed to be associated with the discv5 protocol (and // therefore irrelevant for other libp2p components). - let out_list = enr.multiaddr(); + let mut out_list = enr.multiaddr(); out_list.retain(|addr| { addr.iter() .find(|v| match v { diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index b6b400fa18..9bbe0ed5d0 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -453,7 +453,7 @@ impl PeerManager { impl Stream for PeerManager { type Item = PeerManagerEvent; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // poll the timeouts for pings and status' loop { match self.ping_peers.poll_next_unpin(cx) { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index 94d8a7df8a..0f763e997d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -203,7 +203,7 @@ impl Decoder for SSZOutboundCodec { match self.inner.decode(src).map_err(RPCError::from) { Ok(Some(mut packet)) => { // take the bytes from the buffer - let raw_bytes = packet.take(); + let raw_bytes = packet.split(); match self.protocol.message_name { Protocol::Status => match self.protocol.version { diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs index fe658ebc2a..6c6e09f4db 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz_snappy.rs @@ -119,7 +119,7 @@ impl Decoder for SSZSnappyInboundCodec { // `n` is how many bytes the reader read in the compressed stream let n = reader.get_ref().position(); self.len = None; - src.split_to(n as usize); + let _read_bytes = src.split_to(n as usize); match self.protocol.message_name { Protocol::Status => match self.protocol.version { Version::V1 => Ok(Some(RPCRequest::Status(StatusMessage::from_ssz_bytes( @@ -264,7 +264,7 @@ impl Decoder for SSZSnappyOutboundCodec { // `n` is how many bytes the reader read in the compressed stream let n = reader.get_ref().position(); self.len = None; - src.split_to(n as usize); + let _read_byts = src.split_to(n as usize); match self.protocol.message_name { Protocol::Status => match self.protocol.version { Version::V1 => Ok(Some(RPCResponse::Status( @@ -336,7 +336,7 @@ impl OutboundCodec> for SSZSnappyOutboundCodec // `n` is how many bytes the reader read in the compressed stream let n = reader.get_ref().position(); self.len = None; - src.split_to(n as usize); + let _read_bytes = src.split_to(n as usize); Ok(Some(ErrorMessage::from_ssz_bytes(&decoded_buffer)?)) } Err(e) => match e.kind() { diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index e0830da547..5ff20dc296 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -466,6 +466,13 @@ where }; error!(self.log, "Attempted sending multiple responses to a single response request"); } + InboundSubstreamState::ResponsePendingFlush { substream, .. } => { + *substream_state = InboundSubstreamState::ResponsePendingFlush { + substream, + closing: true, + }; + error!(self.log, "Attempted sending multiple responses to a single response request"); + } InboundSubstreamState::Poisoned => { crit!(self.log, "Poisoned inbound substream"); unreachable!("Coding error: Poisoned substream"); @@ -510,7 +517,7 @@ where ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::ProtocolError(e), )) => match e { - ProtocolError::IoError(io_err) => RPCError::IoError(io_err), + ProtocolError::IoError(io_err) => RPCError::IoError(io_err.to_string()), ProtocolError::InvalidProtocol => { RPCError::InternalError("Protocol was deemed invalid") } @@ -578,7 +585,7 @@ where "Could not poll inbound stream timer", ))); } - Poll::Pending => break, + Poll::Pending | Poll::Ready(None) => break, } } @@ -605,7 +612,7 @@ where "Could not poll outbound stream timer", ))); } - Poll::Pending => break, + Poll::Pending | Poll::Ready(None) => break, } } @@ -641,7 +648,7 @@ where } Err(e) => { // error with sending in the codec - error!(self.log, "Error sending RPC message"; "message" => message.to_string()); + error!(self.log, "Error sending RPC message"; "error" => e.to_string()); // keep connection with the peer and return the // stream to awaiting response if this message // wasn't closing the stream @@ -701,7 +708,7 @@ where } Poll::Ready(Err(e)) => { // error during flush - error!(self.log, "Error sending flushing RPC message"); + error!(self.log, "Error sending flushing RPC message"; "error" => e.to_string()); // close the stream if required // TODO: Duplicate code if closing { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index a89c03c4e4..0b352c58e5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -10,19 +10,18 @@ use crate::rpc::{ }, methods::ResponseTermination, }; -use futures::future::*; +use futures::future::Ready; use futures::prelude::*; use futures::prelude::{AsyncRead, AsyncWrite}; -use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; +use libp2p::core::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::marker::PhantomData; use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use tokio_io_timeout::TimeoutStream; use tokio_util::{ codec::Framed, - compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}, + compat::{Compat, FuturesAsyncReadCompatExt}, }; use types::EthSpec; @@ -375,8 +374,10 @@ where } }; - let socket = Framed::new(socket, codec); - Box::pin(future::join(socket.send(self), future::ok(socket)).map(|(_, socket)| socket)) + let mut socket = Framed::new(socket, codec); + + let future = async { socket.send(self).await.map(|_| socket) }; + Box::pin(future) } } @@ -411,7 +412,7 @@ impl From for RPCError { } } impl From for RPCError { - fn from(err: tokio::time::Elapsed) -> Self { + fn from(_: tokio::time::Elapsed) -> Self { RPCError::StreamTimeout } } @@ -444,7 +445,7 @@ impl std::error::Error for RPCError { match *self { // NOTE: this does have a source RPCError::SSZDecodeError(_) => None, - RPCError::IoError(ref err) => Some(err), + RPCError::IoError(_) => None, RPCError::StreamTimeout => None, RPCError::UnsupportedProtocol => None, RPCError::IncompleteStream => None, @@ -469,6 +470,7 @@ impl std::fmt::Display for RPCRequest { } } +/* /// Converts a futures AsyncRead + AsyncWrite object to a tokio::AsyncRead + tokio::AsyncWrite /// object. struct TokioNegotiatedStream(T); @@ -498,3 +500,4 @@ impl tokio::io::AsyncWrite for TokioNegotiate Pin::new(&mut self.0).poll_close(cx) } } +*/ diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index b5252d7a1c..9a14a0fd86 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -182,7 +182,7 @@ impl Service { impl Stream for Service { type Item = Result, error::Error>; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { match self.swarm.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { @@ -190,7 +190,6 @@ impl Stream for Service { } Poll::Ready(None) => unreachable!("Swarm stream shouldn't end"), Poll::Pending => break, - _ => break, } }