From a43381e3d58d8d6492669f615db8824617c89e10 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Mon, 4 May 2020 17:35:41 +1000 Subject: [PATCH] RPC handler to stable futures --- Cargo.lock | 1 + beacon_node/eth2-libp2p/Cargo.toml | 2 +- beacon_node/eth2-libp2p/src/rpc/handler.rs | 373 ++++++++++++++------ beacon_node/eth2-libp2p/src/rpc/protocol.rs | 20 +- 4 files changed, 270 insertions(+), 126 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d239912ca3..16469bb6d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5276,6 +5276,7 @@ checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" dependencies = [ "bytes 0.5.4", "futures-core", + "futures-io", "futures-sink", "log 0.4.8", "pin-project-lite", diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index 652ec4c475..b3c84e34c6 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -31,7 +31,7 @@ base64 = "0.12.0" snap = "1.0.0" void = "1.0.2" tokio-io-timeout = "0.4.0" -tokio-util = { version = "0.3.1", features = ["codec"] } +tokio-util = { version = "0.3.1", features = ["codec", "compat"] } libp2p = "0.18.1" discv5 = "0.1.0-alpha.1" tiny-keccak = "2.0.2" diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 14a23a7a13..e0830da547 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -25,7 +25,6 @@ use std::{ use tokio::time::{delay_queue, DelayQueue}; use types::EthSpec; -//TODO: Implement close() on the substream types to improve the poll code. //TODO: Implement check_timeout() on the substream types /// The time (in seconds) before a substream that is awaiting a response from the user times out. @@ -65,7 +64,7 @@ where inbound_substreams: FnvHashMap< InboundRequestId, ( - InboundSubstreamState, + InboundSubstreamState, Option, Protocol, ), @@ -76,14 +75,8 @@ where /// Map of outbound substreams that need to be driven to completion. The `RequestId` is /// maintained by the application sending the request. - outbound_substreams: FnvHashMap< - OutboundRequestId, - ( - OutboundSubstreamState, - delay_queue::Key, - Protocol, - ), - >, + outbound_substreams: + FnvHashMap, delay_queue::Key, Protocol)>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. outbound_substreams_delay: DelayQueue, @@ -115,8 +108,19 @@ pub enum InboundSubstreamState where TSpec: EthSpec, { - /// A response has been sent, pending writing and flush. + /// A response has been sent, pending writing. ResponsePendingSend { + /// The substream used to send the response + substream: InboundFramed, + /// The message that is attempting to be sent. + message: RPCCodedResponse, + /// Whether a stream termination is requested. If true the stream will be closed after + /// this send. Otherwise it will transition to an idle state until a stream termination is + /// requested or a timeout is reached. + closing: bool, + }, + /// A response has been sent, pending flush. + ResponsePendingFlush { /// The substream used to send the response substream: InboundFramed, /// Whether a stream termination is requested. If true the stream will be closed after @@ -169,18 +173,37 @@ where RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange); match std::mem::replace(self, InboundSubstreamState::Poisoned) { - InboundSubstreamState::ResponsePendingSend { substream, closing } => { + // if we are busy awaiting a send/flush add the termination to the queue + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + } => { if !closing { outbound_queue.push(error); outbound_queue.push(stream_termination); } // if the stream is closing after the send, allow it to finish - *self = InboundSubstreamState::ResponsePendingSend { substream, closing } + *self = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + } + } + // if we are busy awaiting a send/flush add the termination to the queue + InboundSubstreamState::ResponsePendingFlush { substream, closing } => { + if !closing { + outbound_queue.push(error); + outbound_queue.push(stream_termination); + } + // if the stream is closing after the send, allow it to finish + *self = InboundSubstreamState::ResponsePendingFlush { substream, closing } } InboundSubstreamState::ResponseIdle(substream) => { *self = InboundSubstreamState::ResponsePendingSend { - substream: substream.send(error), + substream: substream, + message: error, closing: true, }; } @@ -259,7 +282,7 @@ where { type InEvent = RPCEvent; type OutEvent = RPCEvent; - type Error = ProtocolsHandlerUpgrErr; + type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; type OutboundOpenInfo = (RequestId, RPCRequest); // Keep track of the id and the request @@ -387,15 +410,18 @@ where // if it's a single rpc request or an error, close the stream after *substream_state = InboundSubstreamState::ResponsePendingSend { - substream: substream.send(response), + substream: substream, + message: response, closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses }; } } } - InboundSubstreamState::ResponsePendingSend { substream, closing } - if res_is_multiple => - { + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + } if res_is_multiple => { // the stream is in use, add the request to a pending queue self.queued_outbound_items .entry(rpc_id) @@ -404,6 +430,22 @@ where // return the state *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + }; + } + InboundSubstreamState::ResponsePendingFlush { substream, closing } + if res_is_multiple => + { + // the stream is in use, add the request to a pending queue + self.queued_outbound_items + .entry(rpc_id) + .or_insert_with(Vec::new) + .push(response); + + // return the state + *substream_state = InboundSubstreamState::ResponsePendingFlush { substream, closing, }; @@ -412,9 +454,14 @@ where *substream_state = InboundSubstreamState::Closing(substream); debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response)); } - InboundSubstreamState::ResponsePendingSend { substream, .. } => { + InboundSubstreamState::ResponsePendingSend { + substream, + message, + .. + } => { *substream_state = InboundSubstreamState::ResponsePendingSend { substream, + message, closing: true, }; error!(self.log, "Attempted sending multiple responses to a single response request"); @@ -494,57 +541,71 @@ where > { if !self.pending_error.is_empty() { let (id, protocol, err) = self.pending_error.remove(0); - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(id, protocol, err), + return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( + id, protocol, err, ))); } // return any events that need to be reported if !self.events_out.is_empty() { - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom(self.events_out.remove(0)))); + return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))); } else { self.events_out.shrink_to_fit(); } // purge expired inbound substreams and send an error - // TODO: check if this pattern is equivalent to - // while let Async::Ready() = stream.poll().map_err(..) - while let Poll::Ready(Some(d)) = self.inbound_substreams_delay.poll() { - let stream_id = d.map_err(|e| { - warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e)); - ProtocolsHandlerUpgrErr::Timer - })?; - let rpc_id = stream_id.get_ref(); + loop { + match self.inbound_substreams_delay.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream_id))) => { + // handle a stream timeout for various states + if let Some((substream_state, delay_key, _)) = + self.inbound_substreams.get_mut(stream_id.get_ref()) + { + // the delay has been removed + *delay_key = None; - // handle a stream timeout for various states - if let Some((substream_state, delay_key, _)) = self.inbound_substreams.get_mut(rpc_id) { - // the delay has been removed - *delay_key = None; - - let outbound_queue = self - .queued_outbound_items - .entry(*rpc_id) - .or_insert_with(Vec::new); - substream_state.close(outbound_queue); + let outbound_queue = self + .queued_outbound_items + .entry(stream_id.into_inner()) + .or_insert_with(Vec::new); + substream_state.close(outbound_queue); + } + } + Poll::Ready(Some(Err(e))) => { + warn!(self.log, "Inbound substream poll failed"; "error" => format!("{:?}", e)); + // drops the peer if we cannot read the delay queue + return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError( + "Could not poll inbound stream timer", + ))); + } + Poll::Pending => break, } } // purge expired outbound substreams - if let Poll::Ready(Some(d)) = self.outbound_substreams_delay.poll() { - let stream_id = d.map_err(|e| { - warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e)); - ProtocolsHandlerUpgrErr::Timer - })? - { - if let Some((_id, _stream, protocol)) = - self.outbound_substreams.remove(stream_id.get_ref()) - { - // notify the user - return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(*stream_id.get_ref(), protocol, RPCError::StreamTimeout), - ))); - } else { - crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref()); + loop { + match self.outbound_substreams_delay.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(stream_id))) => { + if let Some((_id, _stream, protocol)) = + self.outbound_substreams.remove(stream_id.get_ref()) + { + // notify the user + return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( + *stream_id.get_ref(), + protocol, + RPCError::StreamTimeout, + ))); + } else { + crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref()); + } + } + Poll::Ready(Some(Err(e))) => { + warn!(self.log, "Outbound substream poll failed"; "error" => format!("{:?}", e)); + return Poll::Ready(ProtocolsHandlerEvent::Close(RPCError::InternalError( + "Could not poll outbound stream timer", + ))); + } + Poll::Pending => break, } } @@ -563,20 +624,93 @@ where ) { InboundSubstreamState::ResponsePendingSend { mut substream, + message, closing, } => { - match substream.poll() { - Poll::Ready(Ok(raw_substream)) => { - // completed the send - - // close the stream if required + match Sink::poll_ready(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + // stream is ready to send data + match Sink::start_send(Pin::new(&mut substream), message) { + Ok(()) => { + // await flush + entry.get_mut().0 = + InboundSubstreamState::ResponsePendingFlush { + substream, + closing, + } + } + Err(e) => { + // error with sending in the codec + error!(self.log, "Error sending RPC message"; "message" => message.to_string()); + // keep connection with the peer and return the + // stream to awaiting response if this message + // wasn't closing the stream + // TODO: Duplicate code + if closing { + entry.get_mut().0 = + InboundSubstreamState::Closing(substream) + } else { + // check for queued chunks and update the stream + entry.get_mut().0 = apply_queued_responses( + substream, + &mut self + .queued_outbound_items + .get_mut(&request_id), + &mut new_items_to_send, + ); + } + } + } + } + Poll::Ready(Err(e)) => { + error!(self.log, "Outbound substream error while sending RPC message: {:?}", e); + return Poll::Ready(ProtocolsHandlerEvent::Close(e)); + } + Poll::Pending => { + // the stream is not yet ready, continue waiting + entry.get_mut().0 = + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + }; + } + } + } + InboundSubstreamState::ResponsePendingFlush { + mut substream, + closing, + } => { + match Sink::poll_flush(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { + // finished flushing + // TODO: Duplicate code if closing { entry.get_mut().0 = - InboundSubstreamState::Closing(raw_substream) + InboundSubstreamState::Closing(substream) } else { // check for queued chunks and update the stream entry.get_mut().0 = apply_queued_responses( - raw_substream, + substream, + &mut self + .queued_outbound_items + .get_mut(&request_id), + &mut new_items_to_send, + ); + } + } + Poll::Ready(Err(e)) => { + // error during flush + error!(self.log, "Error sending flushing RPC message"); + // close the stream if required + // TODO: Duplicate code + if closing { + entry.get_mut().0 = + InboundSubstreamState::Closing(substream) + } else { + // check for queued chunks and update the stream + entry.get_mut().0 = apply_queued_responses( + substream, &mut self .queued_outbound_items .get_mut(&request_id), @@ -586,22 +720,12 @@ where } Poll::Pending => { entry.get_mut().0 = - InboundSubstreamState::ResponsePendingSend { + InboundSubstreamState::ResponsePendingFlush { substream, closing, }; } - Err(e) => { - if let Some(delay_key) = &entry.get().1 { - self.inbound_substreams_delay.remove(delay_key); - } - let protocol = entry.get().2; - entry.remove_entry(); - return Ok(Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(0, protocol, e), - ))); - } - }; + } } InboundSubstreamState::ResponseIdle(substream) => { entry.get_mut().0 = apply_queued_responses( @@ -611,10 +735,8 @@ where ); } InboundSubstreamState::Closing(mut substream) => { - // TODO: check if this is supposed to be a stream - match substream.close() { - Poll::Ready(_) => { - //trace!(self.log, "Inbound stream dropped"); + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(Ok(())) => { if let Some(delay_key) = &entry.get().1 { self.inbound_substreams_delay.remove(delay_key); } @@ -629,6 +751,24 @@ where ); } } // drop the stream + Poll::Ready(Err(e)) => { + error!(self.log, "Error closing inbound stream"; "error" => e.to_string()); + // drop the stream anyway + // TODO: Duplicate code + if let Some(delay_key) = &entry.get().1 { + self.inbound_substreams_delay.remove(delay_key); + } + self.queued_outbound_items.remove(&request_id); + entry.remove(); + + if self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + { + self.keep_alive = KeepAlive::Until( + Instant::now() + self.inactive_timeout, + ); + } + } Poll::Pending => { entry.get_mut().0 = InboundSubstreamState::Closing(substream); @@ -639,7 +779,7 @@ where crit!(self.log, "Poisoned outbound substream"); unreachable!("Coding Error: Inbound Substream is poisoned"); } - }; + } } Entry::Vacant(_) => unreachable!(), } @@ -657,7 +797,7 @@ where OutboundSubstreamState::RequestPendingResponse { mut substream, request, - } => match substream.poll() { + } => match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { if request.multiple_responses() && !response.is_error() { entry.get_mut().0 = @@ -676,9 +816,9 @@ where entry.get_mut().0 = OutboundSubstreamState::Closing(substream); } - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom( + return Poll::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Response(request_id, response), - ))); + )); } Poll::Ready(None) => { // stream closed @@ -692,22 +832,22 @@ where // notify the application error if request.multiple_responses() { // return an end of stream result - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom( + return Poll::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Response( request_id, RPCCodedResponse::StreamTermination( request.stream_termination(), ), ), - ))); + )); } // else we return an error, stream should not have closed early. - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom( + return Poll::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error( request_id, request.protocol(), RPCError::IncompleteStream, ), - ))); + )); } Poll::Pending => { entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse { @@ -721,31 +861,33 @@ where self.outbound_substreams_delay.remove(delay_key); let protocol = entry.get().2; entry.remove_entry(); - return Poll::Ready(Ok(ProtocolsHandlerEvent::Custom( + return Poll::Ready(ProtocolsHandlerEvent::Custom( RPCEvent::Error(request_id, protocol, e), - ))); + )); } }, - OutboundSubstreamState::Closing(mut substream) => match substream.close() { - // TODO: check if this is supposed to be a stream - Poll::Ready(_) => { - //trace!(self.log, "Outbound stream dropped"); - // drop the stream - let delay_key = &entry.get().1; - self.outbound_substreams_delay.remove(delay_key); - entry.remove_entry(); + OutboundSubstreamState::Closing(mut substream) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + // TODO: check if this is supposed to be a stream + Poll::Ready(_) => { + // drop the stream - including if there is an error + let delay_key = &entry.get().1; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); - if self.outbound_substreams.is_empty() - && self.inbound_substreams.is_empty() - { - self.keep_alive = - KeepAlive::Until(Instant::now() + self.inactive_timeout); + if self.outbound_substreams.is_empty() + && self.inbound_substreams.is_empty() + { + self.keep_alive = KeepAlive::Until( + Instant::now() + self.inactive_timeout, + ); + } + } + Poll::Pending => { + entry.get_mut().0 = OutboundSubstreamState::Closing(substream); } } - Poll::Pending => { - entry.get_mut().0 = OutboundSubstreamState::Closing(substream); - } - }, + } OutboundSubstreamState::Poisoned => { crit!(self.log, "Poisoned outbound substream"); unreachable!("Coding Error: Outbound substream is poisoned") @@ -761,12 +903,10 @@ where self.dial_negotiated += 1; let (id, req) = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); - return Poll::Ready(Ok( - ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: (id, req), - }, - )); + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: (id, req), + }); } Poll::Pending } @@ -774,7 +914,7 @@ where // Check for new items to send to the peer and update the underlying stream fn apply_queued_responses( - raw_substream: InboundFramed, + substream: InboundFramed, queued_outbound_items: &mut Option<&mut Vec>>, new_items_to_send: &mut bool, ) -> InboundSubstreamState { @@ -785,17 +925,18 @@ fn apply_queued_responses( match queue.remove(0) { RPCCodedResponse::StreamTermination(_) => { // close the stream if this is a stream termination - InboundSubstreamState::Closing(raw_substream) + InboundSubstreamState::Closing(substream) } chunk => InboundSubstreamState::ResponsePendingSend { - substream: raw_substream.send(chunk), + substream: substream, + message: chunk, closing: false, }, } } _ => { // no items queued set to idle - InboundSubstreamState::ResponseIdle(raw_substream) + InboundSubstreamState::ResponseIdle(substream) } } } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 9eda5d2c15..b74af76ec5 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -20,7 +20,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use tokio_io_timeout::TimeoutStream; -use tokio_util::codec::Framed; +use tokio_util::{ + codec::Framed, + compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}, +}; use types::EthSpec; /// The maximum bytes that can be sent across the RPC. @@ -171,7 +174,7 @@ impl ProtocolName for ProtocolId { pub type InboundOutput = (RPCRequest, InboundFramed); pub type InboundFramed = - Framed>, InboundCodec>; + Framed>, InboundCodec>; type FnAndThen = fn( ( Option, RPCError>>, @@ -191,7 +194,8 @@ where fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { let protocol_name = protocol.message_name; - let socket = TokioNegotiatedStream(socket); + // convert the socket to tokio compatible socket + let socket = socket.compat(); let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec = @@ -220,9 +224,7 @@ where .and_then({ |(req, stream)| match req { Some(Ok(request)) => future::ok((request, stream)), - Some(Err(_)) => | None => { - future::err(RPCError::IncompleteStream) - } + Some(Err(_)) | None => future::err(RPCError::IncompleteStream), } } as FnAndThen), ), @@ -346,8 +348,7 @@ impl RPCRequest { /* Outbound upgrades */ -pub type OutboundFramed = - Framed, OutboundCodec>; +pub type OutboundFramed = Framed, OutboundCodec>; impl OutboundUpgrade for RPCRequest where @@ -359,7 +360,8 @@ where type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, protocol: Self::Info) -> Self::Future { - let socket = TokioNegotiatedStream(socket); + // convert to a tokio compatible socket + let socket = socket.comapt(); let codec = match protocol.encoding { Encoding::SSZSnappy => { let ssz_snappy_codec =