mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Error from RPC send_response when request doesn't exist on the active inbound requests (#7663)
Lighthouse is currently loggign a lot errors in the `RPC` behaviour whenever a response is received for a request_id that no longer exists in active_inbound_requests. This is likely due to a data race or timing issue (e.g., the peer disconnecting before the response is handled). This PR addresses that by removing the error logging from the RPC layer. Instead, RPC::send_response now simply returns an Err, shifting the responsibility to the main service. The main service can then determine whether the peer is still connected and only log an error if the peer remains connected. Thanks @ackintosh for helping debug!
This commit is contained in:
@@ -17,7 +17,7 @@ use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
use tracing::{debug, instrument, trace};
|
||||
use types::{EthSpec, ForkContext};
|
||||
|
||||
pub(crate) use handler::{HandlerErr, HandlerEvent};
|
||||
@@ -98,6 +98,13 @@ pub struct InboundRequestId {
|
||||
substream_id: SubstreamId,
|
||||
}
|
||||
|
||||
// An Active inbound request received via Rpc.
|
||||
struct ActiveInboundRequest<E: EthSpec> {
|
||||
pub peer_id: PeerId,
|
||||
pub request_type: RequestType<E>,
|
||||
pub peer_disconnected: bool,
|
||||
}
|
||||
|
||||
impl InboundRequestId {
|
||||
/// Creates an _unchecked_ [`InboundRequestId`].
|
||||
///
|
||||
@@ -150,7 +157,7 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
|
||||
/// Rate limiter for our own requests.
|
||||
outbound_request_limiter: SelfRateLimiter<Id, E>,
|
||||
/// Active inbound requests that are awaiting a response.
|
||||
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
|
||||
active_inbound_requests: HashMap<InboundRequestId, ActiveInboundRequest<E>>,
|
||||
/// Queue of events to be processed.
|
||||
events: Vec<BehaviourAction<Id, E>>,
|
||||
fork_context: Arc<ForkContext>,
|
||||
@@ -199,8 +206,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
}
|
||||
|
||||
/// Sends an RPC response.
|
||||
///
|
||||
/// The peer must be connected for this to succeed.
|
||||
/// Returns an `Err` if the request does exist in the active inbound requests list.
|
||||
#[instrument(parent = None,
|
||||
level = "trace",
|
||||
fields(service = "libp2p_rpc"),
|
||||
@@ -209,14 +215,16 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
)]
|
||||
pub fn send_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
request_id: InboundRequestId,
|
||||
response: RpcResponse<E>,
|
||||
) {
|
||||
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
|
||||
) -> Result<(), RpcResponse<E>> {
|
||||
let Some(ActiveInboundRequest {
|
||||
peer_id,
|
||||
request_type,
|
||||
peer_disconnected,
|
||||
}) = self.active_inbound_requests.remove(&request_id)
|
||||
else {
|
||||
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
|
||||
return;
|
||||
return Err(response);
|
||||
};
|
||||
|
||||
// Add the request back to active requests if the response is `Success` and requires stream
|
||||
@@ -224,11 +232,24 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
if request_type.protocol().terminator().is_some()
|
||||
&& matches!(response, RpcResponse::Success(_))
|
||||
{
|
||||
self.active_inbound_requests
|
||||
.insert(request_id, (peer_id, request_type.clone()));
|
||||
self.active_inbound_requests.insert(
|
||||
request_id,
|
||||
ActiveInboundRequest {
|
||||
peer_id,
|
||||
request_type: request_type.clone(),
|
||||
peer_disconnected,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if peer_disconnected {
|
||||
trace!(%peer_id, ?request_id, %response,
|
||||
"Discarding response, peer is no longer connected");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_response_inner(
|
||||
@@ -425,9 +446,10 @@ where
|
||||
self.events.push(error_msg);
|
||||
}
|
||||
|
||||
self.active_inbound_requests.retain(
|
||||
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
|
||||
);
|
||||
self.active_inbound_requests
|
||||
.values_mut()
|
||||
.filter(|request| request.peer_id == peer_id)
|
||||
.for_each(|request| request.peer_disconnected = true);
|
||||
|
||||
if let Some(limiter) = self.response_limiter.as_mut() {
|
||||
limiter.peer_disconnected(peer_id);
|
||||
@@ -468,9 +490,17 @@ where
|
||||
.active_inbound_requests
|
||||
.iter()
|
||||
.filter(
|
||||
|(_inbound_request_id, (request_peer_id, active_request_type))| {
|
||||
|(
|
||||
_inbound_request_id,
|
||||
ActiveInboundRequest {
|
||||
peer_id: request_peer_id,
|
||||
request_type: active_request_type,
|
||||
peer_disconnected,
|
||||
},
|
||||
)| {
|
||||
*request_peer_id == peer_id
|
||||
&& active_request_type.protocol() == request_type.protocol()
|
||||
&& !peer_disconnected
|
||||
},
|
||||
)
|
||||
.count()
|
||||
@@ -494,19 +524,25 @@ where
|
||||
}
|
||||
|
||||
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
|
||||
self.active_inbound_requests
|
||||
.insert(request_id, (peer_id, request_type.clone()));
|
||||
self.active_inbound_requests.insert(
|
||||
request_id,
|
||||
ActiveInboundRequest {
|
||||
peer_id,
|
||||
request_type: request_type.clone(),
|
||||
peer_disconnected: false,
|
||||
},
|
||||
);
|
||||
|
||||
// If we received a Ping, we queue a Pong response.
|
||||
if let RequestType::Ping(_) = request_type {
|
||||
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
|
||||
self.send_response(
|
||||
peer_id,
|
||||
request_id,
|
||||
RpcResponse::Success(RpcSuccessResponse::Pong(Ping {
|
||||
data: self.seq_number,
|
||||
})),
|
||||
);
|
||||
)
|
||||
.expect("Request to exist");
|
||||
}
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
|
||||
@@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY
|
||||
use crate::rpc::methods::MetadataRequest;
|
||||
use crate::rpc::{
|
||||
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
|
||||
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
|
||||
RpcSuccessResponse, RPC,
|
||||
RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC,
|
||||
};
|
||||
use crate::types::{
|
||||
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
|
||||
@@ -39,7 +38,7 @@ use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, info, instrument, trace, warn};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
use types::{
|
||||
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
|
||||
};
|
||||
@@ -1146,35 +1145,22 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
pub fn send_response(
|
||||
pub fn send_response<T: Into<RpcResponse<E>>>(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
response: Response<E>,
|
||||
response: T,
|
||||
) {
|
||||
self.eth2_rpc_mut()
|
||||
.send_response(peer_id, inbound_request_id, response.into())
|
||||
}
|
||||
|
||||
/// Inform the peer that their request produced an error.
|
||||
#[instrument(parent = None,
|
||||
level = "trace",
|
||||
fields(service = "libp2p"),
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
pub fn send_error_response(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
inbound_request_id: InboundRequestId,
|
||||
error: RpcErrorResponse,
|
||||
reason: String,
|
||||
) {
|
||||
self.eth2_rpc_mut().send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
RpcResponse::Error(error, reason.into()),
|
||||
)
|
||||
if let Err(response) = self
|
||||
.eth2_rpc_mut()
|
||||
.send_response(inbound_request_id, response.into())
|
||||
{
|
||||
if self.network_globals.peers.read().is_connected(&peer_id) {
|
||||
error!(%peer_id, ?inbound_request_id, %response,
|
||||
"Request not found in RPC active requests"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Peer management functions */
|
||||
@@ -1460,19 +1446,6 @@ impl<E: EthSpec> Network<E> {
|
||||
name = "libp2p",
|
||||
skip_all
|
||||
)]
|
||||
fn send_meta_data_response(
|
||||
&mut self,
|
||||
_req: MetadataRequest<E>,
|
||||
inbound_request_id: InboundRequestId,
|
||||
peer_id: PeerId,
|
||||
) {
|
||||
let metadata = self.network_globals.local_metadata.read().clone();
|
||||
// The encoder is responsible for sending the negotiated version of the metadata
|
||||
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
|
||||
self.eth2_rpc_mut()
|
||||
.send_response(peer_id, inbound_request_id, event);
|
||||
}
|
||||
|
||||
// RPC Propagation methods
|
||||
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
|
||||
#[must_use = "return the response"]
|
||||
@@ -1760,9 +1733,13 @@ impl<E: EthSpec> Network<E> {
|
||||
self.peer_manager_mut().ping_request(&peer_id, ping.data);
|
||||
None
|
||||
}
|
||||
RequestType::MetaData(req) => {
|
||||
RequestType::MetaData(_req) => {
|
||||
// send the requested meta-data
|
||||
self.send_meta_data_response(req, inbound_request_id, peer_id);
|
||||
let metadata = self.network_globals.local_metadata.read().clone();
|
||||
// The encoder is responsible for sending the negotiated version of the metadata
|
||||
let response =
|
||||
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
|
||||
self.send_response(peer_id, inbound_request_id, response);
|
||||
None
|
||||
}
|
||||
RequestType::Goodbye(reason) => {
|
||||
|
||||
@@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender;
|
||||
use futures::future::OptionFuture;
|
||||
use futures::prelude::*;
|
||||
|
||||
use lighthouse_network::rpc::methods::RpcResponse;
|
||||
use lighthouse_network::rpc::InboundRequestId;
|
||||
use lighthouse_network::rpc::RequestType;
|
||||
use lighthouse_network::service::Network;
|
||||
@@ -627,10 +628,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
|
||||
error,
|
||||
inbound_request_id,
|
||||
reason,
|
||||
} => {
|
||||
self.libp2p
|
||||
.send_error_response(peer_id, inbound_request_id, error, reason);
|
||||
}
|
||||
} => self.libp2p.send_response(
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
RpcResponse::Error(error, reason.into()),
|
||||
),
|
||||
NetworkMessage::ValidationResult {
|
||||
propagation_source,
|
||||
message_id,
|
||||
|
||||
Reference in New Issue
Block a user