Add an option to disable inbound rate limiter (#4327)

## Issue Addressed

On deneb devnetv5, lighthouse keeps rate limiting peers which makes it harder to bootstrap new nodes as there are very few peers in the network. This PR adds an option to disable the inbound rate limiter for testnets.

Added an option to configure inbound rate limits as well.

Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
Pawan Dhananjay
2023-06-02 03:17:38 +00:00
parent 04386cfabb
commit d399961e6e
9 changed files with 189 additions and 118 deletions

View File

@@ -17,7 +17,6 @@ use slog::{crit, debug, o};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use types::{EthSpec, ForkContext};
pub(crate) use handler::HandlerErr;
@@ -32,7 +31,7 @@ pub use methods::{
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
use self::config::OutboundRateLimiterConfig;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec;
@@ -112,7 +111,7 @@ type BehaviourAction<Id, TSpec> =
/// logic.
pub struct RPC<Id: ReqId, TSpec: EthSpec> {
/// Rate limiter
limiter: RateLimiter,
limiter: Option<RateLimiter>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, TSpec>>,
/// Queue of events to be processed.
@@ -127,32 +126,24 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
pub fn new(
fork_context: Arc<ForkContext>,
enable_light_client_server: bool,
inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
log: slog::Logger,
) -> Self {
let log = log.new(o!("service" => "libp2p_rpc"));
let limiter = RateLimiter::builder()
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
.n_every(Protocol::Ping, 2, Duration::from_secs(10))
.n_every(Protocol::Status, 5, Duration::from_secs(15))
.one_every(Protocol::Goodbye, Duration::from_secs(10))
.one_every(Protocol::LightClientBootstrap, Duration::from_secs(10))
.n_every(
Protocol::BlocksByRange,
methods::MAX_REQUEST_BLOCKS,
Duration::from_secs(10),
)
.n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10))
.build()
.expect("Configuration parameters are valid");
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(log, "Using inbound rate limiting params"; "config" => ?config);
RateLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid")
});
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});
RPC {
limiter,
limiter: inbound_limiter,
self_limiter,
events: Vec::new(),
fork_context,
@@ -242,50 +233,60 @@ where
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
// check if the request is conformant to the quota
match self.limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
"request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
}
}
} else {
// No rate limiting, send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
} else {
self.events
@@ -303,7 +304,9 @@ where
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune.
let _ = self.limiter.poll_unpin(cx);
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
}
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {