Merge branch 'unstable' of https://github.com/sigp/lighthouse into deneb-free-blobs

This commit is contained in:
realbigsean
2023-06-02 11:57:15 -04:00
63 changed files with 1689 additions and 1193 deletions

View File

@@ -17,7 +17,6 @@ use slog::{crit, debug, o};
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use types::{EthSpec, ForkContext};
pub(crate) use handler::HandlerErr;
@@ -33,7 +32,7 @@ pub use methods::{
pub(crate) use outbound::OutboundRequest;
pub use protocol::{max_rpc_size, Protocol, RPCError};
use self::config::OutboundRateLimiterConfig;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::self_limiter::SelfRateLimiter;
pub(crate) mod codec;
@@ -113,7 +112,7 @@ type BehaviourAction<Id, TSpec> =
/// logic.
pub struct RPC<Id: ReqId, TSpec: EthSpec> {
/// Rate limiter
limiter: RateLimiter,
limiter: Option<RateLimiter>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, TSpec>>,
/// Queue of events to be processed.
@@ -128,38 +127,24 @@ impl<Id: ReqId, TSpec: EthSpec> RPC<Id, TSpec> {
pub fn new(
fork_context: Arc<ForkContext>,
enable_light_client_server: bool,
inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,
outbound_rate_limiter_config: Option<OutboundRateLimiterConfig>,
log: slog::Logger,
) -> Self {
let log = log.new(o!("service" => "libp2p_rpc"));
let limiter = RateLimiter::builder()
.n_every(Protocol::MetaData, 2, Duration::from_secs(5))
.n_every(Protocol::Ping, 2, Duration::from_secs(10))
.n_every(Protocol::Status, 5, Duration::from_secs(15))
.one_every(Protocol::Goodbye, Duration::from_secs(10))
.one_every(Protocol::LightClientBootstrap, Duration::from_secs(10))
.n_every(
Protocol::BlocksByRange,
methods::MAX_REQUEST_BLOCKS,
Duration::from_secs(10),
)
.n_every(Protocol::BlocksByRoot, 128, Duration::from_secs(10))
.n_every(Protocol::BlobsByRoot, 128, Duration::from_secs(10))
.n_every(
Protocol::BlobsByRange,
MAX_REQUEST_BLOB_SIDECARS,
Duration::from_secs(10),
)
.build()
.expect("Configuration parameters are valid");
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(log, "Using inbound rate limiting params"; "config" => ?config);
RateLimiter::new_with_config(config.0)
.expect("Inbound limiter configuration parameters are valid")
});
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, log.clone()).expect("Configuration parameters are valid")
});
RPC {
limiter,
limiter: inbound_limiter,
self_limiter,
events: Vec::new(),
fork_context,
@@ -249,50 +234,60 @@ where
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
) {
if let Ok(RPCReceived::Request(ref id, ref req)) = event {
// check if the request is conformant to the quota
match self.limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
Ok(()) => {
// send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
crit!(self.log, "Request size too large to ever be processed"; "protocol" => %protocol);
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
),
);
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(self.log, "Request exceeds the rate limit";
"request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time.as_millis());
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
}
}
} else {
// No rate limiting, send the event to the user
self.events
.push(NetworkBehaviourAction::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
}))
}
} else {
self.events
@@ -310,7 +305,9 @@ where
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// let the rate limiter prune.
let _ = self.limiter.poll_unpin(cx);
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
}
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {