Fix SelfRateLimiter breaks the sequence of delayed_requests. (#5903)

* Add test for next_peer_request_ready

* Use push_front to requeue
This commit is contained in:
Akihito Nakano
2024-06-18 13:49:45 +09:00
committed by GitHub
parent 474c1b4486
commit c2838ba2bd
3 changed files with 72 additions and 1 deletions

View File

@@ -60,6 +60,7 @@ tempfile = { workspace = true }
quickcheck = { workspace = true }
quickcheck_macros = { workspace = true }
async-channel = { workspace = true }
logging = { workspace = true }
[features]
libp2p-websocket = []

View File

@@ -147,7 +147,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
queued_requests.push_back(rate_limited_req);
queued_requests.push_front(rate_limited_req);
// If one fails just wait for the next window that allows sending requests.
return;
}
@@ -205,3 +205,72 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{OutboundRequest, Ping, Protocol};
use crate::service::api_types::RequestId;
use libp2p::PeerId;
use std::time::Duration;
use types::MainnetEthSpec;
/// Test that `next_peer_request_ready` correctly maintains the queue.
#[tokio::test]
async fn test_next_peer_request_ready() {
let log = logging::test_logger();
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every(1, 2),
..Default::default()
});
let mut limiter: SelfRateLimiter<RequestId<u64>, MainnetEthSpec> =
SelfRateLimiter::new(config, log).unwrap();
let peer_id = PeerId::random();
for i in 1..=5 {
let _ = limiter.allows(
peer_id,
RequestId::Application(i),
OutboundRequest::Ping(Ping { data: i }),
);
}
{
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(4, queue.len());
// Check that requests in the queue are ordered in the sequence 2, 3, 4, 5.
let mut iter = queue.iter();
for i in 2..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
}
assert_eq!(limiter.ready_requests.len(), 0);
}
// Wait until the tokens have been regenerated, then run `next_peer_request_ready`.
tokio::time::sleep(Duration::from_secs(3)).await;
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
{
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
// Check that requests in the queue are ordered in the sequence 3, 4, 5.
let mut iter = queue.iter();
for i in 3..=5 {
assert_eq!(iter.next().unwrap().request_id, RequestId::Application(i));
}
assert_eq!(limiter.ready_requests.len(), 1);
}
}
}