From 889946c04b15798cb8314e284cbf866e76523632 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Wed, 11 Feb 2026 08:14:28 +0900 Subject: [PATCH] Remove pending requests from ready_requests (#6625) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: ackintosh Co-Authored-By: João Oliveira --- .../src/rpc/rate_limiter.rs | 8 ++ .../src/rpc/self_limiter.rs | 100 ++++++++++++++++-- 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 8b364f506c..2407038bc3 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -77,6 +77,14 @@ impl Quota { max_tokens: n, } } + + #[cfg(test)] + pub const fn n_every_millis(n: NonZeroU64, millis: u64) -> Self { + Quota { + replenish_all_every: Duration::from_millis(millis), + max_tokens: n, + } + } } /// Manages rate limiting of requests per peer, with differentiated rates per protocol. diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 90e2db9135..2a7ef955a1 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -4,6 +4,10 @@ use super::{ rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, }; use crate::rpc::rate_limiter::RateLimiterItem; +use futures::FutureExt; +use libp2p::{PeerId, swarm::NotifyHandler}; +use logging::crit; +use smallvec::SmallVec; use std::time::{SystemTime, UNIX_EPOCH}; use std::{ collections::{HashMap, VecDeque, hash_map::Entry}, @@ -11,11 +15,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -use futures::FutureExt; -use libp2p::{PeerId, swarm::NotifyHandler}; -use logging::crit; -use smallvec::SmallVec; use tokio_util::time::DelayQueue; use tracing::debug; use types::{EthSpec, ForkContext}; @@ -234,9 +233,29 @@ impl SelfRateLimiter { pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { self.active_requests.remove(&peer_id); + let mut failed_requests = Vec::new(); + + self.ready_requests.retain(|(req_peer_id, rpc_send, _)| { + if let RPCSend::Request(request_id, req) = rpc_send { + if req_peer_id == &peer_id { + failed_requests.push((*request_id, req.protocol())); + // Remove the entry + false + } else { + // Keep the entry + true + } + } else { + debug_assert!( + false, + "Coding error: unexpected RPCSend variant {rpc_send:?}." + ); + false + } + }); + // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // should never really be large. So we iterate for simplicity - let mut failed_requests = Vec::new(); self.delayed_requests .retain(|(map_peer_id, protocol), queue| { if map_peer_id == &peer_id { @@ -252,6 +271,7 @@ impl SelfRateLimiter { true } }); + failed_requests } @@ -549,4 +569,72 @@ mod tests { .contains_key(&(peer2, Protocol::Ping)) ); } + + /// Test that `peer_disconnected` returns the IDs of pending requests. + #[tokio::test] + async fn test_peer_disconnected_returns_failed_requests() { + const REPLENISH_DURATION: u64 = 50; + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let config = OutboundRateLimiterConfig(RateLimiterConfig { + ping_quota: Quota::n_every_millis(NonZeroU64::new(1).unwrap(), REPLENISH_DURATION), + ..Default::default() + }); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(Some(config), fork_context).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5u32 { + let result = limiter.allows( + peer_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + req_id: i, + lookup_id: i, + }, + }), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first request while other requests are added to the queue. + if i == 1 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + + // Wait until the tokens have been regenerated, then run `next_peer_request_ready`. + tokio::time::sleep(Duration::from_millis(REPLENISH_DURATION + 10)).await; + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + // Check that one of the pending requests has moved to ready_requests. + assert_eq!( + limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap() + .len(), + 3 + ); + assert_eq!(limiter.ready_requests.len(), 1); + + let mut failed_requests = limiter.peer_disconnected(peer_id); + + // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly. + assert_eq!(failed_requests.len(), 4); + for i in 2..=5u32 { + let (request_id, protocol) = failed_requests.remove(0); + assert!(matches!( + request_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + }) if req_id == i + )); + assert_eq!(protocol, Protocol::Ping); + } + } }