mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Delayed RPC Send Using Tokens (#5923)
closes https://github.com/sigp/lighthouse/issues/5785 The diagram below shows the differences in how the receiver (responder) behaves before and after this PR. The following sentences will detail the changes. ```mermaid flowchart TD subgraph "*** After ***" Start2([START]) --> AA[Receive request] AA --> COND1{Is there already an active request <br> with the same protocol?} COND1 --> |Yes| CC[Send error response] CC --> End2([END]) %% COND1 --> |No| COND2{Request is too large?} %% COND2 --> |Yes| CC COND1 --> |No| DD[Process request] DD --> EE{Rate limit reached?} EE --> |Yes| FF[Wait until tokens are regenerated] FF --> EE EE --> |No| GG[Send response] GG --> End2 end subgraph "*** Before ***" Start([START]) --> A[Receive request] A --> B{Rate limit reached <br> or <br> request is too large?} B -->|Yes| C[Send error response] C --> End([END]) B -->|No| E[Process request] E --> F[Send response] F --> End end ``` ### `Is there already an active request with the same protocol?` This check is not performed in `Before`. This is taken from the PR in the consensus-spec, which proposes updates regarding rate limiting and response timeout. https://github.com/ethereum/consensus-specs/pull/3767/files > The requester MUST NOT make more than two concurrent requests with the same ID. The PR mentions the requester side. In this PR, I introduced the `ActiveRequestsLimiter` for the `responder` side to restrict more than two requests from running simultaneously on the same protocol per peer. If the limiter disallows a request, the responder sends a rate-limited error and penalizes the requester. ### `Rate limit reached?` and `Wait until tokens are regenerated` UPDATE: I moved the limiter logic to the behaviour side. https://github.com/sigp/lighthouse/pull/5923#issuecomment-2379535927 ~~The rate limiter is shared between the behaviour and the handler. (`Arc<Mutex<RateLimiter>>>`) The handler checks the rate limit and queues the response if the limit is reached. The behaviour handles pruning.~~ ~~I considered not sharing the rate limiter between the behaviour and the handler, and performing all of these either within the behaviour or handler. However, I decided against this for the following reasons:~~ - ~~Regarding performing everything within the behaviour: The behaviour is unable to recognize the response protocol when `RPC::send_response()` is called, especially when the response is `RPCCodedResponse::Error`. Therefore, the behaviour can't rate limit responses based on the response protocol.~~ - ~~Regarding performing everything within the handler: When multiple connections are established with a peer, there could be multiple handlers interacting with that peer. Thus, we cannot enforce rate limiting per peer solely within the handler. (Any ideas? 🤔 )~~
This commit is contained in:
@@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock<Result<IntCounterVec>> = LazyLock::new(||
|
||||
)
|
||||
});
|
||||
|
||||
pub static OUTBOUND_REQUEST_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"outbound_request_idling_seconds",
|
||||
"The time our own request remained idle in the self-limiter",
|
||||
)
|
||||
});
|
||||
|
||||
pub static RESPONSE_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
"response_idling_seconds",
|
||||
"The time our response remained idle in the response limiter",
|
||||
)
|
||||
});
|
||||
|
||||
pub fn scrape_discovery_metrics() {
|
||||
let metrics =
|
||||
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
|
||||
|
||||
@@ -141,7 +141,7 @@ where
|
||||
/// Waker, to be sure the handler gets polled when needed.
|
||||
waker: Option<std::task::Waker>,
|
||||
|
||||
/// Timeout that will me used for inbound and outbound responses.
|
||||
/// Timeout that will be used for inbound and outbound responses.
|
||||
resp_timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -314,6 +314,7 @@ where
|
||||
}
|
||||
return;
|
||||
};
|
||||
|
||||
// If the response we are sending is an error, report back for handling
|
||||
if let RpcResponse::Error(ref code, ref reason) = response {
|
||||
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
|
||||
@@ -331,6 +332,7 @@ where
|
||||
"Response not sent. Deactivated handler");
|
||||
return;
|
||||
}
|
||||
|
||||
inbound_info.pending_items.push_back(response);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -606,6 +606,20 @@ pub enum ResponseTermination {
|
||||
LightClientUpdatesByRange,
|
||||
}
|
||||
|
||||
impl ResponseTermination {
|
||||
pub fn as_protocol(&self) -> Protocol {
|
||||
match self {
|
||||
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
|
||||
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
|
||||
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
|
||||
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
|
||||
ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
|
||||
ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange,
|
||||
ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The structured response containing a result/code indicating success or failure
|
||||
/// and the contents of the response
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
//! direct peer-to-peer communication primarily for sending/receiving chain information for
|
||||
//! syncing.
|
||||
|
||||
use futures::future::FutureExt;
|
||||
use handler::RPCHandler;
|
||||
use libp2p::core::transport::PortUse;
|
||||
use libp2p::swarm::{
|
||||
@@ -13,13 +12,12 @@ use libp2p::swarm::{
|
||||
};
|
||||
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
|
||||
use libp2p::PeerId;
|
||||
use logging::crit;
|
||||
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, instrument, trace};
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
use types::{EthSpec, ForkContext};
|
||||
|
||||
pub(crate) use handler::{HandlerErr, HandlerEvent};
|
||||
@@ -28,6 +26,11 @@ pub(crate) use methods::{
|
||||
};
|
||||
pub use protocol::RequestType;
|
||||
|
||||
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
|
||||
use self::protocol::RPCProtocol;
|
||||
use self::self_limiter::SelfRateLimiter;
|
||||
use crate::rpc::rate_limiter::RateLimiterItem;
|
||||
use crate::rpc::response_limiter::ResponseLimiter;
|
||||
pub use handler::SubstreamId;
|
||||
pub use methods::{
|
||||
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
|
||||
@@ -35,10 +38,6 @@ pub use methods::{
|
||||
};
|
||||
pub use protocol::{Protocol, RPCError};
|
||||
|
||||
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
|
||||
use self::protocol::RPCProtocol;
|
||||
use self::self_limiter::SelfRateLimiter;
|
||||
|
||||
pub(crate) mod codec;
|
||||
pub mod config;
|
||||
mod handler;
|
||||
@@ -46,8 +45,12 @@ pub mod methods;
|
||||
mod outbound;
|
||||
mod protocol;
|
||||
mod rate_limiter;
|
||||
mod response_limiter;
|
||||
mod self_limiter;
|
||||
|
||||
// Maximum number of concurrent requests per protocol ID that a client may issue.
|
||||
const MAX_CONCURRENT_REQUESTS: usize = 2;
|
||||
|
||||
/// Composite trait for a request id.
|
||||
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
|
||||
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
|
||||
@@ -144,10 +147,12 @@ pub struct NetworkParams {
|
||||
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
|
||||
/// logic.
|
||||
pub struct RPC<Id: ReqId, E: EthSpec> {
|
||||
/// Rate limiter
|
||||
limiter: Option<RateLimiter>,
|
||||
/// Rate limiter for our responses.
|
||||
response_limiter: Option<ResponseLimiter<E>>,
|
||||
/// Rate limiter for our own requests.
|
||||
self_limiter: Option<SelfRateLimiter<Id, E>>,
|
||||
outbound_request_limiter: SelfRateLimiter<Id, E>,
|
||||
/// Active inbound requests that are awaiting a response.
|
||||
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
|
||||
/// Queue of events to be processed.
|
||||
events: Vec<BehaviourAction<Id, E>>,
|
||||
fork_context: Arc<ForkContext>,
|
||||
@@ -173,20 +178,20 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
network_params: NetworkParams,
|
||||
seq_number: u64,
|
||||
) -> Self {
|
||||
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
|
||||
debug!(?config, "Using inbound rate limiting params");
|
||||
RateLimiter::new_with_config(config.0, fork_context.clone())
|
||||
let response_limiter = inbound_rate_limiter_config.map(|config| {
|
||||
debug!(?config, "Using response rate limiting params");
|
||||
ResponseLimiter::new(config, fork_context.clone())
|
||||
.expect("Inbound limiter configuration parameters are valid")
|
||||
});
|
||||
|
||||
let self_limiter = outbound_rate_limiter_config.map(|config| {
|
||||
SelfRateLimiter::new(config, fork_context.clone())
|
||||
.expect("Configuration parameters are valid")
|
||||
});
|
||||
let outbound_request_limiter: SelfRateLimiter<Id, E> =
|
||||
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
|
||||
.expect("Outbound limiter configuration parameters are valid");
|
||||
|
||||
RPC {
|
||||
limiter: inbound_limiter,
|
||||
self_limiter,
|
||||
response_limiter,
|
||||
outbound_request_limiter,
|
||||
active_inbound_requests: HashMap::new(),
|
||||
events: Vec::new(),
|
||||
fork_context,
|
||||
enable_light_client_server,
|
||||
@@ -210,6 +215,44 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
request_id: InboundRequestId,
|
||||
response: RpcResponse<E>,
|
||||
) {
|
||||
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
|
||||
else {
|
||||
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
|
||||
return;
|
||||
};
|
||||
|
||||
// Add the request back to active requests if the response is `Success` and requires stream
|
||||
// termination.
|
||||
if request_type.protocol().terminator().is_some()
|
||||
&& matches!(response, RpcResponse::Success(_))
|
||||
{
|
||||
self.active_inbound_requests
|
||||
.insert(request_id, (peer_id, request_type.clone()));
|
||||
}
|
||||
|
||||
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
|
||||
}
|
||||
|
||||
fn send_response_inner(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
protocol: Protocol,
|
||||
request_id: InboundRequestId,
|
||||
response: RpcResponse<E>,
|
||||
) {
|
||||
if let Some(response_limiter) = self.response_limiter.as_mut() {
|
||||
if !response_limiter.allows(
|
||||
peer_id,
|
||||
protocol,
|
||||
request_id.connection_id,
|
||||
request_id.substream_id,
|
||||
response.clone(),
|
||||
) {
|
||||
// Response is logged and queued internally in the response limiter.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.events.push(ToSwarm::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::One(request_id.connection_id),
|
||||
@@ -227,23 +270,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
|
||||
skip_all
|
||||
)]
|
||||
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
|
||||
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
|
||||
match self_limiter.allows(peer_id, request_id, req) {
|
||||
Ok(event) => event,
|
||||
Err(_e) => {
|
||||
// Request is logged and queued internally in the self rate limiter.
|
||||
return;
|
||||
}
|
||||
match self
|
||||
.outbound_request_limiter
|
||||
.allows(peer_id, request_id, req)
|
||||
{
|
||||
Ok(event) => self.events.push(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event,
|
||||
}),
|
||||
Err(_e) => {
|
||||
// Request is logged and queued internally in the self rate limiter.
|
||||
}
|
||||
} else {
|
||||
RPCSend::Request(request_id, req)
|
||||
};
|
||||
|
||||
self.events.push(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
event,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
|
||||
@@ -373,20 +412,27 @@ where
|
||||
if remaining_established > 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get a list of pending requests from the self rate limiter
|
||||
if let Some(limiter) = self.self_limiter.as_mut() {
|
||||
for (id, proto) in limiter.peer_disconnected(peer_id) {
|
||||
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
connection_id,
|
||||
message: Err(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
error: RPCError::Disconnected,
|
||||
}),
|
||||
});
|
||||
self.events.push(error_msg);
|
||||
}
|
||||
for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) {
|
||||
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
connection_id,
|
||||
message: Err(HandlerErr::Outbound {
|
||||
id,
|
||||
proto,
|
||||
error: RPCError::Disconnected,
|
||||
}),
|
||||
});
|
||||
self.events.push(error_msg);
|
||||
}
|
||||
|
||||
self.active_inbound_requests.retain(
|
||||
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
|
||||
);
|
||||
|
||||
if let Some(limiter) = self.response_limiter.as_mut() {
|
||||
limiter.peer_disconnected(peer_id);
|
||||
}
|
||||
|
||||
// Replace the pending Requests to the disconnected peer
|
||||
@@ -420,57 +466,39 @@ where
|
||||
) {
|
||||
match event {
|
||||
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
|
||||
if let Some(limiter) = self.limiter.as_mut() {
|
||||
// check if the request is conformant to the quota
|
||||
match limiter.allows(&peer_id, &request_type) {
|
||||
Err(RateLimitedErr::TooLarge) => {
|
||||
// we set the batch sizes, so this is a coding/config err for most protocols
|
||||
let protocol = request_type.versioned_protocol().protocol();
|
||||
if matches!(
|
||||
protocol,
|
||||
Protocol::BlocksByRange
|
||||
| Protocol::BlobsByRange
|
||||
| Protocol::DataColumnsByRange
|
||||
| Protocol::BlocksByRoot
|
||||
| Protocol::BlobsByRoot
|
||||
| Protocol::DataColumnsByRoot
|
||||
) {
|
||||
debug!(request = %request_type, %protocol, "Request too large to process");
|
||||
} else {
|
||||
// Other protocols shouldn't be sending large messages, we should flag the peer kind
|
||||
crit!(%protocol, "Request size too large to ever be processed");
|
||||
}
|
||||
// send an error code to the peer.
|
||||
// the handler upon receiving the error code will send it back to the behaviour
|
||||
self.send_response(
|
||||
peer_id,
|
||||
request_id,
|
||||
RpcResponse::Error(
|
||||
RpcErrorResponse::RateLimited,
|
||||
"Rate limited. Request too large".into(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
Err(RateLimitedErr::TooSoon(wait_time)) => {
|
||||
debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
|
||||
// send an error code to the peer.
|
||||
// the handler upon receiving the error code will send it back to the behaviour
|
||||
self.send_response(
|
||||
peer_id,
|
||||
request_id,
|
||||
RpcResponse::Error(
|
||||
RpcErrorResponse::RateLimited,
|
||||
format!("Wait {:?}", wait_time).into(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
// No rate limiting, continue.
|
||||
Ok(()) => {}
|
||||
}
|
||||
let is_concurrent_request_limit_exceeded = self
|
||||
.active_inbound_requests
|
||||
.iter()
|
||||
.filter(
|
||||
|(_inbound_request_id, (request_peer_id, active_request_type))| {
|
||||
*request_peer_id == peer_id
|
||||
&& active_request_type.protocol() == request_type.protocol()
|
||||
},
|
||||
)
|
||||
.count()
|
||||
>= MAX_CONCURRENT_REQUESTS;
|
||||
|
||||
// Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
|
||||
if is_concurrent_request_limit_exceeded {
|
||||
// There is already an active request with the same protocol. Send an error code to the peer.
|
||||
debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol");
|
||||
self.send_response_inner(
|
||||
peer_id,
|
||||
request_type.protocol(),
|
||||
request_id,
|
||||
RpcResponse::Error(
|
||||
RpcErrorResponse::RateLimited,
|
||||
format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol")
|
||||
.into(),
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
|
||||
self.active_inbound_requests
|
||||
.insert(request_id, (peer_id, request_type.clone()));
|
||||
|
||||
// If we received a Ping, we queue a Pong response.
|
||||
if let RequestType::Ping(_) = request_type {
|
||||
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
|
||||
@@ -489,14 +517,38 @@ where
|
||||
message: Ok(RPCReceived::Request(request_id, request_type)),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Ok(rpc) => {
|
||||
HandlerEvent::Ok(RPCReceived::Response(id, response)) => {
|
||||
if response.protocol().terminator().is_none() {
|
||||
// Inform the limiter that a response has been received.
|
||||
self.outbound_request_limiter
|
||||
.request_completed(&peer_id, response.protocol());
|
||||
}
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
connection_id,
|
||||
message: Ok(rpc),
|
||||
message: Ok(RPCReceived::Response(id, response)),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => {
|
||||
// Inform the limiter that a response has been received.
|
||||
self.outbound_request_limiter
|
||||
.request_completed(&peer_id, response_termination.as_protocol());
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
connection_id,
|
||||
message: Ok(RPCReceived::EndOfStream(id, response_termination)),
|
||||
}));
|
||||
}
|
||||
HandlerEvent::Err(err) => {
|
||||
// Inform the limiter that the request has ended with an error.
|
||||
let protocol = match err {
|
||||
HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto,
|
||||
};
|
||||
self.outbound_request_limiter
|
||||
.request_completed(&peer_id, protocol);
|
||||
|
||||
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
|
||||
peer_id,
|
||||
connection_id,
|
||||
@@ -514,15 +566,20 @@ where
|
||||
}
|
||||
|
||||
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
|
||||
// let the rate limiter prune.
|
||||
if let Some(limiter) = self.limiter.as_mut() {
|
||||
let _ = limiter.poll_unpin(cx);
|
||||
if let Some(response_limiter) = self.response_limiter.as_mut() {
|
||||
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
|
||||
for response in responses {
|
||||
self.events.push(ToSwarm::NotifyHandler {
|
||||
peer_id: response.peer_id,
|
||||
handler: NotifyHandler::One(response.connection_id),
|
||||
event: RPCSend::Response(response.substream_id, response.response),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(self_limiter) = self.self_limiter.as_mut() {
|
||||
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
|
||||
self.events.push(event)
|
||||
}
|
||||
if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) {
|
||||
self.events.push(event)
|
||||
}
|
||||
|
||||
if !self.events.is_empty() {
|
||||
|
||||
@@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder {
|
||||
lcbootstrap_quota: Option<Quota>,
|
||||
/// Quota for the LightClientOptimisticUpdate protocol.
|
||||
lc_optimistic_update_quota: Option<Quota>,
|
||||
/// Quota for the LightClientOptimisticUpdate protocol.
|
||||
/// Quota for the LightClientFinalityUpdate protocol.
|
||||
lc_finality_update_quota: Option<Quota>,
|
||||
/// Quota for the LightClientUpdatesByRange protocol.
|
||||
lc_updates_by_range_quota: Option<Quota>,
|
||||
@@ -275,6 +275,17 @@ impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> RateLimiterItem for (super::RpcResponse<E>, Protocol) {
|
||||
fn protocol(&self) -> Protocol {
|
||||
self.1
|
||||
}
|
||||
|
||||
fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 {
|
||||
// A response chunk consumes one token of the rate limiter.
|
||||
1
|
||||
}
|
||||
}
|
||||
|
||||
impl RPCRateLimiter {
|
||||
pub fn new_with_config(
|
||||
config: RateLimiterConfig,
|
||||
|
||||
177
beacon_node/lighthouse_network/src/rpc/response_limiter.rs
Normal file
177
beacon_node/lighthouse_network/src/rpc/response_limiter.rs
Normal file
@@ -0,0 +1,177 @@
|
||||
use crate::rpc::config::InboundRateLimiterConfig;
|
||||
use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr};
|
||||
use crate::rpc::self_limiter::timestamp_now;
|
||||
use crate::rpc::{Protocol, RpcResponse, SubstreamId};
|
||||
use crate::PeerId;
|
||||
use futures::FutureExt;
|
||||
use libp2p::swarm::ConnectionId;
|
||||
use logging::crit;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use tokio_util::time::DelayQueue;
|
||||
use tracing::debug;
|
||||
use types::{EthSpec, ForkContext};
|
||||
|
||||
/// A response that was rate limited or waiting on rate limited responses for the same peer and
|
||||
/// protocol.
|
||||
#[derive(Clone)]
|
||||
pub(super) struct QueuedResponse<E: EthSpec> {
|
||||
pub peer_id: PeerId,
|
||||
pub connection_id: ConnectionId,
|
||||
pub substream_id: SubstreamId,
|
||||
pub response: RpcResponse<E>,
|
||||
pub protocol: Protocol,
|
||||
pub queued_at: Duration,
|
||||
}
|
||||
|
||||
pub(super) struct ResponseLimiter<E: EthSpec> {
|
||||
/// Rate limiter for our responses.
|
||||
limiter: RPCRateLimiter,
|
||||
/// Responses queued for sending. These responses are stored when the response limiter rejects them.
|
||||
delayed_responses: HashMap<(PeerId, Protocol), VecDeque<QueuedResponse<E>>>,
|
||||
/// The delay required to allow a peer's outbound response per protocol.
|
||||
next_response: DelayQueue<(PeerId, Protocol)>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ResponseLimiter<E> {
|
||||
/// Creates a new [`ResponseLimiter`] based on configuration values.
|
||||
pub fn new(
|
||||
config: InboundRateLimiterConfig,
|
||||
fork_context: Arc<ForkContext>,
|
||||
) -> Result<Self, &'static str> {
|
||||
Ok(ResponseLimiter {
|
||||
limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?,
|
||||
delayed_responses: HashMap::new(),
|
||||
next_response: DelayQueue::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Checks if the rate limiter allows the response. When not allowed, the response is delayed
|
||||
/// until it can be sent.
|
||||
pub fn allows(
|
||||
&mut self,
|
||||
peer_id: PeerId,
|
||||
protocol: Protocol,
|
||||
connection_id: ConnectionId,
|
||||
substream_id: SubstreamId,
|
||||
response: RpcResponse<E>,
|
||||
) -> bool {
|
||||
// First check that there are not already other responses waiting to be sent.
|
||||
if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) {
|
||||
debug!(%peer_id, %protocol, "Response rate limiting since there are already other responses waiting to be sent");
|
||||
queue.push_back(QueuedResponse {
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
response,
|
||||
protocol,
|
||||
queued_at: timestamp_now(),
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Err(wait_time) =
|
||||
Self::try_limiter(&mut self.limiter, peer_id, response.clone(), protocol)
|
||||
{
|
||||
self.delayed_responses
|
||||
.entry((peer_id, protocol))
|
||||
.or_default()
|
||||
.push_back(QueuedResponse {
|
||||
peer_id,
|
||||
connection_id,
|
||||
substream_id,
|
||||
response,
|
||||
protocol,
|
||||
queued_at: timestamp_now(),
|
||||
});
|
||||
self.next_response.insert((peer_id, protocol), wait_time);
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Checks if the limiter allows the response. If the response should be delayed, the duration
|
||||
/// to wait is returned.
|
||||
fn try_limiter(
|
||||
limiter: &mut RPCRateLimiter,
|
||||
peer_id: PeerId,
|
||||
response: RpcResponse<E>,
|
||||
protocol: Protocol,
|
||||
) -> Result<(), Duration> {
|
||||
match limiter.allows(&peer_id, &(response.clone(), protocol)) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => match e {
|
||||
RateLimitedErr::TooLarge => {
|
||||
// This should never happen with default parameters. Let's just send the response.
|
||||
// Log a crit since this is a config issue.
|
||||
crit!(
|
||||
%protocol,
|
||||
"Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
RateLimitedErr::TooSoon(wait_time) => {
|
||||
debug!(%peer_id, %protocol, wait_time_ms = wait_time.as_millis(), "Response rate limiting");
|
||||
Err(wait_time)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Informs the limiter that a peer has disconnected. This removes any pending responses.
|
||||
pub fn peer_disconnected(&mut self, peer_id: PeerId) {
|
||||
self.delayed_responses
|
||||
.retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id);
|
||||
}
|
||||
|
||||
/// When a peer and protocol are allowed to send a next response, this function checks the
|
||||
/// queued responses and attempts marking as ready as many as the limiter allows.
|
||||
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Vec<QueuedResponse<E>>> {
|
||||
let mut responses = vec![];
|
||||
while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) {
|
||||
let (peer_id, protocol) = expired.into_inner();
|
||||
|
||||
if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) {
|
||||
let queue = entry.get_mut();
|
||||
// Take delayed responses from the queue, as long as the limiter allows it.
|
||||
while let Some(response) = queue.pop_front() {
|
||||
match Self::try_limiter(
|
||||
&mut self.limiter,
|
||||
response.peer_id,
|
||||
response.response.clone(),
|
||||
response.protocol,
|
||||
) {
|
||||
Ok(()) => {
|
||||
metrics::observe_duration(
|
||||
&crate::metrics::RESPONSE_IDLING,
|
||||
timestamp_now().saturating_sub(response.queued_at),
|
||||
);
|
||||
responses.push(response)
|
||||
}
|
||||
Err(wait_time) => {
|
||||
// The response was taken from the queue, but the limiter didn't allow it.
|
||||
queue.push_front(response);
|
||||
self.next_response.insert((peer_id, protocol), wait_time);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if queue.is_empty() {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prune the rate limiter.
|
||||
let _ = self.limiter.poll_unpin(cx);
|
||||
|
||||
if !responses.is_empty() {
|
||||
return Poll::Ready(responses);
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,10 @@
|
||||
use super::{
|
||||
config::OutboundRateLimiterConfig,
|
||||
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
|
||||
BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS,
|
||||
};
|
||||
use crate::rpc::rate_limiter::RateLimiterItem;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap, VecDeque},
|
||||
sync::Arc,
|
||||
@@ -13,30 +20,31 @@ use tokio_util::time::DelayQueue;
|
||||
use tracing::debug;
|
||||
use types::{EthSpec, ForkContext};
|
||||
|
||||
use super::{
|
||||
config::OutboundRateLimiterConfig,
|
||||
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
|
||||
BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
|
||||
};
|
||||
|
||||
/// A request that was rate limited or waiting on rate limited requests for the same peer and
|
||||
/// protocol.
|
||||
struct QueuedRequest<Id: ReqId, E: EthSpec> {
|
||||
req: RequestType<E>,
|
||||
request_id: Id,
|
||||
queued_at: Duration,
|
||||
}
|
||||
|
||||
/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue.
|
||||
const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
|
||||
/// Requests queued for sending per peer. This requests are stored when the self rate
|
||||
/// Active requests that are awaiting a response.
|
||||
active_requests: HashMap<PeerId, HashMap<Protocol, usize>>,
|
||||
/// Requests queued for sending per peer. These requests are stored when the self rate
|
||||
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
|
||||
/// are stored in the same way.
|
||||
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
|
||||
/// The delay required to allow a peer's outbound request per protocol.
|
||||
next_peer_request: DelayQueue<(PeerId, Protocol)>,
|
||||
/// Rate limiter for our own requests.
|
||||
limiter: RateLimiter,
|
||||
rate_limiter: Option<RateLimiter>,
|
||||
/// Requests that are ready to be sent.
|
||||
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>); 3]>,
|
||||
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>, Duration); 3]>,
|
||||
}
|
||||
|
||||
/// Error returned when the rate limiter does not accept a request.
|
||||
@@ -49,18 +57,23 @@ pub enum Error {
|
||||
}
|
||||
|
||||
impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
/// Creates a new [`SelfRateLimiter`] based on configration values.
|
||||
/// Creates a new [`SelfRateLimiter`] based on configuration values.
|
||||
pub fn new(
|
||||
config: OutboundRateLimiterConfig,
|
||||
config: Option<OutboundRateLimiterConfig>,
|
||||
fork_context: Arc<ForkContext>,
|
||||
) -> Result<Self, &'static str> {
|
||||
debug!(?config, "Using self rate limiting params");
|
||||
let limiter = RateLimiter::new_with_config(config.0, fork_context)?;
|
||||
let rate_limiter = if let Some(c) = config {
|
||||
Some(RateLimiter::new_with_config(c.0, fork_context)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(SelfRateLimiter {
|
||||
active_requests: Default::default(),
|
||||
delayed_requests: Default::default(),
|
||||
next_peer_request: Default::default(),
|
||||
limiter,
|
||||
rate_limiter,
|
||||
ready_requests: Default::default(),
|
||||
})
|
||||
}
|
||||
@@ -77,11 +90,21 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
let protocol = req.versioned_protocol().protocol();
|
||||
// First check that there are not already other requests waiting to be sent.
|
||||
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
|
||||
queued_requests.push_back(QueuedRequest { req, request_id });
|
||||
|
||||
debug!(%peer_id, protocol = %req.protocol(), "Self rate limiting since there are already other requests waiting to be sent");
|
||||
queued_requests.push_back(QueuedRequest {
|
||||
req,
|
||||
request_id,
|
||||
queued_at: timestamp_now(),
|
||||
});
|
||||
return Err(Error::PendingRequests);
|
||||
}
|
||||
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
|
||||
match Self::try_send_request(
|
||||
&mut self.active_requests,
|
||||
&mut self.rate_limiter,
|
||||
peer_id,
|
||||
request_id,
|
||||
req,
|
||||
) {
|
||||
Err((rate_limited_req, wait_time)) => {
|
||||
let key = (peer_id, protocol);
|
||||
self.next_peer_request.insert(key, wait_time);
|
||||
@@ -99,33 +122,71 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
|
||||
/// request, the [`ToSwarm`] that should be emitted is returned. If the request
|
||||
/// should be delayed, it's returned with the duration to wait.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn try_send_request(
|
||||
limiter: &mut RateLimiter,
|
||||
active_requests: &mut HashMap<PeerId, HashMap<Protocol, usize>>,
|
||||
rate_limiter: &mut Option<RateLimiter>,
|
||||
peer_id: PeerId,
|
||||
request_id: Id,
|
||||
req: RequestType<E>,
|
||||
) -> Result<RPCSend<Id, E>, (QueuedRequest<Id, E>, Duration)> {
|
||||
match limiter.allows(&peer_id, &req) {
|
||||
Ok(()) => Ok(RPCSend::Request(request_id, req)),
|
||||
Err(e) => {
|
||||
let protocol = req.versioned_protocol();
|
||||
match e {
|
||||
RateLimitedErr::TooLarge => {
|
||||
// this should never happen with default parameters. Let's just send the request.
|
||||
// Log a crit since this is a config issue.
|
||||
crit!(
|
||||
protocol = %req.versioned_protocol().protocol(),
|
||||
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."
|
||||
);
|
||||
Ok(RPCSend::Request(request_id, req))
|
||||
}
|
||||
RateLimitedErr::TooSoon(wait_time) => {
|
||||
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
|
||||
Err((QueuedRequest { req, request_id }, wait_time))
|
||||
if let Some(active_request) = active_requests.get(&peer_id) {
|
||||
if let Some(count) = active_request.get(&req.protocol()) {
|
||||
if *count >= MAX_CONCURRENT_REQUESTS {
|
||||
debug!(
|
||||
%peer_id,
|
||||
protocol = %req.protocol(),
|
||||
"Self rate limiting due to the number of concurrent requests"
|
||||
);
|
||||
return Err((
|
||||
QueuedRequest {
|
||||
req,
|
||||
request_id,
|
||||
queued_at: timestamp_now(),
|
||||
},
|
||||
Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(limiter) = rate_limiter.as_mut() {
|
||||
match limiter.allows(&peer_id, &req) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
let protocol = req.versioned_protocol();
|
||||
match e {
|
||||
RateLimitedErr::TooLarge => {
|
||||
// this should never happen with default parameters. Let's just send the request.
|
||||
// Log a crit since this is a config issue.
|
||||
crit!(
|
||||
protocol = %req.versioned_protocol().protocol(),
|
||||
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.",
|
||||
);
|
||||
}
|
||||
RateLimitedErr::TooSoon(wait_time) => {
|
||||
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
|
||||
return Err((
|
||||
QueuedRequest {
|
||||
req,
|
||||
request_id,
|
||||
queued_at: timestamp_now(),
|
||||
},
|
||||
wait_time,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*active_requests
|
||||
.entry(peer_id)
|
||||
.or_default()
|
||||
.entry(req.protocol())
|
||||
.or_default() += 1;
|
||||
|
||||
Ok(RPCSend::Request(request_id, req))
|
||||
}
|
||||
|
||||
/// When a peer and protocol are allowed to send a next request, this function checks the
|
||||
@@ -133,16 +194,32 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
|
||||
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
|
||||
let queued_requests = entry.get_mut();
|
||||
while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
|
||||
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
|
||||
Err((rate_limited_req, wait_time)) => {
|
||||
while let Some(QueuedRequest {
|
||||
req,
|
||||
request_id,
|
||||
queued_at,
|
||||
}) = queued_requests.pop_front()
|
||||
{
|
||||
match Self::try_send_request(
|
||||
&mut self.active_requests,
|
||||
&mut self.rate_limiter,
|
||||
peer_id,
|
||||
request_id,
|
||||
req.clone(),
|
||||
) {
|
||||
Err((_rate_limited_req, wait_time)) => {
|
||||
let key = (peer_id, protocol);
|
||||
self.next_peer_request.insert(key, wait_time);
|
||||
queued_requests.push_front(rate_limited_req);
|
||||
// Don't push `rate_limited_req` here to prevent `queued_at` from being updated.
|
||||
queued_requests.push_front(QueuedRequest {
|
||||
req,
|
||||
request_id,
|
||||
queued_at,
|
||||
});
|
||||
// If one fails just wait for the next window that allows sending requests.
|
||||
return;
|
||||
}
|
||||
Ok(event) => self.ready_requests.push((peer_id, event)),
|
||||
Ok(event) => self.ready_requests.push((peer_id, event, queued_at)),
|
||||
}
|
||||
}
|
||||
if queued_requests.is_empty() {
|
||||
@@ -156,6 +233,8 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
|
||||
/// returns their IDs.
|
||||
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
|
||||
self.active_requests.remove(&peer_id);
|
||||
|
||||
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
|
||||
// should never really be large. So we iterate for simplicity
|
||||
let mut failed_requests = Vec::new();
|
||||
@@ -177,19 +256,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
failed_requests
|
||||
}
|
||||
|
||||
/// Informs the limiter that a response has been received.
|
||||
pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) {
|
||||
if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
|
||||
if let Entry::Occupied(mut entry) = active_requests.entry(protocol) {
|
||||
if *entry.get() > 1 {
|
||||
*entry.get_mut() -= 1;
|
||||
} else {
|
||||
entry.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
|
||||
// First check the requests that were self rate limited, since those might add events to
|
||||
// the queue. Also do this this before rate limiter prunning to avoid removing and
|
||||
// the queue. Also do this before rate limiter pruning to avoid removing and
|
||||
// immediately adding rate limiting keys.
|
||||
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
|
||||
let (peer_id, protocol) = expired.into_inner();
|
||||
self.next_peer_request_ready(peer_id, protocol);
|
||||
}
|
||||
|
||||
// Prune the rate limiter.
|
||||
let _ = self.limiter.poll_unpin(cx);
|
||||
if let Some(limiter) = self.rate_limiter.as_mut() {
|
||||
let _ = limiter.poll_unpin(cx);
|
||||
}
|
||||
|
||||
// Finally return any queued events.
|
||||
if let Some((peer_id, event)) = self.ready_requests.pop() {
|
||||
if let Some((peer_id, event, queued_at)) = self.ready_requests.pop() {
|
||||
metrics::observe_duration(
|
||||
&crate::metrics::OUTBOUND_REQUEST_IDLING,
|
||||
timestamp_now().saturating_sub(queued_at),
|
||||
);
|
||||
return Poll::Ready(BehaviourAction::NotifyHandler {
|
||||
peer_id,
|
||||
handler: NotifyHandler::Any,
|
||||
@@ -201,12 +300,19 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the duration since the unix epoch.
|
||||
pub fn timestamp_now() -> Duration {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_else(|_| Duration::from_secs(0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
|
||||
use crate::rpc::rate_limiter::Quota;
|
||||
use crate::rpc::self_limiter::SelfRateLimiter;
|
||||
use crate::rpc::{Ping, Protocol, RequestType};
|
||||
use crate::rpc::{Ping, Protocol, RPCSend, RequestType};
|
||||
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
|
||||
use libp2p::PeerId;
|
||||
use logging::create_test_tracing_subscriber;
|
||||
@@ -227,7 +333,7 @@ mod tests {
|
||||
&MainnetEthSpec::default_spec(),
|
||||
));
|
||||
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
|
||||
SelfRateLimiter::new(config, fork_context).unwrap();
|
||||
SelfRateLimiter::new(Some(config), fork_context).unwrap();
|
||||
let peer_id = PeerId::random();
|
||||
let lookup_id = 0;
|
||||
|
||||
@@ -290,4 +396,149 @@ mod tests {
|
||||
assert_eq!(limiter.ready_requests.len(), 1);
|
||||
}
|
||||
}
|
||||
|
||||
/// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting.
|
||||
#[tokio::test]
|
||||
async fn test_next_peer_request_ready_concurrent_requests() {
|
||||
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
|
||||
Slot::new(0),
|
||||
Hash256::ZERO,
|
||||
&MainnetEthSpec::default_spec(),
|
||||
));
|
||||
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
|
||||
SelfRateLimiter::new(None, fork_context).unwrap();
|
||||
let peer_id = PeerId::random();
|
||||
|
||||
for i in 1..=5u32 {
|
||||
let result = limiter.allows(
|
||||
peer_id,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId {
|
||||
lookup_id: i,
|
||||
req_id: i,
|
||||
},
|
||||
}),
|
||||
RequestType::Ping(Ping { data: i as u64 }),
|
||||
);
|
||||
|
||||
// Check that the limiter allows the first two requests.
|
||||
if i <= 2 {
|
||||
assert!(result.is_ok());
|
||||
} else {
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
let queue = limiter
|
||||
.delayed_requests
|
||||
.get(&(peer_id, Protocol::Ping))
|
||||
.unwrap();
|
||||
assert_eq!(3, queue.len());
|
||||
|
||||
// The delayed requests remain even after the next_peer_request_ready call because the responses have not been received.
|
||||
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
|
||||
let queue = limiter
|
||||
.delayed_requests
|
||||
.get(&(peer_id, Protocol::Ping))
|
||||
.unwrap();
|
||||
assert_eq!(3, queue.len());
|
||||
|
||||
limiter.request_completed(&peer_id, Protocol::Ping);
|
||||
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
|
||||
|
||||
let queue = limiter
|
||||
.delayed_requests
|
||||
.get(&(peer_id, Protocol::Ping))
|
||||
.unwrap();
|
||||
assert_eq!(2, queue.len());
|
||||
|
||||
limiter.request_completed(&peer_id, Protocol::Ping);
|
||||
limiter.request_completed(&peer_id, Protocol::Ping);
|
||||
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
|
||||
|
||||
let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping));
|
||||
assert!(queue.is_none());
|
||||
|
||||
// Check that the three delayed requests have moved to ready_requests.
|
||||
let mut it = limiter.ready_requests.iter();
|
||||
for i in 3..=5u32 {
|
||||
let (_peer_id, RPCSend::Request(request_id, _), _) = it.next().unwrap() else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
assert!(matches!(
|
||||
request_id,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId { req_id, .. },
|
||||
}) if *req_id == i
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_peer_disconnected() {
|
||||
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
|
||||
Slot::new(0),
|
||||
Hash256::ZERO,
|
||||
&MainnetEthSpec::default_spec(),
|
||||
));
|
||||
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
|
||||
SelfRateLimiter::new(None, fork_context).unwrap();
|
||||
let peer1 = PeerId::random();
|
||||
let peer2 = PeerId::random();
|
||||
|
||||
for peer in [peer1, peer2] {
|
||||
for i in 1..=5u32 {
|
||||
let result = limiter.allows(
|
||||
peer,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId {
|
||||
lookup_id: i,
|
||||
req_id: i,
|
||||
},
|
||||
}),
|
||||
RequestType::Ping(Ping { data: i as u64 }),
|
||||
);
|
||||
|
||||
// Check that the limiter allows the first two requests.
|
||||
if i <= 2 {
|
||||
assert!(result.is_ok());
|
||||
} else {
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert!(limiter.active_requests.contains_key(&peer1));
|
||||
assert!(limiter
|
||||
.delayed_requests
|
||||
.contains_key(&(peer1, Protocol::Ping)));
|
||||
assert!(limiter.active_requests.contains_key(&peer2));
|
||||
assert!(limiter
|
||||
.delayed_requests
|
||||
.contains_key(&(peer2, Protocol::Ping)));
|
||||
|
||||
// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
|
||||
let mut failed_requests = limiter.peer_disconnected(peer1);
|
||||
for i in 3..=5u32 {
|
||||
let (request_id, _) = failed_requests.remove(0);
|
||||
assert!(matches!(
|
||||
request_id,
|
||||
AppRequestId::Sync(SyncRequestId::SingleBlock {
|
||||
id: SingleLookupReqId { req_id, .. },
|
||||
}) if req_id == i
|
||||
));
|
||||
}
|
||||
|
||||
// Check that peer1’s active and delayed requests have been removed.
|
||||
assert!(!limiter.active_requests.contains_key(&peer1));
|
||||
assert!(!limiter
|
||||
.delayed_requests
|
||||
.contains_key(&(peer1, Protocol::Ping)));
|
||||
|
||||
assert!(limiter.active_requests.contains_key(&peer2));
|
||||
assert!(limiter
|
||||
.delayed_requests
|
||||
.contains_key(&(peer2, Protocol::Ping)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ use types::{
|
||||
|
||||
type E = MinimalEthSpec;
|
||||
|
||||
use lighthouse_network::rpc::config::InboundRateLimiterConfig;
|
||||
use tempfile::Builder as TempBuilder;
|
||||
|
||||
/// Returns a dummy fork context
|
||||
@@ -77,7 +78,11 @@ pub fn build_tracing_subscriber(level: &str, enabled: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_config(mut boot_nodes: Vec<Enr>) -> Arc<NetworkConfig> {
|
||||
pub fn build_config(
|
||||
mut boot_nodes: Vec<Enr>,
|
||||
disable_peer_scoring: bool,
|
||||
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
||||
) -> Arc<NetworkConfig> {
|
||||
let mut config = NetworkConfig::default();
|
||||
|
||||
// Find unused ports by using the 0 port.
|
||||
@@ -93,6 +98,8 @@ pub fn build_config(mut boot_nodes: Vec<Enr>) -> Arc<NetworkConfig> {
|
||||
config.enr_address = (Some(std::net::Ipv4Addr::LOCALHOST), None);
|
||||
config.boot_nodes_enr.append(&mut boot_nodes);
|
||||
config.network_dir = path.into_path();
|
||||
config.disable_peer_scoring = disable_peer_scoring;
|
||||
config.inbound_rate_limiter_config = inbound_rate_limiter;
|
||||
Arc::new(config)
|
||||
}
|
||||
|
||||
@@ -102,8 +109,10 @@ pub async fn build_libp2p_instance(
|
||||
fork_name: ForkName,
|
||||
chain_spec: Arc<ChainSpec>,
|
||||
service_name: String,
|
||||
disable_peer_scoring: bool,
|
||||
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
||||
) -> Libp2pInstance {
|
||||
let config = build_config(boot_nodes);
|
||||
let config = build_config(boot_nodes, disable_peer_scoring, inbound_rate_limiter);
|
||||
// launch libp2p service
|
||||
|
||||
let (signal, exit) = async_channel::bounded(1);
|
||||
@@ -144,6 +153,8 @@ pub async fn build_node_pair(
|
||||
fork_name: ForkName,
|
||||
spec: Arc<ChainSpec>,
|
||||
protocol: Protocol,
|
||||
disable_peer_scoring: bool,
|
||||
inbound_rate_limiter: Option<InboundRateLimiterConfig>,
|
||||
) -> (Libp2pInstance, Libp2pInstance) {
|
||||
let mut sender = build_libp2p_instance(
|
||||
rt.clone(),
|
||||
@@ -151,10 +162,20 @@ pub async fn build_node_pair(
|
||||
fork_name,
|
||||
spec.clone(),
|
||||
"sender".to_string(),
|
||||
disable_peer_scoring,
|
||||
inbound_rate_limiter.clone(),
|
||||
)
|
||||
.await;
|
||||
let mut receiver = build_libp2p_instance(
|
||||
rt,
|
||||
vec![],
|
||||
fork_name,
|
||||
spec.clone(),
|
||||
"receiver".to_string(),
|
||||
disable_peer_scoring,
|
||||
inbound_rate_limiter,
|
||||
)
|
||||
.await;
|
||||
let mut receiver =
|
||||
build_libp2p_instance(rt, vec![], fork_name, spec.clone(), "receiver".to_string()).await;
|
||||
|
||||
// let the two nodes set up listeners
|
||||
let sender_fut = async {
|
||||
@@ -235,6 +256,8 @@ pub async fn build_linear(
|
||||
fork_name,
|
||||
spec.clone(),
|
||||
"linear".to_string(),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await,
|
||||
);
|
||||
|
||||
@@ -9,10 +9,10 @@ use lighthouse_network::{NetworkEvent, ReportSource, Response};
|
||||
use ssz::Encode;
|
||||
use ssz_types::VariableList;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::{debug, error, warn};
|
||||
use types::{
|
||||
BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockBellatrix, BlobSidecar, ChainSpec,
|
||||
EmptyBlock, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MinimalEthSpec,
|
||||
@@ -64,8 +64,15 @@ fn test_tcp_status_rpc() {
|
||||
|
||||
rt.block_on(async {
|
||||
// get sender/receiver
|
||||
let (mut sender, mut receiver) =
|
||||
common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, Protocol::Tcp).await;
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
ForkName::Base,
|
||||
spec,
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
@@ -168,6 +175,8 @@ fn test_tcp_blocks_by_range_chunked_rpc() {
|
||||
ForkName::Bellatrix,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -311,6 +320,8 @@ fn test_blobs_by_range_chunked_rpc() {
|
||||
ForkName::Deneb,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -430,6 +441,8 @@ fn test_tcp_blocks_by_range_over_limit() {
|
||||
ForkName::Bellatrix,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -533,6 +546,8 @@ fn test_tcp_blocks_by_range_chunked_rpc_terminates_correctly() {
|
||||
ForkName::Base,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -665,6 +680,8 @@ fn test_tcp_blocks_by_range_single_empty_rpc() {
|
||||
ForkName::Base,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -785,6 +802,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() {
|
||||
ForkName::Bellatrix,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -929,6 +948,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
|
||||
ForkName::Base,
|
||||
spec.clone(),
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -1065,8 +1086,15 @@ fn goodbye_test(log_level: &str, enable_logging: bool, protocol: Protocol) {
|
||||
|
||||
// get sender/receiver
|
||||
rt.block_on(async {
|
||||
let (mut sender, mut receiver) =
|
||||
common::build_node_pair(Arc::downgrade(&rt), ForkName::Base, spec, protocol).await;
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
ForkName::Base,
|
||||
spec,
|
||||
protocol,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
@@ -1127,3 +1155,239 @@ fn quic_test_goodbye_rpc() {
|
||||
let enabled_logging = false;
|
||||
goodbye_test(log_level, enabled_logging, Protocol::Quic);
|
||||
}
|
||||
|
||||
// Test that the receiver delays the responses during response rate-limiting.
|
||||
#[test]
|
||||
fn test_delayed_rpc_response() {
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
let spec = Arc::new(E::default_spec());
|
||||
|
||||
// Allow 1 token to be use used every 3 seconds.
|
||||
const QUOTA_SEC: u64 = 3;
|
||||
|
||||
rt.block_on(async {
|
||||
// get sender/receiver
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
ForkName::Base,
|
||||
spec,
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
// Configure a quota for STATUS responses of 1 token every 3 seconds.
|
||||
Some(format!("status:1/{QUOTA_SEC}").parse().unwrap()),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
|
||||
// Dummy STATUS RPC message
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
|
||||
// build the sender future
|
||||
let sender_future = async {
|
||||
let mut request_id = 1;
|
||||
let mut request_sent_at = Instant::now();
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
debug!(%request_id, "Sending RPC request");
|
||||
sender
|
||||
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
||||
.unwrap();
|
||||
request_sent_at = Instant::now();
|
||||
}
|
||||
NetworkEvent::ResponseReceived {
|
||||
peer_id,
|
||||
app_request_id: _,
|
||||
response,
|
||||
} => {
|
||||
debug!(%request_id, "Sender received");
|
||||
assert_eq!(response, rpc_response);
|
||||
|
||||
match request_id {
|
||||
1 => {
|
||||
// The first response is returned instantly.
|
||||
assert!(request_sent_at.elapsed() < Duration::from_millis(100));
|
||||
}
|
||||
2..=5 => {
|
||||
// The second and subsequent responses are delayed due to the response rate-limiter on the receiver side.
|
||||
// Adding a slight margin to the elapsed time check to account for potential timing issues caused by system
|
||||
// scheduling or execution delays during testing.
|
||||
assert!(
|
||||
request_sent_at.elapsed()
|
||||
> (Duration::from_secs(QUOTA_SEC)
|
||||
- Duration::from_millis(100))
|
||||
);
|
||||
if request_id == 5 {
|
||||
// End the test
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
request_id += 1;
|
||||
debug!(%request_id, "Sending RPC request");
|
||||
sender
|
||||
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
||||
.unwrap();
|
||||
request_sent_at = Instant::now();
|
||||
}
|
||||
NetworkEvent::RPCFailed {
|
||||
app_request_id: _,
|
||||
peer_id: _,
|
||||
error,
|
||||
} => {
|
||||
error!(?error, "RPC Failed");
|
||||
panic!("Rpc failed.");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// build the receiver future
|
||||
let receiver_future = async {
|
||||
loop {
|
||||
if let NetworkEvent::RequestReceived {
|
||||
peer_id,
|
||||
inbound_request_id,
|
||||
request_type,
|
||||
} = receiver.next_event().await
|
||||
{
|
||||
assert_eq!(request_type, rpc_request);
|
||||
debug!("Receiver received request");
|
||||
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(30)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test that a rate-limited error doesn't occur even if the sender attempts to send many requests at
|
||||
// once, thanks to the self-limiter on the sender side.
|
||||
#[test]
|
||||
fn test_active_requests() {
|
||||
let rt = Arc::new(Runtime::new().unwrap());
|
||||
let spec = Arc::new(E::default_spec());
|
||||
|
||||
rt.block_on(async {
|
||||
// Get sender/receiver.
|
||||
let (mut sender, mut receiver) = common::build_node_pair(
|
||||
Arc::downgrade(&rt),
|
||||
ForkName::Base,
|
||||
spec,
|
||||
Protocol::Tcp,
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Dummy STATUS RPC request.
|
||||
let rpc_request = RequestType::Status(StatusMessage {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::from_low_u64_be(0),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::from_low_u64_be(0),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
|
||||
// Dummy STATUS RPC response.
|
||||
let rpc_response = Response::Status(StatusMessage {
|
||||
fork_digest: [0; 4],
|
||||
finalized_root: Hash256::zero(),
|
||||
finalized_epoch: Epoch::new(1),
|
||||
head_root: Hash256::zero(),
|
||||
head_slot: Slot::new(1),
|
||||
});
|
||||
|
||||
// Number of requests.
|
||||
const REQUESTS: u8 = 10;
|
||||
|
||||
// Build the sender future.
|
||||
let sender_future = async {
|
||||
let mut response_received = 0;
|
||||
loop {
|
||||
match sender.next_event().await {
|
||||
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
|
||||
debug!("Sending RPC request");
|
||||
// Send requests in quick succession to intentionally trigger request queueing in the self-limiter.
|
||||
for _ in 0..REQUESTS {
|
||||
sender
|
||||
.send_request(peer_id, AppRequestId::Router, rpc_request.clone())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
NetworkEvent::ResponseReceived { response, .. } => {
|
||||
debug!(?response, "Sender received response");
|
||||
if matches!(response, Response::Status(_)) {
|
||||
response_received += 1;
|
||||
}
|
||||
}
|
||||
NetworkEvent::RPCFailed {
|
||||
app_request_id: _,
|
||||
peer_id: _,
|
||||
error,
|
||||
} => panic!("RPC failed: {:?}", error),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
if response_received == REQUESTS {
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Build the receiver future.
|
||||
let receiver_future = async {
|
||||
let mut received_requests = vec![];
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = receiver.next_event() => {
|
||||
if let NetworkEvent::RequestReceived { peer_id, inbound_request_id, request_type } = event {
|
||||
debug!(?request_type, "Receiver received request");
|
||||
if matches!(request_type, RequestType::Status(_)) {
|
||||
received_requests.push((peer_id, inbound_request_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
// Introduce a delay in sending responses to trigger request queueing on the sender side.
|
||||
_ = sleep(Duration::from_secs(3)) => {
|
||||
for (peer_id, inbound_request_id) in received_requests.drain(..) {
|
||||
receiver.send_response(peer_id, inbound_request_id, rpc_response.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = sender_future => {}
|
||||
_ = receiver_future => {}
|
||||
_ = sleep(Duration::from_secs(30)) => {
|
||||
panic!("Future timed out");
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user