diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 6f338ebc8b..08e55e50c9 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -4,7 +4,7 @@ use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode}; use super::outbound::OutboundRequestContainer; use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol}; -use super::{RPCReceived, RPCSend, ReqId}; +use super::{RPCReceived, RPCResponse, RPCSend, ReqId}; use crate::rpc::outbound::{OutboundFramed, OutboundRequest}; use crate::rpc::protocol::InboundFramed; use fnv::FnvHashMap; @@ -14,7 +14,8 @@ use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, }; -use libp2p::swarm::Stream; +use libp2p::swarm::{ConnectionId, Stream}; +use libp2p::PeerId; use slog::{crit, debug, trace}; use smallvec::SmallVec; use std::{ @@ -88,6 +89,12 @@ pub struct RPCHandler where E: EthSpec, { + /// This `ConnectionId`. + id: ConnectionId, + + /// The matching `PeerId` of this connection. + peer_id: PeerId, + /// The upgrade for inbound substreams. listen_protocol: SubstreamProtocol, ()>, @@ -218,12 +225,16 @@ where E: EthSpec, { pub fn new( + id: ConnectionId, + peer_id: PeerId, listen_protocol: SubstreamProtocol, ()>, fork_context: Arc, log: &slog::Logger, resp_timeout: Duration, ) -> Self { RPCHandler { + id, + peer_id, listen_protocol, events_out: SmallVec::new(), dial_queue: SmallVec::new(), @@ -892,6 +903,15 @@ where self.shutdown(None); } + // If we received a Ping, we queue a Pong response. + if let InboundRequest::Ping(ping) = req { + trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id); + self.send_response( + self.current_inbound_substream_id, + RPCCodedResponse::Success(RPCResponse::Pong(ping)), + ); + } + self.events_out.push(HandlerEvent::Ok(RPCReceived::Request( self.current_inbound_substream_id, req, diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index a96b9d1b16..6e1ba9cd30 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -81,7 +81,7 @@ pub struct StatusMessage { } /// The PING request/response message. -#[derive(Encode, Decode, Clone, Debug, PartialEq)] +#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq)] pub struct Ping { /// The metadata sequence number. pub data: u64, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index cd591554a3..eae206e022 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -106,7 +106,7 @@ pub struct RPCMessage { /// Handler managing this message. pub conn_id: ConnectionId, /// The message that was sent. - pub event: HandlerEvent, + pub message: Result, HandlerErr>, } type BehaviourAction = ToSwarm, RPCSend>; @@ -245,6 +245,8 @@ where .log .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, @@ -278,6 +280,8 @@ where .new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string())); let handler = RPCHandler::new( + connection_id, + peer_id, protocol, self.fork_context.clone(), &log, @@ -311,7 +315,7 @@ where let error_msg = ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id: connection_id, - event: HandlerEvent::Err(HandlerErr::Outbound { + message: Err(HandlerErr::Outbound { id, proto, error: RPCError::Disconnected, @@ -332,7 +336,7 @@ where *event = ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id: connection_id, - event: HandlerEvent::Err(HandlerErr::Outbound { + message: Err(HandlerErr::Outbound { id: *request_id, proto: req.versioned_protocol().protocol(), error: RPCError::Disconnected, @@ -351,16 +355,16 @@ where event: ::ToBehaviour, ) { match event { - HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => { + HandlerEvent::Ok(RPCReceived::Request(id, req)) => { if let Some(limiter) = self.limiter.as_mut() { // check if the request is conformant to the quota - match limiter.allows(&peer_id, req) { + match limiter.allows(&peer_id, &req) { Ok(()) => { // send the event to the user self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id, - event, + message: Ok(RPCReceived::Request(id, req)), })) } Err(RateLimitedErr::TooLarge) => { @@ -384,7 +388,7 @@ where // the handler upon receiving the error code will send it back to the behaviour self.send_response( peer_id, - (conn_id, *id), + (conn_id, id), RPCCodedResponse::Error( RPCResponseErrorCode::RateLimited, "Rate limited. Request too large".into(), @@ -398,7 +402,7 @@ where // the handler upon receiving the error code will send it back to the behaviour self.send_response( peer_id, - (conn_id, *id), + (conn_id, id), RPCCodedResponse::Error( RPCResponseErrorCode::RateLimited, format!("Wait {:?}", wait_time).into(), @@ -411,10 +415,24 @@ where self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, conn_id, - event, + message: Ok(RPCReceived::Request(id, req)), })) } } + HandlerEvent::Ok(rpc) => { + self.events.push(ToSwarm::GenerateEvent(RPCMessage { + peer_id, + conn_id, + message: Ok(rpc), + })); + } + HandlerEvent::Err(err) => { + self.events.push(ToSwarm::GenerateEvent(RPCMessage { + peer_id, + conn_id, + message: Err(err), + })); + } HandlerEvent::Close(_) => { // Handle the close event here. self.events.push(ToSwarm::CloseConnection { @@ -422,13 +440,6 @@ where connection: CloseConnection::All, }); } - _ => { - self.events.push(ToSwarm::GenerateEvent(RPCMessage { - peer_id, - conn_id, - event, - })); - } } } @@ -463,8 +474,8 @@ where serializer: &mut dyn slog::Serializer, ) -> slog::Result { serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?; - match &self.event { - HandlerEvent::Ok(received) => { + match &self.message { + Ok(received) => { let (msg_kind, protocol) = match received { RPCReceived::Request(_, req) => { ("request", req.versioned_protocol().protocol()) @@ -485,7 +496,7 @@ where serializer.emit_str("msg_kind", msg_kind)?; serializer.emit_arguments("protocol", &format_args!("{}", protocol))?; } - HandlerEvent::Err(error) => { + Err(error) => { let (msg_kind, protocol) = match &error { HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto), HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto), @@ -493,9 +504,6 @@ where serializer.emit_str("msg_kind", msg_kind)?; serializer.emit_arguments("protocol", &format_args!("{}", protocol))?; } - HandlerEvent::Close(err) => { - serializer.emit_arguments("handler_close", &format_args!("{}", err))?; - } }; slog::Result::Ok(()) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index d97b52f79f..a97157ff0a 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -10,7 +10,11 @@ use crate::peer_manager::{ }; use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS}; use crate::rpc::methods::MetadataRequest; -use crate::rpc::*; +use crate::rpc::{ + methods, BlocksByRangeRequest, GoodbyeReason, HandlerErr, InboundRequest, NetworkParams, + OutboundRequest, Protocol, RPCCodedResponse, RPCError, RPCMessage, RPCReceived, RPCResponse, + RPCResponseErrorCode, ResponseTermination, RPC, +}; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; use crate::types::{ @@ -1128,16 +1132,6 @@ impl Network { .send_request(peer_id, id, OutboundRequest::Ping(ping)); } - /// Sends a Pong response to the peer. - fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) { - let ping = crate::rpc::Ping { - data: *self.network_globals.local_metadata.read().seq_number(), - }; - trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => %peer_id); - let event = RPCCodedResponse::Success(RPCResponse::Pong(ping)); - self.eth2_rpc_mut().send_response(peer_id, id, event); - } - /// Sends a METADATA request to a peer. fn send_meta_data_request(&mut self, peer_id: PeerId) { let event = if self.fork_context.spec.is_peer_das_scheduled() { @@ -1406,10 +1400,7 @@ impl Network { let peer_id = event.peer_id; // Do not permit Inbound events from peers that are being disconnected, or RPC requests. - if !self.peer_manager().is_connected(&peer_id) - && (matches!(event.event, HandlerEvent::Err(HandlerErr::Inbound { .. })) - || matches!(event.event, HandlerEvent::Ok(RPCReceived::Request(..)))) - { + if !self.peer_manager().is_connected(&peer_id) { debug!( self.log, "Ignoring rpc message of disconnecting peer"; @@ -1420,8 +1411,8 @@ impl Network { let handler_id = event.conn_id; // The METADATA and PING RPC responses are handled within the behaviour and not propagated - match event.event { - HandlerEvent::Err(handler_err) => { + match event.message { + Err(handler_err) => { match handler_err { HandlerErr::Inbound { id: _, @@ -1456,15 +1447,13 @@ impl Network { } } } - HandlerEvent::Ok(RPCReceived::Request(id, request)) => { + Ok(RPCReceived::Request(id, request)) => { let peer_request_id = (handler_id, id); match request { /* Behaviour managed protocols: Ping and Metadata */ InboundRequest::Ping(ping) => { // inform the peer manager and send the response self.peer_manager_mut().ping_request(&peer_id, ping.data); - // send a ping response - self.pong(peer_request_id, peer_id); None } InboundRequest::MetaData(req) => { @@ -1587,7 +1576,7 @@ impl Network { } } } - HandlerEvent::Ok(RPCReceived::Response(id, resp)) => { + Ok(RPCReceived::Response(id, resp)) => { match resp { /* Behaviour managed protocols */ RPCResponse::Pong(ping) => { @@ -1640,7 +1629,7 @@ impl Network { ), } } - HandlerEvent::Ok(RPCReceived::EndOfStream(id, termination)) => { + Ok(RPCReceived::EndOfStream(id, termination)) => { let response = match termination { ResponseTermination::BlocksByRange => Response::BlocksByRange(None), ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), @@ -1651,10 +1640,6 @@ impl Network { }; self.build_response(id, peer_id, response) } - HandlerEvent::Close(_) => { - // NOTE: This is handled in the RPC behaviour. - None - } } }