Super tiny RPC refactor (#1187)

* wip: mwake the request id optional

* make the request_id optional

* cleanup

* address clippy lints inside rpc

* WIP: Separate sent RPC events from received ones

* WIP: Separate sent RPC events from received ones

* cleanup

* Separate request ids from substream ids

* Make RPC's message handling independent of RequestIds

* Change behaviour RPC events to be more outside-crate friendly

* Propage changes across the network + router + processor

* Propage changes across the network + router + processor

* fmt

* "tiny" refactor

* more tiny refactors

* fmt eth2-libp2p

* wip: propagating changes

* wip: propagating changes

* cleaning up

* more cleanup

* fmt

* tests HOT fix

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
divma
2020-06-04 22:07:59 -05:00
committed by GitHub
parent 042e80570c
commit 0e37a16927
19 changed files with 1445 additions and 1175 deletions

View File

@@ -3,7 +3,7 @@
use super::methods::{RPCCodedResponse, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use super::{RPCReceived, RPCSend};
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use fnv::FnvHashMap;
use futures::prelude::*;
@@ -33,12 +33,34 @@ pub const RESPONSE_TIMEOUT: u64 = 10;
/// The number of times to retry an outbound upgrade in the case of IO errors.
const IO_ERROR_RETRIES: u8 = 3;
/// Inbound requests are given a sequential `RequestId` to keep track of. All inbound streams are
/// identified by their substream ID which is identical to the RPC Id.
type InboundRequestId = RequestId;
/// Outbound requests are associated with an id that is given by the application that sent the
/// request.
type OutboundRequestId = RequestId;
/// Identifier of inbound and outbound substreams from the handler's perspective.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct SubstreamId(usize);
/// An error encoutered by the handler.
pub enum HandlerErr {
/// An error ocurred for this peer's request. This can occurr during protocol negotiation,
/// message passing, or if the handler identifies that we are sending an error reponse to the peer.
Inbound {
/// Id of the peer's request for which an error occurred.
id: SubstreamId,
/// Information of the negotiated protocol.
proto: Protocol,
/// The error that ocurred.
error: RPCError,
},
/// An error ocurred for this request. Such error can occurr during protocol negotiation,
/// message passing, or if we successfully received a response from the peer, but this response
/// indicates an error.
Outbound {
/// Application-given Id of the request for which an error occurred.
id: RequestId,
/// Information of the protocol.
proto: Protocol,
/// The error that ocurred.
error: RPCError,
},
}
/// Implementation of `ProtocolsHandler` for the RPC protocol.
pub struct RPCHandler<TSpec>
@@ -48,11 +70,11 @@ where
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>,
/// If something bad happened and we should shut down the handler with an error.
pending_error: Vec<(RequestId, Protocol, RPCError)>,
/// Errors ocurring on outbound and inbound connections queued for reporting back.
pending_errors: Vec<HandlerErr>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[RPCEvent<TSpec>; 4]>,
events_out: SmallVec<[RPCReceived<TSpec>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>,
@@ -62,7 +84,7 @@ where
/// Current inbound substreams awaiting processing.
inbound_substreams: FnvHashMap<
InboundRequestId,
SubstreamId,
(
InboundSubstreamState<TSpec>,
Option<delay_queue::Key>,
@@ -71,29 +93,22 @@ where
>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
inbound_substreams_delay: DelayQueue<InboundRequestId>,
inbound_substreams_delay: DelayQueue<SubstreamId>,
/// Map of outbound substreams that need to be driven to completion. The `RequestId` is
/// maintained by the application sending the request.
/// For Responses with multiple expected response chunks a counter is added to be able to terminate the stream when the expected number has been received
outbound_substreams: FnvHashMap<
OutboundRequestId,
(
OutboundSubstreamState<TSpec>,
delay_queue::Key,
Protocol,
Option<u64>,
),
>,
/// Map of outbound substreams that need to be driven to completion.
outbound_substreams: FnvHashMap<SubstreamId, OutboundInfo<TSpec>>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
outbound_substreams_delay: DelayQueue<SubstreamId>,
/// Map of outbound items that are queued as the stream processes them.
queued_outbound_items: FnvHashMap<RequestId, Vec<RPCCodedResponse<TSpec>>>,
queued_outbound_items: FnvHashMap<SubstreamId, Vec<RPCCodedResponse<TSpec>>>,
/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
current_inbound_substream_id: RequestId,
current_inbound_substream_id: SubstreamId,
/// Sequential ID for outbound substreams.
current_outbound_substream_id: SubstreamId,
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
max_dial_negotiated: u32,
@@ -112,6 +127,23 @@ where
log: slog::Logger,
}
/// Contains the information the handler keeps on established outbound substreams.
struct OutboundInfo<TSpec: EthSpec> {
/// State of the substream.
state: OutboundSubstreamState<TSpec>,
/// Key to keep track of the substream's timeout via `self.outbound_substreams_delay`.
delay_key: delay_queue::Key,
/// Info over the protocol this substream is handling.
proto: Protocol,
/// Number of chunks to be seen from the peer's response.
// TODO: removing the option could allow clossing the streams after the number of
// expected responses is met for all protocols.
// TODO: the type of this is wrong
remaining_chunks: Option<usize>,
/// RequestId as given by the application that sent the request.
req_id: RequestId,
}
pub enum InboundSubstreamState<TSpec>
where
TSpec: EthSpec,
@@ -208,7 +240,7 @@ where
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream,
substream,
message: error,
closing: true,
};
@@ -235,7 +267,7 @@ where
) -> Self {
RPCHandler {
listen_protocol,
pending_error: Vec::new(),
pending_errors: Vec::new(),
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
@@ -244,7 +276,8 @@ where
outbound_substreams: FnvHashMap::default(),
inbound_substreams_delay: DelayQueue::new(),
outbound_substreams_delay: DelayQueue::new(),
current_inbound_substream_id: 1,
current_inbound_substream_id: SubstreamId(0),
current_outbound_substream_id: SubstreamId(0),
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
inactive_timeout,
@@ -300,8 +333,8 @@ impl<TSpec> ProtocolsHandler for RPCHandler<TSpec>
where
TSpec: EthSpec,
{
type InEvent = RPCEvent<TSpec>;
type OutEvent = RPCEvent<TSpec>;
type InEvent = RPCSend<TSpec>;
type OutEvent = Result<RPCReceived<TSpec>, HandlerErr>;
type Error = RPCError;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
@@ -316,9 +349,11 @@ where
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
let (req, substream) = substream;
// drop the stream and return a 0 id for goodbye "requests"
if let r @ RPCRequest::Goodbye(_) = req {
self.events_out.push(RPCEvent::Request(0, r));
// drop the stream
if let RPCRequest::Goodbye(_) = req {
self.events_out
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
self.current_inbound_substream_id.0 += 1;
return;
}
@@ -334,8 +369,8 @@ where
);
self.events_out
.push(RPCEvent::Request(self.current_inbound_substream_id, req));
self.current_inbound_substream_id += 1;
.push(RPCReceived::Request(self.current_inbound_substream_id, req));
self.current_inbound_substream_id.0 += 1;
}
fn inject_fully_negotiated_outbound(
@@ -346,43 +381,42 @@ where
self.dial_negotiated -= 1;
// add the stream to substreams if we expect a response, otherwise drop the stream.
let (mut id, request) = request_info;
if request.expect_response() {
// outbound requests can be sent from various aspects of lighthouse which don't
// track request ids. In the future these will be flagged as None, currently they
// are flagged as 0. These can overlap. In this case, we pick the highest request
// Id available
if id == 0 && self.outbound_substreams.get(&id).is_some() {
// have duplicate outbound request with no id. Pick one that will not collide
let mut new_id = std::usize::MAX;
while self.outbound_substreams.get(&new_id).is_some() {
// panic all outbound substreams are full
new_id -= 1;
}
trace!(self.log, "New outbound stream id created"; "id" => new_id);
id = RequestId::from(new_id);
}
let (id, request) = request_info;
let expected_responses = request.expected_responses();
if expected_responses > 0 {
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(id, Duration::from_secs(RESPONSE_TIMEOUT));
let protocol = request.protocol();
let response_chunk_count = match request {
RPCRequest::BlocksByRange(ref req) => Some(req.count),
RPCRequest::BlocksByRoot(ref req) => Some(req.block_roots.len() as u64),
_ => None, // Other requests do not have a known response chunk length,
};
let delay_key = self.outbound_substreams_delay.insert(
self.current_outbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let proto = request.protocol();
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: out,
request: request,
request,
};
if let Some(_) = self.outbound_substreams.insert(
id,
(awaiting_stream, delay_key, protocol, response_chunk_count),
) {
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
self.current_outbound_substream_id,
OutboundInfo {
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
req_id: id,
},
)
.is_some()
{
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", self.current_outbound_substream_id));
}
self.current_outbound_substream_id.0 += 1;
}
self.update_keep_alive();
@@ -392,113 +426,124 @@ where
// wrong state a response will fail silently.
fn inject_event(&mut self, rpc_event: Self::InEvent) {
match rpc_event {
RPCEvent::Request(id, req) => self.send_request(id, req),
RPCEvent::Response(rpc_id, response) => {
RPCSend::Request(id, req) => self.send_request(id, req),
RPCSend::Response(inbound_id, response) => {
// Variables indicating if the response is an error response or a multi-part
// response
let res_is_error = response.is_error();
let res_is_multiple = response.multiple_responses();
// check if the stream matching the response still exists
match self.inbound_substreams.get_mut(&rpc_id) {
Some((substream_state, _, protocol)) => {
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponseIdle(substream) => {
// close the stream if there is no response
match response {
RPCCodedResponse::StreamTermination(_) => {
//trace!(self.log, "Stream termination sent. Ending the stream");
*substream_state =
InboundSubstreamState::Closing(substream);
}
_ => {
if let Some(error_code) = response.error_code() {
self.pending_error.push((
rpc_id,
*protocol,
RPCError::ErrorResponse(error_code),
));
}
// send the response
// if it's a single rpc request or an error, close the stream after
*substream_state =
InboundSubstreamState::ResponsePendingSend {
substream: substream,
message: response,
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
}
}
}
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} if res_is_multiple => {
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
.or_insert_with(Vec::new)
.push(response);
let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id)
{
Some((substream_state, _, protocol)) => (substream_state, protocol),
None => {
warn!(self.log, "Stream has expired. Response not sent";
"response" => response.to_string(), "id" => inbound_id);
return;
}
};
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
InboundSubstreamState::ResponsePendingFlush { substream, closing }
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(rpc_id)
.or_insert_with(Vec::new)
.push(response);
// If the response we are sending is an error, report back for handling
match response {
RPCCodedResponse::InvalidRequest(ref reason)
| RPCCodedResponse::ServerError(ref reason)
| RPCCodedResponse::Unknown(ref reason) => {
let code = &response
.error_code()
.expect("Error response should map to an error code");
let err = HandlerErr::Inbound {
id: inbound_id,
proto: *protocol,
error: RPCError::ErrorResponse(*code, reason.clone()),
};
self.pending_errors.push(err);
}
_ => {} // not an error, continue.
}
// return the state
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing,
};
}
InboundSubstreamState::Closing(substream) => {
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponseIdle(substream) => {
// close the stream if there is no response
match response {
RPCCodedResponse::StreamTermination(_) => {
*substream_state = InboundSubstreamState::Closing(substream);
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
}
InboundSubstreamState::ResponsePendingSend {
substream,
message,
..
} => {
_ => {
// send the response
// if it's a single rpc request or an error, close the stream after
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing: true,
message: response,
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
error!(self.log, "Attempted sending multiple responses to a single response request");
}
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing: true,
};
error!(self.log, "Attempted sending multiple responses to a single response request");
}
InboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned inbound substream");
unreachable!("Coding error: Poisoned substream");
}
}
}
None => {
warn!(self.log, "Stream has expired. Response not sent"; "response" => response.to_string(), "id" => rpc_id);
InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
} if res_is_multiple => {
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(inbound_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing,
};
}
};
InboundSubstreamState::ResponsePendingFlush { substream, closing }
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
self.queued_outbound_items
.entry(inbound_id)
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state =
InboundSubstreamState::ResponsePendingFlush { substream, closing };
}
InboundSubstreamState::Closing(substream) => {
*substream_state = InboundSubstreamState::Closing(substream);
debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response));
}
InboundSubstreamState::ResponsePendingSend {
substream, message, ..
} => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream,
message,
closing: true,
};
error!(
self.log,
"Attempted sending multiple responses to a single response request"
);
}
InboundSubstreamState::ResponsePendingFlush { substream, .. } => {
*substream_state = InboundSubstreamState::ResponsePendingFlush {
substream,
closing: true,
};
error!(
self.log,
"Attempted sending multiple responses to a single response request"
);
}
InboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned inbound substream");
unreachable!("Coding error: Poisoned substream");
}
}
}
// We do not send errors as responses
RPCEvent::Error(..) => {}
}
}
@@ -520,7 +565,7 @@ where
self.outbound_io_error_retries = 0;
// map the error
let rpc_error = match error {
let error = match error {
ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
@@ -541,7 +586,11 @@ where
}
},
};
self.pending_error.push((id, req.protocol(), rpc_error));
self.pending_errors.push(HandlerErr::Outbound {
id,
proto: req.protocol(),
error,
});
}
fn connection_keep_alive(&self) -> KeepAlive {
@@ -559,16 +608,15 @@ where
Self::Error,
>,
> {
if !self.pending_error.is_empty() {
let (id, protocol, err) = self.pending_error.remove(0);
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
id, protocol, err,
)));
// report failures
if !self.pending_errors.is_empty() {
let err_info = self.pending_errors.remove(0);
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(err_info)));
}
// return any events that need to be reported
if !self.events_out.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)));
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(self.events_out.remove(0))));
} else {
self.events_out.shrink_to_fit();
}
@@ -576,17 +624,23 @@ where
// purge expired inbound substreams and send an error
loop {
match self.inbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
Poll::Ready(Some(Ok(inbound_id))) => {
// handle a stream timeout for various states
if let Some((substream_state, delay_key, _)) =
self.inbound_substreams.get_mut(stream_id.get_ref())
if let Some((substream_state, delay_key, protocol)) =
self.inbound_substreams.get_mut(inbound_id.get_ref())
{
// the delay has been removed
*delay_key = None;
self.pending_errors.push(HandlerErr::Inbound {
id: *inbound_id.get_ref(),
proto: *protocol,
error: RPCError::StreamTimeout,
});
let outbound_queue = self
.queued_outbound_items
.entry(stream_id.into_inner())
.entry(inbound_id.into_inner())
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
}
@@ -605,20 +659,21 @@ where
// purge expired outbound substreams
loop {
match self.outbound_substreams_delay.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(stream_id))) => {
if let Some((_id, _stream, protocol, _)) =
self.outbound_substreams.remove(stream_id.get_ref())
Poll::Ready(Some(Ok(outbound_id))) => {
if let Some(OutboundInfo { proto, req_id, .. }) =
self.outbound_substreams.remove(outbound_id.get_ref())
{
self.update_keep_alive();
let outbound_err = HandlerErr::Outbound {
id: req_id,
proto,
error: RPCError::StreamTimeout,
};
// notify the user
return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error(
*stream_id.get_ref(),
protocol,
RPCError::StreamTimeout,
)));
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref());
}
}
Poll::Ready(Some(Err(e))) => {
@@ -797,155 +852,161 @@ where
}
// drive outbound streams that need to be processed
for request_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
match self.outbound_substreams.entry(request_id) {
for outbound_id in self.outbound_substreams.keys().copied().collect::<Vec<_>>() {
// get the state and mark it as poisoned
let (mut entry, state) = match self.outbound_substreams.entry(outbound_id) {
Entry::Occupied(mut entry) => {
match std::mem::replace(
&mut entry.get_mut().0,
let state = std::mem::replace(
&mut entry.get_mut().state,
OutboundSubstreamState::Poisoned,
) {
OutboundSubstreamState::RequestPendingResponse {
mut substream,
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.multiple_responses() && !response.is_error() {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.1;
// chunks left after this one
let remaining_chunks = substream_entry
.3
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.0 =
OutboundSubstreamState::Closing(substream);
} else {
// If the response chunk was expected update the remaining number of chunks expected and reset the Timeout
substream_entry.0 =
OutboundSubstreamState::RequestPendingResponse {
substream,
request,
};
substream_entry.3 = Some(remaining_chunks);
self.outbound_substreams_delay.reset(
delay_key,
Duration::from_secs(RESPONSE_TIMEOUT),
);
}
} else {
// either this is a single response request or we received an
// error
// only expect a single response, close the stream
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
);
(entry, state)
}
Entry::Vacant(_) => unreachable!(),
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response),
));
match state {
OutboundSubstreamState::RequestPendingResponse {
mut substream,
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.expected_responses() > 1 && !response.is_error() {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.delay_key;
// chunks left after this one
let remaining_chunks = substream_entry
.remaining_chunks
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.state = OutboundSubstreamState::Closing(substream);
} else {
// If the response chunk was expected update the remaining number of chunks expected and reset the Timeout
substream_entry.state =
OutboundSubstreamState::RequestPendingResponse {
substream,
request,
};
substream_entry.remaining_chunks = Some(remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
}
Poll::Ready(None) => {
// stream closed
// if we expected multiple streams send a stream termination,
// else report the stream terminating only.
//trace!(self.log, "RPC Response - stream closed by remote");
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
} else {
// either this is a single response request or we received an
// error only expect a single response, close the stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}
self.update_keep_alive();
// notify the application error
if request.multiple_responses() {
// return an end of stream result
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
request.stream_termination(),
),
),
));
} // else we return an error, stream should not have closed early.
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
request.protocol(),
RPCError::IncompleteStream,
),
));
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse {
substream,
request,
}
}
Poll::Ready(Some(Err(e))) => {
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
let protocol = entry.get().2;
entry.remove_entry();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, protocol, e),
));
}
},
OutboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(_) => {
// drop the stream and its corresponding timeout
let delay_key = &entry.get().1;
let protocol = entry.get().2;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();
// Check what type of response we got and report it accordingly
let id = entry.get().req_id;
let proto = entry.get().proto;
// report the stream termination to the user
//
// Streams can be terminated here if a responder tries to
// continue sending responses beyond what we would expect. Here
// we simply terminate the stream and report a stream
// termination to the application
match protocol {
Protocol::BlocksByRange => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
ResponseTermination::BlocksByRange,
),
),
));
}
Protocol::BlocksByRoot => {
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCCodedResponse::StreamTermination(
ResponseTermination::BlocksByRoot,
),
),
));
}
_ => {} // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream.
}
}
Poll::Pending => {
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
let received = match response {
RPCCodedResponse::StreamTermination(t) => {
Ok(RPCReceived::EndOfStream(id, t))
}
RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)),
RPCCodedResponse::InvalidRequest(ref r)
| RPCCodedResponse::ServerError(ref r)
| RPCCodedResponse::Unknown(ref r) => {
let code = response.error_code().expect(
"Response indicating and error should map to an error code",
);
Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::ErrorResponse(code, r.clone()),
})
}
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(received));
}
Poll::Ready(None) => {
// stream closed
// if we expected multiple streams send a stream termination,
// else report the stream terminating only.
//trace!(self.log, "RPC Response - stream closed by remote");
// drop the stream
let delay_key = &entry.get().delay_key;
let request_id = *&entry.get().req_id;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();
// notify the application error
if request.expected_responses() > 1 {
// return an end of stream result
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, request.stream_termination()),
)));
}
// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
}
Poll::Pending => {
entry.get_mut().state =
OutboundSubstreamState::RequestPendingResponse { substream, request }
}
Poll::Ready(Some(Err(e))) => {
// drop the stream
let delay_key = &entry.get().delay_key;
self.outbound_substreams_delay.remove(delay_key);
let outbound_err = HandlerErr::Outbound {
id: entry.get().req_id,
proto: entry.get().proto,
error: e,
};
entry.remove_entry();
self.update_keep_alive();
return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err)));
}
},
OutboundSubstreamState::Closing(mut substream) => {
match Sink::poll_close(Pin::new(&mut substream), cx) {
Poll::Ready(_) => {
// drop the stream and its corresponding timeout
let delay_key = &entry.get().delay_key;
let protocol = entry.get().proto;
let request_id = entry.get().req_id;
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
self.update_keep_alive();
// report the stream termination to the user
//
// Streams can be terminated here if a responder tries to
// continue sending responses beyond what we would expect. Here
// we simply terminate the stream and report a stream
// termination to the application
let termination = match protocol {
Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange),
Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot),
_ => None, // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream.
};
if let Some(termination) = termination {
return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(
RPCReceived::EndOfStream(request_id, termination),
)));
}
}
OutboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Outbound substream is poisoned")
Poll::Pending => {
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}
}
}
Entry::Vacant(_) => unreachable!(),
OutboundSubstreamState::Poisoned => {
crit!(self.log, "Poisoned outbound substream");
unreachable!("Coding Error: Outbound substream is poisoned")
}
}
}
@@ -980,7 +1041,7 @@ fn apply_queued_responses<TSpec: EthSpec>(
InboundSubstreamState::Closing(substream)
}
chunk => InboundSubstreamState::ResponsePendingSend {
substream: substream,
substream,
message: chunk,
closing: false,
},
@@ -992,3 +1053,14 @@ fn apply_queued_responses<TSpec: EthSpec>(
}
}
}
impl slog::Value for SubstreamId {
fn serialize(
&self,
record: &slog::Record,
key: slog::Key,
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
slog::Value::serialize(&self.0, record, key, serializer)
}
}