From ee734d145665e435f8ea0abfbead37ff61b38549 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 9 Sep 2025 16:18:05 +1000 Subject: [PATCH] Fix stuck data column lookups by improving peer selection and retry logic (#8005) Fixes the issue described in #7980 where Lighthouse repeatedly sends `DataColumnsByRoot` requests to the same peers that return empty responses, causing sync to get stuck. The root cause was we don't count empty responses as failures, leading to excessive retries to unresponsive peers. - Track per peer attempts to limit retry attempts per peer (`MAX_CUSTODY_PEER_ATTEMPTS = 3`) - Replaced random peer selection with hashing within each lookup to prevent splitting lookup into too many small requests and improve request batching efficiency. - Added `single_block_lookup` root span to track all lookups created and added more debug logs: image Co-Authored-By: Jimmy Chen Co-Authored-By: Jimmy Chen --- beacon_node/lighthouse_tracing/src/lib.rs | 4 +- .../network/src/sync/block_lookups/mod.rs | 1 + .../sync/block_lookups/single_block_lookup.rs | 12 ++ .../network/src/sync/network_context.rs | 10 +- .../src/sync/network_context/custody.rs | 153 +++++++++++------- consensus/types/src/data_column_subnet_id.rs | 4 +- 6 files changed, 117 insertions(+), 67 deletions(-) diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index ffbad1364c..d31df4e3dd 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -17,6 +17,8 @@ pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block"; /// Sync methods root spans pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain"; pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request"; +pub const SPAN_SINGLE_BLOCK_LOOKUP: &str = "single_block_lookup"; +pub const SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST: &str = "outgoing_block_by_root_request"; pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request"; pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block"; pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs"; @@ -46,7 +48,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[ SPAN_PROCESS_GOSSIP_BLOB, SPAN_PROCESS_GOSSIP_BLOCK, SPAN_OUTGOING_RANGE_REQUEST, - SPAN_OUTGOING_CUSTODY_REQUEST, + SPAN_SINGLE_BLOCK_LOOKUP, SPAN_PROCESS_RPC_BLOCK, SPAN_PROCESS_RPC_BLOBS, SPAN_PROCESS_RPC_CUSTODY_COLUMNS, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e9f24697ac..b60c21972f 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -384,6 +384,7 @@ impl BlockLookups { // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let _guard = lookup.span.clone().entered(); // Add block components to the new request if let Some(block_component) = block_component { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 30947cf1f0..36509d2563 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -7,6 +7,7 @@ use crate::sync::network_context::{ use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; use derivative::Derivative; use lighthouse_network::service::api_types::Id; +use lighthouse_tracing::SPAN_SINGLE_BLOCK_LOOKUP; use parking_lot::RwLock; use std::collections::HashSet; use std::fmt::Debug; @@ -14,6 +15,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; +use tracing::{Span, debug_span}; use types::blob_sidecar::FixedBlobSidecarList; use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; @@ -70,6 +72,7 @@ pub struct SingleBlockLookup { block_root: Hash256, awaiting_parent: Option, created: Instant, + pub(crate) span: Span, } #[derive(Debug)] @@ -89,6 +92,12 @@ impl SingleBlockLookup { id: Id, awaiting_parent: Option, ) -> Self { + let lookup_span = debug_span!( + SPAN_SINGLE_BLOCK_LOOKUP, + block_root = %requested_block_root, + id = id, + ); + Self { id, block_request_state: BlockRequestState::new(requested_block_root), @@ -97,6 +106,7 @@ impl SingleBlockLookup { block_root: requested_block_root, awaiting_parent, created: Instant::now(), + span: lookup_span, } } @@ -192,6 +202,7 @@ impl SingleBlockLookup { &mut self, cx: &mut SyncNetworkContext, ) -> Result { + let _guard = self.span.clone().entered(); // TODO: Check what's necessary to download, specially for blobs self.continue_request::>(cx, 0)?; @@ -257,6 +268,7 @@ impl SingleBlockLookup { // that can make progress so it must be dropped. Consider the lookup completed. // This case can happen if we receive the components from gossip during a retry. if self.all_components_processed() { + self.span = Span::none(); Ok(LookupResult::Completed) } else { Ok(LookupResult::Pending) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 07462a01fe..17a4295700 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -29,7 +29,7 @@ use lighthouse_network::service::api_types::{ DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; -use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST; +use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST}; use parking_lot::RwLock; pub use requests::LookupVerifyError; use requests::{ @@ -886,6 +886,11 @@ impl SyncNetworkContext { "Sync RPC request sent" ); + let request_span = debug_span!( + parent: Span::current(), + SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, + %block_root, + ); self.blocks_by_root_requests.insert( id, peer_id, @@ -893,8 +898,7 @@ impl SyncNetworkContext { // block and the peer must have it. true, BlocksByRootRequestItems::new(request), - // Not implemented - Span::none(), + request_span, ); Ok(LookupRequestResult::RequestSent(id.req_id)) diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index d973e83cea..71e002cc42 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -7,19 +7,17 @@ use fnv::FnvHashMap; use lighthouse_network::PeerId; use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester}; use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST; -use lru_cache::LRUTimeCache; use parking_lot::RwLock; -use rand::Rng; use std::collections::HashSet; +use std::hash::{BuildHasher, RandomState}; use std::time::{Duration, Instant}; use std::{collections::HashMap, marker::PhantomData, sync::Arc}; -use tracing::{Span, debug, debug_span, field, warn}; +use tracing::{Span, debug, debug_span, warn}; use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex}; use types::{DataColumnSidecarList, EthSpec}; use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext}; -const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5; const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30); pub struct ActiveCustodyRequest { @@ -30,9 +28,7 @@ pub struct ActiveCustodyRequest { /// Active requests for 1 or more columns each active_batch_columns_requests: FnvHashMap, - /// Peers that have recently failed to successfully respond to a columns by root request. - /// Having a LRUTimeCache allows this request to not have to track disconnecting peers. - failed_peers: LRUTimeCache, + peer_attempts: HashMap, /// Set of peers that claim to have imported this block and their custody columns lookup_peers: Arc>>, /// Span for tracing the lifetime of this request. @@ -71,7 +67,11 @@ impl ActiveCustodyRequest { column_indices: &[ColumnIndex], lookup_peers: Arc>>, ) -> Self { - let span = debug_span!(parent: None, SPAN_OUTGOING_CUSTODY_REQUEST, %block_root); + let span = debug_span!( + parent: Span::current(), + SPAN_OUTGOING_CUSTODY_REQUEST, + %block_root, + ); Self { block_root, custody_id, @@ -81,7 +81,7 @@ impl ActiveCustodyRequest { .map(|index| (*index, ColumnRequest::new())), ), active_batch_columns_requests: <_>::default(), - failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)), + peer_attempts: HashMap::new(), lookup_peers, span, _phantom: PhantomData, @@ -170,13 +170,6 @@ impl ActiveCustodyRequest { ?missing_column_indexes, "Custody column peer claims to not have some data" ); - - batch_request.span.record( - "missing_column_indexes", - field::debug(missing_column_indexes), - ); - - self.failed_peers.insert(peer_id); } } Err(err) => { @@ -195,13 +188,6 @@ impl ActiveCustodyRequest { .ok_or(Error::BadState("unknown column_index".to_owned()))? .on_download_error_and_mark_failure(req_id)?; } - - batch_request.span.record( - "missing_column_indexes", - field::debug(&batch_request.indices), - ); - - self.failed_peers.insert(peer_id); } }; @@ -238,52 +224,29 @@ impl ActiveCustodyRequest { let active_request_count_by_peer = cx.active_request_count_by_peer(); let mut columns_to_request_by_peer = HashMap::>::new(); let lookup_peers = self.lookup_peers.read(); + // Create deterministic hasher per request to ensure consistent peer ordering within + // this request (avoiding fragmentation) while varying selection across different requests + let random_state = RandomState::new(); - // Need to: - // - track how many active requests a peer has for load balancing - // - which peers have failures to attempt others - // - which peer returned what to have PeerGroup attributability - - for (column_index, request) in self.column_requests.iter_mut() { + for (column_index, request) in self.column_requests.iter() { if let Some(wait_duration) = request.is_awaiting_download() { + // Note: an empty response is considered a successful response, so we may end up + // retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`. if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { return Err(Error::TooManyFailures); } - // TODO(das): When is a fork and only a subset of your peers know about a block, we should - // only query the peers on that fork. Should this case be handled? How to handle it? - let custodial_peers = cx.get_custodial_peers(*column_index); + let peer_to_request = self.select_column_peer( + cx, + &active_request_count_by_peer, + &lookup_peers, + *column_index, + &random_state, + ); - // We draw from the total set of peers, but prioritize those peers who we have - // received an attestation / status / block message claiming to have imported the - // lookup. The frequency of those messages is low, so drawing only from lookup_peers - // could cause many lookups to take much longer or fail as they don't have enough - // custody peers on a given column - let mut priorized_peers = custodial_peers - .iter() - .map(|peer| { - ( - // Prioritize peers that claim to know have imported this block - if lookup_peers.contains(peer) { 0 } else { 1 }, - // De-prioritize peers that have failed to successfully respond to - // requests recently - self.failed_peers.contains(peer), - // Prefer peers with fewer requests to load balance across peers. - // We batch requests to the same peer, so count existence in the - // `columns_to_request_by_peer` as a single 1 request. - active_request_count_by_peer.get(peer).copied().unwrap_or(0) - + columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0), - // Random factor to break ties, otherwise the PeerID breaks ties - rand::rng().random::(), - *peer, - ) - }) - .collect::>(); - priorized_peers.sort_unstable(); - - if let Some((_, _, _, _, peer_id)) = priorized_peers.first() { + if let Some(peer_id) = peer_to_request { columns_to_request_by_peer - .entry(*peer_id) + .entry(peer_id) .or_default() .push(*column_index); } else if wait_duration > MAX_STALE_NO_PEERS_DURATION { @@ -298,6 +261,23 @@ impl ActiveCustodyRequest { } } + let peer_requests = columns_to_request_by_peer.len(); + if peer_requests > 0 { + let columns_requested_count = columns_to_request_by_peer + .values() + .map(|v| v.len()) + .sum::(); + debug!( + lookup_peers = lookup_peers.len(), + "Requesting {} columns from {} peers", columns_requested_count, peer_requests, + ); + } else { + debug!( + lookup_peers = lookup_peers.len(), + "No column peers found for look up", + ); + } + for (peer_id, indices) in columns_to_request_by_peer.into_iter() { let request_result = cx .data_column_lookup_request( @@ -317,8 +297,14 @@ impl ActiveCustodyRequest { match request_result { LookupRequestResult::RequestSent(req_id) => { + *self.peer_attempts.entry(peer_id).or_insert(0) += 1; + let client = cx.network_globals().client(&peer_id).kind; - let batch_columns_req_span = debug_span!("batch_columns_req", %peer_id, %client, missing_column_indexes = tracing::field::Empty); + let batch_columns_req_span = debug_span!( + "batch_columns_req", + %peer_id, + %client, + ); let _guard = batch_columns_req_span.clone().entered(); for column_index in &indices { let column_request = self @@ -345,11 +331,54 @@ impl ActiveCustodyRequest { Ok(None) } + + fn select_column_peer( + &self, + cx: &mut SyncNetworkContext, + active_request_count_by_peer: &HashMap, + lookup_peers: &HashSet, + column_index: ColumnIndex, + random_state: &RandomState, + ) -> Option { + // We draw from the total set of peers, but prioritize those peers who we have + // received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take + // time to build up and we are likely to not find any column peers initially. + let custodial_peers = cx.get_custodial_peers(column_index); + let mut prioritized_peers = custodial_peers + .iter() + .filter(|peer| { + // Exclude peers that we have already made too many attempts to. + self.peer_attempts.get(peer).copied().unwrap_or(0) <= MAX_CUSTODY_PEER_ATTEMPTS + }) + .map(|peer| { + ( + // Prioritize peers that claim to know have imported this block + if lookup_peers.contains(peer) { 0 } else { 1 }, + // De-prioritize peers that we have already attempted to download from + self.peer_attempts.get(peer).copied().unwrap_or(0), + // Prefer peers with fewer requests to load balance across peers. + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + // The hash ensures consistent peer ordering within this request + // to avoid fragmentation while varying selection across different requests. + random_state.hash_one(peer), + *peer, + ) + }) + .collect::>(); + prioritized_peers.sort_unstable(); + + prioritized_peers + .first() + .map(|(_, _, _, _, peer_id)| *peer_id) + } } /// TODO(das): this attempt count is nested into the existing lookup request count. const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; +/// Max number of attempts to request custody columns from a single peer. +const MAX_CUSTODY_PEER_ATTEMPTS: usize = 3; + struct ColumnRequest { status: Status, download_failures: usize, diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 125a77fc1e..4061cb4fdb 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,13 +1,15 @@ //! Identifies each data column subnet by an integer identifier. use crate::ChainSpec; use crate::data_column_sidecar::ColumnIndex; +use derivative::Derivative; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display}; use std::ops::{Deref, DerefMut}; #[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Clone, Copy, Derivative, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derivative(Debug = "transparent")] #[serde(transparent)] pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64);