diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index ffbad1364c..d31df4e3dd 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -17,6 +17,8 @@ pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block"; /// Sync methods root spans pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain"; pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request"; +pub const SPAN_SINGLE_BLOCK_LOOKUP: &str = "single_block_lookup"; +pub const SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST: &str = "outgoing_block_by_root_request"; pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request"; pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; @@ -46,7 +48,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_OUTGOING_RANGE_REQUEST, - SPAN_OUTGOING_CUSTODY_REQUEST, + SPAN_SINGLE_BLOCK_LOOKUP, SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e9f24697ac..b60c21972f 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -384,6 +384,7 @@ impl BlockLookups { // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let _guard = lookup.span.clone().entered(); // Add block components to the new request if let Some(block_component) = block_component { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 30947cf1f0..36509d2563 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -7,6 +7,7 @@ use crate::sync::network_context::{ use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use derivative::Derivative; use lighthouse_network::service::api_types::Id; +use lighthouse_tracing::SPAN_SINGLE_BLOCK_LOOKUP; use parking_lot::RwLock; use std::collections::HashSet; use std::fmt::Debug; @@ -14,6 +15,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; +use tracing::{Span, debug_span}; use types::blob_sidecar::FixedBlobSidecarList; use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; @@ -70,6 +72,7 @@ pub struct SingleBlockLookup { block_root: Hash256, awaiting_parent: Option, created: Instant, + pub(crate) span: Span, } #[derive(Debug)] @@ -89,6 +92,12 @@ impl SingleBlockLookup { id: Id, awaiting_parent: Option, ) -> Self { + let lookup_span = debug_span!( + SPAN_SINGLE_BLOCK_LOOKUP, + block_root = %requested_block_root, + id = id, + ); + Self { id, block_request_state: BlockRequestState::new(requested_block_root), @@ -97,6 +106,7 @@ impl SingleBlockLookup { block_root: requested_block_root, awaiting_parent, created: Instant::now(), + span: lookup_span, } } @@ -192,6 +202,7 @@ impl SingleBlockLookup { &mut self, cx: &mut SyncNetworkContext, ) -> Result { + let _guard = self.span.clone().entered(); // TODO: Check what's necessary to download, specially for blobs self.continue_request::>(cx, 0)?; @@ -257,6 +268,7 @@ impl SingleBlockLookup { // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. if self.all_components_processed() { + self.span = Span::none(); Ok(LookupResult::Completed) } else { Ok(LookupResult::Pending) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 07462a01fe..17a4295700 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -29,7 +29,7 @@ use lighthouse_network::service::api_types::{ DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; -use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST; +use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST}; use parking_lot::RwLock; pub use requests::LookupVerifyError; use requests::{ @@ -886,6 +886,11 @@ impl SyncNetworkContext { "Sync RPC request sent" ); + let request_span = debug_span!( + parent: Span::current(), + SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, + %block_root, + ); self.blocks_by_root_requests.insert( id, peer_id, @@ -893,8 +898,7 @@ impl SyncNetworkContext { // block and the peer must have it. true, BlocksByRootRequestItems::new(request), - // Not implemented - Span::none(), + request_span, ); Ok(LookupRequestResult::RequestSent(id.req_id)) diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index d973e83cea..71e002cc42 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -7,19 +7,17 @@ use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST; -use lru_cache::LRUTimeCache; use parking_lot::RwLock; -use rand::Rng; use std::collections::HashSet; +use std::hash::{BuildHasher, RandomState}; use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; -use tracing::{Span, debug, debug_span, field, warn}; +use tracing::{Span, debug, debug_span, warn}; use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; -const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); pub struct ActiveCustodyRequest { @@ -30,9 +28,7 @@ pub struct ActiveCustodyRequest { /// Active requests for 1 or more columns each active_batch_columns_requests: FnvHashMap, - /// Peers that have recently failed to successfully respond to a columns by root request. - /// Having a LRUTimeCache allows this request to not have to track disconnecting peers. - failed_peers: LRUTimeCache, + peer_attempts: HashMap, /// Set of peers that claim to have imported this block and their custody columns lookup_peers: Arc>>, /// Span for tracing the lifetime of this request. @@ -71,7 +67,11 @@ impl ActiveCustodyRequest { column_indices: &[ColumnIndex], lookup_peers: Arc>>, ) -> Self { - let span = debug_span!(parent: None, SPAN_OUTGOING_CUSTODY_REQUEST, %block_root); + let span = debug_span!( + parent: Span::current(), + SPAN_OUTGOING_CUSTODY_REQUEST, + %block_root, + ); Self { block_root, custody_id, @@ -81,7 +81,7 @@ impl ActiveCustodyRequest { .map(|index| (*index, ColumnRequest::new())), ), active_batch_columns_requests: <_>::default(), - failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), + peer_attempts: HashMap::new(), lookup_peers, span, _phantom: PhantomData, @@ -170,13 +170,6 @@ impl ActiveCustodyRequest { ?missing_column_indexes, "Custody column peer claims to not have some data" ); - - batch_request.span.record( - "missing_column_indexes", - field::debug(missing_column_indexes), - ); - - self.failed_peers.insert(peer_id); } } Err(err) => { @@ -195,13 +188,6 @@ impl ActiveCustodyRequest { .ok_or(Error::BadState("unknown column_index".to_owned()))? .on_download_error_and_mark_failure(req_id)?; } - - batch_request.span.record( - "missing_column_indexes", - field::debug(&batch_request.indices), - ); - - self.failed_peers.insert(peer_id); } }; @@ -238,52 +224,29 @@ impl ActiveCustodyRequest { let active_request_count_by_peer = cx.active_request_count_by_peer(); let mut columns_to_request_by_peer = HashMap::>::new(); let lookup_peers = self.lookup_peers.read(); + // Create deterministic hasher per request to ensure consistent peer ordering within + // this request (avoiding fragmentation) while varying selection across different requests + let random_state = RandomState::new(); - // Need to: - // - track how many active requests a peer has for load balancing - // - which peers have failures to attempt others - // - which peer returned what to have PeerGroup attributability - - for (column_index, request) in self.column_requests.iter_mut() { + for (column_index, request) in self.column_requests.iter() { if let Some(wait_duration) = request.is_awaiting_download() { + // Note: an empty response is considered a successful response, so we may end up + // retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`. if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { return Err(Error::TooManyFailures); } - // TODO(das): When is a fork and only a subset of your peers know about a block, we should - // only query the peers on that fork. Should this case be handled? How to handle it? - let custodial_peers = cx.get_custodial_peers(*column_index); + let peer_to_request = self.select_column_peer( + cx, + &active_request_count_by_peer, + &lookup_peers, + *column_index, + &random_state, + ); - // We draw from the total set of peers, but prioritize those peers who we have - // received an attestation / status / block message claiming to have imported the - // lookup. The frequency of those messages is low, so drawing only from lookup_peers - // could cause many lookups to take much longer or fail as they don't have enough - // custody peers on a given column - let mut priorized_peers = custodial_peers - .iter() - .map(|peer| { - ( - // Prioritize peers that claim to know have imported this block - if lookup_peers.contains(peer) { 0 } else { 1 }, - // De-prioritize peers that have failed to successfully respond to - // requests recently - self.failed_peers.contains(peer), - // Prefer peers with fewer requests to load balance across peers. - // We batch requests to the same peer, so count existence in the - // `columns_to_request_by_peer` as a single 1 request. - active_request_count_by_peer.get(peer).copied().unwrap_or(0) - + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), - // Random factor to break ties, otherwise the PeerID breaks ties - rand::rng().random::(), - *peer, - ) - }) - .collect::>(); - priorized_peers.sort_unstable(); - - if let Some((_, _, _, _, peer_id)) = priorized_peers.first() { + if let Some(peer_id) = peer_to_request { columns_to_request_by_peer - .entry(*peer_id) + .entry(peer_id) .or_default() .push(*column_index); } else if wait_duration > MAX_STALE_NO_PEERS_DURATION { @@ -298,6 +261,23 @@ impl ActiveCustodyRequest { } } + let peer_requests = columns_to_request_by_peer.len(); + if peer_requests > 0 { + let columns_requested_count = columns_to_request_by_peer + .values() + .map(|v| v.len()) + .sum::(); + debug!( + lookup_peers = lookup_peers.len(), + "Requesting {} columns from {} peers", columns_requested_count, peer_requests, + ); + } else { + debug!( + lookup_peers = lookup_peers.len(), + "No column peers found for look up", + ); + } + for (peer_id, indices) in columns_to_request_by_peer.into_iter() { let request_result = cx .data_column_lookup_request( @@ -317,8 +297,14 @@ impl ActiveCustodyRequest { match request_result { LookupRequestResult::RequestSent(req_id) => { + *self.peer_attempts.entry(peer_id).or_insert(0) += 1; + let client = cx.network_globals().client(&peer_id).kind; - let batch_columns_req_span = debug_span!("batch_columns_req", %peer_id, %client, missing_column_indexes = tracing::field::Empty); + let batch_columns_req_span = debug_span!( + "batch_columns_req", + %peer_id, + %client, + ); let _guard = batch_columns_req_span.clone().entered(); for column_index in &indices { let column_request = self @@ -345,11 +331,54 @@ impl ActiveCustodyRequest { Ok(None) } + + fn select_column_peer( + &self, + cx: &mut SyncNetworkContext, + active_request_count_by_peer: &HashMap, + lookup_peers: &HashSet, + column_index: ColumnIndex, + random_state: &RandomState, + ) -> Option { + // We draw from the total set of peers, but prioritize those peers who we have + // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take + // time to build up and we are likely to not find any column peers initially. + let custodial_peers = cx.get_custodial_peers(column_index); + let mut prioritized_peers = custodial_peers + .iter() + .filter(|peer| { + // Exclude peers that we have already made too many attempts to. + self.peer_attempts.get(peer).copied().unwrap_or(0) <= MAX_CUSTODY_PEER_ATTEMPTS + }) + .map(|peer| { + ( + // Prioritize peers that claim to know have imported this block + if lookup_peers.contains(peer) { 0 } else { 1 }, + // De-prioritize peers that we have already attempted to download from + self.peer_attempts.get(peer).copied().unwrap_or(0), + // Prefer peers with fewer requests to load balance across peers. + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // The hash ensures consistent peer ordering within this request + // to avoid fragmentation while varying selection across different requests. + random_state.hash_one(peer), + *peer, + ) + }) + .collect::>(); + prioritized_peers.sort_unstable(); + + prioritized_peers + .first() + .map(|(_, _, _, _, peer_id)| *peer_id) + } } /// TODO(das): this attempt count is nested into the existing lookup request count. const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; +/// Max number of attempts to request custody columns from a single peer. +const MAX_CUSTODY_PEER_ATTEMPTS: usize = 3; + struct ColumnRequest { status: Status, download_failures: usize, diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 125a77fc1e..4061cb4fdb 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,13 +1,15 @@ //! Identifies each data column subnet by an integer identifier. use crate::ChainSpec; use crate::data_column_sidecar::ColumnIndex; +use derivative::Derivative; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; use std::ops::{Deref, DerefMut}; #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Derivative, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derivative(Debug = "transparent")] #[serde(transparent)] pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64);