From bb0e28b8e3381d6763b35d1ccddec2ef2cf68ea3 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sat, 6 Jul 2019 23:43:44 +1000 Subject: [PATCH] Improved rpc protocols handler. WIP --- beacon_node/eth2-libp2p/src/rpc/handler.rs | 88 +++++++++++++++------ beacon_node/eth2-libp2p/src/rpc/methods.rs | 1 + beacon_node/eth2-libp2p/src/rpc/mod.rs | 4 +- beacon_node/eth2-libp2p/src/rpc/protocol.rs | 33 +------- 4 files changed, 70 insertions(+), 56 deletions(-) diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index fe5fba05f6..ff9f4ebdb3 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -1,4 +1,3 @@ - use libp2p::core::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol @@ -14,7 +13,7 @@ use wasm_timer::Instant; pub const RESPONSE_TIMEOUT: u64 = 9; /// Implementation of `ProtocolsHandler` for the RPC protocol. -pub struct RPCHandler +pub struct RPCHandler { /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, @@ -26,13 +25,13 @@ pub struct RPCHandler events_out: SmallVec<[TOutEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[TOutProto; 4]>, + dial_queue: SmallVec<[(usize,TOutProto); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, /// Map of current substreams awaiting a response to an RPC request. - waiting_substreams: FnvHashMap + waiting_substreams: FnvHashMap /// Sequential Id for waiting substreams. current_substream_id: usize, @@ -47,9 +46,15 @@ pub struct RPCHandler inactive_timeout: Duration, } -struct WaitingStream { - stream: TSubstream, - timeout: Duration, +/// State of an outbound substream. Either waiting for a response, or in the process of sending. +pub enum SubstreamState { + /// An outbound substream is waiting a response from the user. + WaitingResponse { + stream: , + timeout: Duration, + } + /// A response has been sent and we are waiting for the stream to close. + ResponseSent(WriteOne) } impl @@ -96,9 +101,9 @@ impl /// Opens an outbound substream with `upgrade`. #[inline] - pub fn send_request(&mut self, upgrade: RPCRequest) { + pub fn send_request(&mut self, request_id, u64, upgrade: RPCRequest) { self.keep_alive = KeepAlive::Yes; - self.dial_queue.push(upgrade); + self.dial_queue.push((request_id, upgrade)); } } @@ -106,20 +111,20 @@ impl Default for RPCHandler { fn default() -> Self { - RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(10)) + RPCHandler::new(SubstreamProtocol::new(RPCProtocol), Duration::from_secs(30)) } } impl ProtocolsHandler for RPCHandler { - type InEvent = RPCRequest; + type InEvent = RPCEvent; type OutEvent = RPCEvent; type Error = ProtocolsHandlerUpgrErr; type Substream = TSubstream; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; - type OutboundOpenInfo = (); + type OutboundOpenInfo = u64; // request_id #[inline] fn listen_protocol(&self) -> SubstreamProtocol { @@ -131,10 +136,6 @@ impl ProtocolsHandler &mut self, out: RPCProtocol::Output, ) { - if !self.keep_alive.is_yes() { - self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); - } - let (stream, req) = out; // drop the stream and return a 0 id for goodbye "requests" if let req @ RPCRequest::Goodbye(_) = req { @@ -143,7 +144,7 @@ impl ProtocolsHandler } // New inbound request. Store the stream and tag the output. - let awaiting_stream = WaitingStream { stream, timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT) }; + let awaiting_stream = SubstreamState::WaitingResponse { stream, timeout: Instant::now() + Duration::from_secs(RESPONSE_TIMEOUT) }; self.waiting_substreams.insert(self.current_substream_id, awaiting_stream); self.events_out.push(RPCEvent::Request(self.current_substream_id, req)); @@ -154,20 +155,36 @@ impl ProtocolsHandler fn inject_fully_negotiated_outbound( &mut self, out: RPCResponse, - _: Self::OutboundOpenInfo, + request_id : Self::OutboundOpenInfo, ) { self.dial_negotiated -= 1; - if self.dial_negotiated == 0 && self.dial_queue.is_empty() { + if self.dial_negotiated == 0 && self.dial_queue.is_empty() && self.waiting_substreams.is_empty() { self.keep_alive = KeepAlive::Until(Instant::now() + self.inactive_timeout); } + else { + self.keep_alive = KeepAlive::Yes; + } - self.events_out.push(out.into()); + self.events_out.push(RPCEvent::Response(request_id, out)); } + // Note: If the substream has closed due to inactivity, or the substream is in the + // wrong state a response will fail silently. #[inline] - fn inject_event(&mut self, event: Self::InEvent) { - self.send_request(event); + fn inject_event(&mut self, rpc_event: Self::InEvent) { + match rpc_event { + RPCEvent::Request(rpc_id, req) => self.send_request(rpc_id, req), + RPCEvent::Response(rpc_id, res) => { + // check if the stream matching the response still exists + if let Some(mut waiting_stream) = self.waiting_substreams.get_mut(&rpc_id) { + // only send one response per stream. This must be in the waiting state. + if let SubstreamState::WaitingResponse {substream, .. } = waiting_stream { + waiting_stream = SubstreamState::PendingWrite(upgrade::write_one(substream, res)); + } + } + } + } } #[inline] @@ -198,6 +215,26 @@ impl ProtocolsHandler return Err(err); } + // prioritise sending responses for waiting substreams + self.waiting_substreams.retain(|_k, mut waiting_stream| { + match waiting_stream => { + SubstreamState::PendingWrite(write_one) => { + match write_one.poll() => { + Ok(Async::Ready(_socket)) => false, + Ok(Async::NotReady()) => true, + Err(_e) => { + //TODO: Add logging + // throw away streams that error + false + } + } + }, + SubstreamState::WaitingResponse { timeout, .. } => { + if Instant::now() > timeout { false} else { true } + } + } + }); + if !self.events_out.is_empty() { return Ok(Async::Ready(ProtocolsHandlerEvent::Custom( self.events_out.remove(0), @@ -206,20 +243,21 @@ impl ProtocolsHandler self.events_out.shrink_to_fit(); } + // establish outbound substreams if !self.dial_queue.is_empty() { if self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; + let (request_id, req) = self.dial_queue.remove(0); return Ok(Async::Ready( ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(self.dial_queue.remove(0)), - info: (), + protocol: SubstreamProtocol::new(req), + info: request_id, }, )); } } else { self.dial_queue.shrink_to_fit(); } - Ok(Async::NotReady) } } diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index dfc3121c13..90784f9298 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -7,6 +7,7 @@ use types::{Epoch, Hash256, Slot}; #[derive(Debug, Clone)] pub enum RPCResponse { Hello(HelloMessage), + Goodbye, // empty value - required for protocol handler BeaconBlockRoots(BeaconBlockRootsResponse), BeaconBlockHeaders(BeaconBlockHeadersResponse), BeaconBlockBodies(BeaconBlockBodiesResponse), diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index 8914e18d5b..222e1de8d7 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -1,6 +1,6 @@ /// The Ethereum 2.0 Wire Protocol /// -/// This protocol is a purpose built ethereum 2.0 libp2p protocol. It's role is to facilitate +/// This protocol is a purpose built Ethereum 2.0 libp2p protocol. It's role is to facilitate /// direct peer-to-peer communication primarily for sending/receiving chain information for /// syncing. /// @@ -67,7 +67,7 @@ impl NetworkBehaviour for Rpc where TSubstream: AsyncRead + AsyncWrite, { - type ProtocolsHandler = OneShotHandler; + type ProtocolsHandler = RPCHandler; type OutEvent = RPCMessage; fn new_handler(&mut self) -> Self::ProtocolsHandler { diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index fb4a2b6037..9b7e881840 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -10,6 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; const MAX_RPC_SIZE: usize = 4_194_304; // 4M /// The protocol prefix the RPC protocol id. const PROTOCOL_PREFIX: &str = "/eth/serenity/rpc/"; +/// The number of seconds to wait for a response before the stream is terminated. +const RESPONSE_TIMEOUT: u64 = 10; /// Implementation of the `ConnectionUpgrade` for the rpc protocol. #[derive(Debug, Clone)] @@ -58,6 +60,7 @@ where upgrade::read_respond(socket, MAX_RPC_SIZE, (), |socket, packet, ()| { Ok((socket, decode_request(packet, protocol)?)) }) + .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) } } @@ -135,17 +138,6 @@ impl UpgradeInfo for RPCRequest { } } -// GOODBYE RPC has it's own upgrade as it doesn't expect a response -impl UpgradeInfo for Goodbye { - type Info = RawProtocolId; - type InfoIter = iter::Once; - - // add further protocols as we support more encodings/versions - fn protocol_info(&self) -> Self::InfoIter { - iter::once(ProtocolId::new("goodbye", 1, "ssz").into()) - } -} - /// Implements the encoding per supported protocol for RPCRequest. impl RPCRequest { pub fn supported_protocols(&self) -> Vec { @@ -221,24 +213,7 @@ where upgrade::request_response(socket, bytes, MAX_RPC_SIZE, protocol, |packet, protocol| { Ok(decode_response(packet, protocol)?) }) - } -} - -impl OutboundUpgrade for Goodbye -where - TSocket: AsyncWrite, -{ - type Output = (); - type Error = io::Error; - type Future = upgrade::WriteOne>; - - fn upgrade_outbound( - self, - socket: upgrade::Negotiated, - protocol: Self::Info, - ) -> Self::Future { - let bytes = self.as_ssz_bytes(); - upgrade::write_one(socket, bytes) + .timeout(Duration::from_secs(RESPONSE_TIMEOUT)) } }