Delayed RPC Send Using Tokens (#5923)

closes https://github.com/sigp/lighthouse/issues/5785


  The diagram below shows the differences in how the receiver (responder) behaves before and after this PR. The following sentences will detail the changes.

```mermaid
flowchart TD

subgraph "*** After ***"
Start2([START]) --> AA[Receive request]
AA --> COND1{Is there already an active request <br> with the same protocol?}
COND1 --> |Yes| CC[Send error response]
CC --> End2([END])
%% COND1 --> |No| COND2{Request is too large?}
%% COND2 --> |Yes| CC
COND1 --> |No| DD[Process request]
DD --> EE{Rate limit reached?}
EE --> |Yes| FF[Wait until tokens are regenerated]
FF --> EE
EE --> |No| GG[Send response]
GG --> End2
end

subgraph "*** Before ***"
Start([START]) --> A[Receive request]
A --> B{Rate limit reached <br> or <br> request is too large?}
B -->|Yes| C[Send error response]
C --> End([END])
B -->|No| E[Process request]
E --> F[Send response]
F --> End
end
```

### `Is there already an active request with the same protocol?`

This check is not performed in `Before`. This is taken from the PR in the consensus-spec, which proposes updates regarding rate limiting and response timeout.
https://github.com/ethereum/consensus-specs/pull/3767/files
> The requester MUST NOT make more than two concurrent requests with the same ID.

The PR mentions the requester side. In this PR, I introduced the `ActiveRequestsLimiter` for the `responder` side to restrict more than two requests from running simultaneously on the same protocol per peer. If the limiter disallows a request, the responder sends a rate-limited error and penalizes the requester.



### `Rate limit reached?` and `Wait until tokens are regenerated`

UPDATE: I moved the limiter logic to the behaviour side. https://github.com/sigp/lighthouse/pull/5923#issuecomment-2379535927

~~The rate limiter is shared between the behaviour and the handler.  (`Arc<Mutex<RateLimiter>>>`) The handler checks the rate limit and queues the response if the limit is reached. The behaviour handles pruning.~~

~~I considered not sharing the rate limiter between the behaviour and the handler, and performing all of these either within the behaviour or handler. However, I decided against this for the following reasons:~~

- ~~Regarding performing everything within the behaviour: The behaviour is unable to recognize the response protocol when `RPC::send_response()` is called, especially when the response is `RPCCodedResponse::Error`. Therefore, the behaviour can't rate limit responses based on the response protocol.~~
- ~~Regarding performing everything within the handler: When multiple connections are established with a peer, there could be multiple handlers interacting with that peer. Thus, we cannot enforce rate limiting per peer solely within the handler. (Any ideas? 🤔 )~~
This commit is contained in:
Akihito Nakano
2025-04-24 12:46:16 +09:00
committed by GitHub
parent 402a81cdd7
commit 1324d3d3c4
9 changed files with 976 additions and 163 deletions

View File

@@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock<Result<IntCounterVec>> = LazyLock::new(||
)
});
pub static OUTBOUND_REQUEST_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"outbound_request_idling_seconds",
"The time our own request remained idle in the self-limiter",
)
});
pub static RESPONSE_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"response_idling_seconds",
"The time our response remained idle in the response limiter",
)
});
pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());

View File

@@ -141,7 +141,7 @@ where
/// Waker, to be sure the handler gets polled when needed.
waker: Option<std::task::Waker>,
/// Timeout that will me used for inbound and outbound responses.
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
}
@@ -314,6 +314,7 @@ where
}
return;
};
// If the response we are sending is an error, report back for handling
if let RpcResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
@@ -331,6 +332,7 @@ where
"Response not sent. Deactivated handler");
return;
}
inbound_info.pending_items.push_back(response);
}
}

View File

@@ -606,6 +606,20 @@ pub enum ResponseTermination {
LightClientUpdatesByRange,
}
impl ResponseTermination {
pub fn as_protocol(&self) -> Protocol {
match self {
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange,
ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange,
}
}
}
/// The structured response containing a result/code indicating success or failure
/// and the contents of the response
#[derive(Debug, Clone)]

