diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 7c43018af8..e9177586cb 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -46,7 +46,7 @@ mod response_limiter; mod self_limiter; // Maximum number of concurrent requests per protocol ID that a client may issue. -const MAX_CONCURRENT_REQUESTS: usize = 2; +pub const MAX_CONCURRENT_REQUESTS: usize = 2; /// Composite trait for a request id. pub trait ReqId: Send + 'static + std::fmt::Debug + Copy + Clone {} diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d2ced9fd9d..0db172a21a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -27,7 +27,9 @@ use fnv::FnvHashMap; use lighthouse_network::rpc::methods::{ BlobsByRangeRequest, DataColumnsByRangeRequest, PayloadEnvelopesByRangeRequest, }; -use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; +use lighthouse_network::rpc::{ + BlocksByRangeRequest, GoodbyeReason, MAX_CONCURRENT_REQUESTS, RPCError, RequestType, +}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, @@ -40,8 +42,8 @@ use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSourc use parking_lot::RwLock; pub use requests::LookupVerifyError; use requests::{ - ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, - DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + ActiveRequestItems, ActiveRequests, BlobsByRangeRequestItems, BlocksByRangeRequestItems, + BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, PayloadEnvelopesByRangeRequestItems, PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] @@ -100,6 +102,30 @@ pub type RpcResponseResult = Result<(T, Duration), RpcResponseError>; pub type CustodyByRootResult = Result>, RpcResponseError>; +/// Per-peer count of active requests for a single protocol, to keep peer selection within +/// `MAX_CONCURRENT_REQUESTS` concurrent requests per protocol ID. +struct ActiveRequestsPerPeer { + count_by_peer: HashMap, +} + +impl ActiveRequestsPerPeer { + fn new(requests: &ActiveRequests) -> Self + where + K: Copy + Eq + std::hash::Hash + std::fmt::Display, + T: ActiveRequestItems, + { + let mut count_by_peer = HashMap::::new(); + for peer_id in requests.iter_request_peers() { + *count_by_peer.entry(peer_id).or_default() += 1; + } + Self { count_by_peer } + } + + fn at_concurrency_limit(&self, peer_id: &PeerId) -> bool { + self.count_by_peer.get(peer_id).copied().unwrap_or(0) >= MAX_CONCURRENT_REQUESTS + } +} + #[derive(Debug)] #[allow(private_interfaces)] pub enum RpcResponseError { @@ -440,47 +466,6 @@ impl SyncNetworkContext { } } - fn active_request_count_by_peer(&self) -> HashMap { - let Self { - network_send: _, - request_id: _, - blocks_by_root_requests, - payload_envelopes_by_root_requests, - data_columns_by_root_requests, - blocks_by_range_requests, - blobs_by_range_requests, - data_columns_by_range_requests, - payload_envelopes_by_range_requests, - // custody_by_root_requests is a meta request of data_columns_by_root_requests - custody_by_root_requests: _, - // components_by_range_requests is a meta request of various _by_range requests - components_by_range_requests: _, - custody_backfill_data_column_batch_requests: _, - execution_engine_state: _, - network_beacon_processor: _, - chain: _, - fork_context: _, - // Don't use a fallback match. We want to be sure that all requests are considered when - // adding new ones - } = self; - - let mut active_request_count_by_peer = HashMap::::new(); - - for peer_id in blocks_by_root_requests - .iter_request_peers() - .chain(payload_envelopes_by_root_requests.iter_request_peers()) - .chain(data_columns_by_root_requests.iter_request_peers()) - .chain(blocks_by_range_requests.iter_request_peers()) - .chain(blobs_by_range_requests.iter_request_peers()) - .chain(data_columns_by_range_requests.iter_request_peers()) - .chain(payload_envelopes_by_range_requests.iter_request_peers()) - { - *active_request_count_by_peer.entry(peer_id).or_default() += 1; - } - - active_request_count_by_peer - } - /// Retries only the specified failed columns by requesting them again. /// /// Note: This function doesn't retry the whole batch, but retries specific requests within @@ -507,8 +492,6 @@ impl SyncNetworkContext { return Err("request id not present".to_string()); }; - let active_request_count_by_peer = self.active_request_count_by_peer(); - debug!( ?failed_columns, ?id, @@ -518,12 +501,7 @@ impl SyncNetworkContext { // Attempt to find all required custody peers to request the failed columns from let columns_by_range_peers_to_request = self - .select_columns_by_range_peers_to_request( - failed_columns, - peers, - active_request_count_by_peer, - peers_to_deprioritize, - ) + .select_columns_by_range_peers_to_request(failed_columns, peers, peers_to_deprioritize) .map_err(|e| format!("{:?}", e))?; // Reuse the id for the request that received partially correct responses @@ -581,7 +559,7 @@ impl SyncNetworkContext { column_peers = column_peers.len() ); let _guard = range_request_span.clone().entered(); - let active_request_count_by_peer = self.active_request_count_by_peer(); + let blocks_by_range_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_range_requests); let Some(block_peer) = block_peers .iter() @@ -589,8 +567,8 @@ impl SyncNetworkContext { ( // If contains -> 1 (order after), not contains -> 0 (order first) peers_to_deprioritize.contains(peer), - // Prefer peers with less overall requests - active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Strictly de-prioritize peers already at the per-protocol concurrency limit + blocks_by_range_per_peer.at_concurrency_limit(peer), // Random factor to break ties, otherwise the PeerID breaks ties rand::random::(), peer, @@ -620,7 +598,6 @@ impl SyncNetworkContext { Some(self.select_columns_by_range_peers_to_request( &column_indexes, column_peers, - active_request_count_by_peer, peers_to_deprioritize, )?) } else { @@ -692,6 +669,9 @@ impl SyncNetworkContext { let payloads_req_id = if matches!(batch_type, ByRangeRequestType::BlocksAndEnvelopesAndColumns) { Some(self.send_payload_envelopes_by_range_request( + // Peer selection: for a given peer, the count of sent blocks_by_range requests + // equals the count of sent payloads_by_range requests. So we are under the + // concurrency limit for payloads_by_range requests block_peer, PayloadEnvelopesByRangeRequest { start_slot: *request.start_slot(), @@ -731,10 +711,11 @@ impl SyncNetworkContext { &self, custody_indexes: &HashSet, peers: &HashSet, - active_request_count_by_peer: HashMap, peers_to_deprioritize: &HashSet, ) -> Result>, RpcRequestSendError> { let mut columns_to_request_by_peer = HashMap::>::new(); + let data_columns_by_range_per_peer = + ActiveRequestsPerPeer::new(&self.data_columns_by_range_requests); for column_index in custody_indexes { // Strictly consider peers that are custodials of this column AND are part of this @@ -750,12 +731,10 @@ impl SyncNetworkContext { ( // If contains -> 1 (order after), not contains -> 0 (order first) peers_to_deprioritize.contains(peer), - // Prefer peers with less overall requests - // Also account for requests that are not yet issued tracked in peer_id_to_request_map - // We batch requests to the same peer, so count existance in the - // `columns_to_request_by_peer` as a single 1 request. - active_request_count_by_peer.get(peer).copied().unwrap_or(0) - + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), + // Strictly de-prioritize peers already at the per-protocol concurrency limit + // Note: do not account for to-be-sent requests on + // `data_columns_by_range_by_peer` as we always send at most one request + data_columns_by_range_per_peer.at_concurrency_limit(peer), // Random factor to break ties, otherwise the PeerID breaks ties rand::random::(), peer, @@ -881,14 +860,14 @@ impl SyncNetworkContext { lookup_peers: Arc>>, block_root: Hash256, ) -> Result>>, RpcRequestSendError> { - let active_request_count_by_peer = self.active_request_count_by_peer(); + let blocks_by_root_per_peer = ActiveRequestsPerPeer::new(&self.blocks_by_root_requests); let Some(peer_id) = lookup_peers .read() .iter() .map(|peer| { ( - // Prefer peers with less overall requests - active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Strictly de-prioritize peers already at the per-protocol concurrency limit + blocks_by_root_per_peer.at_concurrency_limit(peer), // Random factor to break ties, otherwise the PeerID breaks ties rand::random::(), peer, @@ -1001,13 +980,15 @@ impl SyncNetworkContext { )); } - let active_request_count_by_peer = self.active_request_count_by_peer(); + let payload_envelopes_by_root_per_peer = + ActiveRequestsPerPeer::new(&self.payload_envelopes_by_root_requests); let Some(peer_id) = lookup_peers .read() .iter() .map(|peer| { ( - active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // Strictly de-prioritize peers already at the per-protocol concurrency limit + payload_envelopes_by_root_per_peer.at_concurrency_limit(peer), rand::random::(), peer, ) @@ -1757,7 +1738,6 @@ impl SyncNetworkContext { peers: &HashSet, peers_to_deprioritize: &HashSet, ) -> Result { - let active_request_count_by_peer = self.active_request_count_by_peer(); // Attempt to find all required custody peers before sending any request or creating an ID let columns_by_range_peers_to_request = { let column_indexes = self @@ -1770,7 +1750,6 @@ impl SyncNetworkContext { self.select_columns_by_range_peers_to_request( &column_indexes, peers, - active_request_count_by_peer, peers_to_deprioritize, )? }; diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index 3518eecf09..3dd3683c42 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -16,7 +16,9 @@ use tracing::{Span, debug, debug_span, warn}; use types::{DataColumnSidecar, Hash256, Slot, data::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; -use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; +use super::{ + ActiveRequestsPerPeer, LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext, +}; const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); @@ -237,7 +239,8 @@ impl ActiveCustodyRequest { ))); } - let active_request_count_by_peer = cx.active_request_count_by_peer(); + let data_columns_by_root_per_peer = + ActiveRequestsPerPeer::new(&cx.data_columns_by_root_requests); let mut columns_to_request_by_peer = HashMap::>::new(); let mut columns_without_peers = vec![]; let lookup_peers = self.lookup_peers.read(); @@ -255,7 +258,7 @@ impl ActiveCustodyRequest { let peer_to_request = self.select_column_peer( cx, - &active_request_count_by_peer, + &data_columns_by_root_per_peer, &lookup_peers, *column_index, &random_state, @@ -360,7 +363,7 @@ impl ActiveCustodyRequest { fn select_column_peer( &self, cx: &mut SyncNetworkContext, - active_request_count_by_peer: &HashMap, + data_columns_by_root_per_peer: &ActiveRequestsPerPeer, lookup_peers: &HashSet, column_index: ColumnIndex, random_state: &RandomState, @@ -377,12 +380,12 @@ impl ActiveCustodyRequest { }) .map(|peer| { ( + // Strictly de-prioritize peers already at the per-protocol concurrency limit + data_columns_by_root_per_peer.at_concurrency_limit(peer), // Prioritize peers that claim to know have imported this block if lookup_peers.contains(peer) { 0 } else { 1 }, // De-prioritize peers that we have already attempted to download from self.peer_attempts.get(peer).copied().unwrap_or(0), - // Prefer peers with fewer requests to load balance across peers. - active_request_count_by_peer.get(peer).copied().unwrap_or(0), // The hash ensures consistent peer ordering within this request // to avoid fragmentation while varying selection across different requests. random_state.hash_one(peer),