Correct stream timeout delay queue handling (#803)

* Correct stream timeout delay queue handling

* Correct small typo
This commit is contained in:
Age Manning
2020-01-15 15:50:01 +11:00
committed by GitHub
parent c184a98170
commit 13e74e5352
2 changed files with 103 additions and 97 deletions

View File

@@ -57,7 +57,7 @@ where
/// Current inbound substreams awaiting processing.
inbound_substreams:
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSubstream>, delay_queue::Key)>,
FnvHashMap<InboundRequestId, (InboundSubstreamState<TSubstream>, Option<delay_queue::Key>)>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
inbound_substreams_delay: DelayQueue<InboundRequestId>,
@@ -134,6 +134,52 @@ pub enum OutboundSubstreamState<TSubstream> {
Poisoned,
}
impl<TSubstream> InboundSubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
/// Moves the substream state to closing and informs the connected peer. The
/// `queued_outbound_items` must be given as a parameter to add stream termination messages to
/// the outbound queue.
pub fn close(&mut self, outbound_queue: &mut Vec<RPCErrorResponse>) {
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCErrorResponse::ServerError(ErrorMessage {
error_message: "Request timed out".as_bytes().to_vec(),
});
// The stream termination type is irrelevant, this will terminate the
// stream
let stream_termination =
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
if !closing {
outbound_queue.push(error);
outbound_queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*self = InboundSubstreamState::ResponsePendingSend { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*self = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
closing: true,
};
}
InboundSubstreamState::Closing(substream) => {
// let the stream close
*self = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Poisoned => {
unreachable!("Coding error: Timeout poisoned substream")
}
};
}
}
impl<TSubstream> RPCHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
@@ -233,7 +279,7 @@ where
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
(awaiting_stream, delay_key),
(awaiting_stream, Some(delay_key)),
);
self.events_out
@@ -442,47 +488,15 @@ where
let rpc_id = stream_id.get_ref();
// handle a stream timeout for various states
if let Some((substream_state, _)) = self.inbound_substreams.get_mut(rpc_id) {
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCErrorResponse::ServerError(ErrorMessage {
error_message: "Request timed out".as_bytes().to_vec(),
});
// The stream termination type is irrelevant, this will terminate the
// stream
let stream_termination =
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
if let Some((substream_state, delay_key)) = self.inbound_substreams.get_mut(rpc_id) {
// the delay has been removed
*delay_key = None;
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
if !closing {
// if the stream is not closing, add an error to the stream queue and exit
let queue = self
.queued_outbound_items
.entry(*rpc_id)
.or_insert_with(Vec::new);
queue.push(error);
queue.push(stream_termination);
}
// if the stream is closing after the send, allow it to finish
*substream_state =
InboundSubstreamState::ResponsePendingSend { substream, closing }
}
InboundSubstreamState::ResponseIdle(substream) => {
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(error),
closing: true,
};
}
InboundSubstreamState::Closing(substream) => {
// let the stream close
*substream_state = InboundSubstreamState::Closing(substream);
}
InboundSubstreamState::Poisoned => {
unreachable!("Coding error: Timeout poisoned substream")
}
};
let outbound_queue = self
.queued_outbound_items
.entry(*rpc_id)
.or_insert_with(Vec::new);
substream_state.close(outbound_queue);
}
}
@@ -546,8 +560,9 @@ where
};
}
Err(e) => {
let delay_key = &entry.get().1;
self.inbound_substreams_delay.remove(delay_key);
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, e),
@@ -566,9 +581,10 @@ where
match substream.close() {
Ok(Async::Ready(())) | Err(_) => {
//trace!(self.log, "Inbound stream dropped");
let delay_key = &entry.get().1;
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
self.queued_outbound_items.remove(&request_id);
self.inbound_substreams_delay.remove(delay_key);
entry.remove();
} // drop the stream
Ok(Async::NotReady) => {