View File

@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing.
use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::transport::PortUse;
use libp2p::swarm::{
@@ -13,13 +12,12 @@ use libp2p::swarm::{
};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use logging::crit;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, instrument, trace};
use tracing::{debug, error, instrument, trace};
use types::{EthSpec, ForkContext};
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -28,6 +26,11 @@ pub(crate) use methods::{
};
pub use protocol::RequestType;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
@@ -35,10 +38,6 @@ pub use methods::{
};
pub use protocol::{Protocol, RPCError};
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec;
pub mod config;
mod handler;
@@ -46,8 +45,12 @@ pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
mod response_limiter;
mod self_limiter;
// Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2;
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
@@ -144,10 +147,12 @@ pub struct NetworkParams {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<Id: ReqId, E: EthSpec> {
/// Rate limiter
limiter: Option<RateLimiter>,
/// Rate limiter for our responses.
response_limiter: Option<ResponseLimiter<E>>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, E>>,
outbound_request_limiter: SelfRateLimiter<Id, E>,
/// Active inbound requests that are awaiting a response.
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
/// Queue of events to be processed.
events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>,
@@ -173,20 +178,20 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
network_params: NetworkParams,
seq_number: u64,
) -> Self {
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using inbound rate limiting params");
RateLimiter::new_with_config(config.0, fork_context.clone())
let response_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using response rate limiting params");
ResponseLimiter::new(config, fork_context.clone())
.expect("Inbound limiter configuration parameters are valid")
});
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, fork_context.clone())
.expect("Configuration parameters are valid")
});
let outbound_request_limiter: SelfRateLimiter<Id, E> =
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
.expect("Outbound limiter configuration parameters are valid");
RPC {
limiter: inbound_limiter,
self_limiter,
response_limiter,
outbound_request_limiter,
active_inbound_requests: HashMap::new(),
events: Vec::new(),
fork_context,
enable_light_client_server,
@@ -210,6 +215,44 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
else {
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
return;
};
// Add the request back to active requests if the response is `Success` and requires stream
// termination.
if request_type.protocol().terminator().is_some()
&& matches!(response, RpcResponse::Success(_))
{
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
}
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
}
fn send_response_inner(
&mut self,
peer_id: PeerId,
protocol: Protocol,
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
if let Some(response_limiter) = self.response_limiter.as_mut() {
if !response_limiter.allows(
peer_id,
protocol,
request_id.connection_id,
request_id.substream_id,
response.clone(),
) {
// Response is logged and queued internally in the response limiter.
return;
}
}
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(request_id.connection_id),
@@ -227,23 +270,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
skip_all
)]
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
return;
}
match self
.outbound_request_limiter
.allows(peer_id, request_id, req)
{
Ok(event) => self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
}),
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
}
} else {
RPCSend::Request(request_id, req)
};
self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
});
}
}
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
@@ -373,20 +412,27 @@ where
if remaining_established > 0 {
return;
}
// Get a list of pending requests from the self rate limiter
if let Some(limiter) = self.self_limiter.as_mut() {
for (id, proto) in limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
self.active_inbound_requests.retain(
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
);
if let Some(limiter) = self.response_limiter.as_mut() {
limiter.peer_disconnected(peer_id);
}
// Replace the pending Requests to the disconnected peer
@@ -420,57 +466,39 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, &request_type) {
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = request_type.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::DataColumnsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
| Protocol::DataColumnsByRoot
) {
debug!(request = %request_type, %protocol, "Request too large to process");
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(%protocol, "Request size too large to ever be processed");
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
"Rate limited. Request too large".into(),
),
);
return;
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
return;
}
// No rate limiting, continue.
Ok(()) => {}
}
let is_concurrent_request_limit_exceeded = self
.active_inbound_requests
.iter()
.filter(
|(_inbound_request_id, (request_peer_id, active_request_type))| {
*request_peer_id == peer_id
&& active_request_type.protocol() == request_type.protocol()
},
)
.count()
>= MAX_CONCURRENT_REQUESTS;
// Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
if is_concurrent_request_limit_exceeded {
// There is already an active request with the same protocol. Send an error code to the peer.
debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol");
self.send_response_inner(
peer_id,
request_type.protocol(),
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol")
.into(),
),
);
return;
}
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
// If we received a Ping, we queue a Pong response.
if let RequestType::Ping(_) = request_type {
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
@@ -489,14 +517,38 @@ where
message: Ok(RPCReceived::Request(request_id, request_type)),
}));
}
HandlerEvent::Ok(rpc) => {
HandlerEvent::Ok(RPCReceived::Response(id, response)) => {
if response.protocol().terminator().is_none() {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response.protocol());
}
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(rpc),
message: Ok(RPCReceived::Response(id, response)),
}));
}
HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response_termination.as_protocol());
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(RPCReceived::EndOfStream(id, response_termination)),
}));
}
HandlerEvent::Err(err) => {
// Inform the limiter that the request has ended with an error.
let protocol = match err {
HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto,
};
self.outbound_request_limiter
.request_completed(&peer_id, protocol);
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
@@ -514,15 +566,20 @@ where
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune.
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
if let Some(response_limiter) = self.response_limiter.as_mut() {
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
for response in responses {
self.events.push(ToSwarm::NotifyHandler {
peer_id: response.peer_id,
handler: NotifyHandler::One(response.connection_id),
event: RPCSend::Response(response.substream_id, response.response),
});
}
}
}
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
self.events.push(event)
}
if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) {
self.events.push(event)
}
if !self.events.is_empty() {

View File

@@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder {
lcbootstrap_quota: Option<Quota>,
/// Quota for the LightClientOptimisticUpdate protocol.
lc_optimistic_update_quota: Option<Quota>,
/// Quota for the LightClientOptimisticUpdate protocol.
/// Quota for the LightClientFinalityUpdate protocol.
lc_finality_update_quota: Option<Quota>,
/// Quota for the LightClientUpdatesByRange protocol.
lc_updates_by_range_quota: Option<Quota>,
@@ -275,6 +275,17 @@ impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
}
}
impl<E: EthSpec> RateLimiterItem for (super::RpcResponse<E>, Protocol) {
fn protocol(&self) -> Protocol {
self.1
}
fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 {
// A response chunk consumes one token of the rate limiter.
1
}
}
impl RPCRateLimiter {
pub fn new_with_config(
config: RateLimiterConfig,

View File

@@ -0,0 +1,177 @@
use crate::rpc::config::InboundRateLimiterConfig;
use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr};
use crate::rpc::self_limiter::timestamp_now;
use crate::rpc::{Protocol, RpcResponse, SubstreamId};
use crate::PeerId;
use futures::FutureExt;
use libp2p::swarm::ConnectionId;
use logging::crit;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_util::time::DelayQueue;
use tracing::debug;
use types::{EthSpec, ForkContext};
/// A response that was rate limited or waiting on rate limited responses for the same peer and
/// protocol.
#[derive(Clone)]
pub(super) struct QueuedResponse<E: EthSpec> {
pub peer_id: PeerId,
pub connection_id: ConnectionId,
pub substream_id: SubstreamId,
pub response: RpcResponse<E>,
pub protocol: Protocol,
pub queued_at: Duration,
}
pub(super) struct ResponseLimiter<E: EthSpec> {
/// Rate limiter for our responses.
limiter: RPCRateLimiter,
/// Responses queued for sending. These responses are stored when the response limiter rejects them.
delayed_responses: HashMap<(PeerId, Protocol), VecDeque<QueuedResponse<E>>>,
/// The delay required to allow a peer's outbound response per protocol.
next_response: DelayQueue<(PeerId, Protocol)>,
}
impl<E: EthSpec> ResponseLimiter<E> {
/// Creates a new [`ResponseLimiter`] based on configuration values.
pub fn new(
config: InboundRateLimiterConfig,
fork_context: Arc<ForkContext>,
) -> Result<Self, &'static str> {
Ok(ResponseLimiter {
limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?,
delayed_responses: HashMap::new(),
next_response: DelayQueue::new(),
})
}
/// Checks if the rate limiter allows the response. When not allowed, the response is delayed
/// until it can be sent.
pub fn allows(
&mut self,
peer_id: PeerId,
protocol: Protocol,
connection_id: ConnectionId,
substream_id: SubstreamId,
response: RpcResponse<E>,
) -> bool {
// First check that there are not already other responses waiting to be sent.
if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) {
debug!(%peer_id, %protocol, "Response rate limiting since there are already other responses waiting to be sent");
queue.push_back(QueuedResponse {
peer_id,
connection_id,
substream_id,
response,
protocol,
queued_at: timestamp_now(),
});
return false;
}
if let Err(wait_time) =
Self::try_limiter(&mut self.limiter, peer_id, response.clone(), protocol)
{
self.delayed_responses
.entry((peer_id, protocol))
.or_default()
.push_back(QueuedResponse {
peer_id,
connection_id,
substream_id,
response,
protocol,
queued_at: timestamp_now(),
});
self.next_response.insert((peer_id, protocol), wait_time);
return false;
}
true
}
/// Checks if the limiter allows the response. If the response should be delayed, the duration
/// to wait is returned.
fn try_limiter(
limiter: &mut RPCRateLimiter,
peer_id: PeerId,
response: RpcResponse<E>,
protocol: Protocol,
) -> Result<(), Duration> {
match limiter.allows(&peer_id, &(response.clone(), protocol)) {
Ok(()) => Ok(()),
Err(e) => match e {
RateLimitedErr::TooLarge => {
// This should never happen with default parameters. Let's just send the response.
// Log a crit since this is a config issue.
crit!(
%protocol,
"Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."
);
Ok(())
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(%peer_id, %protocol, wait_time_ms = wait_time.as_millis(), "Response rate limiting");
Err(wait_time)
}
},
}
}
/// Informs the limiter that a peer has disconnected. This removes any pending responses.
pub fn peer_disconnected(&mut self, peer_id: PeerId) {
self.delayed_responses
.retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id);
}
/// When a peer and protocol are allowed to send a next response, this function checks the
/// queued responses and attempts marking as ready as many as the limiter allows.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Vec<QueuedResponse<E>>> {
let mut responses = vec![];
while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) {
let queue = entry.get_mut();
// Take delayed responses from the queue, as long as the limiter allows it.
while let Some(response) = queue.pop_front() {
match Self::try_limiter(
&mut self.limiter,
response.peer_id,
response.response.clone(),
response.protocol,
) {
Ok(()) => {
metrics::observe_duration(
&crate::metrics::RESPONSE_IDLING,
timestamp_now().saturating_sub(response.queued_at),
);
responses.push(response)
}
Err(wait_time) => {
// The response was taken from the queue, but the limiter didn't allow it.
queue.push_front(response);
self.next_response.insert((peer_id, protocol), wait_time);
break;
}
}
}
if queue.is_empty() {
entry.remove();
}
}
}
// Prune the rate limiter.
let _ = self.limiter.poll_unpin(cx);
if !responses.is_empty() {
return Poll::Ready(responses);
}
Poll::Pending
}
}

