From 08838fca2387c745852dc926cfff0eeebc5db069 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 30 Apr 2020 23:09:48 +1000 Subject: [PATCH] Partial eth2-libp2p stable future upgrade --- Cargo.lock | 107 ++++++++++++++++-- beacon_node/eth2-libp2p/src/behaviour.rs | 29 ++--- beacon_node/eth2-libp2p/src/config.rs | 2 +- beacon_node/eth2-libp2p/src/discovery/enr.rs | 4 +- beacon_node/eth2-libp2p/src/discovery/mod.rs | 77 ++++++++++--- beacon_node/eth2-libp2p/src/rpc/codec/base.rs | 15 ++- beacon_node/eth2-libp2p/src/rpc/handler.rs | 99 +++++++--------- beacon_node/eth2-libp2p/src/rpc/mod.rs | 13 +-- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 37 ++---- beacon_node/eth2-libp2p/src/service.rs | 11 +- beacon_node/eth2-libp2p/src/types/mod.rs | 2 +- 11 files changed, 245 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2f6b4bf75..05ad0662b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" +[[package]] +name = "base64" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3" + [[package]] name = "beacon_chain" version = "0.2.0" @@ -347,6 +353,16 @@ dependencies = [ "version", ] +[[package]] +name = "bigint" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebecac13b3c745150d7b6c3ea7572d372f09d627c2077e893bf26c5c7f70d282" +dependencies = [ + "byteorder 1.3.4", + "crunchy 0.1.6", +] + [[package]] name = "bincode" version = "1.2.1" @@ -894,6 +910,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crunchy" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2f4a431c5c9f662e1200b7c7f02c34e91361150e382089a8f2dec3ba680cbda" + [[package]] name = "crunchy" version = "0.2.2" @@ -1052,6 +1074,32 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "discv5" +version = "0.1.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58f722f5ee08f95ee8e223e8766ce55fa6ea9782e8588a68170b1b969fb5d88d" +dependencies = [ + "arrayvec 0.5.1", + "bigint", + "digest", + "enr", + "fnv", + "futures 0.3.4", + "hex 0.4.2", + "hkdf", + "libsecp256k1", + "log 0.4.8", + "net2", + "openssl", + "rand 0.7.3", + "rlp", + "sha2", + "smallvec 1.4.0", + "tokio 0.2.20", + "zeroize", +] + [[package]] name = "dns-parser" version = "0.8.0" @@ -1119,6 +1167,25 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enr" +version = "0.1.0-alpha.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d4c8e39a1c41e3ffd5048e0617888cdc1316c87720ede64c83f3914056a58e" +dependencies = [ + "base64 0.12.0", + "bs58", + "ed25519-dalek", + "hex 0.4.2", + "libsecp256k1", + "log 0.4.8", + "rand 0.7.3", + "rlp", + "serde", + "tiny-keccak 2.0.2", + "zeroize", +] + [[package]] name = "env_logger" version = "0.6.2" @@ -1222,8 +1289,9 @@ dependencies = [ name = "eth2-libp2p" version = "0.2.0" dependencies = [ - "base64 0.11.0", + "base64 0.12.0", "dirs", + "discv5", "error-chain", "eth2_ssz", "eth2_ssz_derive", @@ -1231,12 +1299,12 @@ dependencies = [ "fnv", "futures 0.3.4", "hashmap_delay", - "hex 0.3.2", + "hex 0.4.2", "lazy_static", "libp2p", "lighthouse_metrics", "lru", - "parking_lot 0.9.0", + "parking_lot 0.10.2", "serde", "serde_derive", "sha2", @@ -1348,7 +1416,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "tiny-keccak", + "tiny-keccak 1.5.0", ] [[package]] @@ -1361,7 +1429,7 @@ dependencies = [ "rustc-hex", "serde", "serde_json", - "tiny-keccak", + "tiny-keccak 1.5.0", "uint", ] @@ -1371,11 +1439,11 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32cfe1c169414b709cf28aa30c74060bdb830a03a8ba473314d079ac79d80a5f" dependencies = [ - "crunchy", + "crunchy 0.2.2", "fixed-hash", "impl-rlp", "impl-serde 0.2.3", - "tiny-keccak", + "tiny-keccak 1.5.0", ] [[package]] @@ -1803,6 +1871,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" +[[package]] +name = "hkdf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fa08a006102488bd9cd5b8013aabe84955cf5ae22e304c2caf655b633aefae3" +dependencies = [ + "digest", + "hmac", +] + [[package]] name = "hmac" version = "0.7.1" @@ -2656,7 +2734,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fc1e2c808481a63dc6da2074752fdd4336a3c8fcc68b83db6f1fd5224ae7962" dependencies = [ "arrayref", - "crunchy", + "crunchy 0.2.2", "digest", "hmac-drbg", "rand 0.7.3", @@ -4862,7 +4940,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d8a021c69bb74a44ccedb824a046447e2c84a01df9e5c20779750acb38e11b2" dependencies = [ - "crunchy", + "crunchy 0.2.2", +] + +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy 0.2.2", ] [[package]] @@ -5339,7 +5426,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "173cd16430c206dc1a430af8a89a0e9c076cf15cb42b4aedb10e8cc8fee73681" dependencies = [ "byteorder 1.3.4", - "crunchy", + "crunchy 0.2.2", "rustc-hex", "static_assertions", ] diff --git a/beacon_node/eth2-libp2p/src/behaviour.rs b/beacon_node/eth2-libp2p/src/behaviour.rs index 1da11910cc..0d704c6f14 100644 --- a/beacon_node/eth2-libp2p/src/behaviour.rs +++ b/beacon_node/eth2-libp2p/src/behaviour.rs @@ -3,22 +3,21 @@ use crate::peer_manager::{PeerManager, PeerManagerEvent}; use crate::rpc::*; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use crate::{error, Enr, NetworkConfig, NetworkGlobals, PubsubMessage, TopicHash}; +use discv5::Discv5Event; use futures::prelude::*; use libp2p::{ core::{identity::Keypair, ConnectedPoint}, - discv5::Discv5Event, gossipsub::{Gossipsub, GossipsubEvent, MessageId}, identify::{Identify, IdentifyEvent}, swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}, NetworkBehaviour, PeerId, }; -use tokio::io::{AsyncRead, AsyncWrite}; use lru::LruCache; use slog::{crit, debug, o, warn}; use std::marker::PhantomData; use std::sync::Arc; -use types::{EnrForkId, EthSpec, SubnetId}; use std::task::Poll; +use types::{EnrForkId, EthSpec, SubnetId}; const MAX_IDENTIFY_ADDRESSES: usize = 10; @@ -27,11 +26,11 @@ const MAX_IDENTIFY_ADDRESSES: usize = 10; /// behaviours. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourEvent", poll_method = "poll")] -pub struct Behaviour { +pub struct Behaviour { /// The routing pub-sub mechanism for eth2. gossipsub: Gossipsub, /// The Eth2 RPC specified in the wire-0 protocol. - eth2_rpc: RPC, + eth2_rpc: RPC, /// Keep regular connection to peers and disconnect if absent. // TODO: Using id for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. @@ -66,7 +65,7 @@ pub struct Behaviour { } /// Implements the combined behaviour for the libp2p service. -impl Behaviour { +impl Behaviour { pub fn new( local_key: &Keypair, net_conf: &NetworkConfig, @@ -330,9 +329,7 @@ impl Behaviour - NetworkBehaviourEventProcess for Behaviour -{ +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: GossipsubEvent) { match event { GossipsubEvent::Message(propagation_source, id, gs_msg) => { @@ -373,9 +370,7 @@ impl } } -impl - NetworkBehaviourEventProcess> for Behaviour -{ +impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: RPCMessage) { match event { // TODO: These are temporary methods to give access to injected behaviour @@ -462,7 +457,7 @@ impl } } -impl Behaviour { +impl Behaviour { /// Consumes the events list when polled. fn poll( &mut self, @@ -508,9 +503,7 @@ impl Behaviour NetworkBehaviourEventProcess - for Behaviour -{ +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: IdentifyEvent) { match event { IdentifyEvent::Received { @@ -542,9 +535,7 @@ impl NetworkBehaviourEventPr } } -impl NetworkBehaviourEventProcess - for Behaviour -{ +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, _event: Discv5Event) { // discv5 has no events to inject } diff --git a/beacon_node/eth2-libp2p/src/config.rs b/beacon_node/eth2-libp2p/src/config.rs index 7a5182baf1..369d0477e9 100644 --- a/beacon_node/eth2-libp2p/src/config.rs +++ b/beacon_node/eth2-libp2p/src/config.rs @@ -1,6 +1,6 @@ use crate::types::GossipKind; use crate::Enr; -use libp2p::discv5::{Discv5Config, Discv5ConfigBuilder}; +use discv5::{Discv5Config, Discv5ConfigBuilder}; use libp2p::gossipsub::{GossipsubConfig, GossipsubConfigBuilder, GossipsubMessage, MessageId}; use libp2p::Multiaddr; use serde_derive::{Deserialize, Serialize}; diff --git a/beacon_node/eth2-libp2p/src/discovery/enr.rs b/beacon_node/eth2-libp2p/src/discovery/enr.rs index edd08bc9ac..53cdc4d441 100644 --- a/beacon_node/eth2-libp2p/src/discovery/enr.rs +++ b/beacon_node/eth2-libp2p/src/discovery/enr.rs @@ -1,11 +1,11 @@ //! Helper functions and an extension trait for Ethereum 2 ENRs. -pub use libp2p::{core::identity::Keypair, discv5::enr::CombinedKey}; +pub use discv5::enr::{CombinedKey, EnrBuilder}; +pub use libp2p::core::identity::Keypair; use super::ENR_FILENAME; use crate::types::{Enr, EnrBitfield}; use crate::NetworkConfig; -use libp2p::discv5::enr::EnrBuilder; use slog::{debug, warn}; use ssz::{Decode, Encode}; use ssz_types::BitVector; diff --git a/beacon_node/eth2-libp2p/src/discovery/mod.rs b/beacon_node/eth2-libp2p/src/discovery/mod.rs index e4a9f1c055..b0a06f62c0 100644 --- a/beacon_node/eth2-libp2p/src/discovery/mod.rs +++ b/beacon_node/eth2-libp2p/src/discovery/mod.rs @@ -6,14 +6,14 @@ pub use enr::{build_enr, CombinedKey, Keypair}; use crate::metrics; use crate::{error, Enr, NetworkConfig, NetworkGlobals}; +use discv5::{enr::NodeId, Discv5, Discv5Event}; use enr::{Eth2Enr, BITFIELD_ENR_KEY, ETH2_ENR_KEY}; use futures::prelude::*; -use libp2p::core::{Multiaddr, PeerId}; -use libp2p::discv5::enr::NodeId; -use libp2p::discv5::{Discv5, Discv5Event}; +use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p::multiaddr::Protocol; use libp2p::swarm::{ - DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, + protocols_handler::DummyProtocolsHandler, DialPeerCondition, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, ProtocolsHandler, }; use slog::{crit, debug, info, warn}; use ssz::{Decode, Encode}; @@ -74,7 +74,7 @@ pub struct Discovery { } impl Discovery { - pub async fn new( + pub fn new( local_key: &Keypair, config: &NetworkConfig, network_globals: Arc>, @@ -93,13 +93,17 @@ impl Discovery { let listen_socket = SocketAddr::new(config.listen_address, config.discovery_port); + // convert the keypair into an ENR key + let enr_key: CombinedKey = local_key + .try_into() + .map_err(|_| "Invalid key type for ENR records")?; + let mut discovery = Discv5::new( local_enr, - local_key.clone(), + enr_key, config.discv5_config.clone(), listen_socket, ) - .await .map_err(|e| format!("Discv5 service failed. Error: {:?}", e))?; // Add bootnodes to routing table @@ -345,23 +349,66 @@ impl Discovery { } } -// Redirect all behaviour events to underlying discovery behaviour. +// Build a dummy Network behaviour around the discv5 server impl NetworkBehaviour for Discovery { - type ProtocolsHandler = ::ProtocolsHandler; - type OutEvent = ::OutEvent; + type ProtocolsHandler = DummyProtocolsHandler; + type OutEvent = Discv5Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { - NetworkBehaviour::new_handler(&mut self.discovery) + DummyProtocolsHandler::default() } fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - // Let discovery track possible known peers. - self.discovery.addresses_of_peer(peer_id) + // TODO + // Addresses are ordered by decreasing likelyhood of connectivity, so start with + // the addresses of that peer in the k-buckets. + + /* + if let Some(node_id) = self.known_peer_ids.get(peer_id) { + let key = kbucket::Key::from(node_id.clone()); + let mut out_list = + if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) { + entry.value().multiaddr().to_vec() + } else { + Vec::new() + }; + + // ENR's may have multiple Multiaddrs. The multi-addr associated with the UDP + // port is removed, which is assumed to be associated with the discv5 protocol (and + // therefore irrelevant for other libp2p components). + out_list.retain(|addr| { + addr.iter() + .find(|v| match v { + Protocol::Udp(_) => true, + _ => false, + }) + .is_none() + }); + + out_list + } else { + // PeerId is not known + Vec::new() + } + */ + Vec::new() } - fn inject_connected(&mut self, _peer_id: &PeerId) {} + // ignore libp2p connections/streams + fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} - fn inject_disconnected(&mut self, _peer_id: &PeerId) {} + // ignore libp2p connections/streams + fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + + // no libp2p discv5 events - event originate from the session_service. + fn inject_event( + &mut self, + _: PeerId, + _: ConnectionId, + _event: ::OutEvent, + ) { + void::unreachable(_event) + } fn poll( &mut self, diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index f9feb6196f..4aa31ca366 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -75,14 +75,19 @@ where /* Base Inbound Codec */ // This Encodes RPC Responses sent to external peers -impl Encoder> for BaseInboundCodec> +impl Encoder> + for BaseInboundCodec> where TSpec: EthSpec, TCodec: Decoder + Encoder>, { type Error = >>::Error; - fn encode(&mut self, item: RPCErrorResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode( + &mut self, + item: RPCErrorResponse, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { dst.clear(); dst.reserve(1); dst.put_u8( @@ -112,7 +117,8 @@ where /* Base Outbound Codec */ // This Encodes RPC Requests sent to external peers -impl Encoder> for BaseOutboundCodec> +impl Encoder> + for BaseOutboundCodec> where TSpec: EthSpec, TCodec: OutboundCodec> + Encoder>, @@ -128,7 +134,8 @@ where impl Decoder for BaseOutboundCodec> where TSpec: EthSpec, - TCodec: OutboundCodec, ErrorType = ErrorMessage> + Decoder>, + TCodec: OutboundCodec, ErrorType = ErrorMessage> + + Decoder>, { type Item = RPCErrorResponse; type Error = ::Error; diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 0d224a400e..e461363471 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -5,19 +5,21 @@ use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTerminat use super::protocol::{RPCError, RPCProtocol, RPCRequest}; use super::RPCEvent; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; -use core::marker::PhantomData; use fnv::FnvHashMap; use futures::prelude::*; use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError}; use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; +use libp2p::swarm::NegotiatedSubstream; use slog::{crit, debug, error, trace, warn}; use smallvec::SmallVec; -use std::collections::hash_map::Entry; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use tokio::io::{AsyncRead, AsyncWrite}; +use std::{ + collections::hash_map::Entry, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; use tokio::time::{delay_queue, DelayQueue}; use types::EthSpec; @@ -38,9 +40,8 @@ type InboundRequestId = RequestId; type OutboundRequestId = RequestId; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler<'a, TSubstream, TSpec> +pub struct RPCHandler where - TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { /// The upgrade for inbound substreams. @@ -59,23 +60,16 @@ where dial_negotiated: u32, /// Current inbound substreams awaiting processing. - inbound_substreams: FnvHashMap< - InboundRequestId, - ( - InboundSubstreamState<'a, TSubstream, TSpec>, - Option, - ), - >, + inbound_substreams: + FnvHashMap, Option)>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. inbound_substreams_delay: DelayQueue, /// Map of outbound substreams that need to be driven to completion. The `RequestId` is /// maintained by the application sending the request. - outbound_substreams: FnvHashMap< - OutboundRequestId, - (OutboundSubstreamState, delay_queue::Key), - >, + outbound_substreams: + FnvHashMap, delay_queue::Key)>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, @@ -103,16 +97,14 @@ where log: slog::Logger, } -/// State of an outbound substream. Either waiting for a response, or in the process of sending. -pub enum InboundSubstreamState<'a, TSubstream, TSpec> +pub enum InboundSubstreamState where - TSubstream: AsyncRead + AsyncWrite + Unpin, TSpec: EthSpec, { /// A response has been sent, pending writing and flush. ResponsePendingSend { /// The substream used to send the response - substream: futures::sink::Send<'a, InboundFramed, RPCErrorResponse>, + substream: InboundFramed, /// Whether a stream termination is requested. If true the stream will be closed after /// this send. Otherwise it will transition to an idle state until a stream termination is /// requested or a timeout is reached. @@ -120,29 +112,30 @@ where }, /// The response stream is idle and awaiting input from the application to send more chunked /// responses. - ResponseIdle(InboundFramed), + ResponseIdle(InboundFramed), /// The substream is attempting to shutdown. - Closing(InboundFramed), + Closing(InboundFramed), /// Temporary state during processing Poisoned, } -pub enum OutboundSubstreamState { +/// State of an outbound substream. Either waiting for a response, or in the process of sending. +pub enum OutboundSubstreamState { /// A request has been sent, and we are awaiting a response. This future is driven in the /// handler because GOODBYE requests can be handled and responses dropped instantly. RequestPendingResponse { /// The framed negotiated substream. - substream: OutboundFramed, + substream: OutboundFramed, /// Keeps track of the actual request sent. request: RPCRequest, }, /// Closing an outbound substream> - Closing(OutboundFramed), + Closing(OutboundFramed), /// Temporary state during processing Poisoned, } -impl<'a, TSubstream, TSpec> InboundSubstreamState<'a, TSpec, TSubstream> +impl InboundSubstreamState where TSpec: EthSpec, { @@ -188,9 +181,8 @@ where } } -impl<'a, TSubstream, TSpec> RPCHandler<'a, TSubstream, TSpec> +impl RPCHandler where - TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { pub fn new( @@ -247,9 +239,8 @@ where } } -impl<'a, TSubstream, TSpec> ProtocolsHandler for RPCHandler<'a, TSubstream, TSpec> +impl ProtocolsHandler for RPCHandler where - TSubstream: AsyncRead + AsyncWrite + Unpin, TSpec: EthSpec, { type InEvent = RPCEvent; @@ -265,14 +256,14 @@ where fn inject_fully_negotiated_inbound( &mut self, - out: as InboundUpgrade>::Output, + substream: as InboundUpgrade>::Output, ) { // update the keep alive timeout if there are no more remaining outbound streams if let KeepAlive::Until(_) = self.keep_alive { self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); } - let (req, substream) = out; + let (req, substream) = substream; // drop the stream and return a 0 id for goodbye "requests" if let r @ RPCRequest::Goodbye(_) = req { self.events_out.push(RPCEvent::Request(0, r)); @@ -297,7 +288,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - out: as OutboundUpgrade>::Output, + out: as OutboundUpgrade>::Output, rpc_event: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -452,6 +443,7 @@ where fn poll( &mut self, + cx: &mut Context<'_>, ) -> Poll< ProtocolsHandlerEvent< Self::OutboundProtocol, @@ -533,21 +525,18 @@ where } // purge expired outbound substreams - if let Poll::Ready(Some(d)) = - self.outbound_substreams_delay.poll() { - let stream_id = d.map_err(|e| { + if let Poll::Ready(Some(d)) = self.outbound_substreams_delay.poll() { + let stream_id = d.map_err(|e| { warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e)); ProtocolsHandlerUpgrErr::Timer })?; - + self.outbound_substreams.remove(stream_id.get_ref()); // notify the user - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - *stream_id.get_ref(), - RPCError::Custom("Stream timed out".into()), - ), - ))); + return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(RPCEvent::Error( + *stream_id.get_ref(), + RPCError::Custom("Stream timed out".into()), + )))); } // drive inbound streams that need to be processed @@ -614,7 +603,7 @@ where InboundSubstreamState::Closing(mut substream) => { // TODO: check if this is supposed to be a stream match substream.close() { - Poll::Ready(_) => { + Poll::Ready(_) => { //trace!(self.log, "Inbound stream dropped"); if let Some(delay_key) = &entry.get().1 { self.inbound_substreams_delay.remove(delay_key); @@ -729,7 +718,7 @@ where }, OutboundSubstreamState::Closing(mut substream) => match substream.close() { // TODO: check if this is supposed to be a stream - Poll::Ready(_)=> { + Poll::Ready(_) => { //trace!(self.log, "Outbound stream dropped"); // drop the stream let delay_key = &entry.get().1; @@ -763,12 +752,10 @@ where let rpc_event = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); if let RPCEvent::Request(id, req) = rpc_event { - return Poll::Ready(Ok( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }, - )); + return Poll::Ready(Ok(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: RPCEvent::Request(id, req), + })); } } Poll::Pending @@ -776,11 +763,11 @@ where } // Check for new items to send to the peer and update the underlying stream -fn apply_queued_responses<'a, TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>( - raw_substream: InboundFramed, +fn apply_queued_responses( + raw_substream: InboundFramed, queued_outbound_items: &mut Option<&mut Vec>>, new_items_to_send: &mut bool, -) -> InboundSubstreamState<'a, TSubstream, TSpec> { +) -> InboundSubstreamState { match queued_outbound_items { Some(ref mut queue) if !queue.is_empty() => { *new_items_to_send = true; diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 6f78ec2ea5..7cfc769622 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -18,10 +18,10 @@ pub use methods::{ pub use protocol::{RPCError, RPCProtocol, RPCRequest}; use slog::{debug, o}; use std::marker::PhantomData; +use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use types::EthSpec; -use std::task::{Poll, Context}; pub(crate) mod codec; mod handler; @@ -64,16 +64,14 @@ impl std::fmt::Display for RPCEvent { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. -pub struct RPC { +pub struct RPC { /// Queue of events to processed. events: Vec, RPCMessage>>, - /// Pins the generic substream. - marker: PhantomData, /// Slog logger for RPC behaviour. log: slog::Logger, } -impl RPC { +impl RPC { pub fn new(log: slog::Logger) -> Self { let log = log.new(o!("service" => "libp2p_rpc")); RPC { @@ -94,12 +92,11 @@ impl RPC { } } -impl<'a, TSubstream, TSpec> NetworkBehaviour for RPC +impl NetworkBehaviour for RPC where - TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { - type ProtocolsHandler = RPCHandler<'a, TSubstream, TSpec>; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index ecb28f4877..b5c73cab03 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -11,17 +11,16 @@ use crate::rpc::{ methods::ResponseTermination, }; use futures::future::*; -use futures::{future, sink, stream}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::marker::PhantomData; +use std::pin::Pin; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::Timeout; use tokio_io_timeout::TimeoutStream; use tokio_util::codec::Framed; use types::EthSpec; -use std::pin::Pin; /// The maximum bytes that can be sent across the RPC. const MAX_RPC_SIZE: usize = 1_048_576; // 1M @@ -171,7 +170,7 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = - Framed>, InboundCodec>>; + Framed, InboundCodec>>; type FnAndThen = fn( (Option>, InboundFramed), ) -> Ready, RPCError>>; @@ -181,26 +180,14 @@ type FnMapErr = fn(tokio::time::Error) -> RPCError; impl InboundUpgrade for RPCProtocol where - TSocket: AsyncRead + AsyncWrite, + TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, TSpec: EthSpec, { type Output = InboundOutput; type Error = RPCError; + type Future = Pin> + Send>>; - type Future = future::Either< - Ready, RPCError>>, - future::AndThen< - future::MapErr>>, FnMapErr>, - Ready, RPCError>>, - FnAndThen, - >, - >; - - fn upgrade_inbound( - self, - socket: upgrade::Negotiated, - protocol: ProtocolId, - ) -> Self::Future { + fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { let protocol_name = protocol.message_name.clone(); let codec = match protocol.encoding { Encoding::SSZSnappy => { @@ -348,24 +335,18 @@ impl RPCRequest { /* Outbound upgrades */ -pub type OutboundFramed = - Framed, OutboundCodec>>; +pub type OutboundFramed = Framed>>; impl OutboundUpgrade for RPCRequest where TSpec: EthSpec, - TSocket: AsyncRead + AsyncWrite + Send, + TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = OutboundFramed; type Error = RPCError; - // TODO: Send takes a mutable reference to the sink now, hence the lifetime parameter type Future = Pin> + Send>>; - // type Future = sink::Send<'a, &'a mut OutboundFramed, RPCRequest>; - fn upgrade_outbound( - self, - socket: upgrade::Negotiated, - protocol: Self::Info, - ) -> Self::Future { + + fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 34dad7d54a..435d525664 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -6,10 +6,10 @@ use crate::{NetworkConfig, NetworkGlobals}; use futures::prelude::*; use futures::Stream; use libp2p::core::{ + connection::Substream, identity::Keypair, multiaddr::Multiaddr, muxing::StreamMuxerBox, - connection::Substream, transport::boxed::Boxed, upgrade::{InboundUpgradeExt, OutboundUpgradeExt}, ConnectedPoint, @@ -19,15 +19,12 @@ use slog::{crit, debug, error, info, trace, warn}; use std::fs::File; use std::io::prelude::*; use std::io::{Error, ErrorKind}; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use std::time::Duration; use tokio::time::DelayQueue; use types::{EnrForkId, EthSpec}; -use std::pin::Pin; -use std::task::{Poll, Context}; - -type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; -type Libp2pBehaviour = Behaviour, TSpec>; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The time in milliseconds to wait before banning a peer. This allows for any Goodbye messages to be @@ -38,7 +35,7 @@ const BAN_PEER_WAIT_TIMEOUT: u64 = 200; pub struct Service { /// The libp2p Swarm handler. //TODO: Make this private - pub swarm: Swarm>, + pub swarm: Swarm>, /// This node's PeerId. pub local_peer_id: PeerId, diff --git a/beacon_node/eth2-libp2p/src/types/mod.rs b/beacon_node/eth2-libp2p/src/types/mod.rs index 94d24bad6e..8f9b07fd33 100644 --- a/beacon_node/eth2-libp2p/src/types/mod.rs +++ b/beacon_node/eth2-libp2p/src/types/mod.rs @@ -9,7 +9,7 @@ use types::{BitVector, EthSpec}; #[allow(type_alias_bounds)] pub type EnrBitfield = BitVector; -pub type Enr = libp2p::discv5::enr::Enr; +pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::PubsubMessage;