From ba10c80633500590dc4b3e6fb7d92cc1ad957de7 Mon Sep 17 00:00:00 2001 From: divma Date: Thu, 23 Jul 2020 12:30:43 +0000 Subject: [PATCH] Refactor inbound substream logic with async (#1325) ## Issue Addressed #1112 The logic is slightly different but still valid wrt to error handling. - Inbound state is either Busy with a future that return the subtream (and info about the processing) - The state machine works as follows: - `Idle` with pending responses => `Busy` - `Busy` => finished ? if so and there are new pending responses then `Busy`, if not then `Idle` => not finished remains `Busy` - Add an `InboundInfo` for readability - Other stuff: - Close inbound substreams when all expected responses are sent - Remove the error variants from `RPCCodedResponse` and use the codes instead - Fix various spelling mistakes because I got sloppy last time Sorry for the delay Co-authored-by: Age Manning --- Cargo.lock | 8 +- beacon_node/eth2_libp2p/src/behaviour/mod.rs | 7 +- beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs | 4 +- .../eth2_libp2p/src/rpc/codec/ssz_snappy.rs | 4 +- beacon_node/eth2_libp2p/src/rpc/handler.rs | 665 ++++++------------ beacon_node/eth2_libp2p/src/rpc/methods.rs | 58 +- beacon_node/eth2_libp2p/src/rpc/protocol.rs | 6 +- 7 files changed, 232 insertions(+), 520 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12eb07ed6e..f450d3315e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -885,9 +885,9 @@ checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" [[package]] name = "cpuid-bool" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec6763c20301ab0dc67051d1b6f4cc9132ad9e6eddcb1f10c6c53ea6d6ae2183" +checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" [[package]] name = "crc32fast" @@ -5134,9 +5134,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.34" +version = "1.0.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "936cae2873c940d92e697597c5eee105fb570cd5689c695806f672883653349b" +checksum = "fb7f4c519df8c117855e19dd8cc851e89eb746fe7a73f0157e0d95fdec5369b0" dependencies = [ "proc-macro2", "quote", diff --git a/beacon_node/eth2_libp2p/src/behaviour/mod.rs b/beacon_node/eth2_libp2p/src/behaviour/mod.rs index d3f91ae4cf..e9bc77aa4d 100644 --- a/beacon_node/eth2_libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2_libp2p/src/behaviour/mod.rs @@ -256,11 +256,8 @@ impl Behaviour { error: RPCResponseErrorCode, reason: String, ) { - self.eth2_rpc.send_response( - peer_id, - id, - RPCCodedResponse::from_error_code(error, reason), - ) + self.eth2_rpc + .send_response(peer_id, id, RPCCodedResponse::Error(error, reason.into())) } /* Peer management functions */ diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs index 162541239b..2197048f1d 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz.rs @@ -56,9 +56,7 @@ impl Encoder> for SSZInboundCodec RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), - RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(), RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } diff --git a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs index 276ff9d571..5e6163cadb 100644 --- a/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/eth2_libp2p/src/rpc/codec/ssz_snappy.rs @@ -64,9 +64,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => res.as_ssz_bytes(), }, - RPCCodedResponse::InvalidRequest(err) => err.as_ssz_bytes(), - RPCCodedResponse::ServerError(err) => err.as_ssz_bytes(), - RPCCodedResponse::Unknown(err) => err.as_ssz_bytes(), + RPCCodedResponse::Error(_, err) => err.as_ssz_bytes(), RPCCodedResponse::StreamTermination(_) => { unreachable!("Code error - attempting to encode a stream termination") } diff --git a/beacon_node/eth2_libp2p/src/rpc/handler.rs b/beacon_node/eth2_libp2p/src/rpc/handler.rs index 02641a373b..02e2cc4e52 100644 --- a/beacon_node/eth2_libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2_libp2p/src/rpc/handler.rs @@ -1,7 +1,7 @@ #![allow(clippy::type_complexity)] #![allow(clippy::cognitive_complexity)] -use super::methods::{RPCCodedResponse, RequestId, ResponseTermination}; +use super::methods::{RPCCodedResponse, RPCResponseErrorCode, RequestId, ResponseTermination}; use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use super::{RPCReceived, RPCSend}; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; @@ -14,7 +14,7 @@ use libp2p::swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::swarm::NegotiatedSubstream; -use slog::{crit, debug, error, trace, warn}; +use slog::{crit, debug, warn}; use smallvec::SmallVec; use std::{ collections::hash_map::Entry, @@ -40,6 +40,16 @@ const SHUTDOWN_TIMEOUT_SECS: u8 = 15; #[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] pub struct SubstreamId(usize); +type InboundSubstream = InboundFramed; + +/// Output of the future handling the send of responses to a peer's request. +type InboundProcessingOutput = ( + InboundSubstream, /* substream */ + Vec, /* Errors sending messages if any */ + bool, /* whether to remove the stream afterwards */ + u64, /* Chunks remaining to be sent after this processing finishes */ +); + /// An error encountered by the handler. pub enum HandlerErr { /// An error occurred for this peer's request. This can occur during protocol negotiation, @@ -73,7 +83,7 @@ where /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol>, - /// Errors ocurring on outbound and inbound connections queued for reporting back. + /// Errors occurring on outbound and inbound connections queued for reporting back. pending_errors: Vec, /// Queue of events to produce in `poll()`. @@ -86,14 +96,7 @@ where dial_negotiated: u32, /// Current inbound substreams awaiting processing. - inbound_substreams: FnvHashMap< - SubstreamId, - ( - InboundSubstreamState, - Option, - Protocol, - ), - >, + inbound_substreams: FnvHashMap>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. inbound_substreams_delay: DelayQueue, @@ -104,9 +107,6 @@ where /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, - /// Map of outbound items that are queued as the stream processes them. - queued_outbound_items: FnvHashMap>>, - /// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID. current_inbound_substream_id: SubstreamId, @@ -143,6 +143,20 @@ enum HandlerState { Deactivated, } +/// Contains the information the handler keeps on established inbound substreams. +struct InboundInfo { + /// State of the substream. + state: InboundState, + /// Responses queued for sending. + pending_items: Vec>, + /// Protocol of the original request we received from the peer. + protocol: Protocol, + /// Responses that the peer is still expecting from us. + remaining_chunks: u64, + /// Key to keep track of the substream's timeout via `self.inbound_substreams_delay`. + delay_key: Option, +} + /// Contains the information the handler keeps on established outbound substreams. struct OutboundInfo { /// State of the substream. @@ -154,45 +168,46 @@ struct OutboundInfo { /// Number of chunks to be seen from the peer's response. // TODO: removing the option could allow clossing the streams after the number of // expected responses is met for all protocols. - // TODO: the type of this is wrong - remaining_chunks: Option, - /// RequestId as given by the application that sent the request. + remaining_chunks: Option, + /// `RequestId` as given by the application that sent the request. req_id: RequestId, } -pub enum InboundSubstreamState -where - TSpec: EthSpec, -{ - /// A response has been sent, pending writing. - ResponsePendingSend { - /// The substream used to send the response - substream: Box>, - /// The message that is attempting to be sent. - message: RPCCodedResponse, - /// Whether a stream termination is requested. If true the stream will be closed after - /// this send. Otherwise it will transition to an idle state until a stream termination is - /// requested or a timeout is reached. - closing: bool, - }, - /// A response has been sent, pending flush. - ResponsePendingFlush { - /// The substream used to send the response - substream: Box>, - /// Whether a stream termination is requested. If true the stream will be closed after - /// this send. Otherwise it will transition to an idle state until a stream termination is - /// requested or a timeout is reached. - closing: bool, - }, - /// The response stream is idle and awaiting input from the application to send more chunked - /// responses. - ResponseIdle(Box>), - /// The substream is attempting to shutdown. - Closing(Box>), +/// State of an inbound substream connection. +enum InboundState { + /// The underlying substream is not being used. + Idle(InboundSubstream), + /// The underlying substream is processing responses. + Busy(Pin> + Send>>), /// Temporary state during processing Poisoned, } +impl InboundState { + /// Sends the given items over the underlying substream, if the state allows it, and returns the + /// final state. + fn send_items( + self, + pending_items: &mut Vec>, + remaining_chunks: u64, + ) -> Self { + if let InboundState::Idle(substream) = self { + // only send on Idle + if !pending_items.is_empty() { + // take the items that we need to send + let to_send = std::mem::replace(pending_items, vec![]); + let fut = process_inbound_substream(substream, remaining_chunks, to_send).boxed(); + InboundState::Busy(Box::pin(fut)) + } else { + // nothing to do, keep waiting for responses + InboundState::Idle(substream) + } + } else { + self + } + } +} + /// State of an outbound substream. Either waiting for a response, or in the process of sending. pub enum OutboundSubstreamState { /// A request has been sent, and we are awaiting a response. This future is driven in the @@ -209,69 +224,6 @@ pub enum OutboundSubstreamState { Poisoned, } -impl InboundSubstreamState -where - TSpec: EthSpec, -{ - /// Moves the substream state to closing and informs the connected peer. The - /// `queued_outbound_items` must be given as a parameter to add stream termination messages to - /// the outbound queue. - pub fn close(&mut self, outbound_queue: &mut Vec>) { - // When terminating a stream, report the stream termination to the requesting user via - // an RPC error - let error = RPCCodedResponse::ServerError("Request timed out".into()); - - // The stream termination type is irrelevant, this will terminate the - // stream - let stream_termination = - RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange); - - match std::mem::replace(self, InboundSubstreamState::Poisoned) { - // if we are busy awaiting a send/flush add the termination to the queue - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - } => { - if !closing { - outbound_queue.push(error); - outbound_queue.push(stream_termination); - } - // if the stream is closing after the send, allow it to finish - - *self = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - } - } - // if we are busy awaiting a send/flush add the termination to the queue - InboundSubstreamState::ResponsePendingFlush { substream, closing } => { - if !closing { - outbound_queue.push(error); - outbound_queue.push(stream_termination); - } - // if the stream is closing after the send, allow it to finish - *self = InboundSubstreamState::ResponsePendingFlush { substream, closing } - } - InboundSubstreamState::ResponseIdle(substream) => { - *self = InboundSubstreamState::ResponsePendingSend { - substream, - message: error, - closing: true, - }; - } - InboundSubstreamState::Closing(substream) => { - // let the stream close - *self = InboundSubstreamState::Closing(substream); - } - InboundSubstreamState::Poisoned => { - unreachable!("Coding error: Timeout poisoned substream") - } - }; - } -} - impl RPCHandler where TSpec: EthSpec, @@ -283,7 +235,6 @@ where events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, - queued_outbound_items: FnvHashMap::default(), inbound_substreams: FnvHashMap::default(), outbound_substreams: FnvHashMap::default(), inbound_substreams_delay: DelayQueue::new(), @@ -360,32 +311,21 @@ where // NOTE: If the substream has closed due to inactivity, or the substream is in the // wrong state a response will fail silently. fn send_response(&mut self, inbound_id: SubstreamId, response: RPCCodedResponse) { - // Variables indicating if the response is an error response or a multi-part - // response - let res_is_error = response.is_error(); - let res_is_multiple = response.multiple_responses(); - // check if the stream matching the response still exists - let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) { - Some((substream_state, _, protocol)) => (substream_state, protocol), - None => { - warn!(self.log, "Stream has expired. Response not sent"; - "response" => response.to_string(), "id" => inbound_id); - return; - } + let inbound_info = if let Some(info) = self.inbound_substreams.get_mut(&inbound_id) { + info + } else { + warn!(self.log, "Stream has expired. Response not sent"; + "response" => response.to_string(), "id" => inbound_id); + return; }; // If the response we are sending is an error, report back for handling match response { - RPCCodedResponse::InvalidRequest(ref reason) - | RPCCodedResponse::ServerError(ref reason) - | RPCCodedResponse::Unknown(ref reason) => { - let code = &response - .error_code() - .expect("Error response should map to an error code"); + RPCCodedResponse::Error(ref code, ref reason) => { let err = HandlerErr::Inbound { id: inbound_id, - proto: *protocol, + proto: inbound_info.protocol, error: RPCError::ErrorResponse(*code, reason.to_string()), }; self.pending_errors.push(err); @@ -399,85 +339,7 @@ where "response" => response.to_string(), "id" => inbound_id); return; } - - match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { - InboundSubstreamState::ResponseIdle(substream) => { - // close the stream if there is no response - if let RPCCodedResponse::StreamTermination(_) = response { - *substream_state = InboundSubstreamState::Closing(substream); - } else { - // send the response - // if it's a single rpc request or an error close the stream after. - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message: response, - closing: !res_is_multiple | res_is_error, - } - } - } - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - } if res_is_multiple => { - // the stream is in use, add the request to a pending queue if active - self.queued_outbound_items - .entry(inbound_id) - .or_insert_with(Vec::new) - .push(response); - - // return the state - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - }; - } - InboundSubstreamState::ResponsePendingFlush { substream, closing } - if res_is_multiple => - { - // the stream is in use, add the request to a pending queue - self.queued_outbound_items - .entry(inbound_id) - .or_insert_with(Vec::new) - .push(response); - - // return the state - *substream_state = - InboundSubstreamState::ResponsePendingFlush { substream, closing }; - } - InboundSubstreamState::Closing(substream) => { - *substream_state = InboundSubstreamState::Closing(substream); - debug!(self.log, "Response not sent. Stream is closing"; "response" => response.to_string()); - } - InboundSubstreamState::ResponsePendingSend { - substream, message, .. - } => { - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing: true, - }; - error!( - self.log, - "Attempted sending multiple responses to a single response request" - ); - } - InboundSubstreamState::ResponsePendingFlush { substream, .. } => { - *substream_state = InboundSubstreamState::ResponsePendingFlush { - substream, - closing: true, - }; - error!( - self.log, - "Attempted sending multiple responses to a single response request" - ); - } - InboundSubstreamState::Poisoned => { - crit!(self.log, "Poisoned inbound substream"); - unreachable!("Coding error: Poisoned substream"); - } - } + inbound_info.pending_items.push(response); } /// Updates the `KeepAlive` returned by `connection_keep_alive`. @@ -538,18 +400,25 @@ where } let (req, substream) = substream; + let expected_responses = req.expected_responses(); // store requests that expect responses - if req.expected_responses() > 0 { + if expected_responses > 0 { // Store the stream and tag the output. let delay_key = self.inbound_substreams_delay.insert( self.current_inbound_substream_id, Duration::from_secs(RESPONSE_TIMEOUT), ); - let awaiting_stream = InboundSubstreamState::ResponseIdle(Box::new(substream)); + let awaiting_stream = InboundState::Idle(substream); self.inbound_substreams.insert( self.current_inbound_substream_id, - (awaiting_stream, Some(delay_key), req.protocol()), + InboundInfo { + state: awaiting_stream, + pending_items: vec![], + delay_key: Some(delay_key), + protocol: req.protocol(), + remaining_chunks: expected_responses, + }, ); } @@ -709,13 +578,6 @@ where if delay.is_elapsed() { self.state = HandlerState::Deactivated; debug!(self.log, "Handler deactivated"); - // Drain queued responses - for (inbound_id, queued_responses) in self.queued_outbound_items.drain() { - for response in queued_responses { - debug!(self.log, "Response not sent. Deactivated handler"; - "response" => response.to_string(), "id" => inbound_id); - } - } } } @@ -724,23 +586,22 @@ where match self.inbound_substreams_delay.poll_next_unpin(cx) { Poll::Ready(Some(Ok(inbound_id))) => { // handle a stream timeout for various states - if let Some((substream_state, delay_key, protocol)) = - self.inbound_substreams.get_mut(inbound_id.get_ref()) - { + if let Some(info) = self.inbound_substreams.get_mut(inbound_id.get_ref()) { // the delay has been removed - *delay_key = None; - + info.delay_key = None; self.pending_errors.push(HandlerErr::Inbound { id: *inbound_id.get_ref(), - proto: *protocol, + proto: info.protocol, error: RPCError::StreamTimeout, }); - let outbound_queue = self - .queued_outbound_items - .entry(inbound_id.into_inner()) - .or_insert_with(Vec::new); - substream_state.close(outbound_queue); + if info.pending_items.last().map(|l| l.close_after()) == Some(false) { + // if the last chunk does not close the stream, append an error + info.pending_items.push(RPCCodedResponse::Error( + RPCResponseErrorCode::ServerError, + "Request timed out".into(), + )); + } } } Poll::Ready(Some(Err(e))) => { @@ -788,204 +649,80 @@ where let deactivated = matches!(self.state, HandlerState::Deactivated); // drive inbound streams that need to be processed - for request_id in self.inbound_substreams.keys().copied().collect::>() { - // Drain all queued items until all messages have been processed for this stream - // TODO Improve this code logic - let mut drive_stream_further = true; - while drive_stream_further { - drive_stream_further = false; - match self.inbound_substreams.entry(request_id) { - Entry::Occupied(mut entry) => { - match std::mem::replace( - &mut entry.get_mut().0, - InboundSubstreamState::Poisoned, - ) { - InboundSubstreamState::ResponsePendingSend { - mut substream, - message, - closing, - } => { - if deactivated { - if !closing { - // inform back to cancel this request's processing - self.pending_errors.push(HandlerErr::Inbound { - id: request_id, - proto: entry.get().2, - error: RPCError::HandlerRejected, - }); - } - entry.get_mut().0 = InboundSubstreamState::Closing(substream); - drive_stream_further = true; - } else { - match Sink::poll_ready(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - // stream is ready to send data - match Sink::start_send( - Pin::new(&mut substream), - message, - ) { - Ok(()) => { - // await flush - entry.get_mut().0 = - InboundSubstreamState::ResponsePendingFlush { - substream, - closing, - }; - drive_stream_further = true; - } - Err(e) => { - // error with sending in the codec - warn!(self.log, "Error sending RPC message"; "error" => e.to_string()); - // keep connection with the peer and return the - // stream to awaiting response if this message - // wasn't closing the stream - if closing { - entry.get_mut().0 = - InboundSubstreamState::Closing( - substream, - ); - drive_stream_further = true; - } else { - // check for queued chunks and update the stream - entry.get_mut().0 = apply_queued_responses( - *substream, - &mut self - .queued_outbound_items - .get_mut(&request_id), - &mut drive_stream_further, - ); - } - } - } - } - Poll::Ready(Err(e)) => { - error!( - self.log, - "Outbound substream error while sending RPC message: {:?}", - e - ); - entry.remove(); - self.update_keep_alive(); - return Poll::Ready(ProtocolsHandlerEvent::Close(e)); - } - Poll::Pending => { - // the stream is not yet ready, continue waiting - entry.get_mut().0 = - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - }; - } - } - } + let mut substreams_to_remove = Vec::new(); // Closed substreams that need to be removed + for (id, info) in self.inbound_substreams.iter_mut() { + match std::mem::replace(&mut info.state, InboundState::Poisoned) { + state @ InboundState::Idle(..) if !deactivated => { + info.state = state.send_items(&mut info.pending_items, info.remaining_chunks); + } + InboundState::Idle(mut substream) => { + // handler is deactivated, close the stream and mark it for removal + match substream.close().poll_unpin(cx) { + // if we can't close right now, put the substream back and try again later + Poll::Pending => info.state = InboundState::Idle(substream), + Poll::Ready(res) => { + substreams_to_remove.push(id.clone()); + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay.remove(delay_key); } - InboundSubstreamState::ResponsePendingFlush { - mut substream, - closing, - } => { - match Sink::poll_flush(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - // finished flushing - // TODO: Duplicate code - if closing | deactivated { - if !closing { - // inform back to cancel this request's processing - self.pending_errors.push(HandlerErr::Inbound { - id: request_id, - proto: entry.get().2, - error: RPCError::HandlerRejected, - }); - } - entry.get_mut().0 = - InboundSubstreamState::Closing(substream); - drive_stream_further = true; - } else { - // check for queued chunks and update the stream - entry.get_mut().0 = apply_queued_responses( - *substream, - &mut self - .queued_outbound_items - .get_mut(&request_id), - &mut drive_stream_further, - ); - } - } - Poll::Ready(Err(e)) => { - // error during flush - trace!(self.log, "Error sending flushing RPC message"; "error" => e.to_string()); - // we drop the stream on error and inform the user, remove - // any pending requests - // TODO: Duplicate code - if let Some(delay_key) = &entry.get().1 { - self.inbound_substreams_delay.remove(delay_key); - } - self.queued_outbound_items.remove(&request_id); - entry.remove(); - - self.update_keep_alive(); - } - Poll::Pending => { - entry.get_mut().0 = - InboundSubstreamState::ResponsePendingFlush { - substream, - closing, - }; - } - } + if let Err(error) = res { + self.pending_errors.push(HandlerErr::Inbound { + id: id.clone(), + error, + proto: info.protocol, + }); } - InboundSubstreamState::ResponseIdle(substream) => { - if !deactivated { - entry.get_mut().0 = apply_queued_responses( - *substream, - &mut self.queued_outbound_items.get_mut(&request_id), - &mut drive_stream_further, - ); - } else { - entry.get_mut().0 = InboundSubstreamState::Closing(substream); - drive_stream_further = true; - } - } - InboundSubstreamState::Closing(mut substream) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - if let Some(delay_key) = &entry.get().1 { - self.inbound_substreams_delay.remove(delay_key); - } - self.queued_outbound_items.remove(&request_id); - entry.remove(); - - self.update_keep_alive(); - } // drop the stream - Poll::Ready(Err(e)) => { - error!(self.log, "Error closing inbound stream"; "error" => e.to_string()); - // drop the stream anyway - // TODO: Duplicate code - if let Some(delay_key) = &entry.get().1 { - self.inbound_substreams_delay.remove(delay_key); - } - self.queued_outbound_items.remove(&request_id); - entry.remove(); - - self.update_keep_alive(); - } - Poll::Pending => { - entry.get_mut().0 = - InboundSubstreamState::Closing(substream); - } - } - } - InboundSubstreamState::Poisoned => { - crit!(self.log, "Poisoned outbound substream"); - unreachable!("Coding Error: Inbound Substream is poisoned"); + if info.pending_items.last().map(|l| l.close_after()) == Some(false) { + // if the request was still active, report back to cancel it + self.pending_errors.push(HandlerErr::Inbound { + id: *id, + proto: info.protocol, + error: RPCError::HandlerRejected, + }); } } } - Entry::Vacant(_) => unreachable!(), } + InboundState::Busy(mut fut) => { + // first check if sending finished + let state = match fut.poll_unpin(cx) { + Poll::Ready((substream, errors, remove, new_remaining_chunks)) => { + info.remaining_chunks = new_remaining_chunks; + // report any error + for error in errors { + self.pending_errors.push(HandlerErr::Inbound { + id: *id, + error, + proto: info.protocol, + }) + } + if remove { + substreams_to_remove.push(id.clone()); + if let Some(ref delay_key) = info.delay_key { + self.inbound_substreams_delay.remove(delay_key); + } + } + InboundState::Idle(substream) + } + Poll::Pending => InboundState::Busy(fut), + }; + info.state = if !deactivated { + // if the last batch finished, send more. + state.send_items(&mut info.pending_items, info.remaining_chunks) + } else { + state + }; + } + InboundState::Poisoned => unreachable!("Poisoned inbound substream"), } } + // remove closed substreams + for inbound_id in substreams_to_remove { + self.inbound_substreams.remove(&inbound_id); + } + + self.update_keep_alive(); // drive outbound streams that need to be processed for outbound_id in self.outbound_substreams.keys().copied().collect::>() { // get the state and mark it as poisoned @@ -1018,7 +755,7 @@ where request, } => match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { - if request.expected_responses() > 1 && !response.is_error() { + if request.expected_responses() > 1 && !response.close_after() { let substream_entry = entry.get_mut(); let delay_key = &substream_entry.delay_key; // chunks left after this one @@ -1041,8 +778,8 @@ where .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); } } else { - // either this is a single response request or we received an - // error only expect a single response, close the stream + // either this is a single response request or this response closes the + // stream entry.get_mut().state = OutboundSubstreamState::Closing(substream); } @@ -1055,18 +792,11 @@ where Ok(RPCReceived::EndOfStream(id, t)) } RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)), - RPCCodedResponse::InvalidRequest(ref r) - | RPCCodedResponse::ServerError(ref r) - | RPCCodedResponse::Unknown(ref r) => { - let code = response.error_code().expect( - "Response indicating and error should map to an error code", - ); - Err(HandlerErr::Outbound { - id, - proto, - error: RPCError::ErrorResponse(code, r.to_string()), - }) - } + RPCCodedResponse::Error(ref code, ref r) => Err(HandlerErr::Outbound { + id, + proto, + error: RPCError::ErrorResponse(*code, r.to_string()), + }), }; return Poll::Ready(ProtocolsHandlerEvent::Custom(received)); @@ -1172,35 +902,6 @@ where } } -// Check for new items to send to the peer and update the underlying stream -fn apply_queued_responses( - substream: InboundFramed, - queued_outbound_items: &mut Option<&mut Vec>>, - new_items_to_send: &mut bool, -) -> InboundSubstreamState { - match queued_outbound_items { - Some(ref mut queue) if !queue.is_empty() => { - *new_items_to_send = true; - // we have queued items - match queue.remove(0) { - RPCCodedResponse::StreamTermination(_) => { - // close the stream if this is a stream termination - InboundSubstreamState::Closing(Box::new(substream)) - } - chunk => InboundSubstreamState::ResponsePendingSend { - substream: Box::new(substream), - message: chunk, - closing: false, - }, - } - } - _ => { - // no items queued set to idle - InboundSubstreamState::ResponseIdle(Box::new(substream)) - } - } -} - impl slog::Value for SubstreamId { fn serialize( &self, @@ -1211,3 +912,43 @@ impl slog::Value for SubstreamId { slog::Value::serialize(&self.0, record, key, serializer) } } + +/// Sends the queued items to the peer. +async fn process_inbound_substream( + mut substream: InboundSubstream, + mut remaining_chunks: u64, + pending_items: Vec>, +) -> InboundProcessingOutput { + let mut errors = Vec::new(); + let mut substream_closed = false; + + for item in pending_items { + if !substream_closed { + if matches!(item, RPCCodedResponse::StreamTermination(_)) { + substream.close().await.unwrap_or_else(|e| errors.push(e)); + substream_closed = true; + } else { + remaining_chunks = remaining_chunks.saturating_sub(1); + // chunks that are not stream terminations get sent, and the stream is closed if + // the response is an error + let is_error = matches!(item, RPCCodedResponse::Error(..)); + + substream + .send(item) + .await + .unwrap_or_else(|e| errors.push(e)); + + if remaining_chunks == 0 || is_error { + substream.close().await.unwrap_or_else(|e| errors.push(e)); + substream_closed = true; + } + } + } else { + // we have more items after a closed substream, report those as errors + errors.push(RPCError::InternalError( + "Sending responses to closed inbound substream", + )); + } + } + (substream, errors, substream_closed, remaining_chunks) +} diff --git a/beacon_node/eth2_libp2p/src/rpc/methods.rs b/beacon_node/eth2_libp2p/src/rpc/methods.rs index b0ca48a3c7..c3cb358f91 100644 --- a/beacon_node/eth2_libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2_libp2p/src/rpc/methods.rs @@ -249,14 +249,7 @@ pub enum RPCCodedResponse { /// The response is a successful. Success(RPCResponse), - /// The response was invalid. - InvalidRequest(ErrorType), - - /// The response indicates a server error. - ServerError(ErrorType), - - /// There was an unknown response. - Unknown(ErrorType), + Error(RPCResponseErrorCode, ErrorType), /// Received a stream termination indicating which response is being terminated. StreamTermination(ResponseTermination), @@ -275,9 +268,7 @@ impl RPCCodedResponse { pub fn as_u8(&self) -> Option { match self { RPCCodedResponse::Success(_) => Some(0), - RPCCodedResponse::InvalidRequest(_) => Some(1), - RPCCodedResponse::ServerError(_) => Some(2), - RPCCodedResponse::Unknown(_) => Some(255), + RPCCodedResponse::Error(code, _) => Some(code.as_u8()), RPCCodedResponse::StreamTermination(_) => None, } } @@ -292,20 +283,12 @@ impl RPCCodedResponse { /// Builds an RPCCodedResponse from a response code and an ErrorMessage pub fn from_error(response_code: u8, err: String) -> Self { - match response_code { - 1 => RPCCodedResponse::InvalidRequest(err.into()), - 2 => RPCCodedResponse::ServerError(err.into()), - _ => RPCCodedResponse::Unknown(err.into()), - } - } - - /// Builds an RPCCodedResponse from a response code and an ErrorMessage - pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self { - match response_code { - RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err.into()), - RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err.into()), - RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err.into()), - } + let code = match response_code { + 1 => RPCResponseErrorCode::InvalidRequest, + 2 => RPCResponseErrorCode::ServerError, + _ => RPCResponseErrorCode::Unknown, + }; + RPCCodedResponse::Error(code, err.into()) } /// Specifies which response allows for multiple chunks for the stream handler. @@ -318,30 +301,27 @@ impl RPCCodedResponse { RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, }, - RPCCodedResponse::InvalidRequest(_) => true, - RPCCodedResponse::ServerError(_) => true, - RPCCodedResponse::Unknown(_) => true, + RPCCodedResponse::Error(_, _) => true, // Stream terminations are part of responses that have chunks RPCCodedResponse::StreamTermination(_) => true, } } - /// Returns true if this response is an error. Used to terminate the stream after an error is - /// sent. - pub fn is_error(&self) -> bool { + /// Returns true if this response always terminates the stream. + pub fn close_after(&self) -> bool { match self { RPCCodedResponse::Success(_) => false, _ => true, } } +} - pub fn error_code(&self) -> Option { +impl RPCResponseErrorCode { + fn as_u8(&self) -> u8 { match self { - RPCCodedResponse::Success(_) => None, - RPCCodedResponse::StreamTermination(_) => None, - RPCCodedResponse::InvalidRequest(_) => Some(RPCResponseErrorCode::InvalidRequest), - RPCCodedResponse::ServerError(_) => Some(RPCResponseErrorCode::ServerError), - RPCCodedResponse::Unknown(_) => Some(RPCResponseErrorCode::Unknown), + RPCResponseErrorCode::InvalidRequest => 1, + RPCResponseErrorCode::ServerError => 2, + RPCResponseErrorCode::Unknown => 255, } } } @@ -383,9 +363,7 @@ impl std::fmt::Display for RPCCodedResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RPCCodedResponse::Success(res) => write!(f, "{}", res), - RPCCodedResponse::InvalidRequest(err) => write!(f, "Invalid Request: {:?}", err), - RPCCodedResponse::ServerError(err) => write!(f, "Server Error: {:?}", err), - RPCCodedResponse::Unknown(err) => write!(f, "Unknown Error: {:?}", err), + RPCCodedResponse::Error(code, err) => write!(f, "{}: {:?}", code, err), RPCCodedResponse::StreamTermination(_) => write!(f, "Stream Termination"), } } diff --git a/beacon_node/eth2_libp2p/src/rpc/protocol.rs b/beacon_node/eth2_libp2p/src/rpc/protocol.rs index 5fe5afe8e0..da284f9ef5 100644 --- a/beacon_node/eth2_libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2_libp2p/src/rpc/protocol.rs @@ -323,12 +323,12 @@ impl RPCRequest { /* These functions are used in the handler for stream management */ /// Number of responses expected for this request. - pub fn expected_responses(&self) -> usize { + pub fn expected_responses(&self) -> u64 { match self { RPCRequest::Status(_) => 1, RPCRequest::Goodbye(_) => 0, - RPCRequest::BlocksByRange(req) => req.count as usize, - RPCRequest::BlocksByRoot(req) => req.block_roots.len(), + RPCRequest::BlocksByRange(req) => req.count, + RPCRequest::BlocksByRoot(req) => req.block_roots.len() as u64, RPCRequest::Ping(_) => 1, RPCRequest::MetaData(_) => 1, }