Fix stuck data column lookups by improving peer selection and retry logic (#8005)

Fixes the issue described in #7980 where Lighthouse repeatedly sends `DataColumnsByRoot` requests to the same peers that return empty responses, causing sync to get stuck.

The root cause was we don't count empty responses as failures, leading to excessive retries to unresponsive peers.


  - Track per peer attempts to limit retry attempts per peer (`MAX_CUSTODY_PEER_ATTEMPTS = 3`)
- Replaced random peer selection with hashing within each lookup to prevent splitting lookup into too many small requests and improve request batching efficiency.
- Added `single_block_lookup` root span to track all lookups created and added more debug logs:

<img width="1264" height="501" alt="image" src="https://github.com/user-attachments/assets/983629ba-b6d0-41cf-8e93-88a5b96c2f31" />


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Jimmy Chen <jimmy@sigmaprime.io>
This commit is contained in:
Jimmy Chen
2025-09-09 16:18:05 +10:00
committed by GitHub
parent 8ec2640e04
commit ee734d1456
6 changed files with 117 additions and 67 deletions

View File

@@ -17,6 +17,8 @@ pub const SPAN_PROCESS_GOSSIP_BLOCK: &str = "process_gossip_block";
/// Sync methods root spans
pub const SPAN_SYNCING_CHAIN: &str = "syncing_chain";
pub const SPAN_OUTGOING_RANGE_REQUEST: &str = "outgoing_range_request";
pub const SPAN_SINGLE_BLOCK_LOOKUP: &str = "single_block_lookup";
pub const SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST: &str = "outgoing_block_by_root_request";
pub const SPAN_OUTGOING_CUSTODY_REQUEST: &str = "outgoing_custody_request";
pub const SPAN_PROCESS_RPC_BLOCK: &str = "process_rpc_block";
pub const SPAN_PROCESS_RPC_BLOBS: &str = "process_rpc_blobs";
@@ -46,7 +48,7 @@ pub const LH_BN_ROOT_SPAN_NAMES: &[&str] = &[
SPAN_PROCESS_GOSSIP_BLOB,
SPAN_PROCESS_GOSSIP_BLOCK,
SPAN_OUTGOING_RANGE_REQUEST,
SPAN_OUTGOING_CUSTODY_REQUEST,
SPAN_SINGLE_BLOCK_LOOKUP,
SPAN_PROCESS_RPC_BLOCK,
SPAN_PROCESS_RPC_BLOBS,
SPAN_PROCESS_RPC_CUSTODY_COLUMNS,

View File

@@ -384,6 +384,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
let _guard = lookup.span.clone().entered();
// Add block components to the new request
if let Some(block_component) = block_component {

View File

@@ -7,6 +7,7 @@ use crate::sync::network_context::{
use beacon_chain::{BeaconChainTypes, BlockProcessStatus};
use derivative::Derivative;
use lighthouse_network::service::api_types::Id;
use lighthouse_tracing::SPAN_SINGLE_BLOCK_LOOKUP;
use parking_lot::RwLock;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -14,6 +15,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use store::Hash256;
use strum::IntoStaticStr;
use tracing::{Span, debug_span};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot};
@@ -70,6 +72,7 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
block_root: Hash256,
awaiting_parent: Option<Hash256>,
created: Instant,
pub(crate) span: Span,
}
#[derive(Debug)]
@@ -89,6 +92,12 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
id: Id,
awaiting_parent: Option<Hash256>,
) -> Self {
let lookup_span = debug_span!(
SPAN_SINGLE_BLOCK_LOOKUP,
block_root = %requested_block_root,
id = id,
);
Self {
id,
block_request_state: BlockRequestState::new(requested_block_root),
@@ -97,6 +106,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
block_root: requested_block_root,
awaiting_parent,
created: Instant::now(),
span: lookup_span,
}
}
@@ -192,6 +202,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&mut self,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupRequestError> {
let _guard = self.span.clone().entered();
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx, 0)?;
@@ -257,6 +268,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// that can make progress so it must be dropped. Consider the lookup completed.
// This case can happen if we receive the components from gossip during a retry.
if self.all_components_processed() {
self.span = Span::none();
Ok(LookupResult::Completed)
} else {
Ok(LookupResult::Pending)

View File

@@ -29,7 +29,7 @@ use lighthouse_network::service::api_types::{
DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId,
};
use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource};
use lighthouse_tracing::SPAN_OUTGOING_RANGE_REQUEST;
use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST};
use parking_lot::RwLock;
pub use requests::LookupVerifyError;
use requests::{
@@ -886,6 +886,11 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
"Sync RPC request sent"
);
let request_span = debug_span!(
parent: Span::current(),
SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST,
%block_root,
);
self.blocks_by_root_requests.insert(
id,
peer_id,
@@ -893,8 +898,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
// block and the peer must have it.
true,
BlocksByRootRequestItems::new(request),
// Not implemented
Span::none(),
request_span,
);
Ok(LookupRequestResult::RequestSent(id.req_id))

