Merge branch 'unstable' of https://github.com/sigp/lighthouse into unstable

This commit is contained in:
Eitan Seri-Levi
2026-06-22 12:55:45 +03:00
3 changed files with 59 additions and 77 deletions

View File

@@ -46,7 +46,7 @@ mod response_limiter;
mod self_limiter; mod self_limiter;
// Maximum number of concurrent requests per protocol ID that a client may issue. // Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2; pub const MAX_CONCURRENT_REQUESTS: usize = 2;
/// Composite trait for a request id. /// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}

View File

@@ -27,7 +27,9 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{ use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
}; };
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; use lighthouse_network::rpc::{
BlocksByRangeRequest, GoodbyeReason, MAX_CONCURRENT_REQUESTS, RPCError, RequestType,
};
pub use lighthouse_network::service::api_types::RangeRequestId; pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{ use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
@@ -40,8 +42,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc
use parking_lot::RwLock; use parking_lot::RwLock;
pub use requests::LookupVerifyError; pub use requests::LookupVerifyError;
use requests::{ use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, ActiveRequestItems, ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems,
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems,
}; };
#[cfg(test)] #[cfg(test)]
@@ -100,6 +102,30 @@ pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
pub type CustodyByRootResult<T> = pub type CustodyByRootResult<T> =
Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>; Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>;
/// Per-peer count of active requests for a single protocol, to keep peer selection within
/// `MAX_CONCURRENT_REQUESTS` concurrent requests per protocol ID.
struct ActiveRequestsPerPeer {
count_by_peer: HashMap<PeerId, usize>,
}
impl ActiveRequestsPerPeer {
fn new<K, T>(requests: &ActiveRequests<K, T>) -> Self
where
K: Copy + Eq + std::hash::Hash + std::fmt::Display,
T: ActiveRequestItems,
{
let mut count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in requests.iter_request_peers() {
*count_by_peer.entry(peer_id).or_default() += 1;
}
Self { count_by_peer }
}
fn at_concurrency_limit(&self, peer_id: &PeerId) -> bool {
self.count_by_peer.get(peer_id).copied().unwrap_or(0) >= MAX_CONCURRENT_REQUESTS
}
}
#[derive(Debug)] #[derive(Debug)]
#[allow(private_interfaces)] #[allow(private_interfaces)]
pub enum RpcResponseError { pub enum RpcResponseError {
@@ -440,47 +466,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
} }
} }
fn active_request_count_by_peer(&self) -> HashMap<PeerId, usize> {
let Self {
network_send: _,
request_id: _,
blocks_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
components_by_range_requests: _,
custody_backfill_data_column_batch_requests: _,
execution_engine_state: _,
network_beacon_processor: _,
chain: _,
fork_context: _,
// Don't use a fallback match. We want to be sure that all requests are considered when
// adding new ones
} = self;
let mut active_request_count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_range_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
active_request_count_by_peer
}
/// Retries only the specified failed columns by requesting them again. /// Retries only the specified failed columns by requesting them again.
/// ///
/// Note: This function doesn't retry the whole batch, but retries specific requests within /// Note: This function doesn't retry the whole batch, but retries specific requests within
@@ -507,8 +492,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Err("request id not present".to_string()); return Err("request id not present".to_string());
}; };
let active_request_count_by_peer = self.active_request_count_by_peer();
debug!( debug!(
?failed_columns, ?failed_columns,
?id, ?id,
@@ -518,12 +501,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Attempt to find all required custody peers to request the failed columns from // Attempt to find all required custody peers to request the failed columns from
let columns_by_range_peers_to_request = self let columns_by_range_peers_to_request = self
.select_columns_by_range_peers_to_request( .select_columns_by_range_peers_to_request(failed_columns, peers, peers_to_deprioritize)
failed_columns,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)
.map_err(|e| format!("{:?}", e))?; .map_err(|e| format!("{:?}", e))?;
// Reuse the id for the request that received partially correct responses // Reuse the id for the request that received partially correct responses
@@ -581,7 +559,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
column_peers = column_peers.len() column_peers = column_peers.len()
); );
let _guard = range_request_span.clone().entered(); let _guard = range_request_span.clone().entered();
let active_request_count_by_peer = self.active_request_count_by_peer(); let blocks_by_range_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_range_requests);
let Some(block_peer) = block_peers let Some(block_peer) = block_peers
.iter() .iter()
@@ -589,8 +567,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
( (
// If contains -> 1 (order after), not contains -> 0 (order first) // If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer), peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests // Strictly de-prioritize peers already at the per-protocol concurrency limit
active_request_count_by_peer.get(peer).copied().unwrap_or(0), blocks_by_range_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties // Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(), rand::random::<u32>(),
peer, peer,
@@ -620,7 +598,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Some(self.select_columns_by_range_peers_to_request( Some(self.select_columns_by_range_peers_to_request(
&column_indexes, &column_indexes,
column_peers, column_peers,
active_request_count_by_peer,
peers_to_deprioritize, peers_to_deprioritize,
)?) )?)
} else { } else {
@@ -692,6 +669,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let payloads_req_id = let payloads_req_id =
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
Some(self.send_payload_envelopes_by_range_request( Some(self.send_payload_envelopes_by_range_request(
// Peer selection: for a given peer, the count of sent blocks_by_range requests
// equals the count of sent payloads_by_range requests. So we are under the
// concurrency limit for payloads_by_range requests
block_peer, block_peer,
PayloadEnvelopesByRangeRequest { PayloadEnvelopesByRangeRequest {
start_slot: *request.start_slot(), start_slot: *request.start_slot(),
@@ -731,10 +711,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self, &self,
custody_indexes: &HashSet<ColumnIndex>, custody_indexes: &HashSet<ColumnIndex>,
peers: &HashSet<PeerId>, peers: &HashSet<PeerId>,
active_request_count_by_peer: HashMap<PeerId, usize>,
peers_to_deprioritize: &HashSet<PeerId>, peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<HashMap<PeerId, Vec<ColumnIndex>>, RpcRequestSendError> { ) -> Result<HashMap<PeerId, Vec<ColumnIndex>>, RpcRequestSendError> {
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new(); let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let data_columns_by_range_per_peer =
ActiveRequestsPerPeer::new(&self.data_columns_by_range_requests);
for column_index in custody_indexes { for column_index in custody_indexes {
// Strictly consider peers that are custodials of this column AND are part of this // Strictly consider peers that are custodials of this column AND are part of this
@@ -750,12 +731,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
( (
// If contains -> 1 (order after), not contains -> 0 (order first) // If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer), peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests // Strictly de-prioritize peers already at the per-protocol concurrency limit
// Also account for requests that are not yet issued tracked in peer_id_to_request_map // Note: do not account for to-be-sent requests on
// We batch requests to the same peer, so count existance in the // `data_columns_by_range_by_peer` as we always send at most one request
// `columns_to_request_by_peer` as a single 1 request. data_columns_by_range_per_peer.at_concurrency_limit(peer),
active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties // Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(), rand::random::<u32>(),
peer, peer,
@@ -881,14 +860,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_peers: Arc<RwLock<HashSet<PeerId>>>, lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256, block_root: Hash256,
) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> { ) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer(); let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests);
let Some(peer_id) = lookup_peers let Some(peer_id) = lookup_peers
.read() .read()
.iter() .iter()
.map(|peer| { .map(|peer| {
( (
// Prefer peers with less overall requests // Strictly de-prioritize peers already at the per-protocol concurrency limit
active_request_count_by_peer.get(peer).copied().unwrap_or(0), blocks_by_root_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties // Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(), rand::random::<u32>(),
peer, peer,
@@ -1001,13 +980,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
)); ));
} }
let active_request_count_by_peer = self.active_request_count_by_peer(); let payload_envelopes_by_root_per_peer =
ActiveRequestsPerPeer::new(&self.payload_envelopes_by_root_requests);
let Some(peer_id) = lookup_peers let Some(peer_id) = lookup_peers
.read() .read()
.iter() .iter()
.map(|peer| { .map(|peer| {
( (
active_request_count_by_peer.get(peer).copied().unwrap_or(0), // Strictly de-prioritize peers already at the per-protocol concurrency limit
payload_envelopes_by_root_per_peer.at_concurrency_limit(peer),
rand::random::<u32>(), rand::random::<u32>(),
peer, peer,
) )
@@ -1757,7 +1738,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peers: &HashSet<PeerId>, peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>, peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<CustodyBackFillBatchRequestId, RpcRequestSendError> { ) -> Result<CustodyBackFillBatchRequestId, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
// Attempt to find all required custody peers before sending any request or creating an ID // Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request = { let columns_by_range_peers_to_request = {
let column_indexes = self let column_indexes = self
@@ -1770,7 +1750,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.select_columns_by_range_peers_to_request( self.select_columns_by_range_peers_to_request(
&column_indexes, &column_indexes,
peers, peers,
active_request_count_by_peer,
peers_to_deprioritize, peers_to_deprioritize,
)? )?
}; };

View File

@@ -16,7 +16,9 @@ use tracing::{Span, debug, debug_span, warn};
use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex}; use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec}; use types::{DataColumnSidecarList, EthSpec};
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; use super::{
ActiveRequestsPerPeer, LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext,
};
const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
@@ -237,7 +239,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
))); )));
} }
let active_request_count_by_peer = cx.active_request_count_by_peer(); let data_columns_by_root_per_peer =
ActiveRequestsPerPeer::new(&cx.data_columns_by_root_requests);
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new(); let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let mut columns_without_peers = vec![]; let mut columns_without_peers = vec![];
let lookup_peers = self.lookup_peers.read(); let lookup_peers = self.lookup_peers.read();
@@ -255,7 +258,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
let peer_to_request = self.select_column_peer( let peer_to_request = self.select_column_peer(
cx, cx,
&active_request_count_by_peer, &data_columns_by_root_per_peer,
&lookup_peers, &lookup_peers,
*column_index, *column_index,
&random_state, &random_state,
@@ -360,7 +363,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
fn select_column_peer( fn select_column_peer(
&self, &self,
cx: &mut SyncNetworkContext<T>, cx: &mut SyncNetworkContext<T>,
active_request_count_by_peer: &HashMap<PeerId, usize>, data_columns_by_root_per_peer: &ActiveRequestsPerPeer,
lookup_peers: &HashSet<PeerId>, lookup_peers: &HashSet<PeerId>,
column_index: ColumnIndex, column_index: ColumnIndex,
random_state: &RandomState, random_state: &RandomState,
@@ -377,12 +380,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
}) })
.map(|peer| { .map(|peer| {
( (
// Strictly de-prioritize peers already at the per-protocol concurrency limit
data_columns_by_root_per_peer.at_concurrency_limit(peer),
// Prioritize peers that claim to know have imported this block // Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 }, if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that we have already attempted to download from // De-prioritize peers that we have already attempted to download from
self.peer_attempts.get(peer).copied().unwrap_or(0), self.peer_attempts.get(peer).copied().unwrap_or(0),
// Prefer peers with fewer requests to load balance across peers.
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// The hash ensures consistent peer ordering within this request // The hash ensures consistent peer ordering within this request
// to avoid fragmentation while varying selection across different requests. // to avoid fragmentation while varying selection across different requests.
random_state.hash_one(peer), random_state.hash_one(peer),