protocol.rs compiles

This commit is contained in:
pawanjay176
2020-04-29 20:10:16 +05:30
parent 5ae53c9699
commit 5fa6b8d5e3
4 changed files with 36 additions and 34 deletions

View File

@@ -33,6 +33,7 @@ sha2 = "0.8.0"
base64 = "0.11.0" base64 = "0.11.0"
snap = "1" snap = "1"
void = "1.0.2" void = "1.0.2"
tokio-io-timeout = "0.4.0"
tokio-util = {version = "0.3.1", features = ["codec"]} tokio-util = {version = "0.3.1", features = ["codec"]}
[dev-dependencies] [dev-dependencies]

View File

@@ -38,8 +38,9 @@ type InboundRequestId = RequestId;
type OutboundRequestId = RequestId; type OutboundRequestId = RequestId;
/// Implementation of `ProtocolsHandler` for the RPC protocol. /// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSpec> pub struct RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// The upgrade for inbound substreams. /// The upgrade for inbound substreams.
@@ -61,7 +62,7 @@ where
inbound_substreams: FnvHashMap< inbound_substreams: FnvHashMap<
InboundRequestId, InboundRequestId,
( (
InboundSubstreamState<TSpec>, InboundSubstreamState<'a, TSubstream, TSpec>,
Option<delay_queue::Key>, Option<delay_queue::Key>,
), ),
>, >,
@@ -73,7 +74,7 @@ where
/// maintained by the application sending the request. /// maintained by the application sending the request.
outbound_substreams: FnvHashMap< outbound_substreams: FnvHashMap<
OutboundRequestId, OutboundRequestId,
(OutboundSubstreamState<TSpec>, delay_queue::Key), (OutboundSubstreamState<TSubstream, TSpec>, delay_queue::Key),
>, >,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
@@ -103,15 +104,15 @@ where
} }
/// State of an outbound substream. Either waiting for a response, or in the process of sending. /// State of an outbound substream. Either waiting for a response, or in the process of sending.
pub enum InboundSubstreamState<TSubstream, TSpec> pub enum InboundSubstreamState<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// A response has been sent, pending writing and flush. /// A response has been sent, pending writing and flush.
ResponsePendingSend { ResponsePendingSend {
/// The substream used to send the response /// The substream used to send the response
substream: futures::sink::Send<InboundFramed<TSubstream, TSpec>>, substream: futures::sink::Send<'a, InboundFramed<TSubstream, TSpec>, RPCErrorResponse<TSpec>>,
/// Whether a stream termination is requested. If true the stream will be closed after /// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is /// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached. /// requested or a timeout is reached.
@@ -141,9 +142,8 @@ pub enum OutboundSubstreamState<TSubstream, TSpec: EthSpec> {
Poisoned, Poisoned,
} }
impl<TSubstream, TSpec> InboundSubstreamState<TSubstream, TSpec> impl<'a, TSubstream, TSpec> InboundSubstreamState<'a, TSpec, TSubstream>
where where
TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
{ {
/// Moves the substream state to closing and informs the connected peer. The /// Moves the substream state to closing and informs the connected peer. The
@@ -188,7 +188,7 @@ where
} }
} }
impl<TSubstream, TSpec> RPCHandler<TSubstream, TSpec> impl<'a, TSubstream, TSpec> RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
@@ -215,7 +215,6 @@ where
inactive_timeout, inactive_timeout,
outbound_io_error_retries: 0, outbound_io_error_retries: 0,
log: log.clone(), log: log.clone(),
_phantom: PhantomData,
} }
} }
@@ -248,9 +247,9 @@ where
} }
} }
impl<TSubstream, TSpec> ProtocolsHandler for RPCHandler<TSubstream, TSpec> impl<'a, TSubstream, TSpec> ProtocolsHandler for RPCHandler<'a, TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
TSpec: EthSpec, TSpec: EthSpec,
{ {
type InEvent = RPCEvent<TSpec>; type InEvent = RPCEvent<TSpec>;
@@ -777,11 +776,11 @@ where
} }
// Check for new items to send to the peer and update the underlying stream // Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>( fn apply_queued_responses<'a, TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
raw_substream: InboundFramed<TSubstream, TSpec>, raw_substream: InboundFramed<TSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>, queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>,
new_items_to_send: &mut bool, new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSubstream, TSpec> { ) -> InboundSubstreamState<'a, TSubstream, TSpec> {
match queued_outbound_items { match queued_outbound_items {
Some(ref mut queue) if !queue.is_empty() => { Some(ref mut queue) if !queue.is_empty() => {
*new_items_to_send = true; *new_items_to_send = true;

View File

@@ -94,12 +94,12 @@ impl<TSubstream, TSpec: EthSpec> RPC<TSubstream, TSpec> {
} }
} }
impl<TSubstream, TSpec> NetworkBehaviour for RPC<TSubstream, TSpec> impl<'a, TSubstream, TSpec> NetworkBehaviour for RPC<TSubstream, TSpec>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite,
TSpec: EthSpec, TSpec: EthSpec,
{ {
type ProtocolsHandler = RPCHandler<TSubstream, TSpec>; type ProtocolsHandler = RPCHandler<'a, TSubstream, TSpec>;
type OutEvent = RPCMessage<TSpec>; type OutEvent = RPCMessage<TSpec>;
fn new_handler(&mut self) -> Self::ProtocolsHandler { fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@@ -11,15 +11,17 @@ use crate::rpc::{
methods::ResponseTermination, methods::ResponseTermination,
}; };
use futures::future::*; use futures::future::*;
use futures::{future, sink, stream, Sink, Stream}; use futures::{future, sink, stream};
use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo}; use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeInfo};
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use tokio_util::codec::Framed;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time::Timeout; use tokio::time::Timeout;
use tokio_io_timeout::TimeoutStream;
use tokio_util::codec::Framed;
use types::EthSpec; use types::EthSpec;
use std::pin::Pin;
/// The maximum bytes that can be sent across the RPC. /// The maximum bytes that can be sent across the RPC.
const MAX_RPC_SIZE: usize = 1_048_576; // 1M const MAX_RPC_SIZE: usize = 1_048_576; // 1M
@@ -169,12 +171,13 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>); pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> = pub type InboundFramed<TSocket, TSpec> =
Framed<Timeout<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec>>; Framed<Timeout<upgrade::Negotiated<TSocket>>, InboundCodec<TSpec, RPCErrorResponse<TSpec>>>;
type FnAndThen<TSocket, TSpec> = fn( type FnAndThen<TSocket, TSpec> = fn(
(Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>), (Option<RPCRequest<TSpec>>, InboundFramed<TSocket, TSpec>),
) -> FutureResult<InboundOutput<TSocket, TSpec>, RPCError>; ) -> Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>;
type FnMapErr<TSocket, TSpec> = // TODO: Error doesn't take a generic parameter in new tokio
fn(timeout::Error<(RPCError, InboundFramed<TSocket, TSpec>)>) -> RPCError; // Need to check implications
type FnMapErr = fn(tokio::time::Error) -> RPCError;
impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec> impl<TSocket, TSpec> InboundUpgrade<TSocket> for RPCProtocol<TSpec>
where where
@@ -185,13 +188,10 @@ where
type Error = RPCError; type Error = RPCError;
type Future = future::Either< type Future = future::Either<
FutureResult<InboundOutput<TSocket, TSpec>, RPCError>, Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
future::AndThen< future::AndThen<
future::MapErr< future::MapErr<Timeout<stream::StreamFuture<InboundFramed<TSocket, TSpec>>>, FnMapErr>,
timeout::Timeout<stream::StreamFuture<InboundFramed<TSocket, TSpec>>>, Ready<Result<InboundOutput<TSocket, TSpec>, RPCError>>,
FnMapErr<TSocket, TSpec>,
>,
FutureResult<InboundOutput<TSocket, TSpec>, RPCError>,
FnAndThen<TSocket, TSpec>, FnAndThen<TSocket, TSpec>,
>, >,
>; >;
@@ -349,16 +349,18 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/* Outbound upgrades */ /* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> = pub type OutboundFramed<TSocket, TSpec> =
Framed<upgrade::Negotiated<TSocket>, OutboundCodec<TSpec>>; Framed<upgrade::Negotiated<TSocket>, OutboundCodec<TSpec, RPCRequest<TSpec>>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec> impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec>
where where
TSpec: EthSpec, TSpec: EthSpec,
TSocket: AsyncRead + AsyncWrite, TSocket: AsyncRead + AsyncWrite + Send,
{ {
type Output = OutboundFramed<TSocket, TSpec>; type Output = OutboundFramed<TSocket, TSpec>;
type Error = RPCError; type Error = RPCError;
type Future = sink::Send<OutboundFramed<TSocket, TSpec>>; // TODO: Send takes a mutable reference to the sink now, hence the lifetime parameter
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
// type Future = sink::Send<'a, &'a mut OutboundFramed<TSocket, TSpec>, RPCRequest<TSpec>>;
fn upgrade_outbound( fn upgrade_outbound(
self, self,
socket: upgrade::Negotiated<TSocket>, socket: upgrade::Negotiated<TSocket>,
@@ -376,7 +378,7 @@ where
OutboundCodec::SSZ(ssz_codec) OutboundCodec::SSZ(ssz_codec)
} }
}; };
Framed::new(socket, codec).send(self) Box::pin(Framed::new(socket, codec).send(self))
} }
} }
@@ -414,8 +416,8 @@ impl From<ssz::DecodeError> for RPCError {
RPCError::SSZDecodeError(err) RPCError::SSZDecodeError(err)
} }
} }
impl<T> From<tokio::timer::timeout::Error<T>> for RPCError { impl From<tokio::time::Error> for RPCError {
fn from(err: tokio::timer::timeout::Error<T>) -> Self { fn from(err: tokio::time::Error) -> Self {
if err.is_elapsed() { if err.is_elapsed() {
RPCError::StreamTimeout RPCError::StreamTimeout
} else { } else {