Improved RPC timeout handling (#770)

* Handle slow app level responses for RPC

* Correct errors

* Terminates stream on error
This commit is contained in:
Age Manning
2020-01-08 14:18:06 +11:00
committed by GitHub
parent f36a5a15d6
commit 30f51df4cf
3 changed files with 133 additions and 71 deletions

View File

@@ -1,7 +1,7 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]
use super::methods::{RPCErrorResponse, RequestId};
use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTermination};
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
@@ -28,7 +28,8 @@ pub const RESPONSE_TIMEOUT: u64 = 10;
/// The number of times to retry an outbound upgrade in the case of IO errors.
const IO_ERROR_RETRIES: u8 = 3;
/// Inbound requests are given a sequential `RequestId` to keep track of.
/// Inbound requests are given a sequential `RequestId` to keep track of. All inbound streams are
/// identified by their substream ID which is identical to the RPC Id.
type InboundRequestId = RequestId;
/// Outbound requests are associated with an id that is given by the application that sent the
/// request.
@@ -72,8 +73,8 @@ where
/// Map of outbound items that are queued as the stream processes them.
queued_outbound_items: FnvHashMap<RequestId, Vec<RPCErrorResponse>>,
/// Sequential Id for waiting substreams.
current_substream_id: RequestId,
/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
current_inbound_substream_id: RequestId,
/// Maximum number of concurrent outbound substreams being opened. Value is never modified.
max_dial_negotiated: u32,
@@ -153,7 +154,7 @@ where
outbound_substreams: FnvHashMap::default(),
inbound_substreams_delay: DelayQueue::new(),
outbound_substreams_delay: DelayQueue::new(),
current_substream_id: 1,
current_inbound_substream_id: 1,
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
inactive_timeout,
@@ -226,16 +227,18 @@ where
// New inbound request. Store the stream and tag the output.
let delay_key = self.inbound_substreams_delay.insert(
self.current_substream_id,
self.current_inbound_substream_id,
Duration::from_secs(RESPONSE_TIMEOUT),
);
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
self.inbound_substreams
.insert(self.current_substream_id, (awaiting_stream, delay_key));
self.inbound_substreams.insert(
self.current_inbound_substream_id,
(awaiting_stream, delay_key),
);
self.events_out
.push(RPCEvent::Request(self.current_substream_id, req));
self.current_substream_id += 1;
.push(RPCEvent::Request(self.current_inbound_substream_id, req));
self.current_inbound_substream_id += 1;
}
fn inject_fully_negotiated_outbound(
@@ -306,11 +309,10 @@ where
if res_is_multiple =>
{
// the stream is in use, add the request to a pending queue
(*self
.queued_outbound_items
self.queued_outbound_items
.entry(rpc_id)
.or_insert_with(Vec::new))
.push(response);
.or_insert_with(Vec::new)
.push(response);
// return the state
*substream_state = InboundSubstreamState::ResponsePendingSend {
@@ -431,13 +433,57 @@ where
self.events_out.shrink_to_fit();
}
// purge expired inbound substreams
// purge expired inbound substreams and send an error
while let Async::Ready(Some(stream_id)) = self
.inbound_substreams_delay
.poll()
.map_err(|_| ProtocolsHandlerUpgrErr::Timer)?
{
self.inbound_substreams.remove(stream_id.get_ref());
let rpc_id = stream_id.get_ref();
// handle a stream timeout for various states
if let Some((substream_state, _)) = self.inbound_substreams.get_mut(rpc_id) {
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCErrorResponse::ServerError(ErrorMessage {
error_message: "Request timed out".as_bytes().to_vec(),
});
// The stream termination type is irrelevant, this will terminate the
// stream
let stream_termination =
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
if !closing {
// if the stream is not closing, add an error to the stream queue and exit
let queue = self
.queued_outbound_items
.entry(*rpc_id)
.or_insert_with(Vec::new);
queue.push(error);
queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*substream_state =
InboundSubstreamState::ResponsePendingSend { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
closing: true,
};
}
InboundSubstreamState::Closing(substream) => {
// let the stream close
*substream_state = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Poisoned => {
unreachable!("Coding error: Timeout poisoned substream")
}
};
}
}
// purge expired outbound substreams
@@ -555,7 +601,7 @@ where
request,
} => match substream.poll() {
Ok(Async::Ready(Some(response))) => {
if request.multiple_responses() {
if request.multiple_responses() && !response.is_error() {
entry.get_mut().0 =
OutboundSubstreamState::RequestPendingResponse {
substream,
@@ -565,10 +611,13 @@ where
self.outbound_substreams_delay
.reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT));
} else {
// either this is a single response request or we received an
// error
trace!(self.log, "Closing single stream request");
// only expect a single response, close the stream
entry.get_mut().0 = OutboundSubstreamState::Closing(substream);
}
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(request_id, response),
)));

View File

@@ -564,6 +564,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
batch.retries += 1;
// TODO: Handle partially downloaded batches. Update this when building a new batch
// processor thread.
if batch.retries > MAX_BATCH_RETRIES {
// chain is unrecoverable, remove it
ProcessingResult::RemoveChain