View File

@@ -1,3 +1,10 @@
use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS,
};
use crate::rpc::rate_limiter::RateLimiterItem;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
sync::Arc,
@@ -13,30 +20,31 @@ use tokio_util::time::DelayQueue;
use tracing::debug;
use types::{EthSpec, ForkContext};
use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
};
/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, E: EthSpec> {
req: RequestType<E>,
request_id: Id,
queued_at: Duration,
}
/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue.
const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100;
#[allow(clippy::type_complexity)]
pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// Active requests that are awaiting a response.
active_requests: HashMap<PeerId, HashMap<Protocol, usize>>,
/// Requests queued for sending per peer. These requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
limiter: RateLimiter,
rate_limiter: Option<RateLimiter>,
/// Requests that are ready to be sent.
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>); 3]>,
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>, Duration); 3]>,
}
/// Error returned when the rate limiter does not accept a request.
@@ -49,18 +57,23 @@ pub enum Error {
}
impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Creates a new [`SelfRateLimiter`] based on configration values.
/// Creates a new [`SelfRateLimiter`] based on configuration values.
pub fn new(
config: OutboundRateLimiterConfig,
config: Option<OutboundRateLimiterConfig>,
fork_context: Arc<ForkContext>,
) -> Result<Self, &'static str> {
debug!(?config, "Using self rate limiting params");
let limiter = RateLimiter::new_with_config(config.0, fork_context)?;
let rate_limiter = if let Some(c) = config {
Some(RateLimiter::new_with_config(c.0, fork_context)?)
} else {
None
};
Ok(SelfRateLimiter {
active_requests: Default::default(),
delayed_requests: Default::default(),
next_peer_request: Default::default(),
limiter,
rate_limiter,
ready_requests: Default::default(),
})
}
@@ -77,11 +90,21 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
queued_requests.push_back(QueuedRequest { req, request_id });
debug!(%peer_id, protocol = %req.protocol(), "Self rate limiting since there are already other requests waiting to be sent");
queued_requests.push_back(QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
});
return Err(Error::PendingRequests);
}
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
match Self::try_send_request(
&mut self.active_requests,
&mut self.rate_limiter,
peer_id,
request_id,
req,
) {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
@@ -99,33 +122,71 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
/// request, the [`ToSwarm`] that should be emitted is returned. If the request
/// should be delayed, it's returned with the duration to wait.
#[allow(clippy::result_large_err)]
fn try_send_request(
limiter: &mut RateLimiter,
active_requests: &mut HashMap<PeerId, HashMap<Protocol, usize>>,
rate_limiter: &mut Option<RateLimiter>,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
) -> Result<RPCSend<Id, E>, (QueuedRequest<Id, E>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(RPCSend::Request(request_id, req)),
Err(e) => {
let protocol = req.versioned_protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
// Log a crit since this is a config issue.
crit!(
protocol = %req.versioned_protocol().protocol(),
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."
);
Ok(RPCSend::Request(request_id, req))
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
Err((QueuedRequest { req, request_id }, wait_time))
if let Some(active_request) = active_requests.get(&peer_id) {
if let Some(count) = active_request.get(&req.protocol()) {
if *count >= MAX_CONCURRENT_REQUESTS {
debug!(
%peer_id,
protocol = %req.protocol(),
"Self rate limiting due to the number of concurrent requests"
);
return Err((
QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
},
Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS),
));
}
}
}
if let Some(limiter) = rate_limiter.as_mut() {
match limiter.allows(&peer_id, &req) {
Ok(()) => {}
Err(e) => {
let protocol = req.versioned_protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
// Log a crit since this is a config issue.
crit!(
protocol = %req.versioned_protocol().protocol(),
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.",
);
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
return Err((
QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
},
wait_time,
));
}
}
}
}
}
*active_requests
.entry(peer_id)
.or_default()
.entry(req.protocol())
.or_default() += 1;
Ok(RPCSend::Request(request_id, req))
}
/// When a peer and protocol are allowed to send a next request, this function checks the
@@ -133,16 +194,32 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
let queued_requests = entry.get_mut();
while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
Err((rate_limited_req, wait_time)) => {
while let Some(QueuedRequest {
req,
request_id,
queued_at,
}) = queued_requests.pop_front()
{
match Self::try_send_request(
&mut self.active_requests,
&mut self.rate_limiter,
peer_id,
request_id,
req.clone(),
) {
Err((_rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
queued_requests.push_front(rate_limited_req);
// Don't push `rate_limited_req` here to prevent `queued_at` from being updated.
queued_requests.push_front(QueuedRequest {
req,
request_id,
queued_at,
});
// If one fails just wait for the next window that allows sending requests.
return;
}
Ok(event) => self.ready_requests.push((peer_id, event)),
Ok(event) => self.ready_requests.push((peer_id, event, queued_at)),
}
}
if queued_requests.is_empty() {
@@ -156,6 +233,8 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
self.active_requests.remove(&peer_id);
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
@@ -177,19 +256,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
failed_requests
}
/// Informs the limiter that a response has been received.
pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) {
if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
if let Entry::Occupied(mut entry) = active_requests.entry(protocol) {
if *entry.get() > 1 {
*entry.get_mut() -= 1;
} else {
entry.remove();
}
}
}
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
// First check the requests that were self rate limited, since those might add events to
// the queue. Also do this this before rate limiter prunning to avoid removing and
// the queue. Also do this before rate limiter pruning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}
// Prune the rate limiter.
let _ = self.limiter.poll_unpin(cx);
if let Some(limiter) = self.rate_limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
}
// Finally return any queued events.
if let Some((peer_id, event)) = self.ready_requests.pop() {
if let Some((peer_id, event, queued_at)) = self.ready_requests.pop() {
metrics::observe_duration(
&crate::metrics::OUTBOUND_REQUEST_IDLING,
timestamp_now().saturating_sub(queued_at),
);
return Poll::Ready(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
@@ -201,12 +300,19 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
}
}
/// Returns the duration since the unix epoch.
pub fn timestamp_now() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}
#[cfg(test)]
mod tests {
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::rpc::{Ping, Protocol, RPCSend, RequestType};
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use logging::create_test_tracing_subscriber;
@@ -227,7 +333,7 @@ mod tests {
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, fork_context).unwrap();
SelfRateLimiter::new(Some(config), fork_context).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;
@@ -290,4 +396,149 @@ mod tests {
assert_eq!(limiter.ready_requests.len(), 1);
}
}
/// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting.
#[tokio::test]
async fn test_next_peer_request_ready_concurrent_requests() {
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(None, fork_context).unwrap();
let peer_id = PeerId::random();
for i in 1..=5u32 {
let result = limiter.allows(
peer_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id: i,
req_id: i,
},
}),
RequestType::Ping(Ping { data: i as u64 }),
);
// Check that the limiter allows the first two requests.
if i <= 2 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
// The delayed requests remain even after the next_peer_request_ready call because the responses have not been received.
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(2, queue.len());
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping));
assert!(queue.is_none());
// Check that the three delayed requests have moved to ready_requests.
let mut it = limiter.ready_requests.iter();
for i in 3..=5u32 {
let (_peer_id, RPCSend::Request(request_id, _), _) = it.next().unwrap() else {
unreachable!()
};
assert!(matches!(
request_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
}) if *req_id == i
));
}
}
#[tokio::test]
async fn test_peer_disconnected() {
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(None, fork_context).unwrap();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
for peer in [peer1, peer2] {
for i in 1..=5u32 {
let result = limiter.allows(
peer,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id: i,
req_id: i,
},
}),
RequestType::Ping(Ping { data: i as u64 }),
);
// Check that the limiter allows the first two requests.
if i <= 2 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
}
assert!(limiter.active_requests.contains_key(&peer1));
assert!(limiter
.delayed_requests
.contains_key(&(peer1, Protocol::Ping)));
assert!(limiter.active_requests.contains_key(&peer2));
assert!(limiter
.delayed_requests
.contains_key(&(peer2, Protocol::Ping)));
// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
let mut failed_requests = limiter.peer_disconnected(peer1);
for i in 3..=5u32 {
let (request_id, _) = failed_requests.remove(0);
assert!(matches!(
request_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
}) if req_id == i
));
}
// Check that peer1s active and delayed requests have been removed.
assert!(!limiter.active_requests.contains_key(&peer1));
assert!(!limiter
.delayed_requests
.contains_key(&(peer1, Protocol::Ping)));
assert!(limiter.active_requests.contains_key(&peer2));
assert!(limiter
.delayed_requests
.contains_key(&(peer2, Protocol::Ping)));
}
}