View File

@@ -7,19 +7,17 @@ use fnv::FnvHashMap;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::{CustodyId, DataColumnsByRootRequester};
use lighthouse_tracing::SPAN_OUTGOING_CUSTODY_REQUEST;
use lru_cache::LRUTimeCache;
use parking_lot::RwLock;
use rand::Rng;
use std::collections::HashSet;
use std::hash::{BuildHasher, RandomState};
use std::time::{Duration, Instant};
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
use tracing::{Span, debug, debug_span, field, warn};
use tracing::{Span, debug, debug_span, warn};
use types::{DataColumnSidecar, Hash256, data_column_sidecar::ColumnIndex};
use types::{DataColumnSidecarList, EthSpec};
use super::{LookupRequestResult, PeerGroup, RpcResponseResult, SyncNetworkContext};
const FAILED_PEERS_CACHE_EXPIRY_SECONDS: u64 = 5;
const MAX_STALE_NO_PEERS_DURATION: Duration = Duration::from_secs(30);
pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
@@ -30,9 +28,7 @@ pub struct ActiveCustodyRequest<T: BeaconChainTypes> {
/// Active requests for 1 or more columns each
active_batch_columns_requests:
FnvHashMap<DataColumnsByRootRequestId, ActiveBatchColumnsRequest>,
/// Peers that have recently failed to successfully respond to a columns by root request.
/// Having a LRUTimeCache allows this request to not have to track disconnecting peers.
failed_peers: LRUTimeCache<PeerId>,
peer_attempts: HashMap<PeerId, usize>,
/// Set of peers that claim to have imported this block and their custody columns
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
/// Span for tracing the lifetime of this request.
@@ -71,7 +67,11 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
column_indices: &[ColumnIndex],
lookup_peers: Arc<RwLock<HashSet<PeerId>>>,
) -> Self {
let span = debug_span!(parent: None, SPAN_OUTGOING_CUSTODY_REQUEST, %block_root);
let span = debug_span!(
parent: Span::current(),
SPAN_OUTGOING_CUSTODY_REQUEST,
%block_root,
);
Self {
block_root,
custody_id,
@@ -81,7 +81,7 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.map(|index| (*index, ColumnRequest::new())),
),
active_batch_columns_requests: <_>::default(),
failed_peers: LRUTimeCache::new(Duration::from_secs(FAILED_PEERS_CACHE_EXPIRY_SECONDS)),
peer_attempts: HashMap::new(),
lookup_peers,
span,
_phantom: PhantomData,
@@ -170,13 +170,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
?missing_column_indexes,
"Custody column peer claims to not have some data"
);
batch_request.span.record(
"missing_column_indexes",
field::debug(missing_column_indexes),
);
self.failed_peers.insert(peer_id);
}
}
Err(err) => {
@@ -195,13 +188,6 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
.ok_or(Error::BadState("unknown column_index".to_owned()))?
.on_download_error_and_mark_failure(req_id)?;
}
batch_request.span.record(
"missing_column_indexes",
field::debug(&batch_request.indices),
);
self.failed_peers.insert(peer_id);
}
};
@@ -238,52 +224,29 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
let active_request_count_by_peer = cx.active_request_count_by_peer();
let mut columns_to_request_by_peer = HashMap::<PeerId, Vec<ColumnIndex>>::new();
let lookup_peers = self.lookup_peers.read();
// Create deterministic hasher per request to ensure consistent peer ordering within
// this request (avoiding fragmentation) while varying selection across different requests
let random_state = RandomState::new();
// Need to:
// - track how many active requests a peer has for load balancing
// - which peers have failures to attempt others
// - which peer returned what to have PeerGroup attributability
for (column_index, request) in self.column_requests.iter_mut() {
for (column_index, request) in self.column_requests.iter() {
if let Some(wait_duration) = request.is_awaiting_download() {
// Note: an empty response is considered a successful response, so we may end up
// retrying many more times than `MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS`.
if request.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS {
return Err(Error::TooManyFailures);
}
// TODO(das): When is a fork and only a subset of your peers know about a block, we should
// only query the peers on that fork. Should this case be handled? How to handle it?
let custodial_peers = cx.get_custodial_peers(*column_index);
let peer_to_request = self.select_column_peer(
cx,
&active_request_count_by_peer,
&lookup_peers,
*column_index,
&random_state,
);
// We draw from the total set of peers, but prioritize those peers who we have
// received an attestation / status / block message claiming to have imported the
// lookup. The frequency of those messages is low, so drawing only from lookup_peers
// could cause many lookups to take much longer or fail as they don't have enough
// custody peers on a given column
let mut priorized_peers = custodial_peers
.iter()
.map(|peer| {
(
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that have failed to successfully respond to
// requests recently
self.failed_peers.contains(peer),
// Prefer peers with fewer requests to load balance across peers.
// We batch requests to the same peer, so count existence in the
// `columns_to_request_by_peer` as a single 1 request.
active_request_count_by_peer.get(peer).copied().unwrap_or(0)
+ columns_to_request_by_peer.get(peer).map(|_| 1).unwrap_or(0),
// Random factor to break ties, otherwise the PeerID breaks ties
rand::rng().random::<u32>(),
*peer,
)
})
.collect::<Vec<_>>();
priorized_peers.sort_unstable();
if let Some((_, _, _, _, peer_id)) = priorized_peers.first() {
if let Some(peer_id) = peer_to_request {
columns_to_request_by_peer
.entry(*peer_id)
.entry(peer_id)
.or_default()
.push(*column_index);
} else if wait_duration > MAX_STALE_NO_PEERS_DURATION {
@@ -298,6 +261,23 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
}
}
let peer_requests = columns_to_request_by_peer.len();
if peer_requests > 0 {
let columns_requested_count = columns_to_request_by_peer
.values()
.map(|v| v.len())
.sum::<usize>();
debug!(
lookup_peers = lookup_peers.len(),
"Requesting {} columns from {} peers", columns_requested_count, peer_requests,
);
} else {
debug!(
lookup_peers = lookup_peers.len(),
"No column peers found for look up",
);
}
for (peer_id, indices) in columns_to_request_by_peer.into_iter() {
let request_result = cx
.data_column_lookup_request(
@@ -317,8 +297,14 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
match request_result {
LookupRequestResult::RequestSent(req_id) => {
*self.peer_attempts.entry(peer_id).or_insert(0) += 1;
let client = cx.network_globals().client(&peer_id).kind;
let batch_columns_req_span = debug_span!("batch_columns_req", %peer_id, %client, missing_column_indexes = tracing::field::Empty);
let batch_columns_req_span = debug_span!(
"batch_columns_req",
%peer_id,
%client,
);
let _guard = batch_columns_req_span.clone().entered();
for column_index in &indices {
let column_request = self
@@ -345,11 +331,54 @@ impl<T: BeaconChainTypes> ActiveCustodyRequest<T> {
Ok(None)
}
fn select_column_peer(
&self,
cx: &mut SyncNetworkContext<T>,
active_request_count_by_peer: &HashMap<PeerId, usize>,
lookup_peers: &HashSet<PeerId>,
column_index: ColumnIndex,
random_state: &RandomState,
) -> Option<PeerId> {
// We draw from the total set of peers, but prioritize those peers who we have
// received an attestation or a block from (`lookup_peers`), as the `lookup_peers` may take
// time to build up and we are likely to not find any column peers initially.
let custodial_peers = cx.get_custodial_peers(column_index);
let mut prioritized_peers = custodial_peers
.iter()
.filter(|peer| {
// Exclude peers that we have already made too many attempts to.
self.peer_attempts.get(peer).copied().unwrap_or(0) <= MAX_CUSTODY_PEER_ATTEMPTS
})
.map(|peer| {
(
// Prioritize peers that claim to know have imported this block
if lookup_peers.contains(peer) { 0 } else { 1 },
// De-prioritize peers that we have already attempted to download from
self.peer_attempts.get(peer).copied().unwrap_or(0),
// Prefer peers with fewer requests to load balance across peers.
active_request_count_by_peer.get(peer).copied().unwrap_or(0),
// The hash ensures consistent peer ordering within this request
// to avoid fragmentation while varying selection across different requests.
random_state.hash_one(peer),
*peer,
)
})
.collect::<Vec<_>>();
prioritized_peers.sort_unstable();
prioritized_peers
.first()
.map(|(_, _, _, _, peer_id)| *peer_id)
}
}
/// TODO(das): this attempt count is nested into the existing lookup request count.
const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3;
/// Max number of attempts to request custody columns from a single peer.
const MAX_CUSTODY_PEER_ATTEMPTS: usize = 3;
struct ColumnRequest<E: EthSpec> {
status: Status<E>,
download_failures: usize,

View File

@@ -1,13 +1,15 @@
//! Identifies each data column subnet by an integer identifier.
use crate::ChainSpec;
use crate::data_column_sidecar::ColumnIndex;
use derivative::Derivative;
use safe_arith::{ArithError, SafeArith};
use serde::{Deserialize, Serialize};
use std::fmt::{self, Display};
use std::ops::{Deref, DerefMut};
#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Copy, Derivative, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derivative(Debug = "transparent")]
#[serde(transparent)]
pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64);