diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index d3e5779391..5711621a1a 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -33,6 +33,7 @@ sha2 = "0.8.0" base64 = "0.11.0" snap = "1" void = "1.0.2" +tokio-io-timeout = "0.4.0" tokio-util = {version = "0.3.1", features = ["codec"]} [dev-dependencies] diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 8ab319a461..0d224a400e 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -38,8 +38,9 @@ type InboundRequestId = RequestId; type OutboundRequestId = RequestId; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler<'a, TSubstream, TSpec> where + TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { /// The upgrade for inbound substreams. @@ -61,7 +62,7 @@ where inbound_substreams: FnvHashMap< InboundRequestId, ( - InboundSubstreamState, + InboundSubstreamState<'a, TSubstream, TSpec>, Option, ), >, @@ -73,7 +74,7 @@ where /// maintained by the application sending the request. outbound_substreams: FnvHashMap< OutboundRequestId, - (OutboundSubstreamState, delay_queue::Key), + (OutboundSubstreamState, delay_queue::Key), >, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. @@ -103,15 +104,15 @@ where } /// State of an outbound substream. Either waiting for a response, or in the process of sending. -pub enum InboundSubstreamState +pub enum InboundSubstreamState<'a, TSubstream, TSpec> where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, TSpec: EthSpec, { /// A response has been sent, pending writing and flush. ResponsePendingSend { /// The substream used to send the response - substream: futures::sink::Send>, + substream: futures::sink::Send<'a, InboundFramed, RPCErrorResponse>, /// Whether a stream termination is requested. If true the stream will be closed after /// this send. Otherwise it will transition to an idle state until a stream termination is /// requested or a timeout is reached. @@ -141,9 +142,8 @@ pub enum OutboundSubstreamState { Poisoned, } -impl InboundSubstreamState +impl<'a, TSubstream, TSpec> InboundSubstreamState<'a, TSpec, TSubstream> where - TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { /// Moves the substream state to closing and informs the connected peer. The @@ -188,7 +188,7 @@ where } } -impl RPCHandler +impl<'a, TSubstream, TSpec> RPCHandler<'a, TSubstream, TSpec> where TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, @@ -215,7 +215,6 @@ where inactive_timeout, outbound_io_error_retries: 0, log: log.clone(), - _phantom: PhantomData, } } @@ -248,9 +247,9 @@ where } } -impl ProtocolsHandler for RPCHandler +impl<'a, TSubstream, TSpec> ProtocolsHandler for RPCHandler<'a, TSubstream, TSpec> where - TSubstream: AsyncRead + AsyncWrite, + TSubstream: AsyncRead + AsyncWrite + Unpin, TSpec: EthSpec, { type InEvent = RPCEvent; @@ -777,11 +776,11 @@ where } // Check for new items to send to the peer and update the underlying stream -fn apply_queued_responses( +fn apply_queued_responses<'a, TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>( raw_substream: InboundFramed, queued_outbound_items: &mut Option<&mut Vec>>, new_items_to_send: &mut bool, -) -> InboundSubstreamState { +) -> InboundSubstreamState<'a, TSubstream, TSpec> { match queued_outbound_items { Some(ref mut queue) if !queue.is_empty() => { *new_items_to_send = true; diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 46f47fcafa..6f78ec2ea5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -94,12 +94,12 @@ impl RPC { } } -impl NetworkBehaviour for RPC +impl<'a, TSubstream, TSpec> NetworkBehaviour for RPC where TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec, { - type ProtocolsHandler = RPCHandler; + type ProtocolsHandler = RPCHandler<'a, TSubstream, TSpec>; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 421f9d0a6b..ecb28f4877 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -11,15 +11,17 @@ use crate::rpc::{ methods::ResponseTermination, }; use futures::future::*; -use futures::{future, sink, stream, Sink, Stream}; +use futures::{future, sink, stream}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::marker::PhantomData; use std::time::Duration; -use tokio_util::codec::Framed; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::time::Timeout; +use tokio_io_timeout::TimeoutStream; +use tokio_util::codec::Framed; use types::EthSpec; +use std::pin::Pin; /// The maximum bytes that can be sent across the RPC. const MAX_RPC_SIZE: usize = 1_048_576; // 1M @@ -169,12 +171,13 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = - Framed>, InboundCodec>; + Framed>, InboundCodec>>; type FnAndThen = fn( (Option>, InboundFramed), -) -> FutureResult, RPCError>; -type FnMapErr = - fn(timeout::Error<(RPCError, InboundFramed)>) -> RPCError; +) -> Ready, RPCError>>; +// TODO: Error doesn't take a generic parameter in new tokio +// Need to check implications +type FnMapErr = fn(tokio::time::Error) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -185,13 +188,10 @@ where type Error = RPCError; type Future = future::Either< - FutureResult, RPCError>, + Ready, RPCError>>, future::AndThen< - future::MapErr< - timeout::Timeout>>, - FnMapErr, - >, - FutureResult, RPCError>, + future::MapErr>>, FnMapErr>, + Ready, RPCError>>, FnAndThen, >, >; @@ -349,16 +349,18 @@ impl RPCRequest { /* Outbound upgrades */ pub type OutboundFramed = - Framed, OutboundCodec>; + Framed, OutboundCodec>>; impl OutboundUpgrade for RPCRequest where TSpec: EthSpec, - TSocket: AsyncRead + AsyncWrite, + TSocket: AsyncRead + AsyncWrite + Send, { type Output = OutboundFramed; type Error = RPCError; - type Future = sink::Send>; + // TODO: Send takes a mutable reference to the sink now, hence the lifetime parameter + type Future = Pin> + Send>>; + // type Future = sink::Send<'a, &'a mut OutboundFramed, RPCRequest>; fn upgrade_outbound( self, socket: upgrade::Negotiated, @@ -376,7 +378,7 @@ where OutboundCodec::SSZ(ssz_codec) } }; - Framed::new(socket, codec).send(self) + Box::pin(Framed::new(socket, codec).send(self)) } } @@ -414,8 +416,8 @@ impl From for RPCError { RPCError::SSZDecodeError(err) } } -impl From> for RPCError { - fn from(err: tokio::timer::timeout::Error) -> Self { +impl From for RPCError { + fn from(err: tokio::time::Error) -> Self { if err.is_elapsed() { RPCError::StreamTimeout } else {