merge conflicts

This commit is contained in:
Eitan Seri-Levi
2025-05-27 14:56:02 -07:00
358 changed files with 11552 additions and 6768 deletions

View File

@@ -14,7 +14,7 @@ use std::num::NonZeroU16;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use types::{ForkContext, ForkName};
use types::ForkContext;
pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED;
pub const DEFAULT_TCP_PORT: u16 = 9000u16;
@@ -22,18 +22,9 @@ pub const DEFAULT_DISC_PORT: u16 = 9000u16;
pub const DEFAULT_QUIC_PORT: u16 = 9001u16;
pub const DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD: usize = 1000usize;
/// The maximum size of gossip messages.
pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize {
if is_merge_enabled {
gossip_max_size
} else {
gossip_max_size / 10
}
}
pub struct GossipsubConfigParams {
pub message_domain_valid_snappy: [u8; 4],
pub gossip_max_size: usize,
pub gossipsub_max_transmit_size: usize,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -480,7 +471,6 @@ pub fn gossipsub_config(
}
}
let message_domain_valid_snappy = gossipsub_config_params.message_domain_valid_snappy;
let is_bellatrix_enabled = fork_context.fork_exists(ForkName::Bellatrix);
let gossip_message_id = move |message: &gossipsub::Message| {
gossipsub::MessageId::from(
&Sha256::digest(
@@ -499,10 +489,7 @@ pub fn gossipsub_config(
let duplicate_cache_time = Duration::from_secs(slots_per_epoch * seconds_per_slot * 2);
gossipsub::ConfigBuilder::default()
.max_transmit_size(gossip_max_size(
is_bellatrix_enabled,
gossipsub_config_params.gossip_max_size,
))
.max_transmit_size(gossipsub_config_params.gossipsub_max_transmit_size)
.heartbeat_interval(load.heartbeat_interval)
.mesh_n(load.mesh_n)
.mesh_n_low(load.mesh_n_low)

View File

@@ -12,7 +12,6 @@ pub mod peer_manager;
pub mod rpc;
pub mod types;
pub use config::gossip_max_size;
use libp2p::swarm::DialError;
pub use listen_addr::*;

View File

@@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock<Result<IntCounterVec>> = LazyLock::new(||
)
});
pub static OUTBOUND_REQUEST_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"outbound_request_idling_seconds",
"The time our own request remained idle in the self-limiter",
)
});
pub static RESPONSE_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram(
"response_idling_seconds",
"The time our response remained idle in the response limiter",
)
});
pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());

View File

