From 1324d3d3c4c20914545f6dadcb018a5b442a95a5 Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Thu, 24 Apr 2025 12:46:16 +0900 Subject: [PATCH] Delayed RPC Send Using Tokens (#5923) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit closes https://github.com/sigp/lighthouse/issues/5785 The diagram below shows the differences in how the receiver (responder) behaves before and after this PR. The following sentences will detail the changes. ```mermaid flowchart TD subgraph "*** After ***" Start2([START]) --> AA[Receive request] AA --> COND1{Is there already an active request
with the same protocol?} COND1 --> |Yes| CC[Send error response] CC --> End2([END]) %% COND1 --> |No| COND2{Request is too large?} %% COND2 --> |Yes| CC COND1 --> |No| DD[Process request] DD --> EE{Rate limit reached?} EE --> |Yes| FF[Wait until tokens are regenerated] FF --> EE EE --> |No| GG[Send response] GG --> End2 end subgraph "*** Before ***" Start([START]) --> A[Receive request] A --> B{Rate limit reached
or
request is too large?} B -->|Yes| C[Send error response] C --> End([END]) B -->|No| E[Process request] E --> F[Send response] F --> End end ``` ### `Is there already an active request with the same protocol?` This check is not performed in `Before`. This is taken from the PR in the consensus-spec, which proposes updates regarding rate limiting and response timeout. https://github.com/ethereum/consensus-specs/pull/3767/files > The requester MUST NOT make more than two concurrent requests with the same ID. The PR mentions the requester side. In this PR, I introduced the `ActiveRequestsLimiter` for the `responder` side to restrict more than two requests from running simultaneously on the same protocol per peer. If the limiter disallows a request, the responder sends a rate-limited error and penalizes the requester. ### `Rate limit reached?` and `Wait until tokens are regenerated` UPDATE: I moved the limiter logic to the behaviour side. https://github.com/sigp/lighthouse/pull/5923#issuecomment-2379535927 ~~The rate limiter is shared between the behaviour and the handler. (`Arc>>`) The handler checks the rate limit and queues the response if the limit is reached. The behaviour handles pruning.~~ ~~I considered not sharing the rate limiter between the behaviour and the handler, and performing all of these either within the behaviour or handler. However, I decided against this for the following reasons:~~ - ~~Regarding performing everything within the behaviour: The behaviour is unable to recognize the response protocol when `RPC::send_response()` is called, especially when the response is `RPCCodedResponse::Error`. Therefore, the behaviour can't rate limit responses based on the response protocol.~~ - ~~Regarding performing everything within the handler: When multiple connections are established with a peer, there could be multiple handlers interacting with that peer. Thus, we cannot enforce rate limiting per peer solely within the handler. (Any ideas? 🤔 )~~ --- beacon_node/lighthouse_network/src/metrics.rs | 14 + .../lighthouse_network/src/rpc/handler.rs | 4 +- .../lighthouse_network/src/rpc/methods.rs | 14 + beacon_node/lighthouse_network/src/rpc/mod.rs | 271 ++++++++------ .../src/rpc/rate_limiter.rs | 13 +- .../src/rpc/response_limiter.rs | 177 +++++++++ .../src/rpc/self_limiter.rs | 339 +++++++++++++++--- .../lighthouse_network/tests/common.rs | 31 +- .../lighthouse_network/tests/rpc_tests.rs | 276 +++++++++++++- 9 files changed, 976 insertions(+), 163 deletions(-) create mode 100644 beacon_node/lighthouse_network/src/rpc/response_limiter.rs diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index b36cb8075d..da986f2884 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock> = LazyLock::new(|| ) }); +pub static OUTBOUND_REQUEST_IDLING: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "outbound_request_idling_seconds", + "The time our own request remained idle in the self-limiter", + ) +}); + +pub static RESPONSE_IDLING: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "response_idling_seconds", + "The time our response remained idle in the response limiter", + ) +}); + pub fn scrape_discovery_metrics() { let metrics = discv5::metrics::Metrics::from(discv5::Discv5::::raw_metrics()); diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index b86e2b3a6f..33c5521c3b 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -141,7 +141,7 @@ where /// Waker, to be sure the handler gets polled when needed. waker: Option, - /// Timeout that will me used for inbound and outbound responses. + /// Timeout that will be used for inbound and outbound responses. resp_timeout: Duration, } @@ -314,6 +314,7 @@ where } return; }; + // If the response we are sending is an error, report back for handling if let RpcResponse::Error(ref code, ref reason) = response { self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { @@ -331,6 +332,7 @@ where "Response not sent. Deactivated handler"); return; } + inbound_info.pending_items.push_back(response); } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index b748ab11c0..e6939e36d8 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -606,6 +606,20 @@ pub enum ResponseTermination { LightClientUpdatesByRange, } +impl ResponseTermination { + pub fn as_protocol(&self) -> Protocol { + match self { + ResponseTermination::BlocksByRange => Protocol::BlocksByRange, + ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, + ResponseTermination::BlobsByRange => Protocol::BlobsByRange, + ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, + ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, + ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, + ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange, + } + } +} + /// The structured response containing a result/code indicating success or failure /// and the contents of the response #[derive(Debug, Clone)] diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 0f23da7f38..8cb720132a 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -4,7 +4,6 @@ //! direct peer-to-peer communication primarily for sending/receiving chain information for //! syncing. -use futures::future::FutureExt; use handler::RPCHandler; use libp2p::core::transport::PortUse; use libp2p::swarm::{ @@ -13,13 +12,12 @@ use libp2p::swarm::{ }; use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent}; use libp2p::PeerId; -use logging::crit; -use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}; +use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; -use tracing::{debug, instrument, trace}; +use tracing::{debug, error, instrument, trace}; use types::{EthSpec, ForkContext}; pub(crate) use handler::{HandlerErr, HandlerEvent}; @@ -28,6 +26,11 @@ pub(crate) use methods::{ }; pub use protocol::RequestType; +use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; +use self::protocol::RPCProtocol; +use self::self_limiter::SelfRateLimiter; +use crate::rpc::rate_limiter::RateLimiterItem; +use crate::rpc::response_limiter::ResponseLimiter; pub use handler::SubstreamId; pub use methods::{ BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest, @@ -35,10 +38,6 @@ pub use methods::{ }; pub use protocol::{Protocol, RPCError}; -use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig}; -use self::protocol::RPCProtocol; -use self::self_limiter::SelfRateLimiter; - pub(crate) mod codec; pub mod config; mod handler; @@ -46,8 +45,12 @@ pub mod methods; mod outbound; mod protocol; mod rate_limiter; +mod response_limiter; mod self_limiter; +// Maximum number of concurrent requests per protocol ID that a client may issue. +const MAX_CONCURRENT_REQUESTS: usize = 2; + /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} impl ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {} @@ -144,10 +147,12 @@ pub struct NetworkParams { /// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level /// logic. pub struct RPC { - /// Rate limiter - limiter: Option, + /// Rate limiter for our responses. + response_limiter: Option>, /// Rate limiter for our own requests. - self_limiter: Option>, + outbound_request_limiter: SelfRateLimiter, + /// Active inbound requests that are awaiting a response. + active_inbound_requests: HashMap)>, /// Queue of events to be processed. events: Vec>, fork_context: Arc, @@ -173,20 +178,20 @@ impl RPC { network_params: NetworkParams, seq_number: u64, ) -> Self { - let inbound_limiter = inbound_rate_limiter_config.map(|config| { - debug!(?config, "Using inbound rate limiting params"); - RateLimiter::new_with_config(config.0, fork_context.clone()) + let response_limiter = inbound_rate_limiter_config.map(|config| { + debug!(?config, "Using response rate limiting params"); + ResponseLimiter::new(config, fork_context.clone()) .expect("Inbound limiter configuration parameters are valid") }); - let self_limiter = outbound_rate_limiter_config.map(|config| { - SelfRateLimiter::new(config, fork_context.clone()) - .expect("Configuration parameters are valid") - }); + let outbound_request_limiter: SelfRateLimiter = + SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone()) + .expect("Outbound limiter configuration parameters are valid"); RPC { - limiter: inbound_limiter, - self_limiter, + response_limiter, + outbound_request_limiter, + active_inbound_requests: HashMap::new(), events: Vec::new(), fork_context, enable_light_client_server, @@ -210,6 +215,44 @@ impl RPC { request_id: InboundRequestId, response: RpcResponse, ) { + let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id) + else { + error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent"); + return; + }; + + // Add the request back to active requests if the response is `Success` and requires stream + // termination. + if request_type.protocol().terminator().is_some() + && matches!(response, RpcResponse::Success(_)) + { + self.active_inbound_requests + .insert(request_id, (peer_id, request_type.clone())); + } + + self.send_response_inner(peer_id, request_type.protocol(), request_id, response); + } + + fn send_response_inner( + &mut self, + peer_id: PeerId, + protocol: Protocol, + request_id: InboundRequestId, + response: RpcResponse, + ) { + if let Some(response_limiter) = self.response_limiter.as_mut() { + if !response_limiter.allows( + peer_id, + protocol, + request_id.connection_id, + request_id.substream_id, + response.clone(), + ) { + // Response is logged and queued internally in the response limiter. + return; + } + } + self.events.push(ToSwarm::NotifyHandler { peer_id, handler: NotifyHandler::One(request_id.connection_id), @@ -227,23 +270,19 @@ impl RPC { skip_all )] pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) { - let event = if let Some(self_limiter) = self.self_limiter.as_mut() { - match self_limiter.allows(peer_id, request_id, req) { - Ok(event) => event, - Err(_e) => { - // Request is logged and queued internally in the self rate limiter. - return; - } + match self + .outbound_request_limiter + .allows(peer_id, request_id, req) + { + Ok(event) => self.events.push(BehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event, + }), + Err(_e) => { + // Request is logged and queued internally in the self rate limiter. } - } else { - RPCSend::Request(request_id, req) - }; - - self.events.push(BehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event, - }); + } } /// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This @@ -373,20 +412,27 @@ where if remaining_established > 0 { return; } + // Get a list of pending requests from the self rate limiter - if let Some(limiter) = self.self_limiter.as_mut() { - for (id, proto) in limiter.peer_disconnected(peer_id) { - let error_msg = ToSwarm::GenerateEvent(RPCMessage { - peer_id, - connection_id, - message: Err(HandlerErr::Outbound { - id, - proto, - error: RPCError::Disconnected, - }), - }); - self.events.push(error_msg); - } + for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) { + let error_msg = ToSwarm::GenerateEvent(RPCMessage { + peer_id, + connection_id, + message: Err(HandlerErr::Outbound { + id, + proto, + error: RPCError::Disconnected, + }), + }); + self.events.push(error_msg); + } + + self.active_inbound_requests.retain( + |_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id, + ); + + if let Some(limiter) = self.response_limiter.as_mut() { + limiter.peer_disconnected(peer_id); } // Replace the pending Requests to the disconnected peer @@ -420,57 +466,39 @@ where ) { match event { HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => { - if let Some(limiter) = self.limiter.as_mut() { - // check if the request is conformant to the quota - match limiter.allows(&peer_id, &request_type) { - Err(RateLimitedErr::TooLarge) => { - // we set the batch sizes, so this is a coding/config err for most protocols - let protocol = request_type.versioned_protocol().protocol(); - if matches!( - protocol, - Protocol::BlocksByRange - | Protocol::BlobsByRange - | Protocol::DataColumnsByRange - | Protocol::BlocksByRoot - | Protocol::BlobsByRoot - | Protocol::DataColumnsByRoot - ) { - debug!(request = %request_type, %protocol, "Request too large to process"); - } else { - // Other protocols shouldn't be sending large messages, we should flag the peer kind - crit!(%protocol, "Request size too large to ever be processed"); - } - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - request_id, - RpcResponse::Error( - RpcErrorResponse::RateLimited, - "Rate limited. Request too large".into(), - ), - ); - return; - } - Err(RateLimitedErr::TooSoon(wait_time)) => { - debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit"); - // send an error code to the peer. - // the handler upon receiving the error code will send it back to the behaviour - self.send_response( - peer_id, - request_id, - RpcResponse::Error( - RpcErrorResponse::RateLimited, - format!("Wait {:?}", wait_time).into(), - ), - ); - return; - } - // No rate limiting, continue. - Ok(()) => {} - } + let is_concurrent_request_limit_exceeded = self + .active_inbound_requests + .iter() + .filter( + |(_inbound_request_id, (request_peer_id, active_request_type))| { + *request_peer_id == peer_id + && active_request_type.protocol() == request_type.protocol() + }, + ) + .count() + >= MAX_CONCURRENT_REQUESTS; + + // Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer. + if is_concurrent_request_limit_exceeded { + // There is already an active request with the same protocol. Send an error code to the peer. + debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol"); + self.send_response_inner( + peer_id, + request_type.protocol(), + request_id, + RpcResponse::Error( + RpcErrorResponse::RateLimited, + format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol") + .into(), + ), + ); + return; } + // Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests. + self.active_inbound_requests + .insert(request_id, (peer_id, request_type.clone())); + // If we received a Ping, we queue a Pong response. if let RequestType::Ping(_) = request_type { trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong"); @@ -489,14 +517,38 @@ where message: Ok(RPCReceived::Request(request_id, request_type)), })); } - HandlerEvent::Ok(rpc) => { + HandlerEvent::Ok(RPCReceived::Response(id, response)) => { + if response.protocol().terminator().is_none() { + // Inform the limiter that a response has been received. + self.outbound_request_limiter + .request_completed(&peer_id, response.protocol()); + } + self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, connection_id, - message: Ok(rpc), + message: Ok(RPCReceived::Response(id, response)), + })); + } + HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => { + // Inform the limiter that a response has been received. + self.outbound_request_limiter + .request_completed(&peer_id, response_termination.as_protocol()); + + self.events.push(ToSwarm::GenerateEvent(RPCMessage { + peer_id, + connection_id, + message: Ok(RPCReceived::EndOfStream(id, response_termination)), })); } HandlerEvent::Err(err) => { + // Inform the limiter that the request has ended with an error. + let protocol = match err { + HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto, + }; + self.outbound_request_limiter + .request_completed(&peer_id, protocol); + self.events.push(ToSwarm::GenerateEvent(RPCMessage { peer_id, connection_id, @@ -514,15 +566,20 @@ where } fn poll(&mut self, cx: &mut Context) -> Poll>> { - // let the rate limiter prune. - if let Some(limiter) = self.limiter.as_mut() { - let _ = limiter.poll_unpin(cx); + if let Some(response_limiter) = self.response_limiter.as_mut() { + if let Poll::Ready(responses) = response_limiter.poll_ready(cx) { + for response in responses { + self.events.push(ToSwarm::NotifyHandler { + peer_id: response.peer_id, + handler: NotifyHandler::One(response.connection_id), + event: RPCSend::Response(response.substream_id, response.response), + }); + } + } } - if let Some(self_limiter) = self.self_limiter.as_mut() { - if let Poll::Ready(event) = self_limiter.poll_ready(cx) { - self.events.push(event) - } + if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) { + self.events.push(event) } if !self.events.is_empty() { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index b9e82a5f1e..f666c30d52 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder { lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. lc_optimistic_update_quota: Option, - /// Quota for the LightClientOptimisticUpdate protocol. + /// Quota for the LightClientFinalityUpdate protocol. lc_finality_update_quota: Option, /// Quota for the LightClientUpdatesByRange protocol. lc_updates_by_range_quota: Option, @@ -275,6 +275,17 @@ impl RateLimiterItem for super::RequestType { } } +impl RateLimiterItem for (super::RpcResponse, Protocol) { + fn protocol(&self) -> Protocol { + self.1 + } + + fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 { + // A response chunk consumes one token of the rate limiter. + 1 + } +} + impl RPCRateLimiter { pub fn new_with_config( config: RateLimiterConfig, diff --git a/beacon_node/lighthouse_network/src/rpc/response_limiter.rs b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs new file mode 100644 index 0000000000..c583baaadd --- /dev/null +++ b/beacon_node/lighthouse_network/src/rpc/response_limiter.rs @@ -0,0 +1,177 @@ +use crate::rpc::config::InboundRateLimiterConfig; +use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr}; +use crate::rpc::self_limiter::timestamp_now; +use crate::rpc::{Protocol, RpcResponse, SubstreamId}; +use crate::PeerId; +use futures::FutureExt; +use libp2p::swarm::ConnectionId; +use logging::crit; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; +use tokio_util::time::DelayQueue; +use tracing::debug; +use types::{EthSpec, ForkContext}; + +/// A response that was rate limited or waiting on rate limited responses for the same peer and +/// protocol. +#[derive(Clone)] +pub(super) struct QueuedResponse { + pub peer_id: PeerId, + pub connection_id: ConnectionId, + pub substream_id: SubstreamId, + pub response: RpcResponse, + pub protocol: Protocol, + pub queued_at: Duration, +} + +pub(super) struct ResponseLimiter { + /// Rate limiter for our responses. + limiter: RPCRateLimiter, + /// Responses queued for sending. These responses are stored when the response limiter rejects them. + delayed_responses: HashMap<(PeerId, Protocol), VecDeque>>, + /// The delay required to allow a peer's outbound response per protocol. + next_response: DelayQueue<(PeerId, Protocol)>, +} + +impl ResponseLimiter { + /// Creates a new [`ResponseLimiter`] based on configuration values. + pub fn new( + config: InboundRateLimiterConfig, + fork_context: Arc, + ) -> Result { + Ok(ResponseLimiter { + limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?, + delayed_responses: HashMap::new(), + next_response: DelayQueue::new(), + }) + } + + /// Checks if the rate limiter allows the response. When not allowed, the response is delayed + /// until it can be sent. + pub fn allows( + &mut self, + peer_id: PeerId, + protocol: Protocol, + connection_id: ConnectionId, + substream_id: SubstreamId, + response: RpcResponse, + ) -> bool { + // First check that there are not already other responses waiting to be sent. + if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) { + debug!(%peer_id, %protocol, "Response rate limiting since there are already other responses waiting to be sent"); + queue.push_back(QueuedResponse { + peer_id, + connection_id, + substream_id, + response, + protocol, + queued_at: timestamp_now(), + }); + return false; + } + + if let Err(wait_time) = + Self::try_limiter(&mut self.limiter, peer_id, response.clone(), protocol) + { + self.delayed_responses + .entry((peer_id, protocol)) + .or_default() + .push_back(QueuedResponse { + peer_id, + connection_id, + substream_id, + response, + protocol, + queued_at: timestamp_now(), + }); + self.next_response.insert((peer_id, protocol), wait_time); + return false; + } + + true + } + + /// Checks if the limiter allows the response. If the response should be delayed, the duration + /// to wait is returned. + fn try_limiter( + limiter: &mut RPCRateLimiter, + peer_id: PeerId, + response: RpcResponse, + protocol: Protocol, + ) -> Result<(), Duration> { + match limiter.allows(&peer_id, &(response.clone(), protocol)) { + Ok(()) => Ok(()), + Err(e) => match e { + RateLimitedErr::TooLarge => { + // This should never happen with default parameters. Let's just send the response. + // Log a crit since this is a config issue. + crit!( + %protocol, + "Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters." + ); + Ok(()) + } + RateLimitedErr::TooSoon(wait_time) => { + debug!(%peer_id, %protocol, wait_time_ms = wait_time.as_millis(), "Response rate limiting"); + Err(wait_time) + } + }, + } + } + + /// Informs the limiter that a peer has disconnected. This removes any pending responses. + pub fn peer_disconnected(&mut self, peer_id: PeerId) { + self.delayed_responses + .retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id); + } + + /// When a peer and protocol are allowed to send a next response, this function checks the + /// queued responses and attempts marking as ready as many as the limiter allows. + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>> { + let mut responses = vec![]; + while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) { + let (peer_id, protocol) = expired.into_inner(); + + if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) { + let queue = entry.get_mut(); + // Take delayed responses from the queue, as long as the limiter allows it. + while let Some(response) = queue.pop_front() { + match Self::try_limiter( + &mut self.limiter, + response.peer_id, + response.response.clone(), + response.protocol, + ) { + Ok(()) => { + metrics::observe_duration( + &crate::metrics::RESPONSE_IDLING, + timestamp_now().saturating_sub(response.queued_at), + ); + responses.push(response) + } + Err(wait_time) => { + // The response was taken from the queue, but the limiter didn't allow it. + queue.push_front(response); + self.next_response.insert((peer_id, protocol), wait_time); + break; + } + } + } + if queue.is_empty() { + entry.remove(); + } + } + } + + // Prune the rate limiter. + let _ = self.limiter.poll_unpin(cx); + + if !responses.is_empty() { + return Poll::Ready(responses); + } + Poll::Pending + } +} diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index e4af977a6c..e5b685676f 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -1,3 +1,10 @@ +use super::{ + config::OutboundRateLimiterConfig, + rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, + BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS, +}; +use crate::rpc::rate_limiter::RateLimiterItem; +use std::time::{SystemTime, UNIX_EPOCH}; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, sync::Arc, @@ -13,30 +20,31 @@ use tokio_util::time::DelayQueue; use tracing::debug; use types::{EthSpec, ForkContext}; -use super::{ - config::OutboundRateLimiterConfig, - rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr}, - BehaviourAction, Protocol, RPCSend, ReqId, RequestType, -}; - /// A request that was rate limited or waiting on rate limited requests for the same peer and /// protocol. struct QueuedRequest { req: RequestType, request_id: Id, + queued_at: Duration, } +/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue. +const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100; + +#[allow(clippy::type_complexity)] pub(crate) struct SelfRateLimiter { - /// Requests queued for sending per peer. This requests are stored when the self rate + /// Active requests that are awaiting a response. + active_requests: HashMap>, + /// Requests queued for sending per peer. These requests are stored when the self rate /// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore /// are stored in the same way. delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>, /// The delay required to allow a peer's outbound request per protocol. next_peer_request: DelayQueue<(PeerId, Protocol)>, /// Rate limiter for our own requests. - limiter: RateLimiter, + rate_limiter: Option, /// Requests that are ready to be sent. - ready_requests: SmallVec<[(PeerId, RPCSend); 3]>, + ready_requests: SmallVec<[(PeerId, RPCSend, Duration); 3]>, } /// Error returned when the rate limiter does not accept a request. @@ -49,18 +57,23 @@ pub enum Error { } impl SelfRateLimiter { - /// Creates a new [`SelfRateLimiter`] based on configration values. + /// Creates a new [`SelfRateLimiter`] based on configuration values. pub fn new( - config: OutboundRateLimiterConfig, + config: Option, fork_context: Arc, ) -> Result { debug!(?config, "Using self rate limiting params"); - let limiter = RateLimiter::new_with_config(config.0, fork_context)?; + let rate_limiter = if let Some(c) = config { + Some(RateLimiter::new_with_config(c.0, fork_context)?) + } else { + None + }; Ok(SelfRateLimiter { + active_requests: Default::default(), delayed_requests: Default::default(), next_peer_request: Default::default(), - limiter, + rate_limiter, ready_requests: Default::default(), }) } @@ -77,11 +90,21 @@ impl SelfRateLimiter { let protocol = req.versioned_protocol().protocol(); // First check that there are not already other requests waiting to be sent. if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) { - queued_requests.push_back(QueuedRequest { req, request_id }); - + debug!(%peer_id, protocol = %req.protocol(), "Self rate limiting since there are already other requests waiting to be sent"); + queued_requests.push_back(QueuedRequest { + req, + request_id, + queued_at: timestamp_now(), + }); return Err(Error::PendingRequests); } - match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) { + match Self::try_send_request( + &mut self.active_requests, + &mut self.rate_limiter, + peer_id, + request_id, + req, + ) { Err((rate_limited_req, wait_time)) => { let key = (peer_id, protocol); self.next_peer_request.insert(key, wait_time); @@ -99,33 +122,71 @@ impl SelfRateLimiter { /// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the /// request, the [`ToSwarm`] that should be emitted is returned. If the request /// should be delayed, it's returned with the duration to wait. + #[allow(clippy::result_large_err)] fn try_send_request( - limiter: &mut RateLimiter, + active_requests: &mut HashMap>, + rate_limiter: &mut Option, peer_id: PeerId, request_id: Id, req: RequestType, ) -> Result, (QueuedRequest, Duration)> { - match limiter.allows(&peer_id, &req) { - Ok(()) => Ok(RPCSend::Request(request_id, req)), - Err(e) => { - let protocol = req.versioned_protocol(); - match e { - RateLimitedErr::TooLarge => { - // this should never happen with default parameters. Let's just send the request. - // Log a crit since this is a config issue. - crit!( - protocol = %req.versioned_protocol().protocol(), - "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters." - ); - Ok(RPCSend::Request(request_id, req)) - } - RateLimitedErr::TooSoon(wait_time) => { - debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting"); - Err((QueuedRequest { req, request_id }, wait_time)) + if let Some(active_request) = active_requests.get(&peer_id) { + if let Some(count) = active_request.get(&req.protocol()) { + if *count >= MAX_CONCURRENT_REQUESTS { + debug!( + %peer_id, + protocol = %req.protocol(), + "Self rate limiting due to the number of concurrent requests" + ); + return Err(( + QueuedRequest { + req, + request_id, + queued_at: timestamp_now(), + }, + Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS), + )); + } + } + } + + if let Some(limiter) = rate_limiter.as_mut() { + match limiter.allows(&peer_id, &req) { + Ok(()) => {} + Err(e) => { + let protocol = req.versioned_protocol(); + match e { + RateLimitedErr::TooLarge => { + // this should never happen with default parameters. Let's just send the request. + // Log a crit since this is a config issue. + crit!( + protocol = %req.versioned_protocol().protocol(), + "Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.", + ); + } + RateLimitedErr::TooSoon(wait_time) => { + debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting"); + return Err(( + QueuedRequest { + req, + request_id, + queued_at: timestamp_now(), + }, + wait_time, + )); + } } } } } + + *active_requests + .entry(peer_id) + .or_default() + .entry(req.protocol()) + .or_default() += 1; + + Ok(RPCSend::Request(request_id, req)) } /// When a peer and protocol are allowed to send a next request, this function checks the @@ -133,16 +194,32 @@ impl SelfRateLimiter { fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) { if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) { let queued_requests = entry.get_mut(); - while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() { - match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) { - Err((rate_limited_req, wait_time)) => { + while let Some(QueuedRequest { + req, + request_id, + queued_at, + }) = queued_requests.pop_front() + { + match Self::try_send_request( + &mut self.active_requests, + &mut self.rate_limiter, + peer_id, + request_id, + req.clone(), + ) { + Err((_rate_limited_req, wait_time)) => { let key = (peer_id, protocol); self.next_peer_request.insert(key, wait_time); - queued_requests.push_front(rate_limited_req); + // Don't push `rate_limited_req` here to prevent `queued_at` from being updated. + queued_requests.push_front(QueuedRequest { + req, + request_id, + queued_at, + }); // If one fails just wait for the next window that allows sending requests. return; } - Ok(event) => self.ready_requests.push((peer_id, event)), + Ok(event) => self.ready_requests.push((peer_id, event, queued_at)), } } if queued_requests.is_empty() { @@ -156,6 +233,8 @@ impl SelfRateLimiter { /// Informs the limiter that a peer has disconnected. This removes any pending requests and /// returns their IDs. pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> { + self.active_requests.remove(&peer_id); + // It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map // should never really be large. So we iterate for simplicity let mut failed_requests = Vec::new(); @@ -177,19 +256,39 @@ impl SelfRateLimiter { failed_requests } + /// Informs the limiter that a response has been received. + pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) { + if let Some(active_requests) = self.active_requests.get_mut(peer_id) { + if let Entry::Occupied(mut entry) = active_requests.entry(protocol) { + if *entry.get() > 1 { + *entry.get_mut() -= 1; + } else { + entry.remove(); + } + } + } + } + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { // First check the requests that were self rate limited, since those might add events to - // the queue. Also do this this before rate limiter prunning to avoid removing and + // the queue. Also do this before rate limiter pruning to avoid removing and // immediately adding rate limiting keys. if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) { let (peer_id, protocol) = expired.into_inner(); self.next_peer_request_ready(peer_id, protocol); } + // Prune the rate limiter. - let _ = self.limiter.poll_unpin(cx); + if let Some(limiter) = self.rate_limiter.as_mut() { + let _ = limiter.poll_unpin(cx); + } // Finally return any queued events. - if let Some((peer_id, event)) = self.ready_requests.pop() { + if let Some((peer_id, event, queued_at)) = self.ready_requests.pop() { + metrics::observe_duration( + &crate::metrics::OUTBOUND_REQUEST_IDLING, + timestamp_now().saturating_sub(queued_at), + ); return Poll::Ready(BehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::Any, @@ -201,12 +300,19 @@ impl SelfRateLimiter { } } +/// Returns the duration since the unix epoch. +pub fn timestamp_now() -> Duration { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_else(|_| Duration::from_secs(0)) +} + #[cfg(test)] mod tests { use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig}; use crate::rpc::rate_limiter::Quota; use crate::rpc::self_limiter::SelfRateLimiter; - use crate::rpc::{Ping, Protocol, RequestType}; + use crate::rpc::{Ping, Protocol, RPCSend, RequestType}; use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId}; use libp2p::PeerId; use logging::create_test_tracing_subscriber; @@ -227,7 +333,7 @@ mod tests { &MainnetEthSpec::default_spec(), )); let mut limiter: SelfRateLimiter = - SelfRateLimiter::new(config, fork_context).unwrap(); + SelfRateLimiter::new(Some(config), fork_context).unwrap(); let peer_id = PeerId::random(); let lookup_id = 0; @@ -290,4 +396,149 @@ mod tests { assert_eq!(limiter.ready_requests.len(), 1); } } + + /// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting. + #[tokio::test] + async fn test_next_peer_request_ready_concurrent_requests() { + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(None, fork_context).unwrap(); + let peer_id = PeerId::random(); + + for i in 1..=5u32 { + let result = limiter.allows( + peer_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id: i, + req_id: i, + }, + }), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first two requests. + if i <= 2 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(3, queue.len()); + + // The delayed requests remain even after the next_peer_request_ready call because the responses have not been received. + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(3, queue.len()); + + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + let queue = limiter + .delayed_requests + .get(&(peer_id, Protocol::Ping)) + .unwrap(); + assert_eq!(2, queue.len()); + + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.request_completed(&peer_id, Protocol::Ping); + limiter.next_peer_request_ready(peer_id, Protocol::Ping); + + let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping)); + assert!(queue.is_none()); + + // Check that the three delayed requests have moved to ready_requests. + let mut it = limiter.ready_requests.iter(); + for i in 3..=5u32 { + let (_peer_id, RPCSend::Request(request_id, _), _) = it.next().unwrap() else { + unreachable!() + }; + + assert!(matches!( + request_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + }) if *req_id == i + )); + } + } + + #[tokio::test] + async fn test_peer_disconnected() { + let fork_context = std::sync::Arc::new(ForkContext::new::( + Slot::new(0), + Hash256::ZERO, + &MainnetEthSpec::default_spec(), + )); + let mut limiter: SelfRateLimiter = + SelfRateLimiter::new(None, fork_context).unwrap(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + + for peer in [peer1, peer2] { + for i in 1..=5u32 { + let result = limiter.allows( + peer, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { + lookup_id: i, + req_id: i, + }, + }), + RequestType::Ping(Ping { data: i as u64 }), + ); + + // Check that the limiter allows the first two requests. + if i <= 2 { + assert!(result.is_ok()); + } else { + assert!(result.is_err()); + } + } + } + + assert!(limiter.active_requests.contains_key(&peer1)); + assert!(limiter + .delayed_requests + .contains_key(&(peer1, Protocol::Ping))); + assert!(limiter.active_requests.contains_key(&peer2)); + assert!(limiter + .delayed_requests + .contains_key(&(peer2, Protocol::Ping))); + + // Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly. + let mut failed_requests = limiter.peer_disconnected(peer1); + for i in 3..=5u32 { + let (request_id, _) = failed_requests.remove(0); + assert!(matches!( + request_id, + AppRequestId::Sync(SyncRequestId::SingleBlock { + id: SingleLookupReqId { req_id, .. }, + }) if req_id == i + )); + } + + // Check that peer1’s active and delayed requests have been removed. + assert!(!limiter.active_requests.contains_key(&peer1)); + assert!(!limiter + .delayed_requests + .contains_key(&(peer1, Protocol::Ping))); + + assert!(limiter.active_requests.contains_key(&peer2)); + assert!(limiter + .delayed_requests + .contains_key(&(peer2, Protocol::Ping))); + } } diff --git a/beacon_node/lighthouse_network/tests/common.rs b/beacon_node/lighthouse_network/tests/common.rs index d686885ff7..d979ef9265 100644 --- a/beacon_node/lighthouse_network/tests/common.rs +++ b/beacon_node/lighthouse_network/tests/common.rs @@ -16,6 +16,7 @@ use types::{ type E = MinimalEthSpec; +use lighthouse_network::rpc::config::InboundRateLimiterConfig; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context @@ -77,7 +78,11 @@ pub fn build_tracing_subscriber(level: &str, enabled: bool) { } } -pub fn build_config(mut boot_nodes: Vec) -> Arc { +pub fn build_config( + mut boot_nodes: Vec, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, +) -> Arc { let mut config = NetworkConfig::default(); // Find unused ports by using the 0 port. @@ -93,6 +98,8 @@ pub fn build_config(mut boot_nodes: Vec) -> Arc { config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None); config.boot_nodes_enr.append(&mut boot_nodes); config.network_dir = path.into_path(); + config.disable_peer_scoring = disable_peer_scoring; + config.inbound_rate_limiter_config = inbound_rate_limiter; Arc::new(config) } @@ -102,8 +109,10 @@ pub async fn build_libp2p_instance( fork_name: ForkName, chain_spec: Arc, service_name: String, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, ) -> Libp2pInstance { - let config = build_config(boot_nodes); + let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter); // launch libp2p service let (signal, exit) = async_channel::bounded(1); @@ -144,6 +153,8 @@ pub async fn build_node_pair( fork_name: ForkName, spec: Arc, protocol: Protocol, + disable_peer_scoring: bool, + inbound_rate_limiter: Option, ) -> (Libp2pInstance, Libp2pInstance) { let mut sender = build_libp2p_instance( rt.clone(), @@ -151,10 +162,20 @@ pub async fn build_node_pair( fork_name, spec.clone(), "sender".to_string(), + disable_peer_scoring, + inbound_rate_limiter.clone(), + ) + .await; + let mut receiver = build_libp2p_instance( + rt, + vec![], + fork_name, + spec.clone(), + "receiver".to_string(), + disable_peer_scoring, + inbound_rate_limiter, ) .await; - let mut receiver = - build_libp2p_instance(rt, vec![], fork_name, spec.clone(), "receiver".to_string()).await; // let the two nodes set up listeners let sender_fut = async { @@ -235,6 +256,8 @@ pub async fn build_linear( fork_name, spec.clone(), "linear".to_string(), + false, + None, ) .await, ); diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 7a0eb4602b..9b43e8b581 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -9,10 +9,10 @@ use lighthouse_network::{NetworkEvent, ReportSource, Response}; use ssz::Encode; use ssz_types::VariableList; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::runtime::Runtime; use tokio::time::sleep; -use tracing::{debug, warn}; +use tracing::{debug, error, warn}; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec, EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec, @@ -64,8 +64,15 @@ fn test_tcp_status_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, Protocol::Tcp).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + Protocol::Tcp, + false, + None, + ) + .await; // Dummy STATUS RPC message let rpc_request = RequestType::Status(StatusMessage { @@ -168,6 +175,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -311,6 +320,8 @@ fn test_blobs_by_range_chunked_rpc() { ForkName::Deneb, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -430,6 +441,8 @@ fn test_tcp_blocks_by_range_over_limit() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -533,6 +546,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -665,6 +680,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -785,6 +802,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { ForkName::Bellatrix, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -929,6 +948,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { ForkName::Base, spec.clone(), Protocol::Tcp, + false, + None, ) .await; @@ -1065,8 +1086,15 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) { // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = - common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, protocol).await; + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + protocol, + false, + None, + ) + .await; // build the sender future let sender_future = async { @@ -1127,3 +1155,239 @@ fn quic_test_goodbye_rpc() { let enabled_logging = false; goodbye_test(log_level, enabled_logging, Protocol::Quic); } + +// Test that the receiver delays the responses during response rate-limiting. +#[test] +fn test_delayed_rpc_response() { + let rt = Arc::new(Runtime::new().unwrap()); + let spec = Arc::new(E::default_spec()); + + // Allow 1 token to be use used every 3 seconds. + const QUOTA_SEC: u64 = 3; + + rt.block_on(async { + // get sender/receiver + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + Protocol::Tcp, + false, + // Configure a quota for STATUS responses of 1 token every 3 seconds. + Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()), + ) + .await; + + // Dummy STATUS RPC message + let rpc_request = RequestType::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // Dummy STATUS RPC message + let rpc_response = Response::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // build the sender future + let sender_future = async { + let mut request_id = 1; + let mut request_sent_at = Instant::now(); + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(%request_id, "Sending RPC request"); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + request_sent_at = Instant::now(); + } + NetworkEvent::ResponseReceived { + peer_id, + app_request_id: _, + response, + } => { + debug!(%request_id, "Sender received"); + assert_eq!(response, rpc_response); + + match request_id { + 1 => { + // The first response is returned instantly. + assert!(request_sent_at.elapsed() < Duration::from_millis(100)); + } + 2..=5 => { + // The second and subsequent responses are delayed due to the response rate-limiter on the receiver side. + // Adding a slight margin to the elapsed time check to account for potential timing issues caused by system + // scheduling or execution delays during testing. + assert!( + request_sent_at.elapsed() + > (Duration::from_secs(QUOTA_SEC) + - Duration::from_millis(100)) + ); + if request_id == 5 { + // End the test + return; + } + } + _ => unreachable!(), + } + + request_id += 1; + debug!(%request_id, "Sending RPC request"); + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + request_sent_at = Instant::now(); + } + NetworkEvent::RPCFailed { + app_request_id: _, + peer_id: _, + error, + } => { + error!(?error, "RPC Failed"); + panic!("Rpc failed."); + } + _ => {} + } + } + }; + + // build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + } = receiver.next_event().await + { + assert_eq!(request_type, rpc_request); + debug!("Receiver received request"); + receiver.send_response(peer_id, inbound_request_id, rpc_response.clone()); + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +} + +// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at +// once, thanks to the self-limiter on the sender side. +#[test] +fn test_active_requests() { + let rt = Arc::new(Runtime::new().unwrap()); + let spec = Arc::new(E::default_spec()); + + rt.block_on(async { + // Get sender/receiver. + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + ForkName::Base, + spec, + Protocol::Tcp, + false, + None, + ) + .await; + + // Dummy STATUS RPC request. + let rpc_request = RequestType::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::from_low_u64_be(0), + finalized_epoch: Epoch::new(1), + head_root: Hash256::from_low_u64_be(0), + head_slot: Slot::new(1), + }); + + // Dummy STATUS RPC response. + let rpc_response = Response::Status(StatusMessage { + fork_digest: [0; 4], + finalized_root: Hash256::zero(), + finalized_epoch: Epoch::new(1), + head_root: Hash256::zero(), + head_slot: Slot::new(1), + }); + + // Number of requests. + const REQUESTS: u8 = 10; + + // Build the sender future. + let sender_future = async { + let mut response_received = 0; + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!("Sending RPC request"); + // Send requests in quick succession to intentionally trigger request queueing in the self-limiter. + for _ in 0..REQUESTS { + sender + .send_request(peer_id, AppRequestId::Router, rpc_request.clone()) + .unwrap(); + } + } + NetworkEvent::ResponseReceived { response, .. } => { + debug!(?response, "Sender received response"); + if matches!(response, Response::Status(_)) { + response_received += 1; + } + } + NetworkEvent::RPCFailed { + app_request_id: _, + peer_id: _, + error, + } => panic!("RPC failed: {:?}", error), + _ => {} + } + + if response_received == REQUESTS { + return; + } + } + }; + + // Build the receiver future. + let receiver_future = async { + let mut received_requests = vec![]; + loop { + tokio::select! { + event = receiver.next_event() => { + if let NetworkEvent::RequestReceived { peer_id, inbound_request_id, request_type } = event { + debug!(?request_type, "Receiver received request"); + if matches!(request_type, RequestType::Status(_)) { + received_requests.push((peer_id, inbound_request_id)); + } + } + } + // Introduce a delay in sending responses to trigger request queueing on the sender side. + _ = sleep(Duration::from_secs(3)) => { + for (peer_id, inbound_request_id) in received_requests.drain(..) { + receiver.send_response(peer_id, inbound_request_id, rpc_response.clone()); + } + } + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +}