diff --git a/beacon_node/eth2-libp2p/src/behaviour/mod.rs b/beacon_node/eth2-libp2p/src/behaviour/mod.rs index 3316364cae..0e025d573c 100644 --- a/beacon_node/eth2-libp2p/src/behaviour/mod.rs +++ b/beacon_node/eth2-libp2p/src/behaviour/mod.rs @@ -26,7 +26,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use types::{EnrForkId, EthSpec, SubnetId}; +use types::{EnrForkId, EthSpec, SignedBeaconBlock, SubnetId}; mod handler; @@ -393,8 +393,36 @@ impl Behaviour { /* Eth2 RPC behaviour functions */ + /// Send a request to a peer over RPC. + pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { + self.send_rpc(peer_id, RPCSend::Request(request_id, request.into())) + } + + /// Send a successful response to a peer over RPC. + pub fn send_successful_response( + &mut self, + peer_id: PeerId, + stream_id: SubstreamId, + response: Response, + ) { + self.send_rpc(peer_id, RPCSend::Response(stream_id, response.into())) + } + + /// Inform the peer that their request produced an error. + pub fn _send_error_reponse( + &mut self, + peer_id: PeerId, + stream_id: SubstreamId, + error: RPCResponseErrorCode, + reason: String, + ) { + self.send_rpc( + peer_id, + RPCSend::Response(stream_id, RPCCodedResponse::from_error_code(error, reason)), + ) + } /// Sends an RPC Request/Response via the RPC protocol. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCSend) { self.eth2_rpc.send_rpc(peer_id, rpc_event); } @@ -476,32 +504,38 @@ impl Behaviour { .expect("Local discovery must have bitfield"); } - /// Sends a PING/PONG request/response to a peer. - fn send_ping(&mut self, id: RequestId, peer_id: PeerId, is_request: bool) { - let ping = crate::rpc::methods::Ping { + /// Sends a Ping request to the peer. + fn ping(&mut self, id: RequestId, peer_id: PeerId) { + let ping = crate::rpc::Ping { data: self.meta_data.seq_number, }; + debug!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string()); + let event = RPCSend::Request(id, RPCRequest::Ping(ping)); - let event = if is_request { - debug!(self.log, "Sending Ping"; "request_id" => id, "peer_id" => peer_id.to_string()); - RPCEvent::Request(id, RPCRequest::Ping(ping)) - } else { - debug!(self.log, "Sending Pong"; "request_id" => id, "peer_id" => peer_id.to_string()); - RPCEvent::Response(id, RPCCodedResponse::Success(RPCResponse::Pong(ping))) + self.send_rpc(peer_id, event); + } + + /// Sends a Pong response to the peer. + fn pong(&mut self, id: SubstreamId, peer_id: PeerId) { + let ping = crate::rpc::Ping { + data: self.meta_data.seq_number, }; + debug!(self.log, "Sending Pong"; "request_id" => id, "peer_id" => peer_id.to_string()); + let event = RPCSend::Response(id, RPCCodedResponse::Success(RPCResponse::Pong(ping))); + self.send_rpc(peer_id, event); } /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { let metadata_request = - RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); + RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); self.send_rpc(peer_id, metadata_request); } /// Sends a METADATA response to a peer. - fn send_meta_data_response(&mut self, id: RequestId, peer_id: PeerId) { - let metadata_response = RPCEvent::Response( + fn send_meta_data_response(&mut self, id: SubstreamId, peer_id: PeerId) { + let metadata_response = RPCSend::Response( id, RPCCodedResponse::Success(RPCResponse::MetaData(self.meta_data.clone())), ); @@ -587,45 +621,112 @@ impl Behaviour { } } + /// Queues the response to be sent upwards as long at it was requested outside the Behaviour. + fn propagate_response(&mut self, id: RequestId, peer_id: PeerId, response: Response) { + if !matches!(id, RequestId::Behaviour) { + self.events.push(BehaviourEvent::ResponseReceived { + peer_id, + id, + response, + }); + } + } + + /// Convenience function to propagate a request. + fn propagate_request(&mut self, id: SubstreamId, peer_id: PeerId, request: Request) { + self.events.push(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }); + } + fn on_rpc_event(&mut self, message: RPCMessage) { let peer_id = message.peer_id; - // The METADATA and PING RPC responses are handled within the behaviour and not - // propagated - // TODO: Improve the RPC types to better handle this logic discrepancy + // The METADATA and PING RPC responses are handled within the behaviour and not propagated match message.event { - RPCEvent::Request(id, RPCRequest::Ping(ping)) => { - // inform the peer manager and send the response - self.peer_manager.ping_request(&peer_id, ping.data); - // send a ping response - self.send_ping(id, peer_id, false); + Err(handler_err) => { + match handler_err { + HandlerErr::Inbound { + id: _, + proto, + error, + } => { + // Inform the peer manager of the error. + // An inbound error here means we sent an error to the peer, or the stream + // timed out. + self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + } + HandlerErr::Outbound { id, proto, error } => { + // Inform the peer manager that a request we sent to the peer failed + self.peer_manager.handle_rpc_error(&peer_id, proto, &error); + // inform failures of requests comming outside the behaviour + if !matches!(id, RequestId::Behaviour) { + self.events + .push(BehaviourEvent::RPCFailed { peer_id, id, error }); + } + } + } } - RPCEvent::Request(id, RPCRequest::MetaData(_)) => { - // send the requested meta-data - self.send_meta_data_response(id, peer_id); + Ok(RPCReceived::Request(id, request)) => match request { + /* Behaviour managed protocols: Ping and Metadata */ + RPCRequest::Ping(ping) => { + // inform the peer manager and send the response + self.peer_manager.ping_request(&peer_id, ping.data); + // send a ping response + self.pong(id, peer_id); + } + RPCRequest::MetaData(_) => { + // send the requested meta-data + self.send_meta_data_response(id, peer_id); + // TODO: inform the peer manager? + } + /* Protocols propagated to the Network */ + RPCRequest::Status(msg) => { + // inform the peer manager that we have received a status from a peer + self.peer_manager.peer_statusd(&peer_id); + // propagate the STATUS message upwards + self.propagate_request(id, peer_id, Request::Status(msg)) + } + RPCRequest::BlocksByRange(req) => { + self.propagate_request(id, peer_id, Request::BlocksByRange(req)) + } + RPCRequest::BlocksByRoot(req) => { + self.propagate_request(id, peer_id, Request::BlocksByRoot(req)) + } + RPCRequest::Goodbye(reason) => { + // TODO: do not propagate + self.propagate_request(id, peer_id, Request::Goodbye(reason)); + } + }, + Ok(RPCReceived::Response(id, resp)) => { + match resp { + /* Behaviour managed protocols */ + RPCResponse::Pong(ping) => self.peer_manager.pong_response(&peer_id, ping.data), + RPCResponse::MetaData(meta_data) => { + self.peer_manager.meta_data_response(&peer_id, meta_data) + } + /* Network propagated protocols */ + RPCResponse::Status(msg) => { + // inform the peer manager that we have received a status from a peer + self.peer_manager.peer_statusd(&peer_id); + // propagate the STATUS message upwards + self.propagate_response(id, peer_id, Response::Status(msg)); + } + RPCResponse::BlocksByRange(resp) => { + self.propagate_response(id, peer_id, Response::BlocksByRange(Some(resp))) + } + RPCResponse::BlocksByRoot(resp) => { + self.propagate_response(id, peer_id, Response::BlocksByRoot(Some(resp))) + } + } } - RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Pong(ping))) => { - self.peer_manager.pong_response(&peer_id, ping.data); - } - RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::MetaData(meta_data))) => { - self.peer_manager.meta_data_response(&peer_id, meta_data); - } - RPCEvent::Request(_, RPCRequest::Status(_)) - | RPCEvent::Response(_, RPCCodedResponse::Success(RPCResponse::Status(_))) => { - // inform the peer manager that we have received a status from a peer - self.peer_manager.peer_statusd(&peer_id); - // propagate the STATUS message upwards - self.events - .push(BehaviourEvent::RPC(peer_id, message.event)); - } - RPCEvent::Error(_, protocol, ref err) => { - self.peer_manager.handle_rpc_error(&peer_id, protocol, err); - self.events - .push(BehaviourEvent::RPC(peer_id, message.event)); - } - _ => { - // propagate all other RPC messages upwards - self.events - .push(BehaviourEvent::RPC(peer_id, message.event)) + Ok(RPCReceived::EndOfStream(id, termination)) => { + let response = match termination { + ResponseTermination::BlocksByRange => Response::BlocksByRange(None), + ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), + }; + self.propagate_response(id, peer_id, response); } } } @@ -648,7 +749,7 @@ impl Behaviour { } PeerManagerEvent::Ping(peer_id) => { // send a ping request to this peer - self.send_ping(RequestId::from(0usize), peer_id, true); + self.ping(RequestId::Behaviour, peer_id); } PeerManagerEvent::MetaData(peer_id) => { self.send_meta_data_request(peer_id); @@ -707,11 +808,96 @@ impl Behaviour { } } +/* Public API types */ + +/// The type of RPC requests the Behaviour informs it has received and allows for sending. +/// +// NOTE: This is an application-level wrapper over the lower network leve requests that can be +// sent. The main difference is the absense of the Ping and Metadata protocols, which don't +// leave the Behaviour. For all protocols managed by RPC see `RPCRequest`. +#[derive(Debug, Clone, PartialEq)] +pub enum Request { + /// A Status message. + Status(StatusMessage), + /// A Goobye message. + Goodbye(GoodbyeReason), + /// A blocks by range request. + BlocksByRange(BlocksByRangeRequest), + /// A request blocks root request. + BlocksByRoot(BlocksByRootRequest), +} + +impl std::convert::From for RPCRequest { + fn from(req: Request) -> RPCRequest { + match req { + Request::BlocksByRoot(r) => RPCRequest::BlocksByRoot(r), + Request::BlocksByRange(r) => RPCRequest::BlocksByRange(r), + Request::Goodbye(r) => RPCRequest::Goodbye(r), + Request::Status(s) => RPCRequest::Status(s), + } + } +} + +/// The type of RPC responses the Behaviour informs it has received, and allows for sending. +/// +// NOTE: This is an application-level wrapper over the lower network level responses that can be +// sent. The main difference is the absense of Pong and Metadata, which don't leave the +// Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and +// `RPCCodedResponse`. +#[derive(Debug, Clone, PartialEq)] +pub enum Response { + /// A Status message. + Status(StatusMessage), + /// A response to a get BLOCKS_BY_RANGE request. A None response signals the end of the batch. + BlocksByRange(Option>>), + /// A response to a get BLOCKS_BY_ROOT request. + BlocksByRoot(Option>>), +} + +impl std::convert::From> for RPCCodedResponse { + fn from(resp: Response) -> RPCCodedResponse { + match resp { + Response::BlocksByRoot(r) => match r { + Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRoot(b)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot), + }, + Response::BlocksByRange(r) => match r { + Some(b) => RPCCodedResponse::Success(RPCResponse::BlocksByRange(b)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), + }, + Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), + } + } +} + /// The types of events than can be obtained from polling the behaviour. #[derive(Debug)] pub enum BehaviourEvent { - /// A received RPC event and the peer that it was received from. - RPC(PeerId, RPCEvent), + /// An RPC Request that was sent failed. + RPCFailed { + /// The id of the failed request. + id: RequestId, + /// The peer to which this request was sent. + peer_id: PeerId, + /// The error that occurred. + error: RPCError, + }, + RequestReceived { + /// The peer that sent the request. + peer_id: PeerId, + /// Identifier of the request. All responses to this request must use this id. + id: SubstreamId, + /// Request the peer sent. + request: Request, + }, + ResponseReceived { + /// Peer that sent the response. + peer_id: PeerId, + /// Id of the request to which the peer is responding. + id: RequestId, + /// Response the peer sent. + response: Response, + }, PubsubMessage { /// The gossipsub message id. Used when propagating blocks after validation. id: MessageId, diff --git a/beacon_node/eth2-libp2p/src/lib.rs b/beacon_node/eth2-libp2p/src/lib.rs index 2c028eac6f..7e1f2cd2dd 100644 --- a/beacon_node/eth2-libp2p/src/lib.rs +++ b/beacon_node/eth2-libp2p/src/lib.rs @@ -15,12 +15,11 @@ mod service; pub mod types; pub use crate::types::{error, Enr, GossipTopic, NetworkGlobals, PubsubMessage}; -pub use behaviour::BehaviourEvent; +pub use behaviour::{BehaviourEvent, Request, Response}; pub use config::Config as NetworkConfig; pub use discovery::enr_ext::{CombinedKeyExt, EnrExt}; pub use libp2p::gossipsub::{MessageId, Topic, TopicHash}; pub use libp2p::{core::ConnectedPoint, PeerId, Swarm}; pub use libp2p::{multiaddr, Multiaddr}; pub use peer_manager::{client::Client, PeerDB, PeerInfo, PeerSyncStatus, SyncInfo}; -pub use rpc::RPCEvent; pub use service::{Libp2pEvent, Service, NETWORK_KEY_FILENAME}; diff --git a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs index 2f5d0e9d3f..375ab71dc3 100644 --- a/beacon_node/eth2-libp2p/src/peer_manager/mod.rs +++ b/beacon_node/eth2-libp2p/src/peer_manager/mod.rs @@ -279,7 +279,7 @@ impl PeerManager { // this could their fault or ours, so we tolerate this PeerAction::HighToleranceError } - RPCError::ErrorResponse(code) => match code { + RPCError::ErrorResponse(code, _) => match code { RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, RPCResponseErrorCode::ServerError => PeerAction::MidToleranceError, RPCResponseErrorCode::InvalidRequest => PeerAction::LowToleranceError, diff --git a/beacon_node/eth2-libp2p/src/rpc/handler.rs b/beacon_node/eth2-libp2p/src/rpc/handler.rs index 9bb8500a34..e60ee28c19 100644 --- a/beacon_node/eth2-libp2p/src/rpc/handler.rs +++ b/beacon_node/eth2-libp2p/src/rpc/handler.rs @@ -3,7 +3,7 @@ use super::methods::{RPCCodedResponse, RequestId, ResponseTermination}; use super::protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; -use super::RPCEvent; +use super::{RPCReceived, RPCSend}; use crate::rpc::protocol::{InboundFramed, OutboundFramed}; use fnv::FnvHashMap; use futures::prelude::*; @@ -33,12 +33,34 @@ pub const RESPONSE_TIMEOUT: u64 = 10; /// The number of times to retry an outbound upgrade in the case of IO errors. const IO_ERROR_RETRIES: u8 = 3; -/// Inbound requests are given a sequential `RequestId` to keep track of. All inbound streams are -/// identified by their substream ID which is identical to the RPC Id. -type InboundRequestId = RequestId; -/// Outbound requests are associated with an id that is given by the application that sent the -/// request. -type OutboundRequestId = RequestId; +/// Identifier of inbound and outbound substreams from the handler's perspective. +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] +pub struct SubstreamId(usize); + +/// An error encoutered by the handler. +pub enum HandlerErr { + /// An error ocurred for this peer's request. This can occurr during protocol negotiation, + /// message passing, or if the handler identifies that we are sending an error reponse to the peer. + Inbound { + /// Id of the peer's request for which an error occurred. + id: SubstreamId, + /// Information of the negotiated protocol. + proto: Protocol, + /// The error that ocurred. + error: RPCError, + }, + /// An error ocurred for this request. Such error can occurr during protocol negotiation, + /// message passing, or if we successfully received a response from the peer, but this response + /// indicates an error. + Outbound { + /// Application-given Id of the request for which an error occurred. + id: RequestId, + /// Information of the protocol. + proto: Protocol, + /// The error that ocurred. + error: RPCError, + }, +} /// Implementation of `ProtocolsHandler` for the RPC protocol. pub struct RPCHandler @@ -48,11 +70,11 @@ where /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol>, - /// If something bad happened and we should shut down the handler with an error. - pending_error: Vec<(RequestId, Protocol, RPCError)>, + /// Errors ocurring on outbound and inbound connections queued for reporting back. + pending_errors: Vec, /// Queue of events to produce in `poll()`. - events_out: SmallVec<[RPCEvent; 4]>, + events_out: SmallVec<[RPCReceived; 4]>, /// Queue of outbound substreams to open. dial_queue: SmallVec<[(RequestId, RPCRequest); 4]>, @@ -62,7 +84,7 @@ where /// Current inbound substreams awaiting processing. inbound_substreams: FnvHashMap< - InboundRequestId, + SubstreamId, ( InboundSubstreamState, Option, @@ -71,29 +93,22 @@ where >, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. - inbound_substreams_delay: DelayQueue, + inbound_substreams_delay: DelayQueue, - /// Map of outbound substreams that need to be driven to completion. The `RequestId` is - /// maintained by the application sending the request. - /// For Responses with multiple expected response chunks a counter is added to be able to terminate the stream when the expected number has been received - outbound_substreams: FnvHashMap< - OutboundRequestId, - ( - OutboundSubstreamState, - delay_queue::Key, - Protocol, - Option, - ), - >, + /// Map of outbound substreams that need to be driven to completion. + outbound_substreams: FnvHashMap>, /// Inbound substream `DelayQueue` which keeps track of when an inbound substream will timeout. - outbound_substreams_delay: DelayQueue, + outbound_substreams_delay: DelayQueue, /// Map of outbound items that are queued as the stream processes them. - queued_outbound_items: FnvHashMap>>, + queued_outbound_items: FnvHashMap>>, /// Sequential ID for waiting substreams. For inbound substreams, this is also the inbound request ID. - current_inbound_substream_id: RequestId, + current_inbound_substream_id: SubstreamId, + + /// Sequential ID for outbound substreams. + current_outbound_substream_id: SubstreamId, /// Maximum number of concurrent outbound substreams being opened. Value is never modified. max_dial_negotiated: u32, @@ -112,6 +127,23 @@ where log: slog::Logger, } +/// Contains the information the handler keeps on established outbound substreams. +struct OutboundInfo { + /// State of the substream. + state: OutboundSubstreamState, + /// Key to keep track of the substream's timeout via `self.outbound_substreams_delay`. + delay_key: delay_queue::Key, + /// Info over the protocol this substream is handling. + proto: Protocol, + /// Number of chunks to be seen from the peer's response. + // TODO: removing the option could allow clossing the streams after the number of + // expected responses is met for all protocols. + // TODO: the type of this is wrong + remaining_chunks: Option, + /// RequestId as given by the application that sent the request. + req_id: RequestId, +} + pub enum InboundSubstreamState where TSpec: EthSpec, @@ -208,7 +240,7 @@ where } InboundSubstreamState::ResponseIdle(substream) => { *self = InboundSubstreamState::ResponsePendingSend { - substream: substream, + substream, message: error, closing: true, }; @@ -235,7 +267,7 @@ where ) -> Self { RPCHandler { listen_protocol, - pending_error: Vec::new(), + pending_errors: Vec::new(), events_out: SmallVec::new(), dial_queue: SmallVec::new(), dial_negotiated: 0, @@ -244,7 +276,8 @@ where outbound_substreams: FnvHashMap::default(), inbound_substreams_delay: DelayQueue::new(), outbound_substreams_delay: DelayQueue::new(), - current_inbound_substream_id: 1, + current_inbound_substream_id: SubstreamId(0), + current_outbound_substream_id: SubstreamId(0), max_dial_negotiated: 8, keep_alive: KeepAlive::Yes, inactive_timeout, @@ -300,8 +333,8 @@ impl ProtocolsHandler for RPCHandler where TSpec: EthSpec, { - type InEvent = RPCEvent; - type OutEvent = RPCEvent; + type InEvent = RPCSend; + type OutEvent = Result, HandlerErr>; type Error = RPCError; type InboundProtocol = RPCProtocol; type OutboundProtocol = RPCRequest; @@ -316,9 +349,11 @@ where substream: >::Output, ) { let (req, substream) = substream; - // drop the stream and return a 0 id for goodbye "requests" - if let r @ RPCRequest::Goodbye(_) = req { - self.events_out.push(RPCEvent::Request(0, r)); + // drop the stream + if let RPCRequest::Goodbye(_) = req { + self.events_out + .push(RPCReceived::Request(self.current_inbound_substream_id, req)); + self.current_inbound_substream_id.0 += 1; return; } @@ -334,8 +369,8 @@ where ); self.events_out - .push(RPCEvent::Request(self.current_inbound_substream_id, req)); - self.current_inbound_substream_id += 1; + .push(RPCReceived::Request(self.current_inbound_substream_id, req)); + self.current_inbound_substream_id.0 += 1; } fn inject_fully_negotiated_outbound( @@ -346,43 +381,42 @@ where self.dial_negotiated -= 1; // add the stream to substreams if we expect a response, otherwise drop the stream. - let (mut id, request) = request_info; - if request.expect_response() { - // outbound requests can be sent from various aspects of lighthouse which don't - // track request ids. In the future these will be flagged as None, currently they - // are flagged as 0. These can overlap. In this case, we pick the highest request - // Id available - if id == 0 && self.outbound_substreams.get(&id).is_some() { - // have duplicate outbound request with no id. Pick one that will not collide - let mut new_id = std::usize::MAX; - while self.outbound_substreams.get(&new_id).is_some() { - // panic all outbound substreams are full - new_id -= 1; - } - trace!(self.log, "New outbound stream id created"; "id" => new_id); - id = RequestId::from(new_id); - } - + let (id, request) = request_info; + let expected_responses = request.expected_responses(); + if expected_responses > 0 { // new outbound request. Store the stream and tag the output. - let delay_key = self - .outbound_substreams_delay - .insert(id, Duration::from_secs(RESPONSE_TIMEOUT)); - let protocol = request.protocol(); - let response_chunk_count = match request { - RPCRequest::BlocksByRange(ref req) => Some(req.count), - RPCRequest::BlocksByRoot(ref req) => Some(req.block_roots.len() as u64), - _ => None, // Other requests do not have a known response chunk length, - }; + let delay_key = self.outbound_substreams_delay.insert( + self.current_outbound_substream_id, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + let proto = request.protocol(); let awaiting_stream = OutboundSubstreamState::RequestPendingResponse { substream: out, - request: request, + request, }; - if let Some(_) = self.outbound_substreams.insert( - id, - (awaiting_stream, delay_key, protocol, response_chunk_count), - ) { - crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", id)); + let expected_responses = if expected_responses > 1 { + // Currently enforced only for multiple responses + Some(expected_responses) + } else { + None + }; + if self + .outbound_substreams + .insert( + self.current_outbound_substream_id, + OutboundInfo { + state: awaiting_stream, + delay_key, + proto, + remaining_chunks: expected_responses, + req_id: id, + }, + ) + .is_some() + { + crit!(self.log, "Duplicate outbound substream id"; "id" => format!("{:?}", self.current_outbound_substream_id)); } + self.current_outbound_substream_id.0 += 1; } self.update_keep_alive(); @@ -392,113 +426,124 @@ where // wrong state a response will fail silently. fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { - RPCEvent::Request(id, req) => self.send_request(id, req), - RPCEvent::Response(rpc_id, response) => { + RPCSend::Request(id, req) => self.send_request(id, req), + RPCSend::Response(inbound_id, response) => { // Variables indicating if the response is an error response or a multi-part // response let res_is_error = response.is_error(); let res_is_multiple = response.multiple_responses(); // check if the stream matching the response still exists - match self.inbound_substreams.get_mut(&rpc_id) { - Some((substream_state, _, protocol)) => { - match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { - InboundSubstreamState::ResponseIdle(substream) => { - // close the stream if there is no response - match response { - RPCCodedResponse::StreamTermination(_) => { - //trace!(self.log, "Stream termination sent. Ending the stream"); - *substream_state = - InboundSubstreamState::Closing(substream); - } - _ => { - if let Some(error_code) = response.error_code() { - self.pending_error.push(( - rpc_id, - *protocol, - RPCError::ErrorResponse(error_code), - )); - } - // send the response - // if it's a single rpc request or an error, close the stream after - *substream_state = - InboundSubstreamState::ResponsePendingSend { - substream: substream, - message: response, - closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses - }; - } - } - } - InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - } if res_is_multiple => { - // the stream is in use, add the request to a pending queue - self.queued_outbound_items - .entry(rpc_id) - .or_insert_with(Vec::new) - .push(response); + let (substream_state, protocol) = match self.inbound_substreams.get_mut(&inbound_id) + { + Some((substream_state, _, protocol)) => (substream_state, protocol), + None => { + warn!(self.log, "Stream has expired. Response not sent"; + "response" => response.to_string(), "id" => inbound_id); + return; + } + }; - // return the state - *substream_state = InboundSubstreamState::ResponsePendingSend { - substream, - message, - closing, - }; - } - InboundSubstreamState::ResponsePendingFlush { substream, closing } - if res_is_multiple => - { - // the stream is in use, add the request to a pending queue - self.queued_outbound_items - .entry(rpc_id) - .or_insert_with(Vec::new) - .push(response); + // If the response we are sending is an error, report back for handling + match response { + RPCCodedResponse::InvalidRequest(ref reason) + | RPCCodedResponse::ServerError(ref reason) + | RPCCodedResponse::Unknown(ref reason) => { + let code = &response + .error_code() + .expect("Error response should map to an error code"); + let err = HandlerErr::Inbound { + id: inbound_id, + proto: *protocol, + error: RPCError::ErrorResponse(*code, reason.clone()), + }; + self.pending_errors.push(err); + } + _ => {} // not an error, continue. + } - // return the state - *substream_state = InboundSubstreamState::ResponsePendingFlush { - substream, - closing, - }; - } - InboundSubstreamState::Closing(substream) => { + match std::mem::replace(substream_state, InboundSubstreamState::Poisoned) { + InboundSubstreamState::ResponseIdle(substream) => { + // close the stream if there is no response + match response { + RPCCodedResponse::StreamTermination(_) => { *substream_state = InboundSubstreamState::Closing(substream); - debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response)); } - InboundSubstreamState::ResponsePendingSend { - substream, - message, - .. - } => { + _ => { + // send the response + // if it's a single rpc request or an error, close the stream after *substream_state = InboundSubstreamState::ResponsePendingSend { substream, - message, - closing: true, + message: response, + closing: !res_is_multiple | res_is_error, // close if an error or we are not expecting more responses }; - error!(self.log, "Attempted sending multiple responses to a single response request"); - } - InboundSubstreamState::ResponsePendingFlush { substream, .. } => { - *substream_state = InboundSubstreamState::ResponsePendingFlush { - substream, - closing: true, - }; - error!(self.log, "Attempted sending multiple responses to a single response request"); - } - InboundSubstreamState::Poisoned => { - crit!(self.log, "Poisoned inbound substream"); - unreachable!("Coding error: Poisoned substream"); } } } - None => { - warn!(self.log, "Stream has expired. Response not sent"; "response" => response.to_string(), "id" => rpc_id); + InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + } if res_is_multiple => { + // the stream is in use, add the request to a pending queue + self.queued_outbound_items + .entry(inbound_id) + .or_insert_with(Vec::new) + .push(response); + + // return the state + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing, + }; } - }; + InboundSubstreamState::ResponsePendingFlush { substream, closing } + if res_is_multiple => + { + // the stream is in use, add the request to a pending queue + self.queued_outbound_items + .entry(inbound_id) + .or_insert_with(Vec::new) + .push(response); + + // return the state + *substream_state = + InboundSubstreamState::ResponsePendingFlush { substream, closing }; + } + InboundSubstreamState::Closing(substream) => { + *substream_state = InboundSubstreamState::Closing(substream); + debug!(self.log, "Response not sent. Stream is closing"; "response" => format!("{}",response)); + } + InboundSubstreamState::ResponsePendingSend { + substream, message, .. + } => { + *substream_state = InboundSubstreamState::ResponsePendingSend { + substream, + message, + closing: true, + }; + error!( + self.log, + "Attempted sending multiple responses to a single response request" + ); + } + InboundSubstreamState::ResponsePendingFlush { substream, .. } => { + *substream_state = InboundSubstreamState::ResponsePendingFlush { + substream, + closing: true, + }; + error!( + self.log, + "Attempted sending multiple responses to a single response request" + ); + } + InboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned inbound substream"); + unreachable!("Coding error: Poisoned substream"); + } + } } - // We do not send errors as responses - RPCEvent::Error(..) => {} } } @@ -520,7 +565,7 @@ where self.outbound_io_error_retries = 0; // map the error - let rpc_error = match error { + let error = match error { ProtocolsHandlerUpgrErr::Timer => RPCError::InternalError("Timer failed"), ProtocolsHandlerUpgrErr::Timeout => RPCError::NegotiationTimeout, ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => e, @@ -541,7 +586,11 @@ where } }, }; - self.pending_error.push((id, req.protocol(), rpc_error)); + self.pending_errors.push(HandlerErr::Outbound { + id, + proto: req.protocol(), + error, + }); } fn connection_keep_alive(&self) -> KeepAlive { @@ -559,16 +608,15 @@ where Self::Error, >, > { - if !self.pending_error.is_empty() { - let (id, protocol, err) = self.pending_error.remove(0); - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( - id, protocol, err, - ))); + // report failures + if !self.pending_errors.is_empty() { + let err_info = self.pending_errors.remove(0); + return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(err_info))); } // return any events that need to be reported if !self.events_out.is_empty() { - return Poll::Ready(ProtocolsHandlerEvent::Custom(self.events_out.remove(0))); + return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok(self.events_out.remove(0)))); } else { self.events_out.shrink_to_fit(); } @@ -576,17 +624,23 @@ where // purge expired inbound substreams and send an error loop { match self.inbound_substreams_delay.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stream_id))) => { + Poll::Ready(Some(Ok(inbound_id))) => { // handle a stream timeout for various states - if let Some((substream_state, delay_key, _)) = - self.inbound_substreams.get_mut(stream_id.get_ref()) + if let Some((substream_state, delay_key, protocol)) = + self.inbound_substreams.get_mut(inbound_id.get_ref()) { // the delay has been removed *delay_key = None; + self.pending_errors.push(HandlerErr::Inbound { + id: *inbound_id.get_ref(), + proto: *protocol, + error: RPCError::StreamTimeout, + }); + let outbound_queue = self .queued_outbound_items - .entry(stream_id.into_inner()) + .entry(inbound_id.into_inner()) .or_insert_with(Vec::new); substream_state.close(outbound_queue); } @@ -605,20 +659,21 @@ where // purge expired outbound substreams loop { match self.outbound_substreams_delay.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(stream_id))) => { - if let Some((_id, _stream, protocol, _)) = - self.outbound_substreams.remove(stream_id.get_ref()) + Poll::Ready(Some(Ok(outbound_id))) => { + if let Some(OutboundInfo { proto, req_id, .. }) = + self.outbound_substreams.remove(outbound_id.get_ref()) { self.update_keep_alive(); + let outbound_err = HandlerErr::Outbound { + id: req_id, + proto, + error: RPCError::StreamTimeout, + }; // notify the user - return Poll::Ready(ProtocolsHandlerEvent::Custom(RPCEvent::Error( - *stream_id.get_ref(), - protocol, - RPCError::StreamTimeout, - ))); + return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err))); } else { - crit!(self.log, "timed out substream not in the books"; "stream_id" => stream_id.get_ref()); + crit!(self.log, "timed out substream not in the books"; "stream_id" => outbound_id.get_ref()); } } Poll::Ready(Some(Err(e))) => { @@ -797,155 +852,161 @@ where } // drive outbound streams that need to be processed - for request_id in self.outbound_substreams.keys().copied().collect::>() { - match self.outbound_substreams.entry(request_id) { + for outbound_id in self.outbound_substreams.keys().copied().collect::>() { + // get the state and mark it as poisoned + let (mut entry, state) = match self.outbound_substreams.entry(outbound_id) { Entry::Occupied(mut entry) => { - match std::mem::replace( - &mut entry.get_mut().0, + let state = std::mem::replace( + &mut entry.get_mut().state, OutboundSubstreamState::Poisoned, - ) { - OutboundSubstreamState::RequestPendingResponse { - mut substream, - request, - } => match substream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(response))) => { - if request.multiple_responses() && !response.is_error() { - let substream_entry = entry.get_mut(); - let delay_key = &substream_entry.1; - // chunks left after this one - let remaining_chunks = substream_entry - .3 - .map(|count| count.saturating_sub(1)) - .unwrap_or_else(|| 0); - if remaining_chunks == 0 { - // this is the last expected message, close the stream as all expected chunks have been received - substream_entry.0 = - OutboundSubstreamState::Closing(substream); - } else { - // If the response chunk was expected update the remaining number of chunks expected and reset the Timeout - substream_entry.0 = - OutboundSubstreamState::RequestPendingResponse { - substream, - request, - }; - substream_entry.3 = Some(remaining_chunks); - self.outbound_substreams_delay.reset( - delay_key, - Duration::from_secs(RESPONSE_TIMEOUT), - ); - } - } else { - // either this is a single response request or we received an - // error - // only expect a single response, close the stream - entry.get_mut().0 = OutboundSubstreamState::Closing(substream); - } + ); + (entry, state) + } + Entry::Vacant(_) => unreachable!(), + }; - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response(request_id, response), - )); + match state { + OutboundSubstreamState::RequestPendingResponse { + mut substream, + request, + } => match substream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(response))) => { + if request.expected_responses() > 1 && !response.is_error() { + let substream_entry = entry.get_mut(); + let delay_key = &substream_entry.delay_key; + // chunks left after this one + let remaining_chunks = substream_entry + .remaining_chunks + .map(|count| count.saturating_sub(1)) + .unwrap_or_else(|| 0); + if remaining_chunks == 0 { + // this is the last expected message, close the stream as all expected chunks have been received + substream_entry.state = OutboundSubstreamState::Closing(substream); + } else { + // If the response chunk was expected update the remaining number of chunks expected and reset the Timeout + substream_entry.state = + OutboundSubstreamState::RequestPendingResponse { + substream, + request, + }; + substream_entry.remaining_chunks = Some(remaining_chunks); + self.outbound_substreams_delay + .reset(delay_key, Duration::from_secs(RESPONSE_TIMEOUT)); } - Poll::Ready(None) => { - // stream closed - // if we expected multiple streams send a stream termination, - // else report the stream terminating only. - //trace!(self.log, "RPC Response - stream closed by remote"); - // drop the stream - let delay_key = &entry.get().1; - self.outbound_substreams_delay.remove(delay_key); - entry.remove_entry(); + } else { + // either this is a single response request or we received an + // error only expect a single response, close the stream + entry.get_mut().state = OutboundSubstreamState::Closing(substream); + } - self.update_keep_alive(); - // notify the application error - if request.multiple_responses() { - // return an end of stream result - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response( - request_id, - RPCCodedResponse::StreamTermination( - request.stream_termination(), - ), - ), - )); - } // else we return an error, stream should not have closed early. - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error( - request_id, - request.protocol(), - RPCError::IncompleteStream, - ), - )); - } - Poll::Pending => { - entry.get_mut().0 = OutboundSubstreamState::RequestPendingResponse { - substream, - request, - } - } - Poll::Ready(Some(Err(e))) => { - // drop the stream - let delay_key = &entry.get().1; - self.outbound_substreams_delay.remove(delay_key); - let protocol = entry.get().2; - entry.remove_entry(); - self.update_keep_alive(); - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Error(request_id, protocol, e), - )); - } - }, - OutboundSubstreamState::Closing(mut substream) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(_) => { - // drop the stream and its corresponding timeout - let delay_key = &entry.get().1; - let protocol = entry.get().2; - self.outbound_substreams_delay.remove(delay_key); - entry.remove_entry(); - self.update_keep_alive(); + // Check what type of response we got and report it accordingly + let id = entry.get().req_id; + let proto = entry.get().proto; - // report the stream termination to the user - // - // Streams can be terminated here if a responder tries to - // continue sending responses beyond what we would expect. Here - // we simply terminate the stream and report a stream - // termination to the application - match protocol { - Protocol::BlocksByRange => { - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response( - request_id, - RPCCodedResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ), - ), - )); - } - Protocol::BlocksByRoot => { - return Poll::Ready(ProtocolsHandlerEvent::Custom( - RPCEvent::Response( - request_id, - RPCCodedResponse::StreamTermination( - ResponseTermination::BlocksByRoot, - ), - ), - )); - } - _ => {} // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream. - } - } - Poll::Pending => { - entry.get_mut().0 = OutboundSubstreamState::Closing(substream); - } + let received = match response { + RPCCodedResponse::StreamTermination(t) => { + Ok(RPCReceived::EndOfStream(id, t)) + } + RPCCodedResponse::Success(resp) => Ok(RPCReceived::Response(id, resp)), + RPCCodedResponse::InvalidRequest(ref r) + | RPCCodedResponse::ServerError(ref r) + | RPCCodedResponse::Unknown(ref r) => { + let code = response.error_code().expect( + "Response indicating and error should map to an error code", + ); + Err(HandlerErr::Outbound { + id, + proto, + error: RPCError::ErrorResponse(code, r.clone()), + }) + } + }; + + return Poll::Ready(ProtocolsHandlerEvent::Custom(received)); + } + Poll::Ready(None) => { + // stream closed + // if we expected multiple streams send a stream termination, + // else report the stream terminating only. + //trace!(self.log, "RPC Response - stream closed by remote"); + // drop the stream + let delay_key = &entry.get().delay_key; + let request_id = *&entry.get().req_id; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + self.update_keep_alive(); + // notify the application error + if request.expected_responses() > 1 { + // return an end of stream result + return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok( + RPCReceived::EndOfStream(request_id, request.stream_termination()), + ))); + } + + // else we return an error, stream should not have closed early. + let outbound_err = HandlerErr::Outbound { + id: request_id, + proto: request.protocol(), + error: RPCError::IncompleteStream, + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err))); + } + Poll::Pending => { + entry.get_mut().state = + OutboundSubstreamState::RequestPendingResponse { substream, request } + } + Poll::Ready(Some(Err(e))) => { + // drop the stream + let delay_key = &entry.get().delay_key; + self.outbound_substreams_delay.remove(delay_key); + let outbound_err = HandlerErr::Outbound { + id: entry.get().req_id, + proto: entry.get().proto, + error: e, + }; + entry.remove_entry(); + self.update_keep_alive(); + return Poll::Ready(ProtocolsHandlerEvent::Custom(Err(outbound_err))); + } + }, + OutboundSubstreamState::Closing(mut substream) => { + match Sink::poll_close(Pin::new(&mut substream), cx) { + Poll::Ready(_) => { + // drop the stream and its corresponding timeout + let delay_key = &entry.get().delay_key; + let protocol = entry.get().proto; + let request_id = entry.get().req_id; + self.outbound_substreams_delay.remove(delay_key); + entry.remove_entry(); + self.update_keep_alive(); + + // report the stream termination to the user + // + // Streams can be terminated here if a responder tries to + // continue sending responses beyond what we would expect. Here + // we simply terminate the stream and report a stream + // termination to the application + let termination = match protocol { + Protocol::BlocksByRange => Some(ResponseTermination::BlocksByRange), + Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), + _ => None, // all other protocols are do not have multiple responses and we do not inform the user, we simply drop the stream. + }; + + if let Some(termination) = termination { + return Poll::Ready(ProtocolsHandlerEvent::Custom(Ok( + RPCReceived::EndOfStream(request_id, termination), + ))); } } - OutboundSubstreamState::Poisoned => { - crit!(self.log, "Poisoned outbound substream"); - unreachable!("Coding Error: Outbound substream is poisoned") + Poll::Pending => { + entry.get_mut().state = OutboundSubstreamState::Closing(substream); } } } - Entry::Vacant(_) => unreachable!(), + OutboundSubstreamState::Poisoned => { + crit!(self.log, "Poisoned outbound substream"); + unreachable!("Coding Error: Outbound substream is poisoned") + } } } @@ -980,7 +1041,7 @@ fn apply_queued_responses( InboundSubstreamState::Closing(substream) } chunk => InboundSubstreamState::ResponsePendingSend { - substream: substream, + substream, message: chunk, closing: false, }, @@ -992,3 +1053,14 @@ fn apply_queued_responses( } } } + +impl slog::Value for SubstreamId { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + slog::Value::serialize(&self.0, record, key, serializer) + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/methods.rs b/beacon_node/eth2-libp2p/src/rpc/methods.rs index 8870ffc033..167198fa8d 100644 --- a/beacon_node/eth2-libp2p/src/rpc/methods.rs +++ b/beacon_node/eth2-libp2p/src/rpc/methods.rs @@ -9,7 +9,16 @@ use types::{Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; /* Requests */ -pub type RequestId = usize; +/// Identifier of a request. +/// +// NOTE: The handler stores the `RequestId` to inform back of responses and errors, but it's execution +// is independent of the contents on this type. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestId { + Router, + Sync(usize), + Behaviour, +} /// The STATUS request/response handshake message. #[derive(Encode, Decode, Clone, Debug, PartialEq)] @@ -194,7 +203,7 @@ pub enum RPCCodedResponse { } /// The code assigned to an erroneous `RPCResponse`. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum RPCResponseErrorCode { InvalidRequest, ServerError, @@ -230,6 +239,15 @@ impl RPCCodedResponse { } } + /// Builds an RPCCodedResponse from a response code and an ErrorMessage + pub fn from_error_code(response_code: RPCResponseErrorCode, err: String) -> Self { + match response_code { + RPCResponseErrorCode::InvalidRequest => RPCCodedResponse::InvalidRequest(err), + RPCResponseErrorCode::ServerError => RPCCodedResponse::ServerError(err), + RPCResponseErrorCode::Unknown => RPCCodedResponse::Unknown(err), + } + } + /// Specifies which response allows for multiple chunks for the stream handler. pub fn multiple_responses(&self) -> bool { match self { @@ -333,3 +351,18 @@ impl std::fmt::Display for BlocksByRangeRequest { ) } } + +impl slog::Value for RequestId { + fn serialize( + &self, + record: &slog::Record, + key: slog::Key, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + match self { + RequestId::Behaviour => slog::Value::serialize("Behaviour", record, key, serializer), + RequestId::Router => slog::Value::serialize("Router", record, key, serializer), + RequestId::Sync(ref id) => slog::Value::serialize(id, record, key, serializer), + } + } +} diff --git a/beacon_node/eth2-libp2p/src/rpc/mod.rs b/beacon_node/eth2-libp2p/src/rpc/mod.rs index e276bf6b3a..1af2389d77 100644 --- a/beacon_node/eth2-libp2p/src/rpc/mod.rs +++ b/beacon_node/eth2-libp2p/src/rpc/mod.rs @@ -11,34 +11,69 @@ use libp2p::swarm::{ PollParameters, SubstreamProtocol, }; use libp2p::{Multiaddr, PeerId}; -pub use methods::{ - MetaData, RPCCodedResponse, RPCResponse, RPCResponseErrorCode, RequestId, ResponseTermination, - StatusMessage, -}; -pub use protocol::{Protocol, RPCError, RPCProtocol, RPCRequest}; use slog::{debug, o}; use std::marker::PhantomData; use std::task::{Context, Poll}; use std::time::Duration; use types::EthSpec; +pub(crate) use handler::HandlerErr; +pub(crate) use methods::{MetaData, Ping, RPCCodedResponse, RPCResponse}; +pub(crate) use protocol::{RPCProtocol, RPCRequest}; + +pub use handler::SubstreamId; +pub use methods::{ + BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RPCResponseErrorCode, RequestId, + ResponseTermination, StatusMessage, +}; +pub use protocol::{Protocol, RPCError}; + pub(crate) mod codec; mod handler; pub mod methods; mod protocol; -/// The return type used in the behaviour and the resultant event from the protocols handler. +/// RPC events sent from Lighthouse. #[derive(Debug, Clone)] -pub enum RPCEvent { - /// An inbound/outbound request for RPC protocol. The first parameter is a sequential - /// id which tracks an awaiting substream for the response. +pub enum RPCSend { + /// A request sent from Lighthouse. + /// + /// The `RequestId` is given by the application making the request. These + /// go over *outbound* connections. Request(RequestId, RPCRequest), - /// A response that is being sent or has been received from the RPC protocol. The first parameter returns - /// that which was sent with the corresponding request, the second is a single chunk of a - /// response. - Response(RequestId, RPCCodedResponse), - /// An Error occurred. - Error(RequestId, Protocol, RPCError), + /// A response sent from Lighthouse. + /// + /// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the + /// peer. The second parameter is a single chunk of a response. These go over *inbound* + /// connections. + Response(SubstreamId, RPCCodedResponse), +} + +/// RPC events received from outside Lighthouse. +#[derive(Debug, Clone)] +pub enum RPCReceived { + /// A request received from the outside. + /// + /// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the + /// *inbound* substream over which it is managed. + Request(SubstreamId, RPCRequest), + /// A response received from the outside. + /// + /// The `RequestId` corresponds to the application given ID of the original request sent to the + /// peer. The second parameter is a single chunk of a response. These go over *outbound* + /// connections. + Response(RequestId, RPCResponse), + /// Marks a request as completed + EndOfStream(RequestId, ResponseTermination), +} + +impl std::fmt::Display for RPCSend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RPCSend::Request(id, req) => write!(f, "RPC Request(id: {:?}, {})", id, req), + RPCSend::Response(id, res) => write!(f, "RPC Response(id: {:?}, {})", id, res), + } + } } /// Messages sent to the user from the RPC protocol. @@ -46,38 +81,14 @@ pub struct RPCMessage { /// The peer that sent the message. pub peer_id: PeerId, /// The message that was sent. - pub event: RPCEvent, -} - -impl RPCEvent { - pub fn id(&self) -> usize { - match *self { - RPCEvent::Request(id, _) => id, - RPCEvent::Response(id, _) => id, - RPCEvent::Error(id, _, _) => id, - } - } -} - -impl std::fmt::Display for RPCEvent { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - RPCEvent::Request(id, req) => write!(f, "RPC Request(id: {}, {})", id, req), - RPCEvent::Response(id, res) => write!(f, "RPC Response(id: {}, {})", id, res), - RPCEvent::Error(id, prot, err) => write!( - f, - "RPC Error(id: {}, protocol: {:?} error: {:?})", - id, prot, err - ), - } - } + pub event: as ProtocolsHandler>::OutEvent, } /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { - /// Queue of events to processed. - events: Vec, RPCMessage>>, + /// Queue of events to be processed. + events: Vec, RPCMessage>>, /// Slog logger for RPC behaviour. log: slog::Logger, } @@ -94,11 +105,11 @@ impl RPC { /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - pub fn send_rpc(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { + pub fn send_rpc(&mut self, peer_id: PeerId, event: RPCSend) { self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, - event: rpc_event, + event, }); } } @@ -129,8 +140,7 @@ where fn inject_connected(&mut self, peer_id: &PeerId) { // find the peer's meta-data debug!(self.log, "Requesting new peer's metadata"; "peer_id" => format!("{}",peer_id)); - let rpc_event = - RPCEvent::Request(RequestId::from(0usize), RPCRequest::MetaData(PhantomData)); + let rpc_event = RPCSend::Request(RequestId::Behaviour, RPCRequest::MetaData(PhantomData)); self.events.push(NetworkBehaviourAction::NotifyHandler { peer_id: peer_id.clone(), handler: NotifyHandler::Any, @@ -158,14 +168,14 @@ where fn inject_event( &mut self, - source: PeerId, + peer_id: PeerId, _: ConnectionId, event: ::OutEvent, ) { // send the event to the user self.events .push(NetworkBehaviourAction::GenerateEvent(RPCMessage { - peer_id: source, + peer_id, event, })); } diff --git a/beacon_node/eth2-libp2p/src/rpc/protocol.rs b/beacon_node/eth2-libp2p/src/rpc/protocol.rs index 808f695fa8..6f5beb7490 100644 --- a/beacon_node/eth2-libp2p/src/rpc/protocol.rs +++ b/beacon_node/eth2-libp2p/src/rpc/protocol.rs @@ -290,32 +290,19 @@ impl RPCRequest { /* These functions are used in the handler for stream management */ - /// This specifies whether a stream should remain open and await a response, given a request. - /// A GOODBYE request has no response. - pub fn expect_response(&self) -> bool { + /// Number of responses expected for this request. + pub fn expected_responses(&self) -> usize { match self { - RPCRequest::Status(_) => true, - RPCRequest::Goodbye(_) => false, - RPCRequest::BlocksByRange(_) => true, - RPCRequest::BlocksByRoot(_) => true, - RPCRequest::Ping(_) => true, - RPCRequest::MetaData(_) => true, - } - } - - /// Returns which methods expect multiple responses from the stream. If this is false and - /// the stream terminates, an error is given. - pub fn multiple_responses(&self) -> bool { - match self { - RPCRequest::Status(_) => false, - RPCRequest::Goodbye(_) => false, - RPCRequest::BlocksByRange(_) => true, - RPCRequest::BlocksByRoot(_) => true, - RPCRequest::Ping(_) => false, - RPCRequest::MetaData(_) => false, + RPCRequest::Status(_) => 1, + RPCRequest::Goodbye(_) => 0, + RPCRequest::BlocksByRange(req) => req.count as usize, + RPCRequest::BlocksByRoot(req) => req.block_roots.len(), + RPCRequest::Ping(_) => 1, + RPCRequest::MetaData(_) => 1, } } + /// Gives the corresponding `Protocol` to this request. pub fn protocol(&self) -> Protocol { match self { RPCRequest::Status(_) => Protocol::Status, @@ -390,7 +377,7 @@ pub enum RPCError { /// IO Error. IoError(String), /// The peer returned a valid response but the response indicated an error. - ErrorResponse(RPCResponseErrorCode), + ErrorResponse(RPCResponseErrorCode, String), /// Timed out waiting for a response. StreamTimeout, /// Peer does not support the protocol. @@ -430,7 +417,11 @@ impl std::fmt::Display for RPCError { RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), RPCError::InvalidData => write!(f, "Peer sent unexpected data"), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), - RPCError::ErrorResponse(ref code) => write!(f, "RPC response was an error: {}", code), + RPCError::ErrorResponse(ref code, ref reason) => write!( + f, + "RPC response was an error: {} with reason: {}", + code, reason + ), RPCError::StreamTimeout => write!(f, "Stream Timeout"), RPCError::UnsupportedProtocol => write!(f, "Peer does not support the protocol"), RPCError::IncompleteStream => write!(f, "Stream ended unexpectedly"), @@ -451,7 +442,7 @@ impl std::error::Error for RPCError { RPCError::IncompleteStream => None, RPCError::InvalidData => None, RPCError::InternalError(_) => None, - RPCError::ErrorResponse(_) => None, + RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, } } diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 4542855e82..e527ef7b63 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -1,6 +1,7 @@ -use crate::behaviour::{Behaviour, BehaviourEvent}; +use crate::behaviour::{Behaviour, BehaviourEvent, Request, Response}; use crate::discovery::enr; use crate::multiaddr::Protocol; +use crate::rpc::{RPCResponseErrorCode, RequestId, SubstreamId}; use crate::types::{error, GossipKind}; use crate::EnrExt; use crate::{NetworkConfig, NetworkGlobals}; @@ -229,126 +230,154 @@ impl Service { self.peer_ban_timeout.insert(peer_id, timeout); } + /// Sends a request to a peer, with a given Id. + pub fn send_request(&mut self, peer_id: PeerId, request_id: RequestId, request: Request) { + self.swarm.send_request(peer_id, request_id, request); + } + + /// Informs the peer that their request failed. + pub fn respond_with_error( + &mut self, + peer_id: PeerId, + stream_id: SubstreamId, + error: RPCResponseErrorCode, + reason: String, + ) { + self.swarm + ._send_error_reponse(peer_id, stream_id, error, reason); + } + + /// Sends a response to a peer's request. + pub fn send_response( + &mut self, + peer_id: PeerId, + stream_id: SubstreamId, + response: Response, + ) { + self.swarm + .send_successful_response(peer_id, stream_id, response); + } + pub async fn next_event(&mut self) -> Libp2pEvent { loop { tokio::select! { - event = self.swarm.next_event() => { - match event { - SwarmEvent::Behaviour(behaviour) => { - return Libp2pEvent::Behaviour(behaviour) - } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - } => { - debug!(self.log, "Connection established"; "peer_id"=> peer_id.to_string(), "connections" => num_established.get()); - // if this is the first connection inform the network layer a new connection - // has been established and update the db - if num_established.get() == 1 { - // update the peerdb - match endpoint { - ConnectedPoint::Listener { .. } => { - self.swarm.peer_manager().connect_ingoing(&peer_id); + event = self.swarm.next_event() => { + match event { + SwarmEvent::Behaviour(behaviour) => { + return Libp2pEvent::Behaviour(behaviour) + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + } => { + debug!(self.log, "Connection established"; "peer_id" => peer_id.to_string(), "connections" => num_established.get()); + // if this is the first connection inform the network layer a new connection + // has been established and update the db + if num_established.get() == 1 { + // update the peerdb + match endpoint { + ConnectedPoint::Listener { .. } => { + self.swarm.peer_manager().connect_ingoing(&peer_id); + } + ConnectedPoint::Dialer { .. } => self + .network_globals + .peers + .write() + .connect_outgoing(&peer_id), } - ConnectedPoint::Dialer { .. } => self - .network_globals - .peers - .write() - .connect_outgoing(&peer_id), + return Libp2pEvent::PeerConnected { peer_id, endpoint }; } - return Libp2pEvent::PeerConnected { peer_id, endpoint }; } - } - SwarmEvent::ConnectionClosed { - peer_id, - cause, - endpoint, - num_established, - } => { - debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established); - if num_established == 0 { - // update the peer_db - self.swarm.peer_manager().notify_disconnect(&peer_id); - // the peer has disconnected - return Libp2pEvent::PeerDisconnected { - peer_id, - endpoint, - }; + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint, + num_established, + } => { + debug!(self.log, "Connection closed"; "peer_id"=> peer_id.to_string(), "cause" => cause.to_string(), "connections" => num_established); + if num_established == 0 { + // update the peer_db + self.swarm.peer_manager().notify_disconnect(&peer_id); + // the peer has disconnected + return Libp2pEvent::PeerDisconnected { + peer_id, + endpoint, + }; + } + } + SwarmEvent::NewListenAddr(multiaddr) => { + return Libp2pEvent::NewListenAddr(multiaddr) } - } - SwarmEvent::NewListenAddr(multiaddr) => { - return Libp2pEvent::NewListenAddr(multiaddr) - } - SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - } => { - debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string()) - } - SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - } => { - debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string()) - } - SwarmEvent::BannedPeer { - peer_id, - endpoint: _, - } => { - debug!(self.log, "Attempted to dial a banned peer"; "peer_id" => peer_id.to_string()) - } - SwarmEvent::UnreachableAddr { - peer_id, - address, - error, - attempts_remaining, - } => { - debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining); - self.swarm.peer_manager().notify_disconnect(&peer_id); - } - SwarmEvent::UnknownPeerUnreachableAddr { address, error } => { - debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string()); - } - SwarmEvent::ExpiredListenAddr(multiaddr) => { - debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string()) - } - SwarmEvent::ListenerClosed { addresses, reason } => { - debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)) - } - SwarmEvent::ListenerError { error } => { - debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) - } - SwarmEvent::Dialing(peer_id) => { - debug!(self.log, "Dialing peer"; "peer" => peer_id.to_string()); - self.swarm.peer_manager().dialing_peer(&peer_id); + SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + } => { + debug!(self.log, "Incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string()) + } + SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + } => { + debug!(self.log, "Failed incoming connection"; "our_addr" => local_addr.to_string(), "from" => send_back_addr.to_string(), "error" => error.to_string()) + } + SwarmEvent::BannedPeer { + peer_id, + endpoint: _, + } => { + debug!(self.log, "Attempted to dial a banned peer"; "peer_id" => peer_id.to_string()) + } + SwarmEvent::UnreachableAddr { + peer_id, + address, + error, + attempts_remaining, + } => { + debug!(self.log, "Failed to dial address"; "peer_id" => peer_id.to_string(), "address" => address.to_string(), "error" => error.to_string(), "attempts_remaining" => attempts_remaining); + self.swarm.peer_manager().notify_disconnect(&peer_id); + } + SwarmEvent::UnknownPeerUnreachableAddr { address, error } => { + debug!(self.log, "Peer not known at dialed address"; "address" => address.to_string(), "error" => error.to_string()); + } + SwarmEvent::ExpiredListenAddr(multiaddr) => { + debug!(self.log, "Listen address expired"; "multiaddr" => multiaddr.to_string()) + } + SwarmEvent::ListenerClosed { addresses, reason } => { + debug!(self.log, "Listener closed"; "addresses" => format!("{:?}", addresses), "reason" => format!("{:?}", reason)) + } + SwarmEvent::ListenerError { error } => { + debug!(self.log, "Listener error"; "error" => format!("{:?}", error.to_string())) + } + SwarmEvent::Dialing(peer_id) => { + debug!(self.log, "Dialing peer"; "peer" => peer_id.to_string()); + self.swarm.peer_manager().dialing_peer(&peer_id); + } } } - } - Some(Ok(peer_to_ban)) = self.peers_to_ban.next() => { - let peer_id = peer_to_ban.into_inner(); - Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); - // TODO: Correctly notify protocols of the disconnect - // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 - self.swarm.inject_disconnected(&peer_id); - // inform the behaviour that the peer has been banned - self.swarm.peer_banned(peer_id); - } - Some(Ok(peer_to_unban)) = self.peer_ban_timeout.next() => { - debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_to_unban)); - let unban_peer = peer_to_unban.into_inner(); - self.swarm.peer_unbanned(&unban_peer); - Swarm::unban_peer_id(&mut self.swarm, unban_peer); - } + Some(Ok(peer_to_ban)) = self.peers_to_ban.next() => { + let peer_id = peer_to_ban.into_inner(); + Swarm::ban_peer_id(&mut self.swarm, peer_id.clone()); + // TODO: Correctly notify protocols of the disconnect + // TODO: Also remove peer from the DHT: https://github.com/sigp/lighthouse/issues/629 + self.swarm.inject_disconnected(&peer_id); + // inform the behaviour that the peer has been banned + self.swarm.peer_banned(peer_id); + } + Some(Ok(peer_to_unban)) = self.peer_ban_timeout.next() => { + debug!(self.log, "Peer has been unbanned"; "peer" => format!("{:?}", peer_to_unban)); + let unban_peer = peer_to_unban.into_inner(); + self.swarm.peer_unbanned(&unban_peer); + Swarm::unban_peer_id(&mut self.swarm, unban_peer); + } } } } } -/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption layer, and -/// mplex or yamux as the multiplexing layer. +/// The implementation supports TCP/IP, WebSockets over TCP/IP, noise/secio as the encryption +/// layer, and mplex or yamux as the multiplexing layer. fn build_transport( local_private_key: Keypair, ) -> Result, Error> { diff --git a/beacon_node/eth2-libp2p/tests/rpc_tests.rs b/beacon_node/eth2-libp2p/tests/rpc_tests.rs index 847be08842..7cd5b059f0 100644 --- a/beacon_node/eth2-libp2p/tests/rpc_tests.rs +++ b/beacon_node/eth2-libp2p/tests/rpc_tests.rs @@ -1,7 +1,6 @@ #![cfg(test)] use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::*; -use eth2_libp2p::{BehaviourEvent, Libp2pEvent, RPCEvent}; +use eth2_libp2p::{BehaviourEvent, Libp2pEvent, Request, Response}; use slog::{debug, warn, Level}; use std::time::Duration; use tokio::time::delay_for; @@ -26,7 +25,7 @@ async fn test_status_rpc() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // Dummy STATUS RPC message - let rpc_request = RPCRequest::Status(StatusMessage { + let rpc_request = Request::Status(StatusMessage { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), @@ -35,7 +34,7 @@ async fn test_status_rpc() { }); // Dummy STATUS RPC message - let rpc_response = RPCResponse::Status(StatusMessage { + let rpc_response = Response::Status(StatusMessage { fork_digest: [0; 4], finalized_root: Hash256::from_low_u64_be(0), finalized_epoch: Epoch::new(1), @@ -52,26 +51,19 @@ async fn test_status_rpc() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => { // Should receive the RPC response - RPCEvent::Response(id, response @ RPCCodedResponse::Success(_)) => { - if id == 10 { - debug!(log, "Sender Received"); - let response = { - match response { - RPCCodedResponse::Success(r) => r, - _ => unreachable!(), - } - }; - assert_eq!(response, rpc_response.clone()); - debug!(log, "Sender Completed"); - return; - } - } - _ => {} // Ignore other RPC messages - }, + debug!(log, "Sender Received"); + assert_eq!(response, rpc_response.clone()); + debug!(log, "Sender Completed"); + return; + } _ => {} } } @@ -81,23 +73,17 @@ async fn test_status_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - debug!(log, "Receiver Received"); - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCCodedResponse::Success(rpc_response.clone()), - ), - ); - } - } - _ => {} // Ignore other RPC requests + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }) => { + if request == rpc_request { + // send the response + debug!(log, "Receiver Received"); + receiver + .swarm + .send_successful_response(peer_id, id, rpc_response.clone()); } } _ => {} // Ignore other events @@ -129,7 +115,7 @@ async fn test_blocks_by_range_chunked_rpc() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // BlocksByRange Request - let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: messages_to_send, step: 0, @@ -142,7 +128,7 @@ async fn test_blocks_by_range_chunked_rpc() { message: empty_block, signature: Signature::empty_signature(), }; - let rpc_response = RPCResponse::BlocksByRange(Box::new(empty_signed)); + let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed))); // keep count of the number of messages received let mut messages_received = 0; @@ -155,31 +141,29 @@ async fn test_blocks_by_range_chunked_rpc() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { - // Should receive the RPC response - RPCEvent::Response(id, response) => { - if id == 10 { - warn!(log, "Sender received a response"); - match response { - RPCCodedResponse::Success(res) => { - assert_eq!(res, rpc_response.clone()); - messages_received += 1; - warn!(log, "Chunk received"); - } - RPCCodedResponse::StreamTermination(_) => { - // should be exactly 10 messages before terminating - assert_eq!(messages_received, messages_to_send); - // end the test - return; - } - _ => panic!("Invalid RPC received"), - } + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => { + warn!(log, "Sender received a response"); + match response { + Response::BlocksByRange(Some(_)) => { + assert_eq!(response, rpc_response.clone()); + messages_received += 1; + warn!(log, "Chunk received"); } + Response::BlocksByRange(None) => { + // should be exactly 10 messages before terminating + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => panic!("Invalid RPC received"), } - _ => {} // Ignore other RPC messages - }, + } _ => {} // Ignore other behaviour events } } @@ -189,36 +173,27 @@ async fn test_blocks_by_range_chunked_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - warn!(log, "Receiver got request"); - - for _ in 1..=messages_to_send { - receiver.swarm.send_rpc( - peer_id.clone(), - RPCEvent::Response( - id, - RPCCodedResponse::Success(rpc_response.clone()), - ), - ); - } - // send the stream termination - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCCodedResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ), - ), - ); - } + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + for _ in 1..=messages_to_send { + receiver.swarm.send_successful_response( + peer_id.clone(), + id, + rpc_response.clone(), + ); } - _ => {} // Ignore other events + // send the stream termination + receiver.swarm.send_successful_response( + peer_id, + id, + Response::BlocksByRange(None), + ); } } _ => {} // Ignore other events @@ -251,7 +226,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // BlocksByRange Request - let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: messages_to_send, step: 0, @@ -264,7 +239,7 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { message: empty_block, signature: Signature::empty_signature(), }; - let rpc_response = RPCResponse::BlocksByRange(Box::new(empty_signed)); + let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed))); // keep count of the number of messages received let mut messages_received: u64 = 0; @@ -277,28 +252,29 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { - // Should receive the RPC response - RPCEvent::Response(id, response) => { - if id == 10 { - debug!(log, "Sender received a response"); - match response { - RPCCodedResponse::Success(res) => { - assert_eq!(res, rpc_response.clone()); - messages_received += 1; - } - RPCCodedResponse::StreamTermination(_) => { - // should be exactly 10 messages, as requested - assert_eq!(messages_received, messages_to_send); - } - _ => panic!("Invalid RPC received"), - } + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => + // Should receive the RPC response + { + debug!(log, "Sender received a response"); + match response { + Response::BlocksByRange(Some(_)) => { + assert_eq!(response, rpc_response.clone()); + messages_received += 1; } + Response::BlocksByRange(None) => { + // should be exactly 10 messages, as requested + assert_eq!(messages_received, messages_to_send); + } + _ => panic!("Invalid RPC received"), } - _ => {} // Ignore other RPC messages - }, + } + _ => {} // Ignore other behaviour events } } @@ -320,21 +296,17 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { .await { futures::future::Either::Left(( - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)), + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }), _, )) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - warn!(log, "Receiver got request"); - message_info = Some((peer_id, id)); - } else { - continue; - } - } - _ => continue, // Ignore other events, don't send messages until ready + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + message_info = Some((peer_id, id)); } } futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required @@ -344,12 +316,11 @@ async fn test_blocks_by_range_chunked_rpc_terminates_correctly() { // if we need to send messages send them here. This will happen after a delay if message_info.is_some() { messages_sent += 1; - receiver.swarm.send_rpc( - message_info.as_ref().unwrap().0.clone(), - RPCEvent::Response( - message_info.as_ref().unwrap().1.clone(), - RPCCodedResponse::Success(rpc_response.clone()), - ), + let (peer_id, stream_id) = message_info.as_ref().unwrap(); + receiver.swarm.send_successful_response( + peer_id.clone(), + stream_id.clone(), + rpc_response.clone(), ); debug!(log, "Sending message {}", messages_sent); if messages_sent == messages_to_send + extra_messages_to_send { @@ -382,7 +353,7 @@ async fn test_blocks_by_range_single_empty_rpc() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // BlocksByRange Request - let rpc_request = RPCRequest::BlocksByRange(BlocksByRangeRequest { + let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { start_slot: 0, count: 10, step: 0, @@ -395,7 +366,7 @@ async fn test_blocks_by_range_single_empty_rpc() { message: empty_block, signature: Signature::empty_signature(), }; - let rpc_response = RPCResponse::BlocksByRange(Box::new(empty_signed)); + let rpc_response = Response::BlocksByRange(Some(Box::new(empty_signed))); let messages_to_send = 1; @@ -410,30 +381,25 @@ async fn test_blocks_by_range_single_empty_rpc() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { - // Should receive the RPC response - RPCEvent::Response(id, response) => { - if id == 10 { - warn!(log, "Sender received a response"); - match response { - RPCCodedResponse::Success(res) => { - assert_eq!(res, rpc_response.clone()); - messages_received += 1; - warn!(log, "Chunk received"); - } - RPCCodedResponse::StreamTermination(_) => { - // should be exactly 10 messages before terminating - assert_eq!(messages_received, messages_to_send); - // end the test - return; - } - _ => panic!("Invalid RPC received"), - } - } + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => match response { + Response::BlocksByRange(Some(_)) => { + assert_eq!(response, rpc_response.clone()); + messages_received += 1; + warn!(log, "Chunk received"); } - _ => {} // Ignore other RPC messages + Response::BlocksByRange(None) => { + // should be exactly 10 messages before terminating + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => panic!("Invalid RPC received"), }, _ => {} // Ignore other behaviour events } @@ -444,36 +410,28 @@ async fn test_blocks_by_range_single_empty_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - warn!(log, "Receiver got request"); + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }) => { + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); - for _ in 1..=messages_to_send { - receiver.swarm.send_rpc( - peer_id.clone(), - RPCEvent::Response( - id, - RPCCodedResponse::Success(rpc_response.clone()), - ), - ); - } - // send the stream termination - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCCodedResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ), - ), - ); - } + for _ in 1..=messages_to_send { + receiver.swarm.send_successful_response( + peer_id.clone(), + id, + rpc_response.clone(), + ); } - _ => {} // Ignore other events + // send the stream termination + receiver.swarm.send_successful_response( + peer_id, + id, + Response::BlocksByRange(None), + ); } } _ => {} // Ignore other events @@ -508,7 +466,7 @@ async fn test_blocks_by_root_chunked_rpc() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // BlocksByRoot Request - let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { + let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { block_roots: vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), @@ -522,7 +480,7 @@ async fn test_blocks_by_root_chunked_rpc() { message: full_block, signature: Signature::empty_signature(), }; - let rpc_response = RPCResponse::BlocksByRoot(Box::new(signed_full_block)); + let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block))); // keep count of the number of messages received let mut messages_received = 0; @@ -535,28 +493,23 @@ async fn test_blocks_by_root_chunked_rpc() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { - // Should receive the RPC response - RPCEvent::Response(id, response) => { - if id == 10 { - debug!(log, "Sender received a response"); - match response { - RPCCodedResponse::Success(res) => { - assert_eq!(res, rpc_response.clone()); - messages_received += 1; - debug!(log, "Chunk received"); - } - RPCCodedResponse::StreamTermination(_) => { - // should be exactly messages_to_send - assert_eq!(messages_received, messages_to_send); - // end the test - return; - } - _ => {} // Ignore other RPC messages - } - } + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => match response { + Response::BlocksByRoot(Some(_)) => { + assert_eq!(response, rpc_response.clone()); + messages_received += 1; + debug!(log, "Chunk received"); + } + Response::BlocksByRoot(None) => { + // should be exactly messages_to_send + assert_eq!(messages_received, messages_to_send); + // end the test + return; } _ => {} // Ignore other RPC messages }, @@ -569,38 +522,30 @@ async fn test_blocks_by_root_chunked_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - debug!(log, "Receiver got request"); + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }) => { + if request == rpc_request { + // send the response + debug!(log, "Receiver got request"); - for _ in 1..=messages_to_send { - receiver.swarm.send_rpc( - peer_id.clone(), - RPCEvent::Response( - id, - RPCCodedResponse::Success(rpc_response.clone()), - ), - ); - debug!(log, "Sending message"); - } - // send the stream termination - receiver.swarm.send_rpc( - peer_id, - RPCEvent::Response( - id, - RPCCodedResponse::StreamTermination( - ResponseTermination::BlocksByRange, - ), - ), - ); - debug!(log, "Send stream term"); - } + for _ in 1..=messages_to_send { + receiver.swarm.send_successful_response( + peer_id.clone(), + id, + rpc_response.clone(), + ); + debug!(log, "Sending message"); } - _ => {} // Ignore other events + // send the stream termination + receiver.swarm.send_successful_response( + peer_id, + id, + Response::BlocksByRange(None), + ); + debug!(log, "Send stream term"); } } _ => {} // Ignore other events @@ -633,7 +578,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // BlocksByRoot Request - let rpc_request = RPCRequest::BlocksByRoot(BlocksByRootRequest { + let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { block_roots: vec![ Hash256::from_low_u64_be(0), Hash256::from_low_u64_be(0), @@ -654,7 +599,7 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { message: full_block, signature: Signature::empty_signature(), }; - let rpc_response = RPCResponse::BlocksByRoot(Box::new(signed_full_block)); + let rpc_response = Response::BlocksByRoot(Some(Box::new(signed_full_block))); // keep count of the number of messages received let mut messages_received = 0; @@ -667,31 +612,29 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_, event)) => match event { - // Should receive the RPC response - RPCEvent::Response(id, response) => { - if id == 10 { - debug!(log, "Sender received a response"); - match response { - RPCCodedResponse::Success(res) => { - assert_eq!(res, rpc_response.clone()); - messages_received += 1; - debug!(log, "Chunk received"); - } - RPCCodedResponse::StreamTermination(_) => { - // should be exactly messages_to_send - assert_eq!(messages_received, messages_to_send); - // end the test - return; - } - _ => {} // Ignore other RPC messages - } + Libp2pEvent::Behaviour(BehaviourEvent::ResponseReceived { + peer_id: _, + id: RequestId::Sync(10), + response, + }) => { + debug!(log, "Sender received a response"); + match response { + Response::BlocksByRoot(Some(_)) => { + assert_eq!(response, rpc_response.clone()); + messages_received += 1; + debug!(log, "Chunk received"); } + Response::BlocksByRoot(None) => { + // should be exactly messages_to_send + assert_eq!(messages_received, messages_to_send); + // end the test + return; + } + _ => {} // Ignore other RPC messages } - _ => {} // Ignore other RPC messages - }, + } _ => {} // Ignore other behaviour events } } @@ -713,21 +656,17 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { .await { futures::future::Either::Left(( - Libp2pEvent::Behaviour(BehaviourEvent::RPC(peer_id, event)), + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id, + id, + request, + }), _, )) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - // send the response - warn!(log, "Receiver got request"); - message_info = Some((peer_id, id)); - } else { - continue; - } - } - _ => continue, // Ignore other events, don't send messages until ready + if request == rpc_request { + // send the response + warn!(log, "Receiver got request"); + message_info = Some((peer_id, id)); } } futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required @@ -737,12 +676,11 @@ async fn test_blocks_by_root_chunked_rpc_terminates_correctly() { // if we need to send messages send them here. This will happen after a delay if message_info.is_some() { messages_sent += 1; - receiver.swarm.send_rpc( - message_info.as_ref().unwrap().0.clone(), - RPCEvent::Response( - message_info.as_ref().unwrap().1.clone(), - RPCCodedResponse::Success(rpc_response.clone()), - ), + let (peer_id, stream_id) = message_info.as_ref().unwrap(); + receiver.swarm.send_successful_response( + peer_id.clone(), + stream_id.clone(), + rpc_response.clone(), ); debug!(log, "Sending message {}", messages_sent); if messages_sent == messages_to_send + extra_messages_to_send { @@ -775,7 +713,7 @@ async fn test_goodbye_rpc() { let (mut sender, mut receiver) = common::build_node_pair(&log).await; // Goodbye Request - let rpc_request = RPCRequest::Goodbye(GoodbyeReason::ClientShutdown); + let rpc_request = Request::Goodbye(GoodbyeReason::ClientShutdown); // build the sender future let sender_future = async { @@ -786,7 +724,7 @@ async fn test_goodbye_rpc() { debug!(log, "Sending RPC"); sender .swarm - .send_rpc(peer_id, RPCEvent::Request(10, rpc_request.clone())); + .send_request(peer_id, RequestId::Sync(10), rpc_request.clone()); } _ => {} // Ignore other RPC messages } @@ -797,18 +735,14 @@ async fn test_goodbye_rpc() { let receiver_future = async { loop { match receiver.next_event().await { - Libp2pEvent::Behaviour(BehaviourEvent::RPC(_peer_id, event)) => { - match event { - // Should receive sent RPC request - RPCEvent::Request(id, request) => { - if request == rpc_request { - assert_eq!(id, 0); - assert_eq!(rpc_request.clone(), request); // receives the goodbye. Nothing left to do - return; - } - } - _ => {} // Ignore other events - } + Libp2pEvent::Behaviour(BehaviourEvent::RequestReceived { + peer_id: _, + id: _, + request, + }) => { + // Should receive sent RPC request + assert_eq!(rpc_request.clone(), request); // receives the goodbye. Nothing left to do + return; } _ => {} // Ignore other events } diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 44e1014d79..025914d6bd 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -10,8 +10,8 @@ use crate::error; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockError}; use eth2_libp2p::{ - rpc::{RPCCodedResponse, RPCRequest, RPCResponse, RequestId, ResponseTermination}, - MessageId, NetworkGlobals, PeerId, PubsubMessage, RPCEvent, + rpc::{RPCError, RequestId, SubstreamId}, + MessageId, NetworkGlobals, PeerId, PubsubMessage, Request, Response, }; use futures::prelude::*; use processor::Processor; @@ -43,8 +43,24 @@ pub enum RouterMessage { PeerDialed(PeerId), /// Peer has disconnected, PeerDisconnected(PeerId), - /// An RPC response/request has been received. - RPC(PeerId, RPCEvent), + /// An RPC request has been received. + RPCRequestReceived { + peer_id: PeerId, + stream_id: SubstreamId, + request: Request, + }, + /// An RPC response has been received. + RPCResponseReceived { + peer_id: PeerId, + request_id: RequestId, + response: Response, + }, + /// An RPC request failed + RPCFailed { + peer_id: PeerId, + request_id: RequestId, + error: RPCError, + }, /// A gossip message has been received. The fields are: message id, the peer that sent us this /// message and the message itself. PubsubMessage(MessageId, PeerId, PubsubMessage), @@ -109,11 +125,32 @@ impl Router { RouterMessage::PeerDisconnected(peer_id) => { self.processor.on_disconnect(peer_id); } - // An RPC message request/response has been received - RouterMessage::RPC(peer_id, rpc_event) => { - self.handle_rpc_message(peer_id, rpc_event); + RouterMessage::RPCRequestReceived { + peer_id, + stream_id, + request, + } => { + self.handle_rpc_request(peer_id, stream_id, request); + } + RouterMessage::RPCResponseReceived { + peer_id, + request_id, + response, + } => { + self.handle_rpc_response(peer_id, request_id, response); + } + RouterMessage::RPCFailed { + peer_id, + request_id, + error, + } => { + warn!(self.log, "RPC Error"; + "peer_id" => peer_id.to_string(), + "request_id" => request_id, + "error" => error.to_string(), + "client" => self.network_globals.client(&peer_id).to_string()); + self.processor.on_rpc_error(peer_id, request_id); } - // An RPC message request/response has been received RouterMessage::PubsubMessage(id, peer_id, gossip) => { self.handle_gossip(id, peer_id, gossip); } @@ -122,32 +159,14 @@ impl Router { /* RPC - Related functionality */ - /// Handle RPC messages - fn handle_rpc_message(&mut self, peer_id: PeerId, rpc_message: RPCEvent) { - match rpc_message { - RPCEvent::Request(id, req) => self.handle_rpc_request(peer_id, id, req), - RPCEvent::Response(id, resp) => self.handle_rpc_response(peer_id, id, resp), - RPCEvent::Error(id, _protocol, error) => { - warn!(self.log, "RPC Error"; "peer_id" => peer_id.to_string(), "request_id" => id, "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); - self.processor.on_rpc_error(peer_id, id); - } - } - } - /// A new RPC request has been received from the network. - fn handle_rpc_request( - &mut self, - peer_id: PeerId, - request_id: RequestId, - request: RPCRequest, - ) { + fn handle_rpc_request(&mut self, peer_id: PeerId, stream_id: SubstreamId, request: Request) { match request { - RPCRequest::Status(status_message) => { + Request::Status(status_message) => { self.processor - .on_status_request(peer_id, request_id, status_message) + .on_status_request(peer_id, stream_id, status_message) } - RPCRequest::Goodbye(goodbye_reason) => { + Request::Goodbye(goodbye_reason) => { debug!( self.log, "Peer sent Goodbye"; "peer_id" => peer_id.to_string(), @@ -156,14 +175,12 @@ impl Router { ); self.processor.on_disconnect(peer_id); } - RPCRequest::BlocksByRange(request) => self + Request::BlocksByRange(request) => self .processor - .on_blocks_by_range_request(peer_id, request_id, request), - RPCRequest::BlocksByRoot(request) => self + .on_blocks_by_range_request(peer_id, stream_id, request), + Request::BlocksByRoot(request) => self .processor - .on_blocks_by_root_request(peer_id, request_id, request), - RPCRequest::Ping(_) => unreachable!("Ping MUST be handled in the behaviour"), - RPCRequest::MetaData(_) => unreachable!("MetaData MUST be handled in the behaviour"), + .on_blocks_by_root_request(peer_id, stream_id, request), } } @@ -173,71 +190,20 @@ impl Router { &mut self, peer_id: PeerId, request_id: RequestId, - error_response: RPCCodedResponse, + response: Response, ) { // an error could have occurred. - match error_response { - RPCCodedResponse::InvalidRequest(error) => { - warn!(self.log, "RPC Invalid Request"; - "peer_id" => peer_id.to_string(), - "request_id" => request_id, - "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); - self.processor.on_rpc_error(peer_id, request_id); + match response { + Response::Status(status_message) => { + self.processor.on_status_response(peer_id, status_message); } - RPCCodedResponse::ServerError(error) => { - warn!(self.log, "RPC Server Error" ; - "peer_id" => peer_id.to_string(), - "request_id" => request_id, - "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); - self.processor.on_rpc_error(peer_id, request_id); + Response::BlocksByRange(beacon_block) => { + self.processor + .on_blocks_by_range_response(peer_id, request_id, beacon_block); } - RPCCodedResponse::Unknown(error) => { - warn!(self.log, "RPC Unknown Error"; - "peer_id" => peer_id.to_string(), - "request_id" => request_id, - "error" => error.to_string(), - "client" => self.network_globals.client(&peer_id).to_string()); - self.processor.on_rpc_error(peer_id, request_id); - } - RPCCodedResponse::Success(response) => match response { - RPCResponse::Status(status_message) => { - self.processor.on_status_response(peer_id, status_message); - } - RPCResponse::BlocksByRange(beacon_block) => { - self.processor.on_blocks_by_range_response( - peer_id, - request_id, - Some(beacon_block), - ); - } - RPCResponse::BlocksByRoot(beacon_block) => { - self.processor.on_blocks_by_root_response( - peer_id, - request_id, - Some(beacon_block), - ); - } - RPCResponse::Pong(_) => { - unreachable!("Ping must be handled in the behaviour"); - } - RPCResponse::MetaData(_) => { - unreachable!("Meta data must be handled in the behaviour"); - } - }, - RPCCodedResponse::StreamTermination(response_type) => { - // have received a stream termination, notify the processing functions - match response_type { - ResponseTermination::BlocksByRange => { - self.processor - .on_blocks_by_range_response(peer_id, request_id, None); - } - ResponseTermination::BlocksByRoot => { - self.processor - .on_blocks_by_root_response(peer_id, request_id, None); - } - } + Response::BlocksByRoot(beacon_block) => { + self.processor + .on_blocks_by_root_response(peer_id, request_id, beacon_block); } } } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 3ed9358781..1fd6927c55 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -7,9 +7,8 @@ use beacon_chain::{ }, BeaconChain, BeaconChainTypes, BlockError, BlockProcessingOutcome, GossipVerifiedBlock, }; -use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCCodedResponse, RPCEvent, RPCRequest, RPCResponse, RequestId}; -use eth2_libp2p::{NetworkGlobals, PeerId}; +use eth2_libp2p::rpc::*; +use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; @@ -86,7 +85,10 @@ impl Processor { /// An error occurred during an RPC request. The state is maintained by the sync manager, so /// this function notifies the sync manager of the error. pub fn on_rpc_error(&mut self, peer_id: PeerId, request_id: RequestId) { - self.send_to_sync(SyncMessage::RPCError(peer_id, request_id)); + // Check if the failed RPC belongs to sync + if let RequestId::Sync(id) = request_id { + self.send_to_sync(SyncMessage::RPCError(peer_id, id)); + } } /// Sends a `Status` message to the peer. @@ -106,7 +108,7 @@ impl Processor { "head_slot" => format!("{}", status_message.head_slot), ); self.network - .send_rpc_request(peer_id, RPCRequest::Status(status_message)); + .send_processor_request(peer_id, Request::Status(status_message)); } } @@ -116,7 +118,7 @@ impl Processor { pub fn on_status_request( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: SubstreamId, status: StatusMessage, ) { debug!( @@ -133,10 +135,10 @@ impl Processor { // ignore status responses if we are shutting down if let Some(status_message) = status_message(&self.chain) { // Say status back. - self.network.send_rpc_response( + self.network.send_response( peer_id.clone(), + Response::Status(status_message), request_id, - RPCResponse::Status(status_message), ); } @@ -281,16 +283,16 @@ impl Processor { pub fn on_blocks_by_root_request( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: SubstreamId, request: BlocksByRootRequest, ) { let mut send_block_count = 0; for root in request.block_roots.iter() { if let Ok(Some(block)) = self.chain.store.get_block(root) { - self.network.send_rpc_response( + self.network.send_response( peer_id.clone(), + Response::BlocksByRoot(Some(Box::new(block))), request_id, - RPCResponse::BlocksByRoot(Box::new(block)), ); send_block_count += 1; } else { @@ -311,18 +313,15 @@ impl Processor { ); // send stream termination - self.network.send_rpc_error_response( - peer_id, - request_id, - RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRoot), - ); + self.network + .send_response(peer_id, Response::BlocksByRoot(None), request_id); } /// Handle a `BlocksByRange` request from the peer. pub fn on_blocks_by_range_request( &mut self, peer_id: PeerId, - request_id: RequestId, + request_id: SubstreamId, req: BlocksByRangeRequest, ) { debug!( @@ -388,10 +387,10 @@ impl Processor { && block.slot() < req.start_slot + req.count * req.step { blocks_sent += 1; - self.network.send_rpc_response( + self.network.send_response( peer_id.clone(), + Response::BlocksByRange(Some(Box::new(block))), request_id, - RPCResponse::BlocksByRange(Box::new(block)), ); } } else { @@ -425,11 +424,8 @@ impl Processor { } // send the stream terminator - self.network.send_rpc_error_response( - peer_id, - request_id, - RPCCodedResponse::StreamTermination(ResponseTermination::BlocksByRange), - ); + self.network + .send_response(peer_id, Response::BlocksByRange(None), request_id); } /// Handle a `BlocksByRange` response from the peer. @@ -446,11 +442,18 @@ impl Processor { "peer" => format!("{:?}", peer_id), ); - self.send_to_sync(SyncMessage::BlocksByRangeResponse { - peer_id, - request_id, - beacon_block, - }); + if let RequestId::Sync(id) = request_id { + self.send_to_sync(SyncMessage::BlocksByRangeResponse { + peer_id, + request_id: id, + beacon_block, + }); + } else { + debug!( + self.log, + "All blocks by range responses should belong to sync" + ); + } } /// Handle a `BlocksByRoot` response from the peer. @@ -466,11 +469,18 @@ impl Processor { "peer" => format!("{:?}", peer_id), ); - self.send_to_sync(SyncMessage::BlocksByRootResponse { - peer_id, - request_id, - beacon_block, - }); + if let RequestId::Sync(id) = request_id { + self.send_to_sync(SyncMessage::BlocksByRootResponse { + peer_id, + request_id: id, + beacon_block, + }); + } else { + debug!( + self.log, + "All Blocks by Root responses should belong to sync" + ) + } } /// Template function to be called on a block to determine if the block should be propagated @@ -902,8 +912,6 @@ pub(crate) fn status_message( /// Wraps a Network Channel to employ various RPC related network functionality for the /// processor. -/// The Processor doesn't manage it's own request Id's and can therefore only send -/// responses or requests with 0 request Ids. pub struct HandlerNetworkContext { /// The network channel to relay messages to the Network service. network_send: mpsc::UnboundedSender>, @@ -916,6 +924,12 @@ impl HandlerNetworkContext { Self { network_send, log } } + fn inform_network(&mut self, msg: NetworkMessage) { + self.network_send + .send(msg) + .unwrap_or_else(|_| warn!(self.log, "Could not send message to the network service")) + } + pub fn disconnect(&mut self, peer_id: PeerId, reason: GoodbyeReason) { warn!( &self.log, @@ -923,55 +937,42 @@ impl HandlerNetworkContext { "reason" => format!("{:?}", reason), "peer_id" => format!("{:?}", peer_id), ); - self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); - self.network_send - .send(NetworkMessage::Disconnect { peer_id }) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send a Disconnect to the network service" - ) - }); + self.send_processor_request(peer_id.clone(), Request::Goodbye(reason)); + self.inform_network(NetworkMessage::Disconnect { peer_id }); } - pub fn send_rpc_request(&mut self, peer_id: PeerId, rpc_request: RPCRequest) { - // the message handler cannot send requests with ids. Id's are managed by the sync - // manager. - let request_id = 0; - self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request)); - } - - /// Convenience function to wrap successful RPC Responses. - pub fn send_rpc_response( - &mut self, - peer_id: PeerId, - request_id: RequestId, - rpc_response: RPCResponse, - ) { - self.send_rpc_event( + pub fn send_processor_request(&mut self, peer_id: PeerId, request: Request) { + self.inform_network(NetworkMessage::SendRequest { peer_id, - RPCEvent::Response(request_id, RPCCodedResponse::Success(rpc_response)), - ); + request_id: RequestId::Router, + request, + }) } - /// Send an RPCCodedResponse. This handles errors and stream terminations. - pub fn send_rpc_error_response( + pub fn send_response( &mut self, peer_id: PeerId, - request_id: RequestId, - rpc_error_response: RPCCodedResponse, + response: Response, + stream_id: SubstreamId, ) { - self.send_rpc_event(peer_id, RPCEvent::Response(request_id, rpc_error_response)); + self.inform_network(NetworkMessage::SendResponse { + peer_id, + stream_id, + response, + }) } - - fn send_rpc_event(&mut self, peer_id: PeerId, rpc_event: RPCEvent) { - self.network_send - .send(NetworkMessage::RPC(peer_id, rpc_event)) - .unwrap_or_else(|_| { - warn!( - self.log, - "Could not send RPC message to the network service" - ) - }); + pub fn _send_error_response( + &mut self, + peer_id: PeerId, + substream_id: SubstreamId, + error: RPCResponseErrorCode, + reason: String, + ) { + self.inform_network(NetworkMessage::SendError { + peer_id, + error, + substream_id, + reason, + }) } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 856198ae6b..fad08ea901 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -7,8 +7,11 @@ use crate::{ use crate::{error, metrics}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use eth2_libp2p::Service as LibP2PService; -use eth2_libp2p::{rpc::RPCRequest, BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; -use eth2_libp2p::{Libp2pEvent, PubsubMessage, RPCEvent}; +use eth2_libp2p::{ + rpc::{RPCResponseErrorCode, RequestId, SubstreamId}, + Libp2pEvent, PubsubMessage, Request, Response, +}; +use eth2_libp2p::{BehaviourEvent, Enr, MessageId, NetworkGlobals, PeerId}; use futures::prelude::*; use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; @@ -123,6 +126,9 @@ fn spawn_service( // spawn on the current executor executor.spawn_without_exit(async move { + // TODO: there is something with this code that prevents cargo fmt from doing anything at + // all. Ok, it is worse, the compiler doesn't show errors over this code beyond ast + // checking loop { // build the futures to check simultaneously tokio::select! { @@ -150,13 +156,18 @@ fn spawn_service( info!(service.log, "Network service shutdown"); return; - } - // handle a message sent to the network - Some(message) = service.network_recv.recv() => { - match message { - NetworkMessage::RPC(peer_id, rpc_event) => { - trace!(service.log, "Sending RPC"; "rpc" => format!("{}", rpc_event)); - service.libp2p.swarm.send_rpc(peer_id, rpc_event); + } + // handle a message sent to the network + Some(message) = service.network_recv.recv() => { + match message { + NetworkMessage::SendRequest{ peer_id, request, request_id } => { + service.libp2p.send_request(peer_id, request_id, request); + } + NetworkMessage::SendResponse{ peer_id, response, stream_id } => { + service.libp2p.send_response(peer_id, stream_id, response); + } + NetworkMessage::SendError{ peer_id, error, substream_id, reason } => { + service.libp2p.respond_with_error(peer_id, substream_id, error, reason); } NetworkMessage::Propagate { propagation_source, @@ -177,8 +188,8 @@ fn spawn_service( info!(service.log, "Random filter did not propagate message"); } else { trace!(service.log, "Propagating gossipsub message"; - "propagation_peer" => format!("{:?}", propagation_source), - "message_id" => message_id.to_string(), + "propagation_peer" => format!("{:?}", propagation_source), + "message_id" => message_id.to_string(), ); service .libp2p @@ -229,124 +240,143 @@ fn spawn_service( .attestation_service .validator_subscriptions(subscriptions); } - } - } - // process any attestation service events - Some(attestation_service_message) = service.attestation_service.next() => { - match attestation_service_message { - // TODO: Implement - AttServiceMessage::Subscribe(subnet_id) => { - service.libp2p.swarm.subscribe_to_subnet(subnet_id); - } - AttServiceMessage::Unsubscribe(subnet_id) => { - service.libp2p.swarm.subscribe_to_subnet(subnet_id); - } - AttServiceMessage::EnrAdd(subnet_id) => { - service.libp2p.swarm.update_enr_subnet(subnet_id, true); - } - AttServiceMessage::EnrRemove(subnet_id) => { - service.libp2p.swarm.update_enr_subnet(subnet_id, false); - } - AttServiceMessage::DiscoverPeers(subnet_id) => { - service.libp2p.swarm.peers_request(subnet_id); } } - } - libp2p_event = service.libp2p.next_event() => { - // poll the swarm - match libp2p_event { - Libp2pEvent::Behaviour(event) => match event { - BehaviourEvent::RPC(peer_id, rpc_event) => { - // if we received a Goodbye message, drop and ban the peer - if let RPCEvent::Request(_, RPCRequest::Goodbye(_)) = rpc_event { - //peers_to_ban.push(peer_id.clone()); - service.libp2p.disconnect_and_ban_peer( - peer_id.clone(), - std::time::Duration::from_secs(BAN_PEER_TIMEOUT), - ); - }; - let _ = service - .router_send - .send(RouterMessage::RPC(peer_id, rpc_event)) - .map_err(|_| { - debug!(service.log, "Failed to send RPC to router"); - }); + // process any attestation service events + Some(attestation_service_message) = service.attestation_service.next() => { + match attestation_service_message { + // TODO: Implement + AttServiceMessage::Subscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); } - BehaviourEvent::StatusPeer(peer_id) => { - let _ = service - .router_send - .send(RouterMessage::StatusPeer(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send re-status peer to router"); - }); + AttServiceMessage::Unsubscribe(subnet_id) => { + service.libp2p.swarm.subscribe_to_subnet(subnet_id); } - BehaviourEvent::PubsubMessage { - id, - source, - message, - .. - } => { - // Update prometheus metrics. - expose_receive_metrics(&message); - match message { - // attestation information gets processed in the attestation service - PubsubMessage::Attestation(ref subnet_and_attestation) => { - let subnet = &subnet_and_attestation.0; - let attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we process - // the attestation - if service.attestation_service.should_process_attestation( - &id, - &source, - subnet, - attestation, - ) { + AttServiceMessage::EnrAdd(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, true); + } + AttServiceMessage::EnrRemove(subnet_id) => { + service.libp2p.swarm.update_enr_subnet(subnet_id, false); + } + AttServiceMessage::DiscoverPeers(subnet_id) => { + service.libp2p.swarm.peers_request(subnet_id); + } + } + } + libp2p_event = service.libp2p.next_event() => { + // poll the swarm + match libp2p_event { + Libp2pEvent::Behaviour(event) => match event { + BehaviourEvent::RequestReceived{peer_id, id, request} => { + if let Request::Goodbye(_) = request { + // if we received a Goodbye message, drop and ban the peer + //peers_to_ban.push(peer_id.clone()); + // TODO: remove this: https://github.com/sigp/lighthouse/issues/1240 + service.libp2p.disconnect_and_ban_peer( + peer_id.clone(), + std::time::Duration::from_secs(BAN_PEER_TIMEOUT), + ); + }; + let _ = service + .router_send + .send(RouterMessage::RPCRequestReceived{peer_id, stream_id:id, request}) + .map_err(|_| { + debug!(service.log, "Failed to send RPC to router"); + }); + } + BehaviourEvent::ResponseReceived{peer_id, id, response} => { + let _ = service + .router_send + .send(RouterMessage::RPCResponseReceived{ peer_id, request_id:id, response }) + .map_err(|_| { + debug!(service.log, "Failed to send RPC to router"); + }); + + } + BehaviourEvent::RPCFailed{id, peer_id, error} => { + let _ = service + .router_send + .send(RouterMessage::RPCFailed{ peer_id, request_id:id, error }) + .map_err(|_| { + debug!(service.log, "Failed to send RPC to router"); + }); + + } + BehaviourEvent::StatusPeer(peer_id) => { + let _ = service + .router_send + .send(RouterMessage::StatusPeer(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send re-status peer to router"); + }); + } + BehaviourEvent::PubsubMessage { + id, + source, + message, + .. + } => { + // Update prometheus metrics. + expose_receive_metrics(&message); + match message { + // attestation information gets processed in the attestation service + PubsubMessage::Attestation(ref subnet_and_attestation) => { + let subnet = &subnet_and_attestation.0; + let attestation = &subnet_and_attestation.1; + // checks if we have an aggregator for the slot. If so, we process + // the attestation + if service.attestation_service.should_process_attestation( + &id, + &source, + subnet, + attestation, + ) { + let _ = service + .router_send + .send(RouterMessage::PubsubMessage(id, source, message)) + .map_err(|_| { + debug!(service.log, "Failed to send pubsub message to router"); + }); + } else { + metrics::inc_counter(&metrics::GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED) + } + } + _ => { + // all else is sent to the router let _ = service .router_send .send(RouterMessage::PubsubMessage(id, source, message)) .map_err(|_| { debug!(service.log, "Failed to send pubsub message to router"); }); - } else { - metrics::inc_counter(&metrics::GOSSIP_UNAGGREGATED_ATTESTATIONS_IGNORED) } } - _ => { - // all else is sent to the router - let _ = service - .router_send - .send(RouterMessage::PubsubMessage(id, source, message)) - .map_err(|_| { - debug!(service.log, "Failed to send pubsub message to router"); - }); - } + } + BehaviourEvent::PeerSubscribed(_, _) => {}, + } + Libp2pEvent::NewListenAddr(multiaddr) => { + service.network_globals.listen_multiaddrs.write().push(multiaddr); + } + Libp2pEvent::PeerConnected{ peer_id, endpoint,} => { + debug!(service.log, "Peer Connected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); + if let eth2_libp2p::ConnectedPoint::Dialer { .. } = endpoint { + let _ = service + .router_send + .send(RouterMessage::PeerDialed(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer dialed to router"); }); } } - BehaviourEvent::PeerSubscribed(_, _) => {}, + Libp2pEvent::PeerDisconnected{ peer_id, endpoint,} => { + debug!(service.log, "Peer Disconnected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); + let _ = service + .router_send + .send(RouterMessage::PeerDisconnected(peer_id)) + .map_err(|_| { + debug!(service.log, "Failed to send peer disconnect to router"); + }); + } } - Libp2pEvent::NewListenAddr(multiaddr) => { - service.network_globals.listen_multiaddrs.write().push(multiaddr); - } - Libp2pEvent::PeerConnected{ peer_id, endpoint,} => { - debug!(service.log, "Peer Connected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); - if let eth2_libp2p::ConnectedPoint::Dialer { .. } = endpoint { - let _ = service - .router_send - .send(RouterMessage::PeerDialed(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer dialed to router"); }); - } - } - Libp2pEvent::PeerDisconnected{ peer_id, endpoint,} => { - debug!(service.log, "Peer Disconnected"; "peer_id" => peer_id.to_string(), "endpoint" => format!("{:?}", endpoint)); - let _ = service - .router_send - .send(RouterMessage::PeerDisconnected(peer_id)) - .map_err(|_| { - debug!(service.log, "Failed to send peer disconnect to router"); - }); - } - } } } @@ -384,8 +414,27 @@ pub enum NetworkMessage { Subscribe { subscriptions: Vec, }, - /// Send an RPC message to the libp2p service. - RPC(PeerId, RPCEvent), + /// Send an RPC request to the libp2p service. + SendRequest { + peer_id: PeerId, + request: Request, + request_id: RequestId, + }, + /// Send a successful Response to the libp2p service. + SendResponse { + peer_id: PeerId, + response: Response, + stream_id: SubstreamId, + }, + /// Respond to a peer's request with an error. + SendError { + // TODO: note that this is never used, we just say goodbye without nicely clossing the + // stream assigned to the request + peer_id: PeerId, + error: RPCResponseErrorCode, + reason: String, + substream_id: SubstreamId, + }, /// Publish a list of messages to the gossipsub protocol. Publish { messages: Vec> }, /// Propagate a received gossipsub message. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 99abc9495d..e5c035b7f0 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -37,9 +37,10 @@ use super::block_processor::{spawn_block_processor, BatchProcessResult, ProcessI use super::network_context::SyncNetworkContext; use super::peer_sync_info::{PeerSyncInfo, PeerSyncType}; use super::range_sync::{BatchId, ChainId, RangeSync}; +use super::RequestId; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessingOutcome}; -use eth2_libp2p::rpc::{methods::*, RequestId}; +use eth2_libp2p::rpc::BlocksByRootRequest; use eth2_libp2p::types::NetworkGlobals; use eth2_libp2p::PeerId; use fnv::FnvHashMap; diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 2e68dc6e81..2c0fcabb28 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -9,3 +9,6 @@ mod range_sync; pub use manager::SyncMessage; pub use peer_sync_info::PeerSyncInfo; + +/// Type of id of rpc requests sent by sync +pub type RequestId = usize; diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index a5813ff967..039c5db6d1 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -4,9 +4,8 @@ use crate::router::processor::status_message; use crate::service::NetworkMessage; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::{RPCEvent, RPCRequest, RequestId}; -use eth2_libp2p::{Client, NetworkGlobals, PeerId}; +use eth2_libp2p::rpc::{BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, RequestId}; +use eth2_libp2p::{Client, NetworkGlobals, PeerId, Request}; use slog::{debug, trace, warn}; use std::sync::Arc; use tokio::sync::mpsc; @@ -22,7 +21,7 @@ pub struct SyncNetworkContext { network_globals: Arc>, /// A sequential ID for all RPC requests. - request_id: RequestId, + request_id: usize, /// Logger for the `SyncNetworkContext`. log: slog::Logger, } @@ -68,7 +67,7 @@ impl SyncNetworkContext { "head_slot" => format!("{}", status_message.head_slot), ); - let _ = self.send_rpc_request(peer_id, RPCRequest::Status(status_message)); + let _ = self.send_rpc_request(peer_id, Request::Status(status_message)); } } @@ -76,7 +75,7 @@ impl SyncNetworkContext { &mut self, peer_id: PeerId, request: BlocksByRangeRequest, - ) -> Result { + ) -> Result { trace!( self.log, "Sending BlocksByRange Request"; @@ -84,14 +83,14 @@ impl SyncNetworkContext { "count" => request.count, "peer" => format!("{:?}", peer_id) ); - self.send_rpc_request(peer_id, RPCRequest::BlocksByRange(request)) + self.send_rpc_request(peer_id, Request::BlocksByRange(request)) } pub fn blocks_by_root_request( &mut self, peer_id: PeerId, request: BlocksByRootRequest, - ) -> Result { + ) -> Result { trace!( self.log, "Sending BlocksByRoot Request"; @@ -99,7 +98,7 @@ impl SyncNetworkContext { "count" => request.block_roots.len(), "peer" => format!("{:?}", peer_id) ); - self.send_rpc_request(peer_id, RPCRequest::BlocksByRoot(request)) + self.send_rpc_request(peer_id, Request::BlocksByRoot(request)) } pub fn downvote_peer(&mut self, peer_id: PeerId) { @@ -109,6 +108,10 @@ impl SyncNetworkContext { "peer" => format!("{:?}", peer_id) ); // TODO: Implement reputation + // TODO: what if we first close the channel sending a response + // RPCResponseErrorCode::InvalidRequest (or something) + // and then disconnect the peer? either request dc or let the behaviour have that logic + // itself self.disconnect(peer_id, GoodbyeReason::Fault); } @@ -121,7 +124,7 @@ impl SyncNetworkContext { ); // ignore the error if the channel send fails - let _ = self.send_rpc_request(peer_id.clone(), RPCRequest::Goodbye(reason)); + let _ = self.send_rpc_request(peer_id.clone(), Request::Goodbye(reason)); self.network_send .send(NetworkMessage::Disconnect { peer_id }) .unwrap_or_else(|_| { @@ -135,27 +138,22 @@ impl SyncNetworkContext { pub fn send_rpc_request( &mut self, peer_id: PeerId, - rpc_request: RPCRequest, - ) -> Result { + request: Request, + ) -> Result { let request_id = self.request_id; self.request_id += 1; - self.send_rpc_event(peer_id, RPCEvent::Request(request_id, rpc_request))?; + self.send_network_msg(NetworkMessage::SendRequest { + peer_id, + request_id: RequestId::Sync(request_id), + request, + })?; Ok(request_id) } - fn send_rpc_event( - &mut self, - peer_id: PeerId, - rpc_event: RPCEvent, - ) -> Result<(), &'static str> { - self.network_send - .send(NetworkMessage::RPC(peer_id, rpc_event)) - .map_err(|_| { - debug!( - self.log, - "Could not send RPC message to the network service" - ); - "Network channel send Failed" - }) + fn send_network_msg(&mut self, msg: NetworkMessage) -> Result<(), &'static str> { + self.network_send.send(msg).map_err(|_| { + debug!(self.log, "Could not send message to the network service"); + "Network channel send Failed" + }) } } diff --git a/beacon_node/network/src/sync/peer_sync_info.rs b/beacon_node/network/src/sync/peer_sync_info.rs index f03d1a1dfb..78d91282a5 100644 --- a/beacon_node/network/src/sync/peer_sync_info.rs +++ b/beacon_node/network/src/sync/peer_sync_info.rs @@ -1,7 +1,7 @@ use super::manager::SLOT_IMPORT_TOLERANCE; use crate::router::processor::status_message; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::methods::*; +use eth2_libp2p::rpc::*; use eth2_libp2p::SyncInfo; use std::ops::Sub; use std::sync::Arc; diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index bd8b604e3f..684c751260 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -1,6 +1,5 @@ use super::chain::EPOCHS_PER_BATCH; use eth2_libp2p::rpc::methods::*; -use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use fnv::FnvHashMap; use ssz::Encode; @@ -112,9 +111,9 @@ impl PartialOrd for Batch { /// This is used to optimise searches for idle peers (peers that have no outbound batch requests). pub struct PendingBatches { /// The current pending batches. - batches: FnvHashMap>, + batches: FnvHashMap>, /// A mapping of peers to the number of pending requests. - peer_requests: HashMap>, + peer_requests: HashMap>, } impl PendingBatches { @@ -125,7 +124,7 @@ impl PendingBatches { } } - pub fn insert(&mut self, request_id: RequestId, batch: Batch) -> Option> { + pub fn insert(&mut self, request_id: usize, batch: Batch) -> Option> { let peer_request = batch.current_peer.clone(); self.peer_requests .entry(peer_request) @@ -134,7 +133,7 @@ impl PendingBatches { self.batches.insert(request_id, batch) } - pub fn remove(&mut self, request_id: RequestId) -> Option> { + pub fn remove(&mut self, request_id: usize) -> Option> { if let Some(batch) = self.batches.remove(&request_id) { if let Entry::Occupied(mut entry) = self.peer_requests.entry(batch.current_peer.clone()) { @@ -157,7 +156,7 @@ impl PendingBatches { /// Adds a block to the batches if the request id exists. Returns None if there is no batch /// matching the request id. - pub fn add_block(&mut self, request_id: RequestId, block: SignedBeaconBlock) -> Option<()> { + pub fn add_block(&mut self, request_id: usize, block: SignedBeaconBlock) -> Option<()> { let batch = self.batches.get_mut(&request_id)?; batch.downloaded_blocks.push(block); Some(()) diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 3a98adcac5..05abf6ea32 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1,9 +1,8 @@ use super::batch::{Batch, BatchId, PendingBatches}; use crate::sync::block_processor::{spawn_block_processor, BatchProcessResult, ProcessId}; use crate::sync::network_context::SyncNetworkContext; -use crate::sync::SyncMessage; +use crate::sync::{RequestId, SyncMessage}; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::RequestId; use eth2_libp2p::PeerId; use rand::prelude::*; use slog::{crit, debug, warn}; diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index ca827082e1..f6a1d80e47 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -47,8 +47,8 @@ use crate::sync::block_processor::BatchProcessResult; use crate::sync::manager::SyncMessage; use crate::sync::network_context::SyncNetworkContext; use crate::sync::PeerSyncInfo; +use crate::sync::RequestId; use beacon_chain::{BeaconChain, BeaconChainTypes}; -use eth2_libp2p::rpc::RequestId; use eth2_libp2p::{NetworkGlobals, PeerId}; use slog::{debug, error, trace}; use std::collections::HashSet;