mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-30 03:14:25 +00:00
Improve rpc logic (#6400)
* update rpc imports to be explicit * avoid exposing HandlerEvent outside RPC it's unnecessary. * handle Pongs at RPC handler level
This commit is contained in:
@@ -4,7 +4,7 @@
|
|||||||
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
|
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
|
||||||
use super::outbound::OutboundRequestContainer;
|
use super::outbound::OutboundRequestContainer;
|
||||||
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
|
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
|
||||||
use super::{RPCReceived, RPCSend, ReqId};
|
use super::{RPCReceived, RPCResponse, RPCSend, ReqId};
|
||||||
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
|
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
|
||||||
use crate::rpc::protocol::InboundFramed;
|
use crate::rpc::protocol::InboundFramed;
|
||||||
use fnv::FnvHashMap;
|
use fnv::FnvHashMap;
|
||||||
@@ -14,7 +14,8 @@ use libp2p::swarm::handler::{
|
|||||||
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
|
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
|
||||||
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
|
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
|
||||||
};
|
};
|
||||||
use libp2p::swarm::Stream;
|
use libp2p::swarm::{ConnectionId, Stream};
|
||||||
|
use libp2p::PeerId;
|
||||||
use slog::{crit, debug, trace};
|
use slog::{crit, debug, trace};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{
|
use std::{
|
||||||
@@ -88,6 +89,12 @@ pub struct RPCHandler<Id, E>
|
|||||||
where
|
where
|
||||||
E: EthSpec,
|
E: EthSpec,
|
||||||
{
|
{
|
||||||
|
/// This `ConnectionId`.
|
||||||
|
id: ConnectionId,
|
||||||
|
|
||||||
|
/// The matching `PeerId` of this connection.
|
||||||
|
peer_id: PeerId,
|
||||||
|
|
||||||
/// The upgrade for inbound substreams.
|
/// The upgrade for inbound substreams.
|
||||||
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
||||||
|
|
||||||
@@ -218,12 +225,16 @@ where
|
|||||||
E: EthSpec,
|
E: EthSpec,
|
||||||
{
|
{
|
||||||
pub fn new(
|
pub fn new(
|
||||||
|
id: ConnectionId,
|
||||||
|
peer_id: PeerId,
|
||||||
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
||||||
fork_context: Arc<ForkContext>,
|
fork_context: Arc<ForkContext>,
|
||||||
log: &slog::Logger,
|
log: &slog::Logger,
|
||||||
resp_timeout: Duration,
|
resp_timeout: Duration,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
RPCHandler {
|
RPCHandler {
|
||||||
|
id,
|
||||||
|
peer_id,
|
||||||
listen_protocol,
|
listen_protocol,
|
||||||
events_out: SmallVec::new(),
|
events_out: SmallVec::new(),
|
||||||
dial_queue: SmallVec::new(),
|
dial_queue: SmallVec::new(),
|
||||||
@@ -892,6 +903,15 @@ where
|
|||||||
self.shutdown(None);
|
self.shutdown(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we received a Ping, we queue a Pong response.
|
||||||
|
if let InboundRequest::Ping(ping) = req {
|
||||||
|
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id);
|
||||||
|
self.send_response(
|
||||||
|
self.current_inbound_substream_id,
|
||||||
|
RPCCodedResponse::Success(RPCResponse::Pong(ping)),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
|
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
|
||||||
self.current_inbound_substream_id,
|
self.current_inbound_substream_id,
|
||||||
req,
|
req,
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ pub struct StatusMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// The PING request/response message.
|
/// The PING request/response message.
|
||||||
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
|
#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq)]
|
||||||
pub struct Ping {
|
pub struct Ping {
|
||||||
/// The metadata sequence number.
|
/// The metadata sequence number.
|
||||||
pub data: u64,
|
pub data: u64,
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ pub struct RPCMessage<Id, E: EthSpec> {
|
|||||||
/// Handler managing this message.
|
/// Handler managing this message.
|
||||||
pub conn_id: ConnectionId,
|
pub conn_id: ConnectionId,
|
||||||
/// The message that was sent.
|
/// The message that was sent.
|
||||||
pub event: HandlerEvent<Id, E>,
|
pub message: Result<RPCReceived<Id, E>, HandlerErr<Id>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>;
|
type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>;
|
||||||
@@ -245,6 +245,8 @@ where
|
|||||||
.log
|
.log
|
||||||
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
||||||
let handler = RPCHandler::new(
|
let handler = RPCHandler::new(
|
||||||
|
connection_id,
|
||||||
|
peer_id,
|
||||||
protocol,
|
protocol,
|
||||||
self.fork_context.clone(),
|
self.fork_context.clone(),
|
||||||
&log,
|
&log,
|
||||||
@@ -278,6 +280,8 @@ where
|
|||||||
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
||||||
|
|
||||||
let handler = RPCHandler::new(
|
let handler = RPCHandler::new(
|
||||||
|
connection_id,
|
||||||
|
peer_id,
|
||||||
protocol,
|
protocol,
|
||||||
self.fork_context.clone(),
|
self.fork_context.clone(),
|
||||||
&log,
|
&log,
|
||||||
@@ -311,7 +315,7 @@ where
|
|||||||
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
||||||
peer_id,
|
peer_id,
|
||||||
conn_id: connection_id,
|
conn_id: connection_id,
|
||||||
event: HandlerEvent::Err(HandlerErr::Outbound {
|
message: Err(HandlerErr::Outbound {
|
||||||
id,
|
id,
|
||||||
proto,
|
proto,
|
||||||
error: RPCError::Disconnected,
|
error: RPCError::Disconnected,
|
||||||
@@ -332,7 +336,7 @@ where
|
|||||||
*event = ToSwarm::GenerateEvent(RPCMessage {
|
*event = ToSwarm::GenerateEvent(RPCMessage {
|
||||||
peer_id,
|
peer_id,
|
||||||
conn_id: connection_id,
|
conn_id: connection_id,
|
||||||
event: HandlerEvent::Err(HandlerErr::Outbound {
|
message: Err(HandlerErr::Outbound {
|
||||||
id: *request_id,
|
id: *request_id,
|
||||||
proto: req.versioned_protocol().protocol(),
|
proto: req.versioned_protocol().protocol(),
|
||||||
error: RPCError::Disconnected,
|
error: RPCError::Disconnected,
|
||||||
@@ -351,16 +355,16 @@ where
|
|||||||
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
|
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
|
||||||
) {
|
) {
|
||||||
match event {
|
match event {
|
||||||
HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => {
|
HandlerEvent::Ok(RPCReceived::Request(id, req)) => {
|
||||||
if let Some(limiter) = self.limiter.as_mut() {
|
if let Some(limiter) = self.limiter.as_mut() {
|
||||||
// check if the request is conformant to the quota
|
// check if the request is conformant to the quota
|
||||||
match limiter.allows(&peer_id, req) {
|
match limiter.allows(&peer_id, &req) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
// send the event to the user
|
// send the event to the user
|
||||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||||
peer_id,
|
peer_id,
|
||||||
conn_id,
|
conn_id,
|
||||||
event,
|
message: Ok(RPCReceived::Request(id, req)),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
Err(RateLimitedErr::TooLarge) => {
|
Err(RateLimitedErr::TooLarge) => {
|
||||||
@@ -384,7 +388,7 @@ where
|
|||||||
// the handler upon receiving the error code will send it back to the behaviour
|
// the handler upon receiving the error code will send it back to the behaviour
|
||||||
self.send_response(
|
self.send_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
(conn_id, *id),
|
(conn_id, id),
|
||||||
RPCCodedResponse::Error(
|
RPCCodedResponse::Error(
|
||||||
RPCResponseErrorCode::RateLimited,
|
RPCResponseErrorCode::RateLimited,
|
||||||
"Rate limited. Request too large".into(),
|
"Rate limited. Request too large".into(),
|
||||||
@@ -398,7 +402,7 @@ where
|
|||||||
// the handler upon receiving the error code will send it back to the behaviour
|
// the handler upon receiving the error code will send it back to the behaviour
|
||||||
self.send_response(
|
self.send_response(
|
||||||
peer_id,
|
peer_id,
|
||||||
(conn_id, *id),
|
(conn_id, id),
|
||||||
RPCCodedResponse::Error(
|
RPCCodedResponse::Error(
|
||||||
RPCResponseErrorCode::RateLimited,
|
RPCResponseErrorCode::RateLimited,
|
||||||
format!("Wait {:?}", wait_time).into(),
|
format!("Wait {:?}", wait_time).into(),
|
||||||
@@ -411,10 +415,24 @@ where
|
|||||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||||
peer_id,
|
peer_id,
|
||||||
conn_id,
|
conn_id,
|
||||||
event,
|
message: Ok(RPCReceived::Request(id, req)),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
HandlerEvent::Ok(rpc) => {
|
||||||
|
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||||
|
peer_id,
|
||||||
|
conn_id,
|
||||||
|
message: Ok(rpc),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
HandlerEvent::Err(err) => {
|
||||||
|
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||||
|
peer_id,
|
||||||
|
conn_id,
|
||||||
|
message: Err(err),
|
||||||
|
}));
|
||||||
|
}
|
||||||
HandlerEvent::Close(_) => {
|
HandlerEvent::Close(_) => {
|
||||||
// Handle the close event here.
|
// Handle the close event here.
|
||||||
self.events.push(ToSwarm::CloseConnection {
|
self.events.push(ToSwarm::CloseConnection {
|
||||||
@@ -422,13 +440,6 @@ where
|
|||||||
connection: CloseConnection::All,
|
connection: CloseConnection::All,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
_ => {
|
|
||||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
|
||||||
peer_id,
|
|
||||||
conn_id,
|
|
||||||
event,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -463,8 +474,8 @@ where
|
|||||||
serializer: &mut dyn slog::Serializer,
|
serializer: &mut dyn slog::Serializer,
|
||||||
) -> slog::Result {
|
) -> slog::Result {
|
||||||
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
|
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
|
||||||
match &self.event {
|
match &self.message {
|
||||||
HandlerEvent::Ok(received) => {
|
Ok(received) => {
|
||||||
let (msg_kind, protocol) = match received {
|
let (msg_kind, protocol) = match received {
|
||||||
RPCReceived::Request(_, req) => {
|
RPCReceived::Request(_, req) => {
|
||||||
("request", req.versioned_protocol().protocol())
|
("request", req.versioned_protocol().protocol())
|
||||||
@@ -485,7 +496,7 @@ where
|
|||||||
serializer.emit_str("msg_kind", msg_kind)?;
|
serializer.emit_str("msg_kind", msg_kind)?;
|
||||||
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
|
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
|
||||||
}
|
}
|
||||||
HandlerEvent::Err(error) => {
|
Err(error) => {
|
||||||
let (msg_kind, protocol) = match &error {
|
let (msg_kind, protocol) = match &error {
|
||||||
HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto),
|
HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto),
|
||||||
HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto),
|
HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto),
|
||||||
@@ -493,9 +504,6 @@ where
|
|||||||
serializer.emit_str("msg_kind", msg_kind)?;
|
serializer.emit_str("msg_kind", msg_kind)?;
|
||||||
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
|
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
|
||||||
}
|
}
|
||||||
HandlerEvent::Close(err) => {
|
|
||||||
serializer.emit_arguments("handler_close", &format_args!("{}", err))?;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
slog::Result::Ok(())
|
slog::Result::Ok(())
|
||||||
|
|||||||
@@ -10,7 +10,11 @@ use crate::peer_manager::{
|
|||||||
};
|
};
|
||||||
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
|
||||||
use crate::rpc::methods::MetadataRequest;
|
use crate::rpc::methods::MetadataRequest;
|
||||||
use crate::rpc::*;
|
use crate::rpc::{
|
||||||
|
methods, BlocksByRangeRequest, GoodbyeReason, HandlerErr, InboundRequest, NetworkParams,
|
||||||
|
OutboundRequest, Protocol, RPCCodedResponse, RPCError, RPCMessage, RPCReceived, RPCResponse,
|
||||||
|
RPCResponseErrorCode, ResponseTermination, RPC,
|
||||||
|
};
|
||||||
use crate::service::behaviour::BehaviourEvent;
|
use crate::service::behaviour::BehaviourEvent;
|
||||||
pub use crate::service::behaviour::Gossipsub;
|
pub use crate::service::behaviour::Gossipsub;
|
||||||
use crate::types::{
|
use crate::types::{
|
||||||
@@ -1128,16 +1132,6 @@ impl<E: EthSpec> Network<E> {
|
|||||||
.send_request(peer_id, id, OutboundRequest::Ping(ping));
|
.send_request(peer_id, id, OutboundRequest::Ping(ping));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends a Pong response to the peer.
|
|
||||||
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
|
|
||||||
let ping = crate::rpc::Ping {
|
|
||||||
data: *self.network_globals.local_metadata.read().seq_number(),
|
|
||||||
};
|
|
||||||
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => %peer_id);
|
|
||||||
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
|
|
||||||
self.eth2_rpc_mut().send_response(peer_id, id, event);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sends a METADATA request to a peer.
|
/// Sends a METADATA request to a peer.
|
||||||
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
fn send_meta_data_request(&mut self, peer_id: PeerId) {
|
||||||
let event = if self.fork_context.spec.is_peer_das_scheduled() {
|
let event = if self.fork_context.spec.is_peer_das_scheduled() {
|
||||||
@@ -1406,10 +1400,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
let peer_id = event.peer_id;
|
let peer_id = event.peer_id;
|
||||||
|
|
||||||
// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
|
// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
|
||||||
if !self.peer_manager().is_connected(&peer_id)
|
if !self.peer_manager().is_connected(&peer_id) {
|
||||||
&& (matches!(event.event, HandlerEvent::Err(HandlerErr::Inbound { .. }))
|
|
||||||
|| matches!(event.event, HandlerEvent::Ok(RPCReceived::Request(..))))
|
|
||||||
{
|
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"Ignoring rpc message of disconnecting peer";
|
"Ignoring rpc message of disconnecting peer";
|
||||||
@@ -1420,8 +1411,8 @@ impl<E: EthSpec> Network<E> {
|
|||||||
|
|
||||||
let handler_id = event.conn_id;
|
let handler_id = event.conn_id;
|
||||||
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
|
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
|
||||||
match event.event {
|
match event.message {
|
||||||
HandlerEvent::Err(handler_err) => {
|
Err(handler_err) => {
|
||||||
match handler_err {
|
match handler_err {
|
||||||
HandlerErr::Inbound {
|
HandlerErr::Inbound {
|
||||||
id: _,
|
id: _,
|
||||||
@@ -1456,15 +1447,13 @@ impl<E: EthSpec> Network<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
HandlerEvent::Ok(RPCReceived::Request(id, request)) => {
|
Ok(RPCReceived::Request(id, request)) => {
|
||||||
let peer_request_id = (handler_id, id);
|
let peer_request_id = (handler_id, id);
|
||||||
match request {
|
match request {
|
||||||
/* Behaviour managed protocols: Ping and Metadata */
|
/* Behaviour managed protocols: Ping and Metadata */
|
||||||
InboundRequest::Ping(ping) => {
|
InboundRequest::Ping(ping) => {
|
||||||
// inform the peer manager and send the response
|
// inform the peer manager and send the response
|
||||||
self.peer_manager_mut().ping_request(&peer_id, ping.data);
|
self.peer_manager_mut().ping_request(&peer_id, ping.data);
|
||||||
// send a ping response
|
|
||||||
self.pong(peer_request_id, peer_id);
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
InboundRequest::MetaData(req) => {
|
InboundRequest::MetaData(req) => {
|
||||||
@@ -1587,7 +1576,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
HandlerEvent::Ok(RPCReceived::Response(id, resp)) => {
|
Ok(RPCReceived::Response(id, resp)) => {
|
||||||
match resp {
|
match resp {
|
||||||
/* Behaviour managed protocols */
|
/* Behaviour managed protocols */
|
||||||
RPCResponse::Pong(ping) => {
|
RPCResponse::Pong(ping) => {
|
||||||
@@ -1640,7 +1629,7 @@ impl<E: EthSpec> Network<E> {
|
|||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
HandlerEvent::Ok(RPCReceived::EndOfStream(id, termination)) => {
|
Ok(RPCReceived::EndOfStream(id, termination)) => {
|
||||||
let response = match termination {
|
let response = match termination {
|
||||||
ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
|
ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
|
||||||
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
|
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
|
||||||
@@ -1651,10 +1640,6 @@ impl<E: EthSpec> Network<E> {
|
|||||||
};
|
};
|
||||||
self.build_response(id, peer_id, response)
|
self.build_response(id, peer_id, response)
|
||||||
}
|
}
|
||||||
HandlerEvent::Close(_) => {
|
|
||||||
// NOTE: This is handled in the RPC behaviour.
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user