Better handling of RPC errors and RPC conn with the PeerManager (#1047)

This commit is contained in:
divma
2020-05-03 08:17:12 -05:00
committed by GitHub
parent b6c027b9ec
commit b4a1a2e483
16 changed files with 656 additions and 463 deletions

View File

@@ -1,14 +1,16 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::cognitive_complexity)]
use super::methods::{ErrorMessage, RPCErrorResponse, RequestId, ResponseTermination};
use super::protocol::{RPCError, RPCProtocol, RPCRequest};
use super::methods::{ErrorMessage, RPCCodedResponse, RequestId, ResponseTermination};
use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest};
use super::RPCEvent;
use crate::rpc::protocol::{InboundFramed, OutboundFramed};
use core::marker::PhantomData;
use fnv::FnvHashMap;
use futures::prelude::*;
use libp2p::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use libp2p::core::upgrade::{
InboundUpgrade, NegotiationError, OutboundUpgrade, ProtocolError, UpgradeError,
};
use libp2p::swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
@@ -46,13 +48,13 @@ where
listen_protocol: SubstreamProtocol<RPCProtocol<TSpec>>,
/// If something bad happened and we should shut down the handler with an error.
pending_error: Vec<(RequestId, ProtocolsHandlerUpgrErr<RPCError>)>,
pending_error: Vec<(RequestId, Protocol, RPCError)>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[RPCEvent<TSpec>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[RPCEvent<TSpec>; 4]>,
dial_queue: SmallVec<[(RequestId, RPCRequest<TSpec>); 4]>,
/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
@@ -63,6 +65,7 @@ where
(
InboundSubstreamState<TSubstream, TSpec>,
Option<delay_queue::Key>,
Protocol,
),
>,
@@ -73,14 +76,18 @@ where
/// maintained by the application sending the request.
outbound_substreams: FnvHashMap<
OutboundRequestId,
(OutboundSubstreamState<TSubstream, TSpec>, delay_queue::Key),
(
OutboundSubstreamState<TSubstream, TSpec>,
delay_queue::Key,
Protocol,
),
>,
/// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout.
outbound_substreams_delay: DelayQueue<OutboundRequestId>,
/// Map of outbound items that are queued as the stream processes them.
queued_outbound_items: FnvHashMap<RequestId, Vec<RPCErrorResponse<TSpec>>>,
queued_outbound_items: FnvHashMap<RequestId, Vec<RPCCodedResponse<TSpec>>>,
/// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID.
current_inbound_substream_id: RequestId,
@@ -152,17 +159,17 @@ where
/// Moves the substream state to closing and informs the connected peer. The
/// `queued_outbound_items` must be given as a parameter to add stream termination messages to
/// the outbound queue.
pub fn close(&mut self, outbound_queue: &mut Vec<RPCErrorResponse<TSpec>>) {
pub fn close(&mut self, outbound_queue: &mut Vec<RPCCodedResponse<TSpec>>) {
// When terminating a stream, report the stream termination to the requesting user via
// an RPC error
let error = RPCErrorResponse::ServerError(ErrorMessage {
error_message: "Request timed out".as_bytes().to_vec(),
let error = RPCCodedResponse::ServerError(ErrorMessage {
error_message: b"Request timed out".to_vec(),
});
// The stream termination type is irrelevant, this will terminate the
// stream
let stream_termination =
RPCErrorResponse::StreamTermination(ResponseTermination::BlocksByRange);
RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange);
match std::mem::replace(self, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponsePendingSend { substream, closing } => {
@@ -244,10 +251,10 @@ where
}
/// Opens an outbound substream with a request.
pub fn send_request(&mut self, rpc_event: RPCEvent<TSpec>) {
pub fn send_request(&mut self, id: RequestId, req: RPCRequest<TSpec>) {
self.keep_alive = KeepAlive::Yes;
self.dial_queue.push(rpc_event);
self.dial_queue.push((id, req));
}
}
@@ -262,7 +269,7 @@ where
type Substream = TSubstream;
type InboundProtocol = RPCProtocol<TSpec>;
type OutboundProtocol = RPCRequest<TSpec>;
type OutboundOpenInfo = RPCEvent<TSpec>; // Keep track of the id and the request
type OutboundOpenInfo = (RequestId, RPCRequest<TSpec>); // Keep track of the id and the request
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.listen_protocol.clone()
@@ -292,7 +299,7 @@ where
let awaiting_stream = InboundSubstreamState::ResponseIdle(substream);
self.inbound_substreams.insert(
self.current_inbound_substream_id,
(awaiting_stream, Some(delay_key)),
(awaiting_stream, Some(delay_key), req.protocol()),
);
self.events_out
@@ -303,7 +310,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
out: <RPCRequest<TSpec> as OutboundUpgrade<TSubstream>>::Output,
rpc_event: Self::OutboundOpenInfo,
request_info: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
@@ -317,70 +324,80 @@ where
}
// add the stream to substreams if we expect a response, otherwise drop the stream.
match rpc_event {
RPCEvent::Request(mut id, request) if request.expect_response() => {
// outbound requests can be sent from various aspects of lighthouse which don't
// track request ids. In the future these will be flagged as None, currently they
// are flagged as 0. These can overlap. In this case, we pick the highest request
// Id available
if id == 0 && self.outbound_substreams.get(&id).is_some() {
// have duplicate outbound request with no id. Pick one that will not collide
let mut new_id = std::usize::MAX;
while self.outbound_substreams.get(&new_id).is_some() {
// panic all outbound substreams are full
new_id -= 1;
}
trace!(self.log, "New outbound stream id created"; "id" => new_id);
id = RequestId::from(new_id);
}
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(id, Duration::from_secs(RESPONSE_TIMEOUT));
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: out,
request,
};
if let Some(_) = self
.outbound_substreams
.insert(id, (awaiting_stream, delay_key))
{
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
let (mut id, request) = request_info;
if request.expect_response() {
// outbound requests can be sent from various aspects of lighthouse which don't
// track request ids. In the future these will be flagged as None, currently they
// are flagged as 0. These can overlap. In this case, we pick the highest request
// Id available
if id == 0 && self.outbound_substreams.get(&id).is_some() {
// have duplicate outbound request with no id. Pick one that will not collide
let mut new_id = std::usize::MAX;
while self.outbound_substreams.get(&new_id).is_some() {
// panic all outbound substreams are full
new_id -= 1;
}
trace!(self.log, "New outbound stream id created"; "id" => new_id);
id = RequestId::from(new_id);
}
_ => { // a response is not expected, drop the stream for all other requests
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
.insert(id, Duration::from_secs(RESPONSE_TIMEOUT));
let protocol = request.protocol();
let awaiting_stream = OutboundSubstreamState::RequestPendingResponse {
substream: out,
request,
};
if let Some(_) = self
.outbound_substreams
.insert(id, (awaiting_stream, delay_key, protocol))
{
crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id));
}
}
}
// Note: If the substream has closed due to inactivity, or the substream is in the
// NOTE: If the substream has closed due to inactivity, or the substream is in the
// wrong state a response will fail silently.
fn inject_event(&mut self, rpc_event: Self::InEvent) {
match rpc_event {
RPCEvent::Request(_, _) => self.send_request(rpc_event),
RPCEvent::Request(id, req) => self.send_request(id, req),
RPCEvent::Response(rpc_id, response) => {
// check if the stream matching the response still exists
// variables indicating if the response is an error response or a multi-part
// Variables indicating if the response is an error response or a multi-part
// response
let res_is_error = response.is_error();
let res_is_multiple = response.multiple_responses();
// check if the stream matching the response still exists
match self.inbound_substreams.get_mut(&rpc_id) {
Some((substream_state, _)) => {
Some((substream_state, _, protocol)) => {
match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) {
InboundSubstreamState::ResponseIdle(substream) => {
// close the stream if there is no response
if let RPCErrorResponse::StreamTermination(_) = response {
//trace!(self.log, "Stream termination sent. Ending the stream");
*substream_state = InboundSubstreamState::Closing(substream);
} else {
// send the response
// if it's a single rpc request or an error, close the stream after
*substream_state = InboundSubstreamState::ResponsePendingSend {
substream: substream.send(response),
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
match response {
RPCCodedResponse::StreamTermination(_) => {
//trace!(self.log, "Stream termination sent. Ending the stream");
*substream_state =
InboundSubstreamState::Closing(substream);
}
_ => {
if let Some(error_code) = response.error_code() {
self.pending_error.push((
rpc_id,
*protocol,
RPCError::ErrorResponse(error_code),
));
}
// send the response
// if it's a single rpc request or an error, close the stream after
*substream_state =
InboundSubstreamState::ResponsePendingSend {
substream: substream.send(response),
closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses
};
}
}
}
InboundSubstreamState::ResponsePendingSend { substream, closing }
@@ -416,39 +433,55 @@ where
}
}
None => {
warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}",response));
warn!(self.log, "Stream has expired. Response not sent"; "response" => format!("{}", response));
}
};
}
// We do not send errors as responses
RPCEvent::Error(_, _) => {}
RPCEvent::Error(..) => {}
}
}
fn inject_dial_upgrade_error(
&mut self,
request: Self::OutboundOpenInfo,
request_info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
>,
) {
let (id, req) = request_info;
if let ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(RPCError::IoError(_))) = error {
self.outbound_io_error_retries += 1;
if self.outbound_io_error_retries < IO_ERROR_RETRIES {
self.send_request(request);
self.send_request(id, req);
return;
}
}
self.outbound_io_error_retries = 0;
// add the error
let request_id = {
if let RPCEvent::Request(id, _) = request {
id
} else {
0
// map the error
let rpc_error = match error {
ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"),
ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e,
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
RPCError::UnsupportedProtocol
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(io_err) => RPCError::IoError(io_err),
ProtocolError::InvalidProtocol => {
RPCError::InternalError("Protocol was deemed invalid")
}
ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => {
// Peer is sending invalid data during the negotiation phase, not
// participating in the protocol
RPCError::InvalidData
}
},
};
self.pending_error.push((request_id, error));
self.pending_error.push((id, req.protocol(), rpc_error));
}
fn connection_keep_alive(&self) -> KeepAlive {
@@ -461,46 +494,11 @@ where
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>,
Self::Error,
> {
if let Some((request_id, err)) = self.pending_error.pop() {
// Returning an error here will result in dropping the peer.
match err {
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(
RPCError::InvalidProtocol(protocol_string),
)) => {
// Peer does not support the protocol.
// TODO: We currently will not drop the peer, for maximal compatibility with
// other clients testing their software. In the future, we will need to decide
// which protocols are a bare minimum to support before kicking the peer.
error!(self.log, "Peer doesn't support the RPC protocol"; "protocol" => protocol_string);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::InvalidProtocol(protocol_string)),
)));
}
ProtocolsHandlerUpgrErr::Timeout | ProtocolsHandlerUpgrErr::Timer => {
// negotiation timeout, mark the request as failed
debug!(self.log, "Active substreams before timeout"; "len" => self.outbound_substreams.len());
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
RPCError::Custom("Protocol negotiation timeout".into()),
),
)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)) => {
// IO/Decode/Custom Error, report to the application
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, err),
)));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(err)) => {
// Error during negotiation
debug!(self.log, "Upgrade Error"; "error" => format!("{}",err));
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, RPCError::Custom(format!("{}", err))),
)));
}
}
if !self.pending_error.is_empty() {
let (id, protocol, err) = self.pending_error.remove(0);
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(id, protocol, err),
)));
}
// return any events that need to be reported
@@ -522,7 +520,7 @@ where
let rpc_id = stream_id.get_ref();
// handle a stream timeout for various states
if let Some((substream_state, delay_key)) = self.inbound_substreams.get_mut(rpc_id) {
if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) {
// the delay has been removed
*delay_key = None;
@@ -541,14 +539,16 @@ where
ProtocolsHandlerUpgrErr::Timer
})?
{
self.outbound_substreams.remove(stream_id.get_ref());
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
*stream_id.get_ref(),
RPCError::Custom("Stream timed out".into()),
),
)));
if let Some((_id, _stream, protocol)) =
self.outbound_substreams.remove(stream_id.get_ref())
{
// notify the user
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout),
)));
} else {
crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref());
}
}
// drive inbound streams that need to be processed
@@ -598,9 +598,10 @@ where
if let Some(delay_key) = &entry.get().1 {
self.inbound_substreams_delay.remove(delay_key);
}
let protocol = entry.get().2;
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(0, e),
RPCEvent::Error(0, protocol, e),
)));
}
};
@@ -696,7 +697,7 @@ where
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Response(
request_id,
RPCErrorResponse::StreamTermination(
RPCCodedResponse::StreamTermination(
request.stream_termination(),
),
),
@@ -705,9 +706,8 @@ where
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(
request_id,
RPCError::Custom(
"Stream closed early. Empty response".into(),
),
request.protocol(),
RPCError::IncompleteStream,
),
)));
}
@@ -721,9 +721,10 @@ where
// drop the stream
let delay_key = &entry.get().1;
self.outbound_substreams_delay.remove(delay_key);
let protocol = entry.get().2;
entry.remove_entry();
return Ok(Async::Ready(ProtocolsHandlerEvent::Custom(
RPCEvent::Error(request_id, e),
RPCEvent::Error(request_id, protocol, e),
)));
}
},
@@ -759,16 +760,14 @@ where
// establish outbound substreams
if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1;
let rpc_event = self.dial_queue.remove(0);
let (id, req) = self.dial_queue.remove(0);
self.dial_queue.shrink_to_fit();
if let RPCEvent::Request(id, req) = rpc_event {
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: RPCEvent::Request(id, req),
},
));
}
return Ok(Async::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(req.clone()),
info: (id, req),
},
));
}
Ok(Async::NotReady)
}
@@ -777,7 +776,7 @@ where
// Check for new items to send to the peer and update the underlying stream
fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
raw_substream: InboundFramed<TSubstream, TSpec>,
queued_outbound_items: &mut Option<&mut Vec<RPCErrorResponse<TSpec>>>,
queued_outbound_items: &mut Option<&mut Vec<RPCCodedResponse<TSpec>>>,
new_items_to_send: &mut bool,
) -> InboundSubstreamState<TSubstream, TSpec> {
match queued_outbound_items {
@@ -785,7 +784,7 @@ fn apply_queued_responses<TSubstream: AsyncRead + AsyncWrite, TSpec: EthSpec>(
*new_items_to_send = true;
// we have queued items
match queue.remove(0) {
RPCErrorResponse::StreamTermination(_) => {
RPCCodedResponse::StreamTermination(_) => {
// close the stream if this is a stream termination
InboundSubstreamState::Closing(raw_substream)
}