diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 75d49e9cb5..7a746a63e1 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -1,11 +1,11 @@ +use super::{rate_limiter::Quota, Protocol}; +use std::num::NonZeroU64; use std::{ fmt::{Debug, Display}, str::FromStr, time::Duration, }; -use super::{rate_limiter::Quota, Protocol}; - use serde::{Deserialize, Serialize}; /// Auxiliary struct to aid on configuration parsing. @@ -100,24 +100,30 @@ pub struct RateLimiterConfig { } impl RateLimiterConfig { - pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(2, 10); - pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(2, 5); - pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(5, 15); + pub const DEFAULT_PING_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 10); + pub const DEFAULT_META_DATA_QUOTA: Quota = Quota::n_every(NonZeroU64::new(2).unwrap(), 5); + pub const DEFAULT_STATUS_QUOTA: Quota = Quota::n_every(NonZeroU64::new(5).unwrap(), 15); pub const DEFAULT_GOODBYE_QUOTA: Quota = Quota::one_every(10); // The number is chosen to balance between upload bandwidth required to serve // blocks and a decent syncing rate for honest nodes. Malicious nodes would need to // spread out their requests over the time window to max out bandwidth on the server. - pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = Quota::n_every(128, 10); - pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + pub const DEFAULT_BLOCKS_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); // `DEFAULT_BLOCKS_BY_RANGE_QUOTA` * (target + 1) to account for high usage - pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(896, 10); - pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(896, 10); + pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(896).unwrap(), 10); + pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(896).unwrap(), 10); // 320 blocks worth of columns for regular node, or 40 blocks for supernode. // Range sync load balances when requesting blocks, and each batch is 32 blocks. - pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = Quota::n_every(5120, 10); + pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(5120).unwrap(), 10); // 512 columns per request from spec. This should be plenty as peers are unlikely to send all // sampling requests to a single peer. - pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(512, 10); + pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(512).unwrap(), 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); @@ -275,7 +281,7 @@ mod tests { protocol: Protocol::Goodbye, quota: Quota { replenish_all_every: Duration::from_secs(10), - max_tokens: 8, + max_tokens: NonZeroU64::new(8).unwrap(), }, }; assert_eq!(quota.to_string().parse(), Ok(quota)) diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index f666c30d52..6e66999612 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -1,3 +1,5 @@ +#![deny(clippy::arithmetic_side_effects)] + use super::config::RateLimiterConfig; use crate::rpc::Protocol; use fnv::FnvHashMap; @@ -5,6 +7,7 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; use std::future::Future; use std::hash::Hash; +use std::num::NonZeroU64; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -55,7 +58,7 @@ pub struct Quota { pub(super) replenish_all_every: Duration, /// Token limit. This translates on how large can an instantaneous batch of /// tokens be. - pub(super) max_tokens: u64, + pub(super) max_tokens: NonZeroU64, } impl Quota { @@ -63,12 +66,12 @@ impl Quota { pub const fn one_every(seconds: u64) -> Self { Quota { replenish_all_every: Duration::from_secs(seconds), - max_tokens: 1, + max_tokens: NonZeroU64::new(1).unwrap(), } } /// Allow `n` tokens to be use used every `seconds`. - pub const fn n_every(n: u64, seconds: u64) -> Self { + pub const fn n_every(n: NonZeroU64, seconds: u64) -> Self { Quota { replenish_all_every: Duration::from_secs(seconds), max_tokens: n, @@ -236,7 +239,9 @@ impl RPCRateLimiterBuilder { // check for peers to prune every 30 seconds, starting in 30 seconds let prune_every = tokio::time::Duration::from_secs(30); - let prune_start = tokio::time::Instant::now() + prune_every; + let prune_start = tokio::time::Instant::now() + .checked_add(prune_every) + .ok_or("prune time overflow")?; let prune_interval = tokio::time::interval_at(prune_start, prune_every); Ok(RPCRateLimiter { prune_interval, @@ -412,14 +417,13 @@ pub struct Limiter { impl Limiter { pub fn from_quota(quota: Quota) -> Result { - if quota.max_tokens == 0 { - return Err("Max number of tokens should be positive"); - } let tau = quota.replenish_all_every.as_nanos(); if tau == 0 { return Err("Replenish time must be positive"); } - let t = (tau / quota.max_tokens as u128) + let t = tau + .checked_div(quota.max_tokens.get() as u128) + .expect("Division by zero never occurs, since Quota::max_token is of type NonZeroU64.") .try_into() .map_err(|_| "total replenish time is too long")?; let tau = tau @@ -442,7 +446,7 @@ impl Limiter { let tau = self.tau; let t = self.t; // how long does it take to replenish these tokens - let additional_time = t * tokens; + let additional_time = t.saturating_mul(tokens); if additional_time > tau { // the time required to process this amount of tokens is longer than the time that // makes the bucket full. So, this batch can _never_ be processed @@ -455,16 +459,16 @@ impl Limiter { .entry(key.clone()) .or_insert(time_since_start); // check how soon could the request be made - let earliest_time = (*tat + additional_time).saturating_sub(tau); + let earliest_time = (*tat).saturating_add(additional_time).saturating_sub(tau); // earliest_time is in the future if time_since_start < earliest_time { Err(RateLimitedErr::TooSoon(Duration::from_nanos( /* time they need to wait, i.e. how soon were they */ - earliest_time - time_since_start, + earliest_time.saturating_sub(time_since_start), ))) } else { // calculate the new TAT - *tat = time_since_start.max(*tat) + additional_time; + *tat = time_since_start.max(*tat).saturating_add(additional_time); Ok(()) } } @@ -479,14 +483,15 @@ impl Limiter { #[cfg(test)] mod tests { - use crate::rpc::rate_limiter::{Limiter, Quota}; + use crate::rpc::rate_limiter::{Limiter, Quota, RateLimitedErr}; + use std::num::NonZeroU64; use std::time::Duration; #[test] fn it_works_a() { let mut limiter = Limiter::from_quota(Quota { replenish_all_every: Duration::from_secs(2), - max_tokens: 4, + max_tokens: NonZeroU64::new(4).unwrap(), }) .unwrap(); let key = 10; @@ -523,7 +528,7 @@ mod tests { fn it_works_b() { let mut limiter = Limiter::from_quota(Quota { replenish_all_every: Duration::from_secs(2), - max_tokens: 4, + max_tokens: NonZeroU64::new(4).unwrap(), }) .unwrap(); let key = 10; @@ -547,4 +552,22 @@ mod tests { .allows(Duration::from_secs_f32(0.4), &key, 1) .is_err()); } + + #[test] + fn large_tokens() { + // These have been adjusted so that an overflow occurs when calculating `additional_time` in + // `Limiter::allows`. If we don't handle overflow properly, `Limiter::allows` returns `Ok` + // in this case. + let replenish_all_every = 2; + let tokens = u64::MAX / 2 + 1; + + let mut limiter = Limiter::from_quota(Quota { + replenish_all_every: Duration::from_nanos(replenish_all_every), + max_tokens: NonZeroU64::new(1).unwrap(), + }) + .unwrap(); + + let result = limiter.allows(Duration::from_secs_f32(0.0), &10, tokens); + assert!(matches!(result, Err(RateLimitedErr::TooLarge))); + } } diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index e5b685676f..f26dc4c7a8 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -316,6 +316,7 @@ mod tests { use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId}; use libp2p::PeerId; use logging::create_test_tracing_subscriber; + use std::num::NonZeroU64; use std::time::Duration; use types::{EthSpec, ForkContext, Hash256, MainnetEthSpec, Slot}; @@ -324,7 +325,7 @@ mod tests { async fn test_next_peer_request_ready() { create_test_tracing_subscriber(); let config = OutboundRateLimiterConfig(RateLimiterConfig { - ping_quota: Quota::n_every(1, 2), + ping_quota: Quota::n_every(NonZeroU64::new(1).unwrap(), 2), ..Default::default() }); let fork_context = std::sync::Arc::new(ForkContext::new::(