Delayed RPC Send Using Tokens (#5923)

closes https://github.com/sigp/lighthouse/issues/5785


  The diagram below shows the differences in how the receiver (responder) behaves before and after this PR. The following sentences will detail the changes.

```mermaid
flowchart TD

subgraph "*** After ***"
Start2([START]) --> AA[Receive request]
AA --> COND1{Is there already an active request <br> with the same protocol?}
COND1 --> |Yes| CC[Send error response]
CC --> End2([END])
%% COND1 --> |No| COND2{Request is too large?}
%% COND2 --> |Yes| CC
COND1 --> |No| DD[Process request]
DD --> EE{Rate limit reached?}
EE --> |Yes| FF[Wait until tokens are regenerated]
FF --> EE
EE --> |No| GG[Send response]
GG --> End2
end

subgraph "*** Before ***"
Start([START]) --> A[Receive request]
A --> B{Rate limit reached <br> or <br> request is too large?}
B -->|Yes| C[Send error response]
C --> End([END])
B -->|No| E[Process request]
E --> F[Send response]
F --> End
end
```

### `Is there already an active request with the same protocol?`

This check is not performed in `Before`. This is taken from the PR in the consensus-spec, which proposes updates regarding rate limiting and response timeout.
https://github.com/ethereum/consensus-specs/pull/3767/files
> The requester MUST NOT make more than two concurrent requests with the same ID.

The PR mentions the requester side. In this PR, I introduced the `ActiveRequestsLimiter` for the `responder` side to restrict more than two requests from running simultaneously on the same protocol per peer. If the limiter disallows a request, the responder sends a rate-limited error and penalizes the requester.



### `Rate limit reached?` and `Wait until tokens are regenerated`

UPDATE: I moved the limiter logic to the behaviour side. https://github.com/sigp/lighthouse/pull/5923#issuecomment-2379535927

~~The rate limiter is shared between the behaviour and the handler.  (`Arc<Mutex<RateLimiter>>>`) The handler checks the rate limit and queues the response if the limit is reached. The behaviour handles pruning.~~

~~I considered not sharing the rate limiter between the behaviour and the handler, and performing all of these either within the behaviour or handler. However, I decided against this for the following reasons:~~

- ~~Regarding performing everything within the behaviour: The behaviour is unable to recognize the response protocol when `RPC::send_response()` is called, especially when the response is `RPCCodedResponse::Error`. Therefore, the behaviour can't rate limit responses based on the response protocol.~~
- ~~Regarding performing everything within the handler: When multiple connections are established with a peer, there could be multiple handlers interacting with that peer. Thus, we cannot enforce rate limiting per peer solely within the handler. (Any ideas? 🤔 )~~
This commit is contained in:
Akihito Nakano
2025-04-24 12:46:16 +09:00
committed by GitHub
parent 402a81cdd7
commit 1324d3d3c4
9 changed files with 976 additions and 163 deletions

View File

@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing.
use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::transport::PortUse;
use libp2p::swarm::{
@@ -13,13 +12,12 @@ use libp2p::swarm::{
};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use logging::crit;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, instrument, trace};
use tracing::{debug, error, instrument, trace};
use types::{EthSpec, ForkContext};
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -28,6 +26,11 @@ pub(crate) use methods::{
};
pub use protocol::RequestType;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
@@ -35,10 +38,6 @@ pub use methods::{
};
pub use protocol::{Protocol, RPCError};
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec;
pub mod config;
mod handler;
@@ -46,8 +45,12 @@ pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
mod response_limiter;
mod self_limiter;
// Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2;
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
@@ -144,10 +147,12 @@ pub struct NetworkParams {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<Id: ReqId, E: EthSpec> {
/// Rate limiter
limiter: Option<RateLimiter>,
/// Rate limiter for our responses.
response_limiter: Option<ResponseLimiter<E>>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, E>>,
outbound_request_limiter: SelfRateLimiter<Id, E>,
/// Active inbound requests that are awaiting a response.
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
/// Queue of events to be processed.
events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>,
@@ -173,20 +178,20 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
network_params: NetworkParams,
seq_number: u64,
) -> Self {
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using inbound rate limiting params");
RateLimiter::new_with_config(config.0, fork_context.clone())
let response_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using response rate limiting params");
ResponseLimiter::new(config, fork_context.clone())
.expect("Inbound limiter configuration parameters are valid")
});
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, fork_context.clone())
.expect("Configuration parameters are valid")
});
let outbound_request_limiter: SelfRateLimiter<Id, E> =
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
.expect("Outbound limiter configuration parameters are valid");
RPC {
limiter: inbound_limiter,
self_limiter,
response_limiter,
outbound_request_limiter,
active_inbound_requests: HashMap::new(),
events: Vec::new(),
fork_context,
enable_light_client_server,
@@ -210,6 +215,44 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
else {
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
return;
};
// Add the request back to active requests if the response is `Success` and requires stream
// termination.
if request_type.protocol().terminator().is_some()
&& matches!(response, RpcResponse::Success(_))
{
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
}
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
}
fn send_response_inner(
&mut self,
peer_id: PeerId,
protocol: Protocol,
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
if let Some(response_limiter) = self.response_limiter.as_mut() {
if !response_limiter.allows(
peer_id,
protocol,
request_id.connection_id,
request_id.substream_id,
response.clone(),
) {
// Response is logged and queued internally in the response limiter.
return;
}
}
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(request_id.connection_id),
@@ -227,23 +270,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
skip_all
)]
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
return;
}
match self
.outbound_request_limiter
.allows(peer_id, request_id, req)
{
Ok(event) => self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
}),
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
}
} else {
RPCSend::Request(request_id, req)
};
self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
});
}
}
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
@@ -373,20 +412,27 @@ where
if remaining_established > 0 {
return;
}
// Get a list of pending requests from the self rate limiter
if let Some(limiter) = self.self_limiter.as_mut() {
for (id, proto) in limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
self.active_inbound_requests.retain(
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
);
if let Some(limiter) = self.response_limiter.as_mut() {
limiter.peer_disconnected(peer_id);
}
// Replace the pending Requests to the disconnected peer
@@ -420,57 +466,39 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, &request_type) {
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = request_type.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::DataColumnsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
| Protocol::DataColumnsByRoot
) {
debug!(request = %request_type, %protocol, "Request too large to process");
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(%protocol, "Request size too large to ever be processed");
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
"Rate limited. Request too large".into(),
),
);
return;
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
return;
}
// No rate limiting, continue.
Ok(()) => {}
}
let is_concurrent_request_limit_exceeded = self
.active_inbound_requests
.iter()
.filter(
|(_inbound_request_id, (request_peer_id, active_request_type))| {
*request_peer_id == peer_id
&& active_request_type.protocol() == request_type.protocol()
},
)
.count()
>= MAX_CONCURRENT_REQUESTS;
// Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
if is_concurrent_request_limit_exceeded {
// There is already an active request with the same protocol. Send an error code to the peer.
debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol");
self.send_response_inner(
peer_id,
request_type.protocol(),
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol")
.into(),
),
);
return;
}
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
// If we received a Ping, we queue a Pong response.
if let RequestType::Ping(_) = request_type {
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
@@ -489,14 +517,38 @@ where
message: Ok(RPCReceived::Request(request_id, request_type)),
}));
}
HandlerEvent::Ok(rpc) => {
HandlerEvent::Ok(RPCReceived::Response(id, response)) => {
if response.protocol().terminator().is_none() {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response.protocol());
}
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(rpc),
message: Ok(RPCReceived::Response(id, response)),
}));
}
HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response_termination.as_protocol());
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(RPCReceived::EndOfStream(id, response_termination)),
}));
}
HandlerEvent::Err(err) => {
// Inform the limiter that the request has ended with an error.
let protocol = match err {
HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto,
};
self.outbound_request_limiter
.request_completed(&peer_id, protocol);
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
@@ -514,15 +566,20 @@ where
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune.
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
if let Some(response_limiter) = self.response_limiter.as_mut() {
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
for response in responses {
self.events.push(ToSwarm::NotifyHandler {
peer_id: response.peer_id,
handler: NotifyHandler::One(response.connection_id),
event: RPCSend::Response(response.substream_id, response.response),
});
}
}
}
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
self.events.push(event)
}
if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) {
self.events.push(event)
}
if !self.events.is_empty() {