mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 21:34:46 +00:00
Remove pending requests from ready_requests (#6625)
Co-Authored-By: ackintosh <sora.akatsuki@gmail.com> Co-Authored-By: João Oliveira <hello@jxs.pt>
This commit is contained in:
@@ -77,6 +77,14 @@ impl Quota {
|
||||
max_tokens: n,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub const fn n_every_millis(n: NonZeroU64, millis: u64) -> Self {
|
||||
Quota {
|
||||
replenish_all_every: Duration::from_millis(millis),
|
||||
max_tokens: n,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages rate limiting of requests per peer, with differentiated rates per protocol.
|
||||
|
||||
@@ -4,6 +4,10 @@ use super::{
|
||||
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
|
||||
};
|
||||
use crate::rpc::rate_limiter::RateLimiterItem;
|
||||
use futures::FutureExt;
|
||||
use libp2p::{PeerId, swarm::NotifyHandler};
|
||||
use logging::crit;
|
||||
use smallvec::SmallVec;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{
|
||||
collections::{HashMap, VecDeque, hash_map::Entry},
|
||||
@@ -11,11 +15,6 @@ use std::{
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use libp2p::{PeerId, swarm::NotifyHandler};
|
||||
use logging::crit;
|
||||
use smallvec::SmallVec;
|
||||
use tokio_util::time::DelayQueue;
|
||||
use tracing::debug;
|
||||
use types::{EthSpec, ForkContext};
|
||||
@@ -234,9 +233,29 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
|
||||
self.active_requests.remove(&peer_id);
|
||||
|
||||
let mut failed_requests = Vec::new();
|
||||
|
||||
self.ready_requests.retain(|(req_peer_id, rpc_send, _)| {
|
||||
if let RPCSend::Request(request_id, req) = rpc_send {
|
||||
if req_peer_id == &peer_id {
|
||||
failed_requests.push((*request_id, req.protocol()));
|
||||
// Remove the entry
|
||||
false
|
||||
} else {
|
||||
// Keep the entry
|
||||
true
|
||||
}
|
||||
} else {
|
||||
debug_assert!(
|
||||
false,
|
||||
"Coding error: unexpected RPCSend variant {rpc_send:?}."
|
||||
);
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
|
||||
// should never really be large. So we iterate for simplicity
|
||||
let mut failed_requests = Vec::new();
|
||||
self.delayed_requests
|
||||
.retain(|(map_peer_id, protocol), queue| {
|
||||
if map_peer_id == &peer_id {
|
||||
@@ -252,6 +271,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
failed_requests
|
||||
}
|
||||
|
||||
@@ -549,4 +569,72 @@ mod tests {
|
||||
.contains_key(&(peer2, Protocol::Ping))
|
||||
);
|
||||
}
|
||||
|
||||
/// Test that `peer_disconnected` returns the IDs of pending requests.
|
||||
#[tokio::test]
|
||||
async fn test_peer_disconnected_returns_failed_requests() {
|
||||
const REPLENISH_DURATION: u64 = 50;
|
||||
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
|
||||
Slot::new(0),
|
||||
Hash256::ZERO,
|
||||
&MainnetEthSpec::default_spec(),
|
||||
));
|
||||
let config = OutboundRateLimiterConfig(RateLimiterConfig {
|
||||
ping_quota: Quota::n_every_millis(NonZeroU64::new(1).unwrap(), REPLENISH_DURATION),
|
||||
..Default::default()
|
||||
});
|
||||
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
|
||||
SelfRateLimiter::new(Some(config), fork_context).unwrap();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
for i in 1..=5u32 {
|
||||
let result = limiter.allows(
|
||||
peer_id,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId {
|
||||
req_id: i,
|
||||
lookup_id: i,
|
||||
},
|
||||
}),
|
||||
RequestType::Ping(Ping { data: i as u64 }),
|
||||
);
|
||||
|
||||
// Check that the limiter allows the first request while other requests are added to the queue.
|
||||
if i == 1 {
|
||||
assert!(result.is_ok());
|
||||
} else {
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
|
||||
tokio::time::sleep(Duration::from_millis(REPLENISH_DURATION + 10)).await;
|
||||
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
|
||||
|
||||
// Check that one of the pending requests has moved to ready_requests.
|
||||
assert_eq!(
|
||||
limiter
|
||||
.delayed_requests
|
||||
.get(&(peer_id, Protocol::Ping))
|
||||
.unwrap()
|
||||
.len(),
|
||||
3
|
||||
);
|
||||
assert_eq!(limiter.ready_requests.len(), 1);
|
||||
|
||||
let mut failed_requests = limiter.peer_disconnected(peer_id);
|
||||
|
||||
// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
|
||||
assert_eq!(failed_requests.len(), 4);
|
||||
for i in 2..=5u32 {
|
||||
let (request_id, protocol) = failed_requests.remove(0);
|
||||
assert!(matches!(
|
||||
request_id,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId { req_id, .. },
|
||||
}) if req_id == i
|
||||
));
|
||||
assert_eq!(protocol, Protocol::Ping);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user