Gate sync peer selection on per-protocol concurrent-request limit (#9456)

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Lion - dapplion
2026-06-21 18:34:29 +02:00
committed by GitHub
parent 10568b139b
commit b05badb5f2
3 changed files with 59 additions and 77 deletions

View File

@@ -46,7 +46,7 @@ mod response_limiter;
mod self_limiter;
// Maximum number of concurrent requests per protocol ID that a client may issue.
const MAX_CONCURRENT_REQUESTS: usize = 2;
pub const MAX_CONCURRENT_REQUESTS: usize = 2;
/// Composite trait for a request id.
pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {}

View File

@@ -27,7 +27,9 @@ use fnv::FnvHashMap;
use lighthouse_network::rpc::methods::{
BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest,
};
use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType};
use lighthouse_network::rpc::{
BlocksByRangeRequest, GoodbyeReason, MAX_CONCURRENT_REQUESTS, RPCError, RequestType,
};
pub use lighthouse_network::service::api_types::RangeRequestId;
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
@@ -40,8 +42,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc
use parking_lot::RwLock;
pub use requests::LookupVerifyError;
use requests::{
ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems,
DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
ActiveRequestItems, ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems,
BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems,
PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems,
};
#[cfg(test)]
@@ -100,6 +102,30 @@ pub type RpcResponseResult<T> = Result<(T, Duration), RpcResponseError>;
pub type CustodyByRootResult<T> =
Result<DownloadResult<DataColumnSidecarList<T>>, RpcResponseError>;
/// Per-peer count of active requests for a single protocol, to keep peer selection within
/// `MAX_CONCURRENT_REQUESTS` concurrent requests per protocol ID.
struct ActiveRequestsPerPeer {
count_by_peer: HashMap<PeerId, usize>,
}
impl ActiveRequestsPerPeer {
fn new<K, T>(requests: &ActiveRequests<K, T>) -> Self
where
K: Copy + Eq + std::hash::Hash + std::fmt::Display,
T: ActiveRequestItems,
{
let mut count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in requests.iter_request_peers() {
*count_by_peer.entry(peer_id).or_default() += 1;
}
Self { count_by_peer }
}
fn at_concurrency_limit(&self, peer_id: &PeerId) -> bool {
self.count_by_peer.get(peer_id).copied().unwrap_or(0) >= MAX_CONCURRENT_REQUESTS
}
}
#[derive(Debug)]
#[allow(private_interfaces)]
pub enum RpcResponseError {
@@ -440,47 +466,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
}
}
fn active_request_count_by_peer(&self) -> HashMap<PeerId, usize> {
let Self {
network_send: _,
request_id: _,
blocks_by_root_requests,
payload_envelopes_by_root_requests,
data_columns_by_root_requests,
blocks_by_range_requests,
blobs_by_range_requests,
data_columns_by_range_requests,
payload_envelopes_by_range_requests,
// custody_by_root_requests is a meta request of data_columns_by_root_requests
custody_by_root_requests: _,
// components_by_range_requests is a meta request of various _by_range requests
components_by_range_requests: _,
custody_backfill_data_column_batch_requests: _,
execution_engine_state: _,
network_beacon_processor: _,
chain: _,
fork_context: _,
// Don't use a fallback match. We want to be sure that all requests are considered when
// adding new ones
} = self;
let mut active_request_count_by_peer = HashMap::<PeerId, usize>::new();
for peer_id in blocks_by_root_requests
.iter_request_peers()
.chain(payload_envelopes_by_root_requests.iter_request_peers())
.chain(data_columns_by_root_requests.iter_request_peers())
.chain(blocks_by_range_requests.iter_request_peers())
.chain(blobs_by_range_requests.iter_request_peers())
.chain(data_columns_by_range_requests.iter_request_peers())
.chain(payload_envelopes_by_range_requests.iter_request_peers())
{
*active_request_count_by_peer.entry(peer_id).or_default() += 1;
}
active_request_count_by_peer
}
/// Retries only the specified failed columns by requesting them again.
///
/// Note: This function doesn't retry the whole batch, but retries specific requests within
@@ -507,8 +492,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Err("request id not present".to_string());
};
let active_request_count_by_peer = self.active_request_count_by_peer();
debug!(
?failed_columns,
?id,
@@ -518,12 +501,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// Attempt to find all required custody peers to request the failed columns from
let columns_by_range_peers_to_request = self
.select_columns_by_range_peers_to_request(
failed_columns,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)
.select_columns_by_range_peers_to_request(failed_columns, peers, peers_to_deprioritize)
.map_err(|e| format!("{:?}", e))?;
// Reuse the id for the request that received partially correct responses
@@ -581,7 +559,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
column_peers = column_peers.len()
);
let _guard = range_request_span.clone().entered();
let active_request_count_by_peer = self.active_request_count_by_peer();
let blocks_by_range_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_range_requests);
let Some(block_peer) = block_peers
.iter()
@@ -589,8 +567,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
blocks_by_range_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -620,7 +598,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
Some(self.select_columns_by_range_peers_to_request(
&column_indexes,
column_peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?)
} else {
@@ -692,6 +669,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let payloads_req_id =
if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) {
Some(self.send_payload_envelopes_by_range_request(
// Peer selection: for a given peer, the count of sent blocks_by_range requests
// equals the count of sent payloads_by_range requests. So we are under the
// concurrency limit for payloads_by_range requests
block_peer,
PayloadEnvelopesByRangeRequest {
start_slot: *request.start_slot(),
@@ -731,10 +711,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
&self,
custody_indexes: &HashSet<ColumnIndex>,
peers: &HashSet<PeerId>,
active_request_count_by_peer: HashMap<PeerId, usize>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<HashMap<PeerId, Vec<ColumnIndex>>, RpcRequestSendError> {
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let data_columns_by_range_per_peer =
ActiveRequestsPerPeer::new(&self.data_columns_by_range_requests);
for column_index in custody_indexes {
// Strictly consider peers that are custodials of this column AND are part of this
@@ -750,12 +731,10 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
(
// If contains -> 1 (order after), not contains -> 0 (order first)
peers_to_deprioritize.contains(peer),
// Prefer peers with less overall requests
// Also account for requests that are not yet issued tracked in peer_id_to_request_map
// We batch requests to the same peer, so count existance in the
// `columns_to_request_by_peer` as a single 1 request.
active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
// Note: do not account for to-be-sent requests on
// `data_columns_by_range_by_peer` as we always send at most one request
data_columns_by_range_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -881,14 +860,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
block_root: Hash256,
) -> Result<LookupRequestResult<Arc<SignedBeaconBlock<T::EthSpec>>>, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests);
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
// Prefer peers with less overall requests
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
blocks_by_root_per_peer.at_concurrency_limit(peer),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::random::<u32>(),
peer,
@@ -1001,13 +980,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
));
}
let active_request_count_by_peer = self.active_request_count_by_peer();
let payload_envelopes_by_root_per_peer =
ActiveRequestsPerPeer::new(&self.payload_envelopes_by_root_requests);
let Some(peer_id) = lookup_peers
.read()
.iter()
.map(|peer| {
(
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// Strictly de-prioritize peers already at the per-protocol concurrency limit
payload_envelopes_by_root_per_peer.at_concurrency_limit(peer),
rand::random::<u32>(),
peer,
)
@@ -1757,7 +1738,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
) -> Result<CustodyBackFillBatchRequestId, RpcRequestSendError> {
let active_request_count_by_peer = self.active_request_count_by_peer();
// Attempt to find all required custody peers before sending any request or creating an ID
let columns_by_range_peers_to_request = {
let column_indexes = self
@@ -1770,7 +1750,6 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
self.select_columns_by_range_peers_to_request(
&column_indexes,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)?
};

View File

@@ -16,7 +16,9 @@ use tracing::{Span, debug, debug_span, warn};
use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec};
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
use super::{
ActiveRequestsPerPeer, LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext,
};
const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
@@ -237,7 +239,8 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
)));
}
let active_request_count_by_peer = cx.active_request_count_by_peer();
let data_columns_by_root_per_peer =
ActiveRequestsPerPeer::new(&cx.data_columns_by_root_requests);
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let mut columns_without_peers = vec![];
let lookup_peers = self.lookup_peers.read();
@@ -255,7 +258,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
let peer_to_request = self.select_column_peer(
cx,
&active_request_count_by_peer,
&data_columns_by_root_per_peer,
&lookup_peers,
*column_index,
&random_state,
@@ -360,7 +363,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
fn select_column_peer(
&self,
cx: &mut SyncNetworkContext<T>,
active_request_count_by_peer: &HashMap<PeerId, usize>,
data_columns_by_root_per_peer: &ActiveRequestsPerPeer,
lookup_peers: &HashSet<PeerId>,
column_index: ColumnIndex,
random_state: &RandomState,
@@ -377,12 +380,12 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
})
.map(|peer| {
(
// Strictly de-prioritize peers already at the per-protocol concurrency limit
data_columns_by_root_per_peer.at_concurrency_limit(peer),
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that we have already attempted to download from
self.peer_attempts.get(peer).copied().unwrap_or(0),
// Prefer peers with fewer requests to load balance across peers.
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// The hash ensures consistent peer ordering within this request
// to avoid fragmentation while varying selection across different requests.
random_state.hash_one(peer),