From 9e6ae448a6041de8132995ed4c5f98edc4b280c4 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 1 May 2020 20:05:03 +1000 Subject: [PATCH] Finished first round of fighting RPC types --- beacon_node/eth2-libp2p/src/rpc/codec/base.rs | 26 +++-- beacon_node/eth2-libp2p/src/rpc/codec/mod.rs | 20 ++-- beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs | 8 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 6 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 94 ++++++++++++------- 5 files changed, 93 insertions(+), 61 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs index 4aa31ca366..245c66678b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/base.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/base.rs @@ -19,9 +19,9 @@ pub trait OutboundCodec: Encoder + Decoder { /* Global Inbound Codec */ // This deals with Decoding RPC Requests from other peers and encoding our responses -pub struct BaseInboundCodec +pub struct BaseInboundCodec where - TCodec: Encoder + Decoder, + TCodec: Encoder> + Decoder, TSpec: EthSpec, { /// Inner codec for handling various encodings @@ -29,9 +29,9 @@ where phantom: PhantomData, } -impl BaseInboundCodec +impl BaseInboundCodec where - TCodec: Encoder + Decoder, + TCodec: Encoder> + Decoder, TSpec: EthSpec, { pub fn new(codec: TCodec) -> Self { @@ -44,9 +44,9 @@ where /* Global Outbound Codec */ // This deals with Decoding RPC Responses from other peers and encoding our requests -pub struct BaseOutboundCodec +pub struct BaseOutboundCodec where - TOutboundCodec: OutboundCodec, + TOutboundCodec: OutboundCodec>, TSpec: EthSpec, { /// Inner codec for handling various encodings. @@ -56,10 +56,10 @@ where phantom: PhantomData, } -impl BaseOutboundCodec +impl BaseOutboundCodec where TSpec: EthSpec, - TOutboundCodec: OutboundCodec, + TOutboundCodec: OutboundCodec>, { pub fn new(codec: TOutboundCodec) -> Self { BaseOutboundCodec { @@ -75,8 +75,7 @@ where /* Base Inbound Codec */ // This Encodes RPC Responses sent to external peers -impl Encoder> - for BaseInboundCodec> +impl Encoder> for BaseInboundCodec where TSpec: EthSpec, TCodec: Decoder + Encoder>, @@ -100,7 +99,7 @@ where // This Decodes RPC Requests from external peers // TODO: check if the Item parameter is correct -impl Decoder for BaseInboundCodec> +impl Decoder for BaseInboundCodec where TSpec: EthSpec, // TODO: check if the Item parameter is correct @@ -117,8 +116,7 @@ where /* Base Outbound Codec */ // This Encodes RPC Requests sent to external peers -impl Encoder> - for BaseOutboundCodec> +impl Encoder> for BaseOutboundCodec where TSpec: EthSpec, TCodec: OutboundCodec> + Encoder>, @@ -131,7 +129,7 @@ where } // This decodes RPC Responses received from external peers -impl Decoder for BaseOutboundCodec> +impl Decoder for BaseOutboundCodec where TSpec: EthSpec, TCodec: OutboundCodec, ErrorType = ErrorMessage> diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs index d7ef037c48..0a33779526 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/mod.rs @@ -12,17 +12,17 @@ use tokio_util::codec::{Decoder, Encoder}; use types::EthSpec; // Known types of codecs -pub enum InboundCodec { - SSZSnappy(BaseInboundCodec, TSpec, TItem>), - SSZ(BaseInboundCodec, TSpec, TItem>), +pub enum InboundCodec { + SSZSnappy(BaseInboundCodec, TSpec>), + SSZ(BaseInboundCodec, TSpec>), } -pub enum OutboundCodec { - SSZSnappy(BaseOutboundCodec, TSpec, TItem>), - SSZ(BaseOutboundCodec, TSpec, TItem>), +pub enum OutboundCodec { + SSZSnappy(BaseOutboundCodec, TSpec>), + SSZ(BaseOutboundCodec, TSpec>), } -impl Encoder> for InboundCodec> { +impl Encoder> for InboundCodec { type Error = RPCError; fn encode(&mut self, item: RPCErrorResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -33,7 +33,7 @@ impl Encoder> for InboundCodec Decoder for InboundCodec> { +impl Decoder for InboundCodec { type Item = RPCRequest; type Error = RPCError; @@ -45,7 +45,7 @@ impl Decoder for InboundCodec> { } } -impl Encoder> for OutboundCodec> { +impl Encoder> for OutboundCodec { type Error = RPCError; fn encode(&mut self, item: RPCRequest, dst: &mut BytesMut) -> Result<(), Self::Error> { @@ -56,7 +56,7 @@ impl Encoder> for OutboundCodec Decoder for OutboundCodec> { +impl Decoder for OutboundCodec { type Item = RPCErrorResponse; type Error = RPCError; diff --git a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs index bec69e4347..7b7c8bf12b 100644 --- a/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2-libp2p/src/rpc/codec/ssz.rs @@ -19,7 +19,7 @@ pub struct SSZInboundCodec { phantom: PhantomData, } -impl SSZInboundCodec { +impl SSZInboundCodec { pub fn new(protocol: ProtocolId, max_packet_size: usize) -> Self { let mut uvi_codec = UviBytes::default(); uvi_codec.set_max_len(max_packet_size); @@ -39,7 +39,11 @@ impl SSZInboundCodec { impl Encoder> for SSZInboundCodec { type Error = RPCError; - fn encode(&mut self, item: RPCErrorResponse, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode( + &mut self, + item: RPCErrorResponse, + dst: &mut BytesMut, + ) -> Result<(), Self::Error> { let bytes = match item { RPCErrorResponse::Success(resp) => match resp { RPCResponse::Status(res) => res.as_ssz_bytes(), diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index e461363471..d3634608d2 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -256,7 +256,7 @@ where fn inject_fully_negotiated_inbound( &mut self, - substream: as InboundUpgrade>::Output, + substream: >::Output, ) { // update the keep alive timeout if there are no more remaining outbound streams if let KeepAlive::Until(_) = self.keep_alive { @@ -288,7 +288,7 @@ where fn inject_fully_negotiated_outbound( &mut self, - out: as OutboundUpgrade>::Output, + out: >::Output, rpc_event: Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; @@ -415,7 +415,7 @@ where &mut self, request: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr< - >::Error, + >::Error, >, ) { if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index b5c73cab03..ad1faed385 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -11,13 +11,14 @@ use crate::rpc::{ methods::ResponseTermination, }; use futures::future::*; +use futures::prelude::*; +use futures::prelude::{AsyncRead, AsyncWrite}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use std::io; use std::marker::PhantomData; use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::time::Timeout; use tokio_io_timeout::TimeoutStream; use tokio_util::codec::Framed; use types::EthSpec; @@ -170,13 +171,14 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = - Framed, InboundCodec>>; + Framed>, InboundCodec>; type FnAndThen = fn( - (Option>, InboundFramed), + ( + Option, RPCError>>, + InboundFramed, + ), ) -> Ready, RPCError>>; -// TODO: Error doesn't take a generic parameter in new tokio -// Need to check implications -type FnMapErr = fn(tokio::time::Error) -> RPCError; +type FnMapErr = fn(tokio::time::Elapsed) -> RPCError; impl InboundUpgrade for RPCProtocol where @@ -189,6 +191,7 @@ where fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { let protocol_name = protocol.message_name.clone(); + let socket = TokioNegotiatedStream(socket); let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -206,27 +209,24 @@ where let socket = Framed::new(timed_socket, codec); // MetaData requests should be empty, return the stream - match protocol_name { - Protocol::MetaData => futures::future::Either::A(futures::future::ok(( - RPCRequest::MetaData(PhantomData), - socket, - ))), + Box::pin(match protocol_name { + Protocol::MetaData => { + future::Either::Left(future::ok((RPCRequest::MetaData(PhantomData), socket))) + } - _ => futures::future::Either::B( - socket - .into_future() - .timeout(Duration::from_secs(REQUEST_TIMEOUT)) - .map_err(RPCError::from as FnMapErr) + _ => future::Either::Right( + tokio::time::timeout(Duration::from_secs(REQUEST_TIMEOUT), socket.into_future()) + .map_err(RPCError::from as FnMapErr) .and_then({ |(req, stream)| match req { - Some(request) => futures::future::ok((request, stream)), - None => futures::future::err(RPCError::Custom( - "Stream terminated early".into(), - )), + Some(Ok(request)) => future::ok((request, stream)), + Some(Err(_)) | None => { + err(RPCError::Custom("Stream terminated early".into())) + } } } as FnAndThen), ), - } + }) } } @@ -335,11 +335,12 @@ impl RPCRequest { /* Outbound upgrades */ -pub type OutboundFramed = Framed>>; +pub type OutboundFramed = + Framed, OutboundCodec>; impl OutboundUpgrade for RPCRequest where - TSpec: EthSpec, + TSpec: EthSpec + Send + 'static, TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = OutboundFramed; @@ -347,6 +348,7 @@ where type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { + let socket = TokioNegotiatedStream(socket); let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -359,7 +361,9 @@ where OutboundCodec::SSZ(ssz_codec) } }; - Box::pin(Framed::new(socket, codec).send(self)) + + let socket = Framed::new(socket, codec); + Box::pin(future::join(socket.send(self), future::ok(socket)).map(|(_, socket)| socket)) } } @@ -397,13 +401,9 @@ impl From for RPCError { RPCError::SSZDecodeError(err) } } -impl From for RPCError { - fn from(err: tokio::time::Error) -> Self { - if err.is_elapsed() { - RPCError::StreamTimeout - } else { - RPCError::Custom("Stream timer failed".into()) - } +impl From for RPCError { + fn from(err: tokio::time::Elapsed) -> Self { + RPCError::StreamTimeout } } @@ -468,3 +468,33 @@ impl std::fmt::Display for RPCRequest { } } } + +/// Converts a futures AsyncRead + AsyncWrite object to a tokio::AsyncRead + tokio::AsyncWrite +/// object. +struct TokioNegotiatedStream(T); + +impl tokio::io::AsyncRead for TokioNegotiatedStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for TokioNegotiatedStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_close(cx) + } +}