RPC handler to stable futures

This commit is contained in:
Age Manning
2020-05-04 17:35:41 +10:00
parent 0d4ee680b5
commit a43381e3d5
4 changed files with 270 additions and 126 deletions

View File

@@ -25,7 +25,6 @@ use std::{
use tokio::time::{delay_queue, DelayQueue};
use types::EthSpec;
//TODO: Implement close() on the substream types to improve the poll code.
//TODO: Implement check_timeout() on the substream types
/// The time (in seconds) before a substream that is awaiting a response from the user times out.
@@ -65,7 +64,7 @@ where
inbound_substreams: FnvHashMap<
InboundRequestId,
(
InboundSubstreamState<TSubstream, TSpec>,
InboundSubstreamState<TSpec>,
Option<delay_queue::Key>,
Protocol,
),
@@ -76,14 +75,8 @@ where
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is
/// maintained by the application sending the request.
outbound_substreams: FnvHashMap<
OutboundRequestId,
(
OutboundSubstreamState<TSubstream, TSpec>,
delay_queue::Key,
Protocol,
),
>,
outbound_substreams:
FnvHashMap<OutboundRequestId, (OutboundSubstreamState<TSpec>, delay_queue::Key, Protocol)>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
@@ -115,8 +108,19 @@ pub enum InboundSubstreamState<TSpec>
where
TSpec: EthSpec,
{
/// A response has been sent, pending writing and flush.
/// A response has been sent, pending writing.
ResponsePendingSend {
/// The substream used to send the response
substream: InboundFramed<NegotiatedSubstream, TSpec>,
/// The message that is attempting to be sent.
message: RPCCodedResponse<TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
/// this send. Otherwise it will transition to an idle state until a stream termination is
/// requested or a timeout is reached.
closing: bool,
},
/// A response has been sent, pending flush.
ResponsePendingFlush {
/// The substream used to send the response
substream: InboundFramed<NegotiatedSubstream, TSpec>,
/// Whether a stream termination is requested. If true the stream will be closed after
@@ -169,18 +173,37 @@ where
RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
*self = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
}
}
// if we are busy awaiting a send/flush add the termination to the queue
InboundSubstreamState::ResponsePendingFlush { substream, closing } => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingFlush { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
substream: substream,
message: error,
closing: true,
};
}
@@ -259,7 +282,7 @@ where
{
type InEvent = RPCEvent<TSpec>;
type OutEvent = RPCEvent<TSpec>;
type Error = ProtocolsHandlerUpgrErr<RPCError>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
@@ -387,15 +410,18 @@ where
// if it's a single rpc request or an error, close the stream after
*substream_state =
InboundSubstreamState::ResponsePendingSend {
substream: substream.send(response),
substream: substream,
message: response,
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
}
}
}
InboundSubstreamState::ResponsePendingSend { substream, closing }
if res_is_multiple =>
{
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} if res_is_multiple => {
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
@@ -404,6 +430,22 @@ where
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
InboundSubstreamState::ResponsePendingFlush { substream, closing }
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
@@ -412,9 +454,14 @@ where
*substream_state = InboundSubstreamState::Closing(substream);
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
}
InboundSubstreamState::ResponsePendingSend { substream, .. } => {
InboundSubstreamState::ResponsePendingSend {
substream,
message,
..
} => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing: true,
};
error!(self.log, "Attempted sending multiple responses to a single response request");
@@ -494,57 +541,71 @@ where
> {
if !self.pending_error.is_empty() {
let (id, protocol, err) = self.pending_error.remove(0);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(id, protocol, err),
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
id, protocol, err,
)));
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))));
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
} else {
self.events_out.shrink_to_fit();
}
// purge expired inbound substreams and send an error
// TODO: check if this pattern is equivalent to
// while let Async::Ready() = stream.poll().map_err(..)
while let Poll::Ready(Some(d)) = self.inbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?;
let rpc_id = stream_id.get_ref();
loop {
match self.inbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
// handle a stream timeout for various states
if let Some((substream_state, delay_key, _)) =
self.inbound_substreams.get_mut(stream_id.get_ref())
{
// the delay has been removed
*delay_key = None;
// handle a stream timeout for various states
if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) {
// the delay has been removed
*delay_key = None;
let outbound_queue = self
.queued_outbound_items
.entry(*rpc_id)
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
let outbound_queue = self
.queued_outbound_items
.entry(stream_id.into_inner())
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e));
// drops the peer if we cannot read the delay queue
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
"Could not poll inbound stream timer",
)));
}
Poll::Pending => break,
}
}
// purge expired outbound substreams
if let Poll::Ready(Some(d)) = self.outbound_substreams_delay.poll() {
let stream_id = d.map_err(|e| {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
ProtocolsHandlerUpgrErr::Timer
})?
{
if let Some((_id, _stream, protocol)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout),
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
loop {
match self.outbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
if let Some((_id, _stream, protocol)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
*stream_id.get_ref(),
protocol,
RPCError::StreamTimeout,
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e));
return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError(
"Could not poll outbound stream timer",
)));
}
Poll::Pending => break,
}
}
@@ -563,20 +624,93 @@ where
) {
InboundSubstreamState::ResponsePendingSend {
mut substream,
message,
closing,
} => {
match substream.poll() {
Poll::Ready(Ok(raw_substream)) => {
// completed the send
// close the stream if required
match Sink::poll_ready(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// stream is ready to send data
match Sink::start_send(Pin::new(&mut substream), message) {
Ok(()) => {
// await flush
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
}
}
Err(e) => {
// error with sending in the codec
error!(self.log, "Error sending RPC message"; "message" => message.to_string());
// keep connection with the peer and return the
// stream to awaiting response if this message
// wasn't closing the stream
// TODO: Duplicate code
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream)
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
&mut new_items_to_send,
);
}
}
}
}
Poll::Ready(Err(e)) => {
error!(self.log, "Outbound substream error while sending RPC message: {:?}", e);
return Poll::Ready(ProtocolsHandlerEvent::Close(e));
}
Poll::Pending => {
// the stream is not yet ready, continue waiting
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
}
}
InboundSubstreamState::ResponsePendingFlush {
mut substream,
closing,
} => {
match Sink::poll_flush(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
// finished flushing
// TODO: Duplicate code
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(raw_substream)
InboundSubstreamState::Closing(substream)
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
raw_substream,
substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
&mut new_items_to_send,
);
}
}
Poll::Ready(Err(e)) => {
// error during flush
error!(self.log, "Error sending flushing RPC message");
// close the stream if required
// TODO: Duplicate code
if closing {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream)
} else {
// check for queued chunks and update the stream
entry.get_mut().0 = apply_queued_responses(
substream,
&mut self
.queued_outbound_items
.get_mut(&request_id),
@@ -586,22 +720,12 @@ where
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::ResponsePendingSend {
InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
}
Err(e) => {
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
let protocol = entry.get().2;
entry.remove_entry();
return Ok(Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, protocol, e),
)));
}
};
}
}
InboundSubstreamState::ResponseIdle(substream) => {
entry.get_mut().0 = apply_queued_responses(
@@ -611,10 +735,8 @@ where
);
}
InboundSubstreamState::Closing(mut substream) => {
// TODO: check if this is supposed to be a stream
match substream.close() {
Poll::Ready(_) => {
//trace!(self.log, "Inbound stream dropped");
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(Ok(())) => {
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
@@ -629,6 +751,24 @@ where
);
}
} // drop the stream
Poll::Ready(Err(e)) => {
error!(self.log, "Error closing inbound stream"; "error" => e.to_string());
// drop the stream anyway
// TODO: Duplicate code
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
entry.remove();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
}
Poll::Pending => {
entry.get_mut().0 =
InboundSubstreamState::Closing(substream);
@@ -639,7 +779,7 @@ where
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Inbound Substream is poisoned");
}
};
}
}
Entry::Vacant(_) => unreachable!(),
}
@@ -657,7 +797,7 @@ where
OutboundSubstreamState::RequestPendingResponse {
mut substream,
request,
} => match substream.poll() {
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 =
@@ -676,9 +816,9 @@ where
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response),
)));
));
}
Poll::Ready(None) => {
// stream closed
@@ -692,22 +832,22 @@ where
// notify the application error
if request.multiple_responses() {
// return an end of stream result
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
request.stream_termination(),
),
),
)));
));
} // else we return an error, stream should not have closed early.
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
request.protocol(),
RPCError::IncompleteStream,
),
)));
));
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
@@ -721,31 +861,33 @@ where
self.outbound_substreams_delay.remove(delay_key);
let protocol = entry.get().2;
entry.remove_entry();
return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, protocol, e),
)));
));
}
},
OutboundSubstreamState::Closing(mut substream) => match substream.close() {
// TODO: check if this is supposed to be a stream
Poll::Ready(_) => {
//trace!(self.log, "Outbound stream dropped");
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
OutboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
// TODO: check if this is supposed to be a stream
Poll::Ready(_) => {
// drop the stream - including if there is an error
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive =
KeepAlive::Until(Instant::now() + self.inactive_timeout);
if self.outbound_substreams.is_empty()
&& self.inbound_substreams.is_empty()
{
self.keep_alive = KeepAlive::Until(
Instant::now() + self.inactive_timeout,
);
}
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
},
}
OutboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Outbound substream is poisoned")
@@ -761,12 +903,10 @@ where
self.dial_negotiated += 1;
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
return Poll::Ready(Ok(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
},
));
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
});
}
Poll::Pending
}
@@ -774,7 +914,7 @@ where
// Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSpec: EthSpec>(
raw_substream: InboundFramed<NegotiatedSubstream, TSpec>,
substream: InboundFramed<NegotiatedSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCCodedResponse<TSpec>>>,
new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSpec> {
@@ -785,17 +925,18 @@ fn apply_queued_responses<TSpec: EthSpec>(
match queue.remove(0) {
RPCCodedResponse::StreamTermination(_) => {
// close the stream if this is a stream termination
InboundSubstreamState::Closing(raw_substream)
InboundSubstreamState::Closing(substream)
}
chunk => InboundSubstreamState::ResponsePendingSend {
substream: raw_substream.send(chunk),
substream: substream,
message: chunk,
closing: false,
},
}
}
_ => {
// no items queued set to idle
InboundSubstreamState::ResponseIdle(raw_substream)
InboundSubstreamState::ResponseIdle(substream)
}
}
}