View File

@@ -16,6 +16,7 @@ use types::{
type E = MinimalEthSpec;
use lighthouse_network::rpc::config::InboundRateLimiterConfig;
use tempfile::Builder as TempBuilder;
/// Returns a dummy fork context
@@ -77,7 +78,11 @@ pub fn build_tracing_subscriber(level: &str, enabled: bool) {
}
}
pub fn build_config(mut boot_nodes: Vec<Enr>) -> Arc<NetworkConfig> {
pub fn build_config(
mut boot_nodes: Vec<Enr>,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> Arc<NetworkConfig> {
let mut config = NetworkConfig::default();
// Find unused ports by using the 0 port.
@@ -93,6 +98,8 @@ pub fn build_config(mut boot_nodes: Vec<Enr>) -> Arc<NetworkConfig> {
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
config.boot_nodes_enr.append(&mut boot_nodes);
config.network_dir = path.into_path();
config.disable_peer_scoring = disable_peer_scoring;
config.inbound_rate_limiter_config = inbound_rate_limiter;
Arc::new(config)
}
@@ -102,8 +109,10 @@ pub async fn build_libp2p_instance(
fork_name: ForkName,
chain_spec: Arc<ChainSpec>,
service_name: String,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> Libp2pInstance {
let config = build_config(boot_nodes);
let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter);
// launch libp2p service
let (signal, exit) = async_channel::bounded(1);
@@ -144,6 +153,8 @@ pub async fn build_node_pair(
fork_name: ForkName,
spec: Arc<ChainSpec>,
protocol: Protocol,
disable_peer_scoring: bool,
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
) -> (Libp2pInstance, Libp2pInstance) {
let mut sender = build_libp2p_instance(
rt.clone(),
@@ -151,10 +162,20 @@ pub async fn build_node_pair(
fork_name,
spec.clone(),
"sender".to_string(),
disable_peer_scoring,
inbound_rate_limiter.clone(),
)
.await;
let mut receiver = build_libp2p_instance(
rt,
vec![],
fork_name,
spec.clone(),
"receiver".to_string(),
disable_peer_scoring,
inbound_rate_limiter,
)
.await;
let mut receiver =
build_libp2p_instance(rt, vec![], fork_name, spec.clone(), "receiver".to_string()).await;
// let the two nodes set up listeners
let sender_fut = async {
@@ -235,6 +256,8 @@ pub async fn build_linear(
fork_name,
spec.clone(),
"linear".to_string(),
false,
None,
)
.await,
);

View File

@@ -9,10 +9,10 @@ use lighthouse_network::{NetworkEvent, ReportSource, Response};
use ssz::Encode;
use ssz_types::VariableList;
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::runtime::Runtime;
use tokio::time::sleep;
use tracing::{debug, warn};
use tracing::{debug, error, warn};
use types::{
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec,
@@ -64,8 +64,15 @@ fn test_tcp_status_rpc() {
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, Protocol::Tcp).await;
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
Protocol::Tcp,
false,
None,
)
.await;
// Dummy STATUS RPC message
let rpc_request = RequestType::Status(StatusMessage {
@@ -168,6 +175,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -311,6 +320,8 @@ fn test_blobs_by_range_chunked_rpc() {
ForkName::Deneb,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -430,6 +441,8 @@ fn test_tcp_blocks_by_range_over_limit() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -533,6 +546,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -665,6 +680,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -785,6 +802,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
ForkName::Bellatrix,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -929,6 +948,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
ForkName::Base,
spec.clone(),
Protocol::Tcp,
false,
None,
)
.await;
@@ -1065,8 +1086,15 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) {
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) =
common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, protocol).await;
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
protocol,
false,
None,
)
.await;
// build the sender future
let sender_future = async {
@@ -1127,3 +1155,239 @@ fn quic_test_goodbye_rpc() {
let enabled_logging = false;
goodbye_test(log_level, enabled_logging, Protocol::Quic);
}
// Test that the receiver delays the responses during response rate-limiting.
#[test]
fn test_delayed_rpc_response() {
let rt = Arc::new(Runtime::new().unwrap());
let spec = Arc::new(E::default_spec());
// Allow 1 token to be use used every 3 seconds.
const QUOTA_SEC: u64 = 3;
rt.block_on(async {
// get sender/receiver
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
Protocol::Tcp,
false,
// Configure a quota for STATUS responses of 1 token every 3 seconds.
Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()),
)
.await;
// Dummy STATUS RPC message
let rpc_request = RequestType::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});
// Dummy STATUS RPC message
let rpc_response = Response::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});
// build the sender future
let sender_future = async {
let mut request_id = 1;
let mut request_sent_at = Instant::now();
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
debug!(%request_id, "Sending RPC request");
sender
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
request_sent_at = Instant::now();
}
NetworkEvent::ResponseReceived {
peer_id,
app_request_id: _,
response,
} => {
debug!(%request_id, "Sender received");
assert_eq!(response, rpc_response);
match request_id {
1 => {
// The first response is returned instantly.
assert!(request_sent_at.elapsed() < Duration::from_millis(100));
}
2..=5 => {
// The second and subsequent responses are delayed due to the response rate-limiter on the receiver side.
// Adding a slight margin to the elapsed time check to account for potential timing issues caused by system
// scheduling or execution delays during testing.
assert!(
request_sent_at.elapsed()
> (Duration::from_secs(QUOTA_SEC)
- Duration::from_millis(100))
);
if request_id == 5 {
// End the test
return;
}
}
_ => unreachable!(),
}
request_id += 1;
debug!(%request_id, "Sending RPC request");
sender
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
request_sent_at = Instant::now();
}
NetworkEvent::RPCFailed {
app_request_id: _,
peer_id: _,
error,
} => {
error!(?error, "RPC Failed");
panic!("Rpc failed.");
}
_ => {}
}
}
};
// build the receiver future
let receiver_future = async {
loop {
if let NetworkEvent::RequestReceived {
peer_id,
inbound_request_id,
request_type,
} = receiver.next_event().await
{
assert_eq!(request_type, rpc_request);
debug!("Receiver received request");
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}
// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at
// once, thanks to the self-limiter on the sender side.
#[test]
fn test_active_requests() {
let rt = Arc::new(Runtime::new().unwrap());
let spec = Arc::new(E::default_spec());
rt.block_on(async {
// Get sender/receiver.
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
ForkName::Base,
spec,
Protocol::Tcp,
false,
None,
)
.await;
// Dummy STATUS RPC request.
let rpc_request = RequestType::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
});
// Dummy STATUS RPC response.
let rpc_response = Response::Status(StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::zero(),
finalized_epoch: Epoch::new(1),
head_root: Hash256::zero(),
head_slot: Slot::new(1),
});
// Number of requests.
const REQUESTS: u8 = 10;
// Build the sender future.
let sender_future = async {
let mut response_received = 0;
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
debug!("Sending RPC request");
// Send requests in quick succession to intentionally trigger request queueing in the self-limiter.
for _ in 0..REQUESTS {
sender
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
.unwrap();
}
}
NetworkEvent::ResponseReceived { response, .. } => {
debug!(?response, "Sender received response");
if matches!(response, Response::Status(_)) {
response_received += 1;
}
}
NetworkEvent::RPCFailed {
app_request_id: _,
peer_id: _,
error,
} => panic!("RPC failed: {:?}", error),
_ => {}
}
if response_received == REQUESTS {
return;
}
}
};
// Build the receiver future.
let receiver_future = async {
let mut received_requests = vec![];
loop {
tokio::select! {
event = receiver.next_event() => {
if let NetworkEvent::RequestReceived { peer_id, inbound_request_id, request_type } = event {
debug!(?request_type, "Receiver received request");
if matches!(request_type, RequestType::Status(_)) {
received_requests.push((peer_id, inbound_request_id));
}
}
}
// Introduce a delay in sending responses to trigger request queueing on the sender side.
_ = sleep(Duration::from_secs(3)) => {
for (peer_id, inbound_request_id) in received_requests.drain(..) {
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
}
}
}
}
};
tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}