Error from RPC send_response when request doesn't exist on the active inbound requests (#7663)

Lighthouse is currently loggign a lot errors in the `RPC` behaviour whenever a response is received for a request_id that no longer exists in active_inbound_requests. This is likely due to a data race or timing issue (e.g., the peer disconnecting before the response is handled).
This PR addresses that by removing the error logging from the RPC layer. Instead, RPC::send_response now simply returns an Err, shifting the responsibility to the main service. The main service can then determine whether the peer is still connected and only log an error if the peer remains connected.
Thanks @ackintosh for helping debug!
This commit is contained in:
João Oliveira
2025-07-09 15:26:51 +01:00
committed by GitHub
parent 8e55684b06
commit 8b5ccacac9
3 changed files with 81 additions and 66 deletions

View File

@@ -17,7 +17,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, error, instrument, trace};
use tracing::{debug, instrument, trace};
use types::{EthSpec, ForkContext};
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -98,6 +98,13 @@ pub struct InboundRequestId {
substream_id: SubstreamId,
}
// An Active inbound request received via Rpc.
struct ActiveInboundRequest<E: EthSpec> {
pub peer_id: PeerId,
pub request_type: RequestType<E>,
pub peer_disconnected: bool,
}
impl InboundRequestId {
/// Creates an _unchecked_ [`InboundRequestId`].
///
@@ -150,7 +157,7 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
/// Rate limiter for our own requests.
outbound_request_limiter: SelfRateLimiter<Id, E>,
/// Active inbound requests that are awaiting a response.
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
active_inbound_requests: HashMap<InboundRequestId, ActiveInboundRequest<E>>,
/// Queue of events to be processed.
events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>,
@@ -199,8 +206,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
}
/// Sends an RPC response.
///
/// The peer must be connected for this to succeed.
/// Returns an `Err` if the request does exist in the active inbound requests list.
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p_rpc"),
@@ -209,14 +215,16 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
)]
pub fn send_response(
&mut self,
peer_id: PeerId,
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
) -> Result<(), RpcResponse<E>> {
let Some(ActiveInboundRequest {
peer_id,
request_type,
peer_disconnected,
}) = self.active_inbound_requests.remove(&request_id)
else {
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
return;
return Err(response);
};
// Add the request back to active requests if the response is `Success` and requires stream
@@ -224,11 +232,24 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
if request_type.protocol().terminator().is_some()
&& matches!(response, RpcResponse::Success(_))
{
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
self.active_inbound_requests.insert(
request_id,
ActiveInboundRequest {
peer_id,
request_type: request_type.clone(),
peer_disconnected,
},
);
}
if peer_disconnected {
trace!(%peer_id, ?request_id, %response,
"Discarding response, peer is no longer connected");
return Ok(());
}
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
Ok(())
}
fn send_response_inner(
@@ -425,9 +446,10 @@ where
self.events.push(error_msg);
}
self.active_inbound_requests.retain(
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
);
self.active_inbound_requests
.values_mut()
.filter(|request| request.peer_id == peer_id)
.for_each(|request| request.peer_disconnected = true);
if let Some(limiter) = self.response_limiter.as_mut() {
limiter.peer_disconnected(peer_id);
@@ -468,9 +490,17 @@ where
.active_inbound_requests
.iter()
.filter(
|(_inbound_request_id, (request_peer_id, active_request_type))| {
|(
_inbound_request_id,
ActiveInboundRequest {
peer_id: request_peer_id,
request_type: active_request_type,
peer_disconnected,
},
)| {
*request_peer_id == peer_id
&& active_request_type.protocol() == request_type.protocol()
&& !peer_disconnected
},
)
.count()
@@ -494,19 +524,25 @@ where
}
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
self.active_inbound_requests.insert(
request_id,
ActiveInboundRequest {
peer_id,
request_type: request_type.clone(),
peer_disconnected: false,
},
);
// If we received a Ping, we queue a Pong response.
if let RequestType::Ping(_) = request_type {
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
self.send_response(
peer_id,
request_id,
RpcResponse::Success(RpcSuccessResponse::Pong(Ping {
data: self.seq_number,
})),
);
)
.expect("Request to exist");
}
self.events.push(ToSwarm::GenerateEvent(RPCMessage {

View File

@@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY
use crate::rpc::methods::MetadataRequest;
use crate::rpc::{
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
RpcSuccessResponse, RPC,
RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC,
};
use crate::types::{
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
@@ -39,7 +38,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
@@ -1146,35 +1145,22 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
pub fn send_response(
pub fn send_response<T: Into<RpcResponse<E>>>(
&mut self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
response: Response<E>,
response: T,
) {
self.eth2_rpc_mut()
.send_response(peer_id, inbound_request_id, response.into())
}
/// Inform the peer that their request produced an error.
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
pub fn send_error_response(
&mut self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
error: RpcErrorResponse,
reason: String,
) {
self.eth2_rpc_mut().send_response(
peer_id,
inbound_request_id,
RpcResponse::Error(error, reason.into()),
)
if let Err(response) = self
.eth2_rpc_mut()
.send_response(inbound_request_id, response.into())
{
if self.network_globals.peers.read().is_connected(&peer_id) {
error!(%peer_id, ?inbound_request_id, %response,
"Request not found in RPC active requests"
);
}
}
}
/* Peer management functions */
@@ -1460,19 +1446,6 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
fn send_meta_data_response(
&mut self,
_req: MetadataRequest<E>,
inbound_request_id: InboundRequestId,
peer_id: PeerId,
) {
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
self.eth2_rpc_mut()
.send_response(peer_id, inbound_request_id, event);
}
// RPC Propagation methods
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
#[must_use = "return the response"]
@@ -1760,9 +1733,13 @@ impl<E: EthSpec> Network<E> {
self.peer_manager_mut().ping_request(&peer_id, ping.data);
None
}
RequestType::MetaData(req) => {
RequestType::MetaData(_req) => {
// send the requested meta-data
self.send_meta_data_response(req, inbound_request_id, peer_id);
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
let response =
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
self.send_response(peer_id, inbound_request_id, response);
None
}
RequestType::Goodbye(reason) => {

View File

@@ -11,6 +11,7 @@ use futures::channel::mpsc::Sender;
use futures::future::OptionFuture;
use futures::prelude::*;
use lighthouse_network::rpc::methods::RpcResponse;
use lighthouse_network::rpc::InboundRequestId;
use lighthouse_network::rpc::RequestType;
use lighthouse_network::service::Network;
@@ -627,10 +628,11 @@ impl<T: BeaconChainTypes> NetworkService<T> {
error,
inbound_request_id,
reason,
} => {
self.libp2p
.send_error_response(peer_id, inbound_request_id, error, reason);
}
} => self.libp2p.send_response(
peer_id,
inbound_request_id,
RpcResponse::Error(error, reason.into()),
),
NetworkMessage::ValidationResult {
propagation_source,
message_id,