View File

@@ -20,7 +20,10 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_io_timeout::TimeoutStream;
use tokio_util::codec::Framed;
use tokio_util::{
codec::Framed,
compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
};
use types::EthSpec;
/// The maximum bytes that can be sent across the RPC.
@@ -171,7 +174,7 @@ impl ProtocolName for ProtocolId {
pub type InboundOutput<TSocket, TSpec> = (RPCRequest<TSpec>, InboundFramed<TSocket, TSpec>);
pub type InboundFramed<TSocket, TSpec> =
Framed<TimeoutStream<TokioNegotiatedStream<TSocket>>, InboundCodec<TSpec>>;
Framed<TimeoutStream<Compat<TSocket>>, InboundCodec<TSpec>>;
type FnAndThen<TSocket, TSpec> = fn(
(
Option<Result<RPCRequest<TSpec>, RPCError>>,
@@ -191,7 +194,8 @@ where
fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
let protocol_name = protocol.message_name;
let socket = TokioNegotiatedStream(socket);
// convert the socket to tokio compatible socket
let socket = socket.compat();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =
@@ -220,9 +224,7 @@ where
.and_then({
|(req, stream)| match req {
Some(Ok(request)) => future::ok((request, stream)),
Some(Err(_)) => | None => {
future::err(RPCError::IncompleteStream)
}
Some(Err(_)) | None => future::err(RPCError::IncompleteStream),
}
} as FnAndThen<TSocket, TSpec>),
),
@@ -346,8 +348,7 @@ impl<TSpec: EthSpec> RPCRequest<TSpec> {
/* Outbound upgrades */
pub type OutboundFramed<TSocket, TSpec> =
Framed<TokioNegotiatedStream<TSocket>, OutboundCodec<TSpec>>;
pub type OutboundFramed<TSocket, TSpec> = Framed<Compat<TSocket>, OutboundCodec<TSpec>>;
impl<TSocket, TSpec> OutboundUpgrade<TSocket> for RPCRequest<TSpec>
where
@@ -359,7 +360,8 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future {
let socket = TokioNegotiatedStream(socket);
// convert to a tokio compatible socket
let socket = socket.comapt();
let codec = match protocol.encoding {
Encoding::SSZSnappy => {
let ssz_snappy_codec =