@@ -712,8 +712,9 @@ impl<E: EthSpec> PeerManager<E> {
}
/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) -> bool {
let mut invalid_meta_data = false;
let mut updated_cgc = false;
if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
@@ -729,12 +730,16 @@ impl<E: EthSpec> PeerManager<E> {
debug!(%peer_id, new_seq_no = meta_data.seq_number(), "Obtained peer's metadata");
}
let known_custody_group_count = peer_info
.meta_data()
.and_then(|meta_data| meta_data.custody_group_count().copied().ok());
let custody_group_count_opt = meta_data.custody_group_count().copied().ok();
peer_info.set_meta_data(meta_data);
if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
// Gracefully ignore metadata/v2 peers.
// We only send metadata v3 requests when PeerDAS is scheduled
if let Some(custody_group_count) = custody_group_count_opt {
match self.compute_peer_custody_groups(peer_id, custody_group_count) {
Ok(custody_groups) => {
@@ -755,6 +760,8 @@ impl<E: EthSpec> PeerManager<E> {
})
.collect();
peer_info.set_custody_subnets(custody_subnets);
updated_cgc = Some(custody_group_count) != known_custody_group_count;
}
Err(err) => {
debug!(
@@ -777,6 +784,8 @@ impl<E: EthSpec> PeerManager<E> {
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}
updated_cgc
}
/// Updates the gossipsub scores for all known peers in gossipsub.
@@ -1487,6 +1496,15 @@ impl<E: EthSpec> PeerManager<E> {
pub fn remove_trusted_peer(&mut self, enr: Enr) {
self.trusted_peers.remove(&enr);
}
#[cfg(test)]
fn custody_subnet_count_for_peer(&self, peer_id: &PeerId) -> Option<usize> {
self.network_globals
.peers
.read()
.peer_info(peer_id)
.map(|peer_info| peer_info.custody_subnets_iter().count())
}
}
enum ConnectingType {
@@ -1507,8 +1525,9 @@ enum ConnectingType {
#[cfg(test)]
mod tests {
use super::*;
use crate::rpc::MetaDataV3;
use crate::NetworkConfig;
use types::MainnetEthSpec as E;
use types::{ChainSpec, ForkName, MainnetEthSpec as E};
async fn build_peer_manager(target_peer_count: usize) -> PeerManager<E> {
build_peer_manager_with_trusted_peers(vec![], target_peer_count).await
@@ -1517,6 +1536,15 @@ mod tests {
async fn build_peer_manager_with_trusted_peers(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
) -> PeerManager<E> {
let spec = Arc::new(E::default_spec());
build_peer_manager_with_opts(trusted_peers, target_peer_count, spec).await
}
async fn build_peer_manager_with_opts(
trusted_peers: Vec<PeerId>,
target_peer_count: usize,
spec: Arc<ChainSpec>,
) -> PeerManager<E> {
let config = config::Config {
target_peer_count,
@@ -1527,7 +1555,6 @@ mod tests {
target_peers: target_peer_count,
..Default::default()
});
let spec = Arc::new(E::default_spec());
let globals = NetworkGlobals::new_test_globals(trusted_peers, network_config, spec);
PeerManager::new(config, Arc::new(globals)).unwrap()
}
@@ -1878,6 +1905,44 @@ mod tests {
assert!(peers_should_have_removed.is_empty());
}
#[tokio::test]
/// Test a metadata response should update custody subnets
async fn test_peer_manager_update_custody_subnets() {
// PeerDAS is enabled from Fulu.
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
let mut peer_manager = build_peer_manager_with_opts(vec![], 1, spec).await;
let pubkey = Keypair::generate_secp256k1().public();
let peer_id = PeerId::from_public_key(&pubkey);
peer_manager.inject_connect_ingoing(
&peer_id,
Multiaddr::empty().with_p2p(peer_id).unwrap(),
None,
);
// A newly connected peer should have no custody subnets before metadata is received.
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(0));
// Metadata should update the custody subnets.
let peer_cgc = 4;
let meta_data = MetaData::V3(MetaDataV3 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
custody_group_count: peer_cgc,
});
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data.clone());
assert!(cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
// Make another update and assert that CGC is not updated.
let cgc_updated = peer_manager.meta_data_response(&peer_id, meta_data);
assert!(!cgc_updated);
let custody_subnet_count = peer_manager.custody_subnet_count_for_peer(&peer_id);
assert_eq!(custody_subnet_count, Some(peer_cgc as usize));
}
#[tokio::test]
/// Test the pruning logic to remove grouped subnet peers
async fn test_peer_manager_prune_grouped_subnet_peers() {

View File

@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
use crate::discovery::{peer_id_to_node_id, CombinedKey};
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
use crate::{
metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo,
};
use itertools::Itertools;
use logging::crit;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
@@ -15,7 +17,7 @@ use std::{
use sync_status::SyncStatus;
use tracing::{debug, error, trace, warn};
use types::data_column_custody_group::compute_subnets_for_node;
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot};
pub mod client;
pub mod peer_info;
@@ -735,6 +737,19 @@ impl<E: EthSpec> PeerDB<E> {
},
);
self.update_sync_status(
&peer_id,
SyncStatus::Synced {
// Fill in mock SyncInfo, only for the peer to return `is_synced() == true`.
info: SyncInfo {
head_slot: Slot::new(0),
head_root: Hash256::ZERO,
finalized_epoch: Epoch::new(0),
finalized_root: Hash256::ZERO,
},
},
);
if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)

View File

@@ -16,12 +16,12 @@ use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::codec::{Decoder, Encoder};
use types::{
BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256,
LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate,
LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair,
SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella,
SignedBeaconBlockDeneb, SignedBeaconBlockEip7805, SignedBeaconBlockElectra,
SignedBeaconBlockFulu,
BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnsByRootIdentifier, EthSpec, ForkContext,
ForkName, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock,
SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix,
SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockEip7805,
SignedBeaconBlockElectra, SignedBeaconBlockFulu,
};
use unsigned_varint::codec::Uvi;
@@ -600,10 +600,12 @@ fn handle_rpc_request<E: EthSpec>(
))),
SupportedProtocol::DataColumnsByRootV1 => Ok(Some(RequestType::DataColumnsByRoot(
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::from_ssz_bytes(
decoded_buffer,
spec.max_request_data_column_sidecars as usize,
)?,
data_column_ids:
<RuntimeVariableList<DataColumnsByRootIdentifier>>::from_ssz_bytes_with_nested(
decoded_buffer,
spec.max_request_blocks(current_fork),
spec.number_of_columns as usize,
)?,
},
))),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
@@ -949,8 +951,8 @@ mod tests {
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock,
Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnsByRootIdentifier,
EmptyBlock, Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
SignedBeaconBlockHeader, Slot,
};
@@ -1019,10 +1021,7 @@ mod tests {
}
/// Bellatrix block with length < max_rpc_size.
fn bellatrix_block_small(
fork_context: &ForkContext,
spec: &ChainSpec,
) -> SignedBeaconBlock<Spec> {
fn bellatrix_block_small(spec: &ChainSpec) -> SignedBeaconBlock<Spec> {
let mut block: BeaconBlockBellatrix<_, FullPayload<Spec>> =
BeaconBlockBellatrix::empty(&Spec::default_spec());
@@ -1032,17 +1031,14 @@ mod tests {
block.body.execution_payload.execution_payload.transactions = txs;
let block = BeaconBlock::Bellatrix(block);
assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context, spec.max_chunk_size as usize));
assert!(block.ssz_bytes_len() <= spec.max_payload_size as usize);
SignedBeaconBlock::from_block(block, Signature::empty())
}
/// Bellatrix block with length > MAX_RPC_SIZE.
/// The max limit for a Bellatrix block is in the order of ~16GiB which wouldn't fit in memory.
/// Hence, we generate a Bellatrix block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer.
fn bellatrix_block_large(
fork_context: &ForkContext,
spec: &ChainSpec,
) -> SignedBeaconBlock<Spec> {
fn bellatrix_block_large(spec: &ChainSpec) -> SignedBeaconBlock<Spec> {
let mut block: BeaconBlockBellatrix<_, FullPayload<Spec>> =
BeaconBlockBellatrix::empty(&Spec::default_spec());
@@ -1052,7 +1048,7 @@ mod tests {
block.body.execution_payload.execution_payload.transactions = txs;
let block = BeaconBlock::Bellatrix(block);
assert!(block.ssz_bytes_len() > max_rpc_size(fork_context, spec.max_chunk_size as usize));
assert!(block.ssz_bytes_len() > spec.max_payload_size as usize);
SignedBeaconBlock::from_block(block, Signature::empty())
}
@@ -1089,14 +1085,15 @@ mod tests {
}
}
fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest {
fn dcbroot_request(spec: &ChainSpec, fork_name: ForkName) -> DataColumnsByRootRequest {
let number_of_columns = spec.number_of_columns as usize;
DataColumnsByRootRequest {
data_column_ids: RuntimeVariableList::new(
vec![DataColumnIdentifier {
vec![DataColumnsByRootIdentifier {
block_root: Hash256::zero(),
index: 0,
columns: RuntimeVariableList::from_vec(vec![0, 1, 2], number_of_columns),
}],
spec.max_request_data_column_sidecars as usize,
spec.max_request_blocks(fork_name),
)
.unwrap(),
}
@@ -1160,7 +1157,7 @@ mod tests {
) -> Result<BytesMut, RPCError> {
let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let max_packet_size = spec.max_payload_size as usize;
let mut buf = BytesMut::new();
let mut snappy_inbound_codec =
@@ -1207,7 +1204,7 @@ mod tests {
) -> Result<Option<RpcSuccessResponse<Spec>>, RPCError> {
let snappy_protocol_id = ProtocolId::new(protocol, Encoding::SSZSnappy);
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let max_packet_size = spec.max_payload_size as usize;
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, max_packet_size, fork_context);
// decode message just as snappy message
@@ -1228,7 +1225,7 @@ mod tests {
/// Verifies that requests we send are encoded in a way that we would correctly decode too.
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let max_packet_size = spec.max_payload_size as usize;
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
// Encode a request we send
let mut buf = BytesMut::new();
@@ -1643,10 +1640,8 @@ mod tests {
))))
);
let bellatrix_block_small =
bellatrix_block_small(&fork_context(ForkName::Bellatrix), &chain_spec);
let bellatrix_block_large =
bellatrix_block_large(&fork_context(ForkName::Bellatrix), &chain_spec);
let bellatrix_block_small = bellatrix_block_small(&chain_spec);
let bellatrix_block_large = bellatrix_block_large(&chain_spec);
assert_eq!(
encode_then_decode_response(
@@ -1967,7 +1962,6 @@ mod tests {
RequestType::MetaData(MetadataRequest::new_v1()),
RequestType::BlobsByRange(blbrange_request()),
RequestType::DataColumnsByRange(dcbrange_request()),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec)),
RequestType::MetaData(MetadataRequest::new_v2()),
];
for req in requests.iter() {
@@ -1983,6 +1977,7 @@ mod tests {
RequestType::BlobsByRoot(blbroot_request(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v1(fork_name)),
RequestType::BlocksByRoot(bbroot_request_v2(fork_name)),
RequestType::DataColumnsByRoot(dcbroot_request(&chain_spec, fork_name)),
]
};
for fork_name in ForkName::list_all() {
@@ -2146,7 +2141,7 @@ mod tests {
// Insert length-prefix
uvi_codec
.encode(chain_spec.max_chunk_size as usize + 1, &mut dst)
.encode(chain_spec.max_payload_size as usize + 1, &mut dst)
.unwrap();
// Insert snappy stream identifier
@@ -2184,7 +2179,7 @@ mod tests {
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
chain_spec.max_payload_size as usize,
fork_context,
);
@@ -2220,7 +2215,7 @@ mod tests {
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
snappy_protocol_id,
max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize),
chain_spec.max_payload_size as usize,
fork_context,
);
@@ -2249,7 +2244,7 @@ mod tests {
let chain_spec = Spec::default_spec();
let max_rpc_size = max_rpc_size(&fork_context, chain_spec.max_chunk_size as usize);
let max_rpc_size = chain_spec.max_payload_size as usize;
let limit = protocol_id.rpc_response_limits::<Spec>(&fork_context);
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(

View File

@@ -141,7 +141,7 @@ where
/// Waker, to be sure the handler gets polled when needed.
waker: Option<std::task::Waker>,
/// Timeout that will me used for inbound and outbound responses.
/// Timeout that will be used for inbound and outbound responses.
resp_timeout: Duration,
}
@@ -314,6 +314,7 @@ where
}
return;
};
// If the response we are sending is an error, report back for handling
if let RpcResponse::Error(ref code, ref reason) = response {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
@@ -331,6 +332,7 @@ where
"Response not sent. Deactivated handler");
return;
}
inbound_info.pending_items.push_back(response);
}
}

View File

@@ -6,7 +6,6 @@ use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
@@ -16,9 +15,10 @@ use superstruct::superstruct;
use types::blob_sidecar::BlobIdentifier;
use types::light_client_update::MAX_REQUEST_LIGHT_CLIENT_UPDATES;
use types::{
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate,
LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList, SignedBeaconBlock, Slot,
blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar,
DataColumnsByRootIdentifier, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, RuntimeVariableList,
SignedBeaconBlock, Slot,
};
use types::{ForkContext, ForkName};
@@ -479,31 +479,20 @@ impl BlobsByRootRequest {
#[derive(Clone, Debug, PartialEq)]
pub struct DataColumnsByRootRequest {
/// The list of beacon block roots and column indices being requested.
pub data_column_ids: RuntimeVariableList<DataColumnIdentifier>,
pub data_column_ids: RuntimeVariableList<DataColumnsByRootIdentifier>,
}
impl DataColumnsByRootRequest {
pub fn new(data_column_ids: Vec<DataColumnIdentifier>, spec: &ChainSpec) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(
data_column_ids,
spec.max_request_data_column_sidecars as usize,
);
pub fn new(
data_column_ids: Vec<DataColumnsByRootIdentifier>,
max_request_blocks: usize,
) -> Self {
let data_column_ids = RuntimeVariableList::from_vec(data_column_ids, max_request_blocks);
Self { data_column_ids }
}
pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self {
Self::new(vec![DataColumnIdentifier { block_root, index }], spec)
}
pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec<ColumnIndex>)> {
let mut column_indexes_by_block = BTreeMap::<Hash256, Vec<ColumnIndex>>::new();
for request_id in self.data_column_ids.as_slice() {
column_indexes_by_block
.entry(request_id.block_root)
.or_default()
.push(request_id.index);
}
column_indexes_by_block.into_iter().collect()
pub fn max_requested(&self) -> usize {
self.data_column_ids.iter().map(|id| id.columns.len()).sum()
}
}
@@ -606,6 +595,20 @@ pub enum ResponseTermination {
LightClientUpdatesByRange,
}
impl ResponseTermination {
pub fn as_protocol(&self) -> Protocol {
match self {
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange,
ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange,
}
}
}
/// The structured response containing a result/code indicating success or failure
/// and the contents of the response
#[derive(Debug, Clone)]

View File

@@ -4,7 +4,6 @@
//! direct peer-to-peer communication primarily for sending/receiving chain information for
//! syncing.
use futures::future::FutureExt;
use handler::RPCHandler;
use libp2p::core::transport::PortUse;
use libp2p::swarm::{
@@ -13,13 +12,12 @@ use libp2p::swarm::{
};
use libp2p::swarm::{ConnectionClosed, FromSwarm, SubstreamProtocol, THandlerInEvent};
use libp2p::PeerId;
use logging::crit;
use rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr};
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{debug, instrument, trace};
use tracing::{debug, error, instrument, trace};
use types::{EthSpec, ForkContext};
pub(crate) use handler::{HandlerErr, HandlerEvent};
@@ -28,16 +26,17 @@ pub(crate) use methods::{
};
pub use protocol::RequestType;
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
use crate::rpc::rate_limiter::RateLimiterItem;
use crate::rpc::response_limiter::ResponseLimiter;
pub use handler::SubstreamId;
pub use methods::{
BlocksByRangeRequest, BlocksByRootRequest, GoodbyeReason, LightClientBootstrapRequest,
ResponseTermination, RpcErrorResponse, StatusMessage,
};
pub use protocol::{max_rpc_size, Protocol, RPCError};
use self::config::{InboundRateLimiterConfig, OutboundRateLimiterConfig};
use self::protocol::RPCProtocol;
use self::self_limiter::SelfRateLimiter;
pub use protocol::{Protocol, RPCError};
pub(crate) mod codec;
pub mod config;
@@ -46,8 +45,12 @@ pub mod methods;
mod outbound;
mod protocol;
mod rate_limiter;
mod response_limiter;
mod self_limiter;
// Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2;
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}
impl<T> ReqId for T where T: Send + 'static + std::fmt::Debug + Copy + Clone {}
@@ -136,7 +139,7 @@ pub struct RPCMessage<Id, E: EthSpec> {
type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>;
pub struct NetworkParams {
pub max_chunk_size: usize,
pub max_payload_size: usize,
pub ttfb_timeout: Duration,
pub resp_timeout: Duration,
}
@@ -144,10 +147,12 @@ pub struct NetworkParams {
/// Implements the libp2p `NetworkBehaviour` trait and therefore manages network-level
/// logic.
pub struct RPC<Id: ReqId, E: EthSpec> {
/// Rate limiter
limiter: Option<RateLimiter>,
/// Rate limiter for our responses.
response_limiter: Option<ResponseLimiter<E>>,
/// Rate limiter for our own requests.
self_limiter: Option<SelfRateLimiter<Id, E>>,
outbound_request_limiter: SelfRateLimiter<Id, E>,
/// Active inbound requests that are awaiting a response.
active_inbound_requests: HashMap<InboundRequestId, (PeerId, RequestType<E>)>,
/// Queue of events to be processed.
events: Vec<BehaviourAction<Id, E>>,
fork_context: Arc<ForkContext>,
@@ -173,20 +178,20 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
network_params: NetworkParams,
seq_number: u64,
) -> Self {
let inbound_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using inbound rate limiting params");
RateLimiter::new_with_config(config.0, fork_context.clone())
let response_limiter = inbound_rate_limiter_config.map(|config| {
debug!(?config, "Using response rate limiting params");
ResponseLimiter::new(config, fork_context.clone())
.expect("Inbound limiter configuration parameters are valid")
});
let self_limiter = outbound_rate_limiter_config.map(|config| {
SelfRateLimiter::new(config, fork_context.clone())
.expect("Configuration parameters are valid")
});
let outbound_request_limiter: SelfRateLimiter<Id, E> =
SelfRateLimiter::new(outbound_rate_limiter_config, fork_context.clone())
.expect("Outbound limiter configuration parameters are valid");
RPC {
limiter: inbound_limiter,
self_limiter,
response_limiter,
outbound_request_limiter,
active_inbound_requests: HashMap::new(),
events: Vec::new(),
fork_context,
enable_light_client_server,
@@ -210,6 +215,44 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
let Some((_peer_id, request_type)) = self.active_inbound_requests.remove(&request_id)
else {
error!(%peer_id, ?request_id, %response, "Request not found in active_inbound_requests. Response not sent");
return;
};
// Add the request back to active requests if the response is `Success` and requires stream
// termination.
if request_type.protocol().terminator().is_some()
&& matches!(response, RpcResponse::Success(_))
{
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
}
self.send_response_inner(peer_id, request_type.protocol(), request_id, response);
}
fn send_response_inner(
&mut self,
peer_id: PeerId,
protocol: Protocol,
request_id: InboundRequestId,
response: RpcResponse<E>,
) {
if let Some(response_limiter) = self.response_limiter.as_mut() {
if !response_limiter.allows(
peer_id,
protocol,
request_id.connection_id,
request_id.substream_id,
response.clone(),
) {
// Response is logged and queued internally in the response limiter.
return;
}
}
self.events.push(ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(request_id.connection_id),
@@ -227,23 +270,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
skip_all
)]
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
return;
}
match self
.outbound_request_limiter
.allows(peer_id, request_id, req)
{
Ok(event) => self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
}),
Err(_e) => {
// Request is logged and queued internally in the self rate limiter.
}
} else {
RPCSend::Request(request_id, req)
};
self.events.push(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
event,
});
}
}
/// Lighthouse wishes to disconnect from this peer by sending a Goodbye message. This
@@ -306,7 +345,7 @@ where
let protocol = SubstreamProtocol::new(
RPCProtocol {
fork_context: self.fork_context.clone(),
max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size),
max_rpc_size: self.fork_context.spec.max_payload_size as usize,
enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData,
ttfb_timeout: self.network_params.ttfb_timeout,
@@ -336,7 +375,7 @@ where
let protocol = SubstreamProtocol::new(
RPCProtocol {
fork_context: self.fork_context.clone(),
max_rpc_size: max_rpc_size(&self.fork_context, self.network_params.max_chunk_size),
max_rpc_size: self.fork_context.spec.max_payload_size as usize,
enable_light_client_server: self.enable_light_client_server,
phantom: PhantomData,
ttfb_timeout: self.network_params.ttfb_timeout,
@@ -373,20 +412,27 @@ where
if remaining_established > 0 {
return;
}
// Get a list of pending requests from the self rate limiter
if let Some(limiter) = self.self_limiter.as_mut() {
for (id, proto) in limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
for (id, proto) in self.outbound_request_limiter.peer_disconnected(peer_id) {
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
}),
});
self.events.push(error_msg);
}
self.active_inbound_requests.retain(
|_inbound_request_id, (request_peer_id, _request_type)| *request_peer_id != peer_id,
);
if let Some(limiter) = self.response_limiter.as_mut() {
limiter.peer_disconnected(peer_id);
}
// Replace the pending Requests to the disconnected peer
@@ -420,57 +466,39 @@ where
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(request_id, request_type)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, &request_type) {
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = request_type.versioned_protocol().protocol();
if matches!(
protocol,
Protocol::BlocksByRange
| Protocol::BlobsByRange
| Protocol::DataColumnsByRange
| Protocol::BlocksByRoot
| Protocol::BlobsByRoot
| Protocol::DataColumnsByRoot
) {
debug!(request = %request_type, %protocol, "Request too large to process");
} else {
// Other protocols shouldn't be sending large messages, we should flag the peer kind
crit!(%protocol, "Request size too large to ever be processed");
}
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
"Rate limited. Request too large".into(),
),
);
return;
}
Err(RateLimitedErr::TooSoon(wait_time)) => {
debug!(request = %request_type, %peer_id, wait_time_ms = wait_time.as_millis(), "Request exceeds the rate limit");
// send an error code to the peer.
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Wait {:?}", wait_time).into(),
),
);
return;
}
// No rate limiting, continue.
Ok(()) => {}
}
let is_concurrent_request_limit_exceeded = self
.active_inbound_requests
.iter()
.filter(
|(_inbound_request_id, (request_peer_id, active_request_type))| {
*request_peer_id == peer_id
&& active_request_type.protocol() == request_type.protocol()
},
)
.count()
>= MAX_CONCURRENT_REQUESTS;
// Restricts more than MAX_CONCURRENT_REQUESTS inbound requests from running simultaneously on the same protocol per peer.
if is_concurrent_request_limit_exceeded {
// There is already an active request with the same protocol. Send an error code to the peer.
debug!(request = %request_type, protocol = %request_type.protocol(), %peer_id, "There is an active request with the same protocol");
self.send_response_inner(
peer_id,
request_type.protocol(),
request_id,
RpcResponse::Error(
RpcErrorResponse::RateLimited,
format!("Rate limited. There are already {MAX_CONCURRENT_REQUESTS} active requests with the same protocol")
.into(),
),
);
return;
}
// Requests that are below the limit on the number of simultaneous requests are added to the active inbound requests.
self.active_inbound_requests
.insert(request_id, (peer_id, request_type.clone()));
// If we received a Ping, we queue a Pong response.
if let RequestType::Ping(_) = request_type {
trace!(connection_id = %connection_id, %peer_id, "Received Ping, queueing Pong");
@@ -489,14 +517,38 @@ where
message: Ok(RPCReceived::Request(request_id, request_type)),
}));
}
HandlerEvent::Ok(rpc) => {
HandlerEvent::Ok(RPCReceived::Response(id, response)) => {
if response.protocol().terminator().is_none() {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response.protocol());
}
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(rpc),
message: Ok(RPCReceived::Response(id, response)),
}));
}
HandlerEvent::Ok(RPCReceived::EndOfStream(id, response_termination)) => {
// Inform the limiter that a response has been received.
self.outbound_request_limiter
.request_completed(&peer_id, response_termination.as_protocol());
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
message: Ok(RPCReceived::EndOfStream(id, response_termination)),
}));
}
HandlerEvent::Err(err) => {
// Inform the limiter that the request has ended with an error.
let protocol = match err {
HandlerErr::Inbound { proto, .. } | HandlerErr::Outbound { proto, .. } => proto,
};
self.outbound_request_limiter
.request_completed(&peer_id, protocol);
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection_id,
@@ -514,15 +566,20 @@ where
}
fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
// let the rate limiter prune.
if let Some(limiter) = self.limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
if let Some(response_limiter) = self.response_limiter.as_mut() {
if let Poll::Ready(responses) = response_limiter.poll_ready(cx) {
for response in responses {
self.events.push(ToSwarm::NotifyHandler {
peer_id: response.peer_id,
handler: NotifyHandler::One(response.connection_id),
event: RPCSend::Response(response.substream_id, response.response),
});
}
}
}
if let Some(self_limiter) = self.self_limiter.as_mut() {
if let Poll::Ready(event) = self_limiter.poll_ready(cx) {
self.events.push(event)
}
if let Poll::Ready(event) = self.outbound_request_limiter.poll_ready(cx) {
self.events.push(event)
}
if !self.events.is_empty() {

View File

@@ -57,7 +57,7 @@ pub static SIGNED_BEACON_BLOCK_ALTAIR_MAX: LazyLock<usize> = LazyLock::new(|| {
/// The `BeaconBlockBellatrix` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing.
/// We calculate the value from its fields instead of constructing the block and checking the length.
/// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network
/// with `max_chunk_size`.
/// with `max_payload_size`.
pub static SIGNED_BEACON_BLOCK_BELLATRIX_MAX: LazyLock<usize> =
LazyLock::new(|| // Size of a full altair block
*SIGNED_BEACON_BLOCK_ALTAIR_MAX
@@ -122,15 +122,6 @@ const PROTOCOL_PREFIX: &str = "/eth2/beacon_chain/req";
/// established before the stream is terminated.
const REQUEST_TIMEOUT: u64 = 15;
/// Returns the maximum bytes that can be sent across the RPC.
pub fn max_rpc_size(fork_context: &ForkContext, max_chunk_size: usize) -> usize {
if fork_context.current_fork().bellatrix_enabled() {
max_chunk_size
} else {
max_chunk_size / 10
}
}
/// Returns the rpc limits for beacon_block_by_range and beacon_block_by_root responses.
///
/// Note: This function should take care to return the min/max limits accounting for all
@@ -749,7 +740,7 @@ impl<E: EthSpec> RequestType<E> {
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
RequestType::BlobsByRange(req) => req.max_blobs_requested(current_fork, spec),
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.max_requested() as u64,
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
RequestType::Ping(_) => 1,
RequestType::MetaData(_) => 1,

View File

@@ -149,7 +149,7 @@ pub struct RPCRateLimiterBuilder {
lcbootstrap_quota: Option<Quota>,
/// Quota for the LightClientOptimisticUpdate protocol.
lc_optimistic_update_quota: Option<Quota>,
/// Quota for the LightClientOptimisticUpdate protocol.
/// Quota for the LightClientFinalityUpdate protocol.
lc_finality_update_quota: Option<Quota>,
/// Quota for the LightClientUpdatesByRange protocol.
lc_updates_by_range_quota: Option<Quota>,
@@ -275,6 +275,17 @@ impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
}
}
impl<E: EthSpec> RateLimiterItem for (super::RpcResponse<E>, Protocol) {
fn protocol(&self) -> Protocol {
self.1
}
fn max_responses(&self, _current_fork: ForkName, _spec: &ChainSpec) -> u64 {
// A response chunk consumes one token of the rate limiter.
1
}
}
impl RPCRateLimiter {
pub fn new_with_config(
config: RateLimiterConfig,

View File

@@ -0,0 +1,177 @@
use crate::rpc::config::InboundRateLimiterConfig;
use crate::rpc::rate_limiter::{RPCRateLimiter, RateLimitedErr};
use crate::rpc::self_limiter::timestamp_now;
use crate::rpc::{Protocol, RpcResponse, SubstreamId};
use crate::PeerId;
use futures::FutureExt;
use libp2p::swarm::ConnectionId;
use logging::crit;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_util::time::DelayQueue;
use tracing::debug;
use types::{EthSpec, ForkContext};
/// A response that was rate limited or waiting on rate limited responses for the same peer and
/// protocol.
#[derive(Clone)]
pub(super) struct QueuedResponse<E: EthSpec> {
pub peer_id: PeerId,
pub connection_id: ConnectionId,
pub substream_id: SubstreamId,
pub response: RpcResponse<E>,
pub protocol: Protocol,
pub queued_at: Duration,
}
pub(super) struct ResponseLimiter<E: EthSpec> {
/// Rate limiter for our responses.
limiter: RPCRateLimiter,
/// Responses queued for sending. These responses are stored when the response limiter rejects them.
delayed_responses: HashMap<(PeerId, Protocol), VecDeque<QueuedResponse<E>>>,
/// The delay required to allow a peer's outbound response per protocol.
next_response: DelayQueue<(PeerId, Protocol)>,
}
impl<E: EthSpec> ResponseLimiter<E> {
/// Creates a new [`ResponseLimiter`] based on configuration values.
pub fn new(
config: InboundRateLimiterConfig,
fork_context: Arc<ForkContext>,
) -> Result<Self, &'static str> {
Ok(ResponseLimiter {
limiter: RPCRateLimiter::new_with_config(config.0, fork_context)?,
delayed_responses: HashMap::new(),
next_response: DelayQueue::new(),
})
}
/// Checks if the rate limiter allows the response. When not allowed, the response is delayed
/// until it can be sent.
pub fn allows(
&mut self,
peer_id: PeerId,
protocol: Protocol,
connection_id: ConnectionId,
substream_id: SubstreamId,
response: RpcResponse<E>,
) -> bool {
// First check that there are not already other responses waiting to be sent.
if let Some(queue) = self.delayed_responses.get_mut(&(peer_id, protocol)) {
debug!(%peer_id, %protocol, "Response rate limiting since there are already other responses waiting to be sent");
queue.push_back(QueuedResponse {
peer_id,
connection_id,
substream_id,
response,
protocol,
queued_at: timestamp_now(),
});
return false;
}
if let Err(wait_time) =
Self::try_limiter(&mut self.limiter, peer_id, response.clone(), protocol)
{
self.delayed_responses
.entry((peer_id, protocol))
.or_default()
.push_back(QueuedResponse {
peer_id,
connection_id,
substream_id,
response,
protocol,
queued_at: timestamp_now(),
});
self.next_response.insert((peer_id, protocol), wait_time);
return false;
}
true
}
/// Checks if the limiter allows the response. If the response should be delayed, the duration
/// to wait is returned.
fn try_limiter(
limiter: &mut RPCRateLimiter,
peer_id: PeerId,
response: RpcResponse<E>,
protocol: Protocol,
) -> Result<(), Duration> {
match limiter.allows(&peer_id, &(response.clone(), protocol)) {
Ok(()) => Ok(()),
Err(e) => match e {
RateLimitedErr::TooLarge => {
// This should never happen with default parameters. Let's just send the response.
// Log a crit since this is a config issue.
crit!(
%protocol,
"Response rate limiting error for a batch that will never fit. Sending response anyway. Check configuration parameters."
);
Ok(())
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(%peer_id, %protocol, wait_time_ms = wait_time.as_millis(), "Response rate limiting");
Err(wait_time)
}
},
}
}
/// Informs the limiter that a peer has disconnected. This removes any pending responses.
pub fn peer_disconnected(&mut self, peer_id: PeerId) {
self.delayed_responses
.retain(|(map_peer_id, _protocol), _queue| map_peer_id != &peer_id);
}
/// When a peer and protocol are allowed to send a next response, this function checks the
/// queued responses and attempts marking as ready as many as the limiter allows.
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Vec<QueuedResponse<E>>> {
let mut responses = vec![];
while let Poll::Ready(Some(expired)) = self.next_response.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
if let Entry::Occupied(mut entry) = self.delayed_responses.entry((peer_id, protocol)) {
let queue = entry.get_mut();
// Take delayed responses from the queue, as long as the limiter allows it.
while let Some(response) = queue.pop_front() {
match Self::try_limiter(
&mut self.limiter,
response.peer_id,
response.response.clone(),
response.protocol,
) {
Ok(()) => {
metrics::observe_duration(
&crate::metrics::RESPONSE_IDLING,
timestamp_now().saturating_sub(response.queued_at),
);
responses.push(response)
}
Err(wait_time) => {
// The response was taken from the queue, but the limiter didn't allow it.
queue.push_front(response);
self.next_response.insert((peer_id, protocol), wait_time);
break;
}
}
}
if queue.is_empty() {
entry.remove();
}
}
}
// Prune the rate limiter.
let _ = self.limiter.poll_unpin(cx);
if !responses.is_empty() {
return Poll::Ready(responses);
}
Poll::Pending
}
}

View File

@@ -1,3 +1,10 @@
use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType, MAX_CONCURRENT_REQUESTS,
};
use crate::rpc::rate_limiter::RateLimiterItem;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
sync::Arc,
@@ -13,30 +20,31 @@ use tokio_util::time::DelayQueue;
use tracing::debug;
use types::{EthSpec, ForkContext};
use super::{
config::OutboundRateLimiterConfig,
rate_limiter::{RPCRateLimiter as RateLimiter, RateLimitedErr},
BehaviourAction, Protocol, RPCSend, ReqId, RequestType,
};
/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId, E: EthSpec> {
req: RequestType<E>,
request_id: Id,
queued_at: Duration,
}
/// The number of milliseconds requests delayed due to the concurrent request limit stay in the queue.
const WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS: u64 = 100;
#[allow(clippy::type_complexity)]
pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// Active requests that are awaiting a response.
active_requests: HashMap<PeerId, HashMap<Protocol, usize>>,
/// Requests queued for sending per peer. These requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
limiter: RateLimiter,
rate_limiter: Option<RateLimiter>,
/// Requests that are ready to be sent.
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>); 3]>,
ready_requests: SmallVec<[(PeerId, RPCSend<Id, E>, Duration); 3]>,
}
/// Error returned when the rate limiter does not accept a request.
@@ -49,18 +57,23 @@ pub enum Error {
}
impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Creates a new [`SelfRateLimiter`] based on configration values.
/// Creates a new [`SelfRateLimiter`] based on configuration values.
pub fn new(
config: OutboundRateLimiterConfig,
config: Option<OutboundRateLimiterConfig>,
fork_context: Arc<ForkContext>,
) -> Result<Self, &'static str> {
debug!(?config, "Using self rate limiting params");
let limiter = RateLimiter::new_with_config(config.0, fork_context)?;
let rate_limiter = if let Some(c) = config {
Some(RateLimiter::new_with_config(c.0, fork_context)?)
} else {
None
};
Ok(SelfRateLimiter {
active_requests: Default::default(),
delayed_requests: Default::default(),
next_peer_request: Default::default(),
limiter,
rate_limiter,
ready_requests: Default::default(),
})
}
@@ -77,11 +90,21 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
if let Some(queued_requests) = self.delayed_requests.get_mut(&(peer_id, protocol)) {
queued_requests.push_back(QueuedRequest { req, request_id });
debug!(%peer_id, protocol = %req.protocol(), "Self rate limiting since there are already other requests waiting to be sent");
queued_requests.push_back(QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
});
return Err(Error::PendingRequests);
}
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
match Self::try_send_request(
&mut self.active_requests,
&mut self.rate_limiter,
peer_id,
request_id,
req,
) {
Err((rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
@@ -99,33 +122,71 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Auxiliary function to deal with self rate limiting outcomes. If the rate limiter allows the
/// request, the [`ToSwarm`] that should be emitted is returned. If the request
/// should be delayed, it's returned with the duration to wait.
#[allow(clippy::result_large_err)]
fn try_send_request(
limiter: &mut RateLimiter,
active_requests: &mut HashMap<PeerId, HashMap<Protocol, usize>>,
rate_limiter: &mut Option<RateLimiter>,
peer_id: PeerId,
request_id: Id,
req: RequestType<E>,
) -> Result<RPCSend<Id, E>, (QueuedRequest<Id, E>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(RPCSend::Request(request_id, req)),
Err(e) => {
let protocol = req.versioned_protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
// Log a crit since this is a config issue.
crit!(
protocol = %req.versioned_protocol().protocol(),
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters."
);
Ok(RPCSend::Request(request_id, req))
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
Err((QueuedRequest { req, request_id }, wait_time))
if let Some(active_request) = active_requests.get(&peer_id) {
if let Some(count) = active_request.get(&req.protocol()) {
if *count >= MAX_CONCURRENT_REQUESTS {
debug!(
%peer_id,
protocol = %req.protocol(),
"Self rate limiting due to the number of concurrent requests"
);
return Err((
QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
},
Duration::from_millis(WAIT_TIME_DUE_TO_CONCURRENT_REQUESTS),
));
}
}
}
if let Some(limiter) = rate_limiter.as_mut() {
match limiter.allows(&peer_id, &req) {
Ok(()) => {}
Err(e) => {
let protocol = req.versioned_protocol();
match e {
RateLimitedErr::TooLarge => {
// this should never happen with default parameters. Let's just send the request.
// Log a crit since this is a config issue.
crit!(
protocol = %req.versioned_protocol().protocol(),
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters.",
);
}
RateLimitedErr::TooSoon(wait_time) => {
debug!(protocol = %protocol.protocol(), wait_time_ms = wait_time.as_millis(), %peer_id, "Self rate limiting");
return Err((
QueuedRequest {
req,
request_id,
queued_at: timestamp_now(),
},
wait_time,
));
}
}
}
}
}
*active_requests
.entry(peer_id)
.or_default()
.entry(req.protocol())
.or_default() += 1;
Ok(RPCSend::Request(request_id, req))
}
/// When a peer and protocol are allowed to send a next request, this function checks the
@@ -133,16 +194,32 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
fn next_peer_request_ready(&mut self, peer_id: PeerId, protocol: Protocol) {
if let Entry::Occupied(mut entry) = self.delayed_requests.entry((peer_id, protocol)) {
let queued_requests = entry.get_mut();
while let Some(QueuedRequest { req, request_id }) = queued_requests.pop_front() {
match Self::try_send_request(&mut self.limiter, peer_id, request_id, req) {
Err((rate_limited_req, wait_time)) => {
while let Some(QueuedRequest {
req,
request_id,
queued_at,
}) = queued_requests.pop_front()
{
match Self::try_send_request(
&mut self.active_requests,
&mut self.rate_limiter,
peer_id,
request_id,
req.clone(),
) {
Err((_rate_limited_req, wait_time)) => {
let key = (peer_id, protocol);
self.next_peer_request.insert(key, wait_time);
queued_requests.push_front(rate_limited_req);
// Don't push `rate_limited_req` here to prevent `queued_at` from being updated.
queued_requests.push_front(QueuedRequest {
req,
request_id,
queued_at,
});
// If one fails just wait for the next window that allows sending requests.
return;
}
Ok(event) => self.ready_requests.push((peer_id, event)),
Ok(event) => self.ready_requests.push((peer_id, event, queued_at)),
}
}
if queued_requests.is_empty() {
@@ -156,6 +233,8 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
/// Informs the limiter that a peer has disconnected. This removes any pending requests and
/// returns their IDs.
pub fn peer_disconnected(&mut self, peer_id: PeerId) -> Vec<(Id, Protocol)> {
self.active_requests.remove(&peer_id);
// It's not ideal to iterate this map, but the key is (PeerId, Protocol) and this map
// should never really be large. So we iterate for simplicity
let mut failed_requests = Vec::new();
@@ -177,19 +256,39 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
failed_requests
}
/// Informs the limiter that a response has been received.
pub fn request_completed(&mut self, peer_id: &PeerId, protocol: Protocol) {
if let Some(active_requests) = self.active_requests.get_mut(peer_id) {
if let Entry::Occupied(mut entry) = active_requests.entry(protocol) {
if *entry.get() > 1 {
*entry.get_mut() -= 1;
} else {
entry.remove();
}
}
}
}
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<BehaviourAction<Id, E>> {
// First check the requests that were self rate limited, since those might add events to
// the queue. Also do this this before rate limiter prunning to avoid removing and
// the queue. Also do this before rate limiter pruning to avoid removing and
// immediately adding rate limiting keys.
if let Poll::Ready(Some(expired)) = self.next_peer_request.poll_expired(cx) {
let (peer_id, protocol) = expired.into_inner();
self.next_peer_request_ready(peer_id, protocol);
}
// Prune the rate limiter.
let _ = self.limiter.poll_unpin(cx);
if let Some(limiter) = self.rate_limiter.as_mut() {
let _ = limiter.poll_unpin(cx);
}
// Finally return any queued events.
if let Some((peer_id, event)) = self.ready_requests.pop() {
if let Some((peer_id, event, queued_at)) = self.ready_requests.pop() {
metrics::observe_duration(
&crate::metrics::OUTBOUND_REQUEST_IDLING,
timestamp_now().saturating_sub(queued_at),
);
return Poll::Ready(BehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::Any,
@@ -201,12 +300,19 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
}
}
/// Returns the duration since the unix epoch.
pub fn timestamp_now() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}
#[cfg(test)]
mod tests {
use crate::rpc::config::{OutboundRateLimiterConfig, RateLimiterConfig};
use crate::rpc::rate_limiter::Quota;
use crate::rpc::self_limiter::SelfRateLimiter;
use crate::rpc::{Ping, Protocol, RequestType};
use crate::rpc::{Ping, Protocol, RPCSend, RequestType};
use crate::service::api_types::{AppRequestId, SingleLookupReqId, SyncRequestId};
use libp2p::PeerId;
use logging::create_test_tracing_subscriber;
@@ -227,7 +333,7 @@ mod tests {
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(config, fork_context).unwrap();
SelfRateLimiter::new(Some(config), fork_context).unwrap();
let peer_id = PeerId::random();
let lookup_id = 0;
@@ -290,4 +396,149 @@ mod tests {
assert_eq!(limiter.ready_requests.len(), 1);
}
}
/// Test that `next_peer_request_ready` correctly maintains the queue when using the self-limiter without rate limiting.
#[tokio::test]
async fn test_next_peer_request_ready_concurrent_requests() {
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(None, fork_context).unwrap();
let peer_id = PeerId::random();
for i in 1..=5u32 {
let result = limiter.allows(
peer_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id: i,
req_id: i,
},
}),
RequestType::Ping(Ping { data: i as u64 }),
);
// Check that the limiter allows the first two requests.
if i <= 2 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
// The delayed requests remain even after the next_peer_request_ready call because the responses have not been received.
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(3, queue.len());
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter
.delayed_requests
.get(&(peer_id, Protocol::Ping))
.unwrap();
assert_eq!(2, queue.len());
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.request_completed(&peer_id, Protocol::Ping);
limiter.next_peer_request_ready(peer_id, Protocol::Ping);
let queue = limiter.delayed_requests.get(&(peer_id, Protocol::Ping));
assert!(queue.is_none());
// Check that the three delayed requests have moved to ready_requests.
let mut it = limiter.ready_requests.iter();
for i in 3..=5u32 {
let (_peer_id, RPCSend::Request(request_id, _), _) = it.next().unwrap() else {
unreachable!()
};
assert!(matches!(
request_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
}) if *req_id == i
));
}
}
#[tokio::test]
async fn test_peer_disconnected() {
let fork_context = std::sync::Arc::new(ForkContext::new::<MainnetEthSpec>(
Slot::new(0),
Hash256::ZERO,
&MainnetEthSpec::default_spec(),
));
let mut limiter: SelfRateLimiter<AppRequestId, MainnetEthSpec> =
SelfRateLimiter::new(None, fork_context).unwrap();
let peer1 = PeerId::random();
let peer2 = PeerId::random();
for peer in [peer1, peer2] {
for i in 1..=5u32 {
let result = limiter.allows(
peer,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId {
lookup_id: i,
req_id: i,
},
}),
RequestType::Ping(Ping { data: i as u64 }),
);
// Check that the limiter allows the first two requests.
if i <= 2 {
assert!(result.is_ok());
} else {
assert!(result.is_err());
}
}
}
assert!(limiter.active_requests.contains_key(&peer1));
assert!(limiter
.delayed_requests
.contains_key(&(peer1, Protocol::Ping)));
assert!(limiter.active_requests.contains_key(&peer2));
assert!(limiter
.delayed_requests
.contains_key(&(peer2, Protocol::Ping)));
// Check that the limiter returns the IDs of pending requests and that the IDs are ordered correctly.
let mut failed_requests = limiter.peer_disconnected(peer1);
for i in 3..=5u32 {
let (request_id, _) = failed_requests.remove(0);
assert!(matches!(
request_id,
AppRequestId::Sync(SyncRequestId::SingleBlock {
id: SingleLookupReqId { req_id, .. },
}) if req_id == i
));
}
// Check that peer1s active and delayed requests have been removed.
assert!(!limiter.active_requests.contains_key(&peer1));
assert!(!limiter
.delayed_requests
.contains_key(&(peer1, Protocol::Ping)));
assert!(limiter.active_requests.contains_key(&peer2));
assert!(limiter
.delayed_requests
.contains_key(&(peer2, Protocol::Ping)));
}
}

View File

@@ -103,6 +103,8 @@ pub enum NetworkEvent<E: EthSpec> {
StatusPeer(PeerId),
NewListenAddr(Multiaddr),
ZeroListeners,
/// A peer has an updated custody group count from MetaData.
PeerUpdatedCustodyGroupCount(PeerId),
}
pub type Gossipsub = gossipsub::Behaviour<SnappyTransform, SubscriptionFilter>;
@@ -223,7 +225,7 @@ impl<E: EthSpec> Network<E> {
let gossipsub_config_params = GossipsubConfigParams {
message_domain_valid_snappy: ctx.chain_spec.message_domain_valid_snappy,
gossip_max_size: ctx.chain_spec.gossip_max_size as usize,
gossipsub_max_transmit_size: ctx.chain_spec.max_message_size(),
};
let gs_config = gossipsub_config(
config.network_load,
@@ -334,7 +336,9 @@ impl<E: EthSpec> Network<E> {
)
});
let snappy_transform = SnappyTransform::new(gs_config.max_transmit_size());
let spec = &ctx.chain_spec;
let snappy_transform =
SnappyTransform::new(spec.max_payload_size as usize, spec.max_compressed_len());
let mut gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous,
gs_config.clone(),
@@ -365,7 +369,7 @@ impl<E: EthSpec> Network<E> {
};
let network_params = NetworkParams {
max_chunk_size: ctx.chain_spec.max_chunk_size as usize,
max_payload_size: ctx.chain_spec.max_payload_size as usize,
ttfb_timeout: ctx.chain_spec.ttfb_timeout(),
resp_timeout: ctx.chain_spec.resp_timeout(),
};
@@ -1653,7 +1657,7 @@ impl<E: EthSpec> Network<E> {
return None;
}
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
// The PING RPC responses are handled within the behaviour and not propagated
match event.message {
Err(handler_err) => {
match handler_err {
@@ -1856,9 +1860,11 @@ impl<E: EthSpec> Network<E> {
None
}
RpcSuccessResponse::MetaData(meta_data) => {
self.peer_manager_mut()
let updated_cgc = self
.peer_manager_mut()
.meta_data_response(&peer_id, meta_data.as_ref().clone());
None
// Send event after calling into peer_manager so the PeerDB is updated.
updated_cgc.then(|| NetworkEvent::PeerUpdatedCustodyGroupCount(peer_id))
}
/* Network propagated protocols */
RpcSuccessResponse::Status(msg) => {

View File

@@ -206,6 +206,20 @@ impl<E: EthSpec> NetworkGlobals<E> {
.collect::<Vec<_>>()
}
/// Returns true if the peer is known and is a custodian of `column_index`
pub fn is_custody_peer_of(&self, column_index: ColumnIndex, peer_id: &PeerId) -> bool {
self.peers
.read()
.peer_info(peer_id)
.map(|info| {
info.is_assigned_to_custody_subnet(&DataColumnSubnetId::from_column_index(
column_index,
&self.spec,
))
})
.unwrap_or(false)
}
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {

View File

@@ -55,13 +55,16 @@ pub enum PubsubMessage<E: EthSpec> {
// Implements the `DataTransform` trait of gossipsub to employ snappy compression
pub struct SnappyTransform {
/// Sets the maximum size we allow gossipsub messages to decompress to.
max_size_per_message: usize,
max_uncompressed_len: usize,
/// Sets the maximum size we allow for compressed gossipsub message data.
max_compressed_len: usize,
}
impl SnappyTransform {
pub fn new(max_size_per_message: usize) -> Self {
pub fn new(max_uncompressed_len: usize, max_compressed_len: usize) -> Self {
SnappyTransform {
max_size_per_message,
max_uncompressed_len,
max_compressed_len,
}
}
}
@@ -72,12 +75,19 @@ impl gossipsub::DataTransform for SnappyTransform {
&self,
raw_message: gossipsub::RawMessage,
) -> Result<gossipsub::Message, std::io::Error> {
// check the length of the raw bytes
let len = decompress_len(&raw_message.data)?;
if len > self.max_size_per_message {
// first check the size of the compressed payload
if raw_message.data.len() > self.max_compressed_len {
return Err(Error::new(
ErrorKind::InvalidData,
"ssz_snappy decoded data > GOSSIP_MAX_SIZE",
"ssz_snappy encoded data > max_compressed_len",
));
}
// check the length of the uncompressed bytes
let len = decompress_len(&raw_message.data)?;
if len > self.max_uncompressed_len {
return Err(Error::new(
ErrorKind::InvalidData,
"ssz_snappy decoded data > MAX_PAYLOAD_SIZE",
));
}
@@ -101,10 +111,10 @@ impl gossipsub::DataTransform for SnappyTransform {
) -> Result<Vec<u8>, std::io::Error> {
// Currently we are not employing topic-based compression. Everything is expected to be
// snappy compressed.
if data.len() > self.max_size_per_message {
if data.len() > self.max_uncompressed_len {
return Err(Error::new(
ErrorKind::InvalidData,
"ssz_snappy Encoded data > GOSSIP_MAX_SIZE",
"ssz_snappy Encoded data > MAX_PAYLOAD_SIZE",
));
}
let mut encoder = Encoder::new();