Error from RPC send_response when request doesn't exist on the active inbound requests (#7663)

Lighthouse is currently loggign a lot errors in the `RPC` behaviour whenever a response is received for a request_id that no longer exists in active_inbound_requests. This is likely due to a data race or timing issue (e.g., the peer disconnecting before the response is handled).
This PR addresses that by removing the error logging from the RPC layer. Instead, RPC::send_response now simply returns an Err, shifting the responsibility to the main service. The main service can then determine whether the peer is still connected and only log an error if the peer remains connected.
Thanks @ackintosh for helping debug!
This commit is contained in:
João Oliveira
2025-07-09 15:26:51 +01:00
committed by GitHub
parent 8e55684b06
commit 8b5ccacac9
3 changed files with 81 additions and 66 deletions

View File

@@ -11,8 +11,7 @@ use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY
use crate::rpc::methods::MetadataRequest;
use crate::rpc::{
GoodbyeReason, HandlerErr, InboundRequestId, NetworkParams, Protocol, RPCError, RPCMessage,
RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse,
RpcSuccessResponse, RPC,
RPCReceived, RequestType, ResponseTermination, RpcResponse, RpcSuccessResponse, RPC,
};
use crate::types::{
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
@@ -39,7 +38,7 @@ use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, instrument, trace, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
@@ -1146,35 +1145,22 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
pub fn send_response(
pub fn send_response<T: Into<RpcResponse<E>>>(
&mut self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
response: Response<E>,
response: T,
) {
self.eth2_rpc_mut()
.send_response(peer_id, inbound_request_id, response.into())
}
/// Inform the peer that their request produced an error.
#[instrument(parent = None,
level = "trace",
fields(service = "libp2p"),
name = "libp2p",
skip_all
)]
pub fn send_error_response(
&mut self,
peer_id: PeerId,
inbound_request_id: InboundRequestId,
error: RpcErrorResponse,
reason: String,
) {
self.eth2_rpc_mut().send_response(
peer_id,
inbound_request_id,
RpcResponse::Error(error, reason.into()),
)
if let Err(response) = self
.eth2_rpc_mut()
.send_response(inbound_request_id, response.into())
{
if self.network_globals.peers.read().is_connected(&peer_id) {
error!(%peer_id, ?inbound_request_id, %response,
"Request not found in RPC active requests"
);
}
}
}
/* Peer management functions */
@@ -1460,19 +1446,6 @@ impl<E: EthSpec> Network<E> {
name = "libp2p",
skip_all
)]
fn send_meta_data_response(
&mut self,
_req: MetadataRequest<E>,
inbound_request_id: InboundRequestId,
peer_id: PeerId,
) {
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
let event = RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
self.eth2_rpc_mut()
.send_response(peer_id, inbound_request_id, event);
}
// RPC Propagation methods
/// Queues the response to be sent upwards as long at it was requested outside the Behaviour.
#[must_use = "return the response"]
@@ -1760,9 +1733,13 @@ impl<E: EthSpec> Network<E> {
self.peer_manager_mut().ping_request(&peer_id, ping.data);
None
}
RequestType::MetaData(req) => {
RequestType::MetaData(_req) => {
// send the requested meta-data
self.send_meta_data_response(req, inbound_request_id, peer_id);
let metadata = self.network_globals.local_metadata.read().clone();
// The encoder is responsible for sending the negotiated version of the metadata
let response =
RpcResponse::Success(RpcSuccessResponse::MetaData(Arc::new(metadata)));
self.send_response(peer_id, inbound_request_id, response);
None
}
RequestType::Goodbye(reason) => {