mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
fix Rpc Ping sequence number (#6408)
* fix Rpc Ping sequence number * bubble up Outbound Err's and Responses even if the peer disconnected * send pings via Rpc from main network * add comment to connected check * Merge branch 'unstable' into fix-ping-seq-number
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
|
||||
use super::outbound::OutboundRequestContainer;
|
||||
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
|
||||
use super::{RPCReceived, RPCResponse, RPCSend, ReqId};
|
||||
use super::{RPCReceived, RPCSend, ReqId};
|
||||
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
|
||||
use crate::rpc::protocol::InboundFramed;
|
||||
use fnv::FnvHashMap;
|
||||
@@ -14,8 +14,7 @@ use libp2p::swarm::handler::{
|
||||
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
|
||||
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
|
||||
};
|
||||
use libp2p::swarm::{ConnectionId, Stream};
|
||||
use libp2p::PeerId;
|
||||
use libp2p::swarm::Stream;
|
||||
use slog::{crit, debug, trace};
|
||||
use smallvec::SmallVec;
|
||||
use std::{
|
||||
@@ -89,12 +88,6 @@ pub struct RPCHandler<Id, E>
|
||||
where
|
||||
E: EthSpec,
|
||||
{
|
||||
/// This `ConnectionId`.
|
||||
id: ConnectionId,
|
||||
|
||||
/// The matching `PeerId` of this connection.
|
||||
peer_id: PeerId,
|
||||
|
||||
/// The upgrade for inbound substreams.
|
||||
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
||||
|
||||
@@ -225,16 +218,12 @@ where
|
||||
E: EthSpec,
|
||||
{
|
||||
pub fn new(
|
||||
id: ConnectionId,
|
||||
peer_id: PeerId,
|
||||
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
|
||||
fork_context: Arc<ForkContext>,
|
||||
log: &slog::Logger,
|
||||
resp_timeout: Duration,
|
||||
) -> Self {
|
||||
RPCHandler {
|
||||
id,
|
||||
peer_id,
|
||||
listen_protocol,
|
||||
events_out: SmallVec::new(),
|
||||
dial_queue: SmallVec::new(),
|
||||
@@ -903,15 +892,6 @@ where
|
||||
self.shutdown(None);
|
||||
}
|
||||
|
||||
// If we received a Ping, we queue a Pong response.
|
||||
if let InboundRequest::Ping(ping) = req {
|
||||
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id);
|
||||
self.send_response(
|
||||
self.current_inbound_substream_id,
|
||||
RPCCodedResponse::Success(RPCResponse::Pong(ping)),
|
||||
);
|
||||
}
|
||||
|
||||
self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
|
||||
self.current_inbound_substream_id,
|
||||
req,
|
||||
|
||||
@@ -14,7 +14,7 @@ use libp2p::swarm::{
|
||||
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
|
||||
use libp2p::PeerId;
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
|
||||
use slog::{crit, debug, o};
|
||||
use slog::{crit, debug, o, trace};
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -132,6 +132,8 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
|
||||
log: slog::Logger,
|
||||
/// Networking constant values
|
||||
network_params: NetworkParams,
|
||||
/// A sequential counter indicating when data gets modified.
|
||||
seq_number: u64,
|
||||
}
|
||||
|
||||
impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
@@ -142,6 +144,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
|
||||
log: slog::Logger,
|
||||
network_params: NetworkParams,
|
||||
seq_number: u64,
|
||||
) -> Self {
|
||||
let log = log.new(o!("service" => "libp2p_rpc"));
|
||||
|
||||
@@ -163,6 +166,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
enable_light_client_server,
|
||||
log,
|
||||
network_params,
|
||||
seq_number,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,6 +218,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
event: RPCSend::Shutdown(id, reason),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn update_seq_number(&mut self, seq_number: u64) {
|
||||
self.seq_number = seq_number
|
||||
}
|
||||
|
||||
/// Send a Ping request to the destination `PeerId` via `ConnectionId`.
|
||||
pub fn ping(&mut self, peer_id: PeerId, id: Id) {
|
||||
let ping = Ping {
|
||||
data: self.seq_number,
|
||||
};
|
||||
trace!(self.log, "Sending Ping"; "peer_id" => %peer_id);
|
||||
self.send_request(peer_id, id, OutboundRequest::Ping(ping));
|
||||
}
|
||||
}
|
||||
|
||||
impl<Id, E> NetworkBehaviour for RPC<Id, E>
|
||||
@@ -245,8 +262,6 @@ where
|
||||
.log
|
||||
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
||||
let handler = RPCHandler::new(
|
||||
connection_id,
|
||||
peer_id,
|
||||
protocol,
|
||||
self.fork_context.clone(),
|
||||
&log,
|
||||
@@ -280,8 +295,6 @@ where
|
||||
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
|
||||
|
||||
let handler = RPCHandler::new(
|
||||
connection_id,
|
||||
peer_id,
|
||||
protocol,
|
||||
self.fork_context.clone(),
|
||||
&log,
|
||||
@@ -359,14 +372,6 @@ where
|
||||
if let Some(limiter) = self.limiter.as_mut() {
|
||||
// check if the request is conformant to the quota
|
||||
match limiter.allows(&peer_id, &req) {
|
||||
Ok(()) => {
|
||||
// send the event to the user
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
message: Ok(RPCReceived::Request(id, req)),
|
||||
}))
|
||||
}
|
||||
Err(RateLimitedErr::TooLarge) => {
|
||||
// we set the batch sizes, so this is a coding/config err for most protocols
|
||||
let protocol = req.versioned_protocol().protocol();
|
||||
@@ -394,6 +399,7 @@ where
|
||||
"Rate limited. Request too large".into(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(RateLimitedErr::TooSoon(wait_time)) => {
|
||||
debug!(self.log, "Request exceeds the rate limit";
|
||||
@@ -408,16 +414,29 @@ where
|
||||
format!("Wait {:?}", wait_time).into(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
// No rate limiting, continue.
|
||||
Ok(_) => {}
|
||||
}
|
||||
} else {
|
||||
// No rate limiting, send the event to the user
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
message: Ok(RPCReceived::Request(id, req)),
|
||||
}))
|
||||
}
|
||||
// If we received a Ping, we queue a Pong response.
|
||||
if let InboundRequest::Ping(_) = req {
|
||||
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %conn_id, "peer_id" => %peer_id);
|
||||
self.send_response(
|
||||
peer_id,
|
||||
(conn_id, id),
|
||||
RPCCodedResponse::Success(RPCResponse::Pong(Ping {
|
||||
data: self.seq_number,
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
conn_id,
|
||||
message: Ok(RPCReceived::Request(id, req)),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Ok(rpc) => {
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
|
||||
@@ -159,38 +159,36 @@ impl<E: EthSpec> Network<E> {
|
||||
.collect();
|
||||
|
||||
// set up a collection of variables accessible outside of the network crate
|
||||
let network_globals = {
|
||||
// Create an ENR or load from disk if appropriate
|
||||
let enr = crate::discovery::enr::build_or_load_enr::<E>(
|
||||
local_keypair.clone(),
|
||||
&config,
|
||||
&ctx.enr_fork_id,
|
||||
&log,
|
||||
&ctx.chain_spec,
|
||||
)?;
|
||||
// Construct the metadata
|
||||
let custody_subnet_count = if ctx.chain_spec.is_peer_das_scheduled() {
|
||||
if config.subscribe_all_data_column_subnets {
|
||||
Some(ctx.chain_spec.data_column_sidecar_subnet_count)
|
||||
} else {
|
||||
Some(ctx.chain_spec.custody_requirement)
|
||||
}
|
||||
// Create an ENR or load from disk if appropriate
|
||||
let enr = crate::discovery::enr::build_or_load_enr::<E>(
|
||||
local_keypair.clone(),
|
||||
&config,
|
||||
&ctx.enr_fork_id,
|
||||
&log,
|
||||
&ctx.chain_spec,
|
||||
)?;
|
||||
|
||||
// Construct the metadata
|
||||
let custody_subnet_count = ctx.chain_spec.is_peer_das_scheduled().then(|| {
|
||||
if config.subscribe_all_data_column_subnets {
|
||||
ctx.chain_spec.data_column_sidecar_subnet_count
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let meta_data =
|
||||
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
|
||||
let globals = NetworkGlobals::new(
|
||||
enr,
|
||||
meta_data,
|
||||
trusted_peers,
|
||||
config.disable_peer_scoring,
|
||||
&log,
|
||||
config.clone(),
|
||||
ctx.chain_spec.clone(),
|
||||
);
|
||||
Arc::new(globals)
|
||||
};
|
||||
ctx.chain_spec.custody_requirement
|
||||
}
|
||||
});
|
||||
let meta_data =
|
||||
utils::load_or_build_metadata(&config.network_dir, custody_subnet_count, &log);
|
||||
let seq_number = *meta_data.seq_number();
|
||||
let globals = NetworkGlobals::new(
|
||||
enr,
|
||||
meta_data,
|
||||
trusted_peers,
|
||||
config.disable_peer_scoring,
|
||||
&log,
|
||||
config.clone(),
|
||||
ctx.chain_spec.clone(),
|
||||
);
|
||||
let network_globals = Arc::new(globals);
|
||||
|
||||
// Grab our local ENR FORK ID
|
||||
let enr_fork_id = network_globals
|
||||
@@ -338,6 +336,7 @@ impl<E: EthSpec> Network<E> {
|
||||
config.outbound_rate_limiter_config.clone(),
|
||||
log.clone(),
|
||||
network_params,
|
||||
seq_number,
|
||||
);
|
||||
|
||||
let discovery = {
|
||||
@@ -1104,33 +1103,26 @@ impl<E: EthSpec> Network<E> {
|
||||
.sync_committee_bitfield::<E>()
|
||||
.expect("Local discovery must have sync committee bitfield");
|
||||
|
||||
{
|
||||
// write lock scope
|
||||
let mut meta_data = self.network_globals.local_metadata.write();
|
||||
// write lock scope
|
||||
let mut meta_data_w = self.network_globals.local_metadata.write();
|
||||
|
||||
*meta_data.seq_number_mut() += 1;
|
||||
*meta_data.attnets_mut() = local_attnets;
|
||||
if let Ok(syncnets) = meta_data.syncnets_mut() {
|
||||
*syncnets = local_syncnets;
|
||||
}
|
||||
*meta_data_w.seq_number_mut() += 1;
|
||||
*meta_data_w.attnets_mut() = local_attnets;
|
||||
if let Ok(syncnets) = meta_data_w.syncnets_mut() {
|
||||
*syncnets = local_syncnets;
|
||||
}
|
||||
let seq_number = *meta_data_w.seq_number();
|
||||
let meta_data = meta_data_w.clone();
|
||||
|
||||
drop(meta_data_w);
|
||||
self.eth2_rpc_mut().update_seq_number(seq_number);
|
||||
// Save the updated metadata to disk
|
||||
utils::save_metadata_to_disk(
|
||||
&self.network_dir,
|
||||
self.network_globals.local_metadata.read().clone(),
|
||||
&self.log,
|
||||
);
|
||||
utils::save_metadata_to_disk(&self.network_dir, meta_data, &self.log);
|
||||
}
|
||||
|
||||
/// Sends a Ping request to the peer.
|
||||
fn ping(&mut self, peer_id: PeerId) {
|
||||
let ping = crate::rpc::Ping {
|
||||
data: *self.network_globals.local_metadata.read().seq_number(),
|
||||
};
|
||||
trace!(self.log, "Sending Ping"; "peer_id" => %peer_id);
|
||||
let id = RequestId::Internal;
|
||||
self.eth2_rpc_mut()
|
||||
.send_request(peer_id, id, OutboundRequest::Ping(ping));
|
||||
self.eth2_rpc_mut().ping(peer_id, RequestId::Internal);
|
||||
}
|
||||
|
||||
/// Sends a METADATA request to a peer.
|
||||
@@ -1400,8 +1392,12 @@ impl<E: EthSpec> Network<E> {
|
||||
fn inject_rpc_event(&mut self, event: RPCMessage<RequestId, E>) -> Option<NetworkEvent<E>> {
|
||||
let peer_id = event.peer_id;
|
||||
|
||||
// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
|
||||
if !self.peer_manager().is_connected(&peer_id) {
|
||||
// Do not permit Inbound events from peers that are being disconnected or RPC requests,
|
||||
// but allow `RpcFailed` and `HandlerErr::Outbound` to be bubble up to sync for state management.
|
||||
if !self.peer_manager().is_connected(&peer_id)
|
||||
&& (matches!(event.message, Err(HandlerErr::Inbound { .. }))
|
||||
|| matches!(event.message, Ok(RPCReceived::Request(..))))
|
||||
{
|
||||
debug!(
|
||||
self.log,
|
||||
"Ignoring rpc message of disconnecting peer";
|
||||
|
||||
Reference in New Issue
Block a user