Enable arithmetic lint in rate-limiter (#7025)

https://github.com/sigp/lighthouse/issues/6875


  - Enabled the linter in rate-limiter and fixed errors.
- Changed the type of `Quota::max_tokens` from `u64` to `NonZeroU64` because `max_tokens` cannot be zero.
- Added a test to ensure that a large value for `tokens`, which causes an overflow, is handled properly.
This commit is contained in:
Akihito Nakano
2025-05-28 00:43:22 +09:00
committed by GitHub
parent 7c89b970af
commit 8989ef8fb1
3 changed files with 58 additions and 28 deletions

View File

@@ -1,11 +1,11 @@
use super::{rate_limiter::Quota, Protocol};
use std::num::NonZeroU64;
use std::{
fmt::{Debug, Display},
str::FromStr,
time::Duration,
};
use super::{rate_limiter::Quota, Protocol};
use serde::{Deserialize, Serialize};
/// Auxiliary struct to aid on configuration parsing.
@@ -100,24 +100,30 @@ pub struct RateLimiterConfig {
}
impl RateLimiterConfig {
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15);
pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 10);
pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 5);
pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(NonZeroU64::new(5).unwrap(), 15);
pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10);
// The number is chosen to balance between upload bandwidth required to serve
// blocks and a decent syncing rate for honest nodes. Malicious nodes would need to
// spread out their requests over the time window to max out bandwidth on the server.
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10);
pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
// `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(896, 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(896, 10);
pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(896).unwrap(), 10);
pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(896).unwrap(), 10);
// 320 blocks worth of columns for regular node, or 40 blocks for supernode.
// Range sync load balances when requesting blocks, and each batch is 32 blocks.
pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = Quota::n_every(5120, 10);
pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(5120).unwrap(), 10);
// 512 columns per request from spec. This should be plenty as peers are unlikely to send all
// sampling requests to a single peer.
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(512, 10);
pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(512).unwrap(), 10);
pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10);
@@ -275,7 +281,7 @@ mod tests {
protocol: Protocol::Goodbye,
quota: Quota {
replenish_all_every: Duration::from_secs(10),
max_tokens: 8,
max_tokens: NonZeroU64::new(8).unwrap(),
},
};
assert_eq!(quota.to_string().parse(), Ok(quota))

View File

@@ -1,3 +1,5 @@
#![deny(clippy::arithmetic_side_effects)]
use super::config::RateLimiterConfig;
use crate::rpc::Protocol;
use fnv::FnvHashMap;
@@ -5,6 +7,7 @@ use libp2p::PeerId;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::hash::Hash;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -55,7 +58,7 @@ pub struct Quota {
pub(super) replenish_all_every: Duration,
/// Token limit. This translates on how large can an instantaneous batch of
/// tokens be.
pub(super) max_tokens: u64,
pub(super) max_tokens: NonZeroU64,
}
impl Quota {
@@ -63,12 +66,12 @@ impl Quota {
pub const fn one_every(seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: 1,
max_tokens: NonZeroU64::new(1).unwrap(),
}
}
/// Allow `n` tokens to be use used every `seconds`.
pub const fn n_every(n: u64, seconds: u64) -> Self {
pub const fn n_every(n: NonZeroU64, seconds: u64) -> Self {
Quota {
replenish_all_every: Duration::from_secs(seconds),
max_tokens: n,
@@ -236,7 +239,9 @@ impl RPCRateLimiterBuilder {
// check for peers to prune every 30 seconds, starting in 30 seconds
let prune_every = tokio::time::Duration::from_secs(30);
let prune_start = tokio::time::Instant::now() + prune_every;
let prune_start = tokio::time::Instant::now()
.checked_add(prune_every)
.ok_or("prune time overflow")?;
let prune_interval = tokio::time::interval_at(prune_start, prune_every);
Ok(RPCRateLimiter {
prune_interval,
@@ -412,14 +417,13 @@ pub struct Limiter<Key: Hash + Eq + Clone> {
impl<Key: Hash + Eq + Clone> Limiter<Key> {
pub fn from_quota(quota: Quota) -> Result<Self, &'static str> {
if quota.max_tokens == 0 {
return Err("Max number of tokens should be positive");
}
let tau = quota.replenish_all_every.as_nanos();
if tau == 0 {
return Err("Replenish time must be positive");
}
let t = (tau / quota.max_tokens as u128)
let t = tau
.checked_div(quota.max_tokens.get() as u128)
.expect("Division by zero never occurs, since Quota::max_token is of type NonZeroU64.")
.try_into()
.map_err(|_| "total replenish time is too long")?;
let tau = tau
@@ -442,7 +446,7 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
let tau = self.tau;
let t = self.t;
// how long does it take to replenish these tokens
let additional_time = t * tokens;
let additional_time = t.saturating_mul(tokens);
if additional_time > tau {
// the time required to process this amount of tokens is longer than the time that
// makes the bucket full. So, this batch can _never_ be processed
@@ -455,16 +459,16 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
.entry(key.clone())
.or_insert(time_since_start);
// check how soon could the request be made
let earliest_time = (*tat + additional_time).saturating_sub(tau);
let earliest_time = (*tat).saturating_add(additional_time).saturating_sub(tau);
// earliest_time is in the future
if time_since_start < earliest_time {
Err(RateLimitedErr::TooSoon(Duration::from_nanos(
/* time they need to wait, i.e. how soon were they */
earliest_time - time_since_start,
earliest_time.saturating_sub(time_since_start),
)))
} else {
// calculate the new TAT
*tat = time_since_start.max(*tat) + additional_time;
*tat = time_since_start.max(*tat).saturating_add(additional_time);
Ok(())
}
}
@@ -479,14 +483,15 @@ impl<Key: Hash + Eq + Clone> Limiter<Key> {
#[cfg(test)]
mod tests {
use crate::rpc::rate_limiter::{Limiter, Quota};
use crate::rpc::rate_limiter::{Limiter, Quota, RateLimitedErr};
use std::num::NonZeroU64;
use std::time::Duration;
#[test]
fn it_works_a() {
let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_secs(2),
max_tokens: 4,
max_tokens: NonZeroU64::new(4).unwrap(),
})
.unwrap();
let key = 10;
@@ -523,7 +528,7 @@ mod tests {
fn it_works_b() {
let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_secs(2),
max_tokens: 4,
max_tokens: NonZeroU64::new(4).unwrap(),
})
.unwrap();
let key = 10;
@@ -547,4 +552,22 @@ mod tests {
.allows(Duration::from_secs_f32(0.4), &key, 1)
.is_err());
}
#[test]
fn large_tokens() {
// These have been adjusted so that an overflow occurs when calculating `additional_time` in
// `Limiter::allows`. If we don't handle overflow properly, `Limiter::allows` returns `Ok`
// in this case.
let replenish_all_every = 2;
let tokens = u64::MAX / 2 + 1;
let mut limiter = Limiter::from_quota(Quota {
replenish_all_every: Duration::from_nanos(replenish_all_every),
max_tokens: NonZeroU64::new(1).unwrap(),
})
.unwrap();
let result = limiter.allows(Duration::from_secs_f32(0.0), &10, tokens);
assert!(matches!(result, Err(RateLimitedErr::TooLarge)));
}
}

View File

@@ -316,6 +316,7 @@ mod tests {
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use logging::create_test_tracing_subscriber;
use std::num::NonZeroU64;
use std::time::Duration;
use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot};
@@ -324,7 +325,7 @@ mod tests {
async fn test_next_peer_request_ready() {
create_test_tracing_subscriber();
let config = OutboundRateLimiterConfig(RateLimiterConfig {
ping_quota: Quota::n_every(1, 2),
ping_quota: Quota::n_every(NonZeroU64::new(1).unwrap(), 2),
..Default::default()
});
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(