From 80ba0b169bb0371287211a79abcf377e0378b474 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 11 Aug 2025 19:11:56 -0700 Subject: [PATCH] Backfill peer attribution (#7762) Partly addresses https://github.com/sigp/lighthouse/issues/7744 Implement similar peer sync attribution like in #7733 for backfill sync. --- .../src/peer_manager/peerdb.rs | 40 +- .../network/src/sync/backfill_sync/mod.rs | 135 +++++- .../src/sync/block_sidecar_coupling.rs | 437 +++++++++++++++--- .../network/src/sync/network_context.rs | 60 ++- .../src/sync/network_context/requests.rs | 5 + .../network/src/sync/range_sync/chain.rs | 77 +-- .../tests/checkpoint-sync-config-devnet.yaml | 10 +- 7 files changed, 620 insertions(+), 144 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 6559b24724..7dd4e6544d 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -253,15 +253,17 @@ impl PeerDB { /// /// If `earliest_available_slot` info is not available, then return peer anyway assuming it has the /// required data. + /// + /// If `allowed_peers` is `Some`, then filters for the epoch only for those peers. pub fn synced_peers_for_epoch<'a>( &'a self, epoch: Epoch, - allowed_peers: &'a HashSet, + allowed_peers: Option<&'a HashSet>, ) -> impl Iterator { self.peers .iter() .filter(move |(peer_id, info)| { - allowed_peers.contains(peer_id) + allowed_peers.is_none_or(|allowed| allowed.contains(peer_id)) && info.is_connected() && match info.sync_status() { SyncStatus::Synced { info } => { @@ -270,7 +272,9 @@ impl PeerDB { SyncStatus::Advanced { info } => { info.has_slot(epoch.end_slot(E::slots_per_epoch())) } - _ => false, + SyncStatus::IrrelevantPeer + | SyncStatus::Behind { .. } + | SyncStatus::Unknown => false, } }) .map(|(peer_id, _)| peer_id) @@ -320,22 +324,36 @@ impl PeerDB { } /// Returns an iterator of all peers that are supposed to be custodying - /// the given subnet id that also belong to `allowed_peers`. - pub fn good_range_sync_custody_subnet_peer<'a>( - &'a self, + /// the given subnet id. + pub fn good_range_sync_custody_subnet_peers( + &self, subnet: DataColumnSubnetId, - allowed_peers: &'a HashSet, - ) -> impl Iterator { + ) -> impl Iterator { self.peers .iter() - .filter(move |(peer_id, info)| { + .filter(move |(_, info)| { // The custody_subnets hashset can be populated via enr or metadata - let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet); - allowed_peers.contains(peer_id) && info.is_connected() && is_custody_subnet_peer + info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) }) .map(|(peer_id, _)| peer_id) } + /// Returns `true` if the given peer is assigned to the given subnet. + /// else returns `false` + /// + /// Returns `false` if peer doesn't exist in peerdb. + pub fn is_good_range_sync_custody_subnet_peer( + &self, + subnet: DataColumnSubnetId, + peer: &PeerId, + ) -> bool { + if let Some(info) = self.peers.get(peer) { + info.is_connected() && info.is_assigned_to_custody_subnet(&subnet) + } else { + false + } + } + /// Gives the ids of all known disconnected peers. pub fn disconnected_peers(&self) -> impl Iterator { self.peers diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 0b67262cde..e7a57092dd 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -9,6 +9,7 @@ //! sync as failed, log an error and attempt to retry once a new peer joins the node. use crate::network_beacon_processor::ChainSegmentProcessId; +use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; use crate::sync::network_context::{ RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext, @@ -28,7 +29,7 @@ use std::collections::{ }; use std::sync::Arc; use tracing::{debug, error, info, warn}; -use types::{Epoch, EthSpec}; +use types::{ColumnIndex, Epoch, EthSpec}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -209,9 +210,11 @@ impl BackFillSync { .network_globals .peers .read() - .synced_peers() + .synced_peers_for_epoch(self.to_be_downloaded, None) .next() .is_some() + // backfill can't progress if we do not have peers in the required subnets post peerdas. + && self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { // If there are peers to resume with, begin the resume. debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync"); @@ -305,6 +308,46 @@ impl BackFillSync { err: RpcResponseError, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { + match coupling_error { + CouplingError::DataColumnPeerFailure { + error, + faulty_peers, + action, + exceeded_retries, + } => { + debug!(?batch_id, error, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in faulty_peers { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + + // Only retry if peer failure **and** retries have been exceeded + if !*exceeded_retries { + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); + } + } + CouplingError::BlobPeerFailure(msg) => { + tracing::debug!(?batch_id, msg, "Blob peer failure"); + } + CouplingError::InternalError(msg) => { + error!(?batch_id, msg, "Block components coupling internal error"); + } + } + } // A batch could be retried without the peer failing the request (disconnecting/ // sending an error /timeout) if the peer is removed from the chain for other // reasons. Check that this block belongs to the expected peer @@ -834,12 +877,16 @@ impl BackFillSync { network: &mut SyncNetworkContext, batch_id: BatchId, ) -> Result<(), BackFillError> { + if matches!(self.state(), BackFillState::Paused) { + return Err(BackFillError::Paused); + } if let Some(batch) = self.batches.get_mut(&batch_id) { + debug!(?batch_id, "Sending backfill batch"); let synced_peers = self .network_globals .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, None) .cloned() .collect::>(); @@ -898,6 +945,53 @@ impl BackFillSync { Ok(()) } + /// Retries partial column requests within the batch by creating new requests for the failed columns. + pub fn retry_partial_batch( + &mut self, + network: &mut SyncNetworkContext, + batch_id: BatchId, + id: Id, + failed_columns: HashSet, + mut failed_peers: HashSet, + ) -> Result<(), BackFillError> { + if let Some(batch) = self.batches.get_mut(&batch_id) { + failed_peers.extend(&batch.failed_peers()); + let req = batch.to_blocks_by_range_request().0; + + let synced_peers = network + .network_globals() + .peers + .read() + .synced_peers_for_epoch(batch_id, None) + .cloned() + .collect::>(); + + match network.retry_columns_by_range( + id, + &synced_peers, + &failed_peers, + req, + &failed_columns, + ) { + Ok(_) => { + debug!( + ?batch_id, + id, "Retried column requests from different peers" + ); + return Ok(()); + } + Err(e) => { + debug!(?batch_id, id, e, "Failed to retry partial batch"); + } + } + } else { + return Err(BackFillError::InvalidSyncState( + "Batch should exist to be retried".to_string(), + )); + } + Ok(()) + } + /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { @@ -973,6 +1067,11 @@ impl BackFillSync { return None; } + if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) { + debug!("Waiting for peers to be available on custody column subnets"); + return None; + } + let batch_id = self.to_be_downloaded; // this batch could have been included already being an optimistic batch match self.batches.entry(batch_id) { @@ -1005,6 +1104,36 @@ impl BackFillSync { } } + /// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in + /// every sampling column subnet. + /// + /// Returns `true` if peerdas isn't enabled for the epoch. + fn good_peers_on_sampling_subnets( + &self, + epoch: Epoch, + network: &SyncNetworkContext, + ) -> bool { + if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { + // Require peers on all sampling column subnets before sending batches + let peers_on_all_custody_subnets = network + .network_globals() + .sampling_subnets() + .iter() + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_range_sync_custody_subnet_peers(*subnet_id) + .count(); + peer_count > 0 + }); + peers_on_all_custody_subnets + } else { + true + } + } + /// Resets the start epoch based on the beacon chain. /// /// This errors if the beacon chain indicates that backfill sync has already completed or is diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 4653daa44a..8ccbc64a17 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -12,6 +12,20 @@ use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; + +use crate::sync::network_context::MAX_COLUMN_RETRIES; + +/// Accumulates and couples beacon blocks with their associated data (blobs or data columns) +/// from range sync network responses. +/// +/// This struct acts as temporary storage while multiple network responses arrive: +/// - Blocks themselves (always required) +/// - Blob sidecars (pre-Fulu fork) +/// - Data columns (Fulu fork and later) +/// +/// It accumulates responses until all expected components are received, then couples +/// them together and returns complete `RpcBlock`s ready for processing. Handles validation +/// and peer failure detection during the coupling process. pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, @@ -35,16 +49,30 @@ enum RangeBlockDataRequest { /// The column indices corresponding to the request column_peers: HashMap>, expected_custody_columns: Vec, + attempt: usize, }, } #[derive(Debug)] -pub struct CouplingError { - pub(crate) msg: String, - pub(crate) column_and_peer: Option<(Vec<(ColumnIndex, PeerId)>, PeerAction)>, +pub(crate) enum CouplingError { + InternalError(String), + /// The peer we requested the columns from was faulty/malicious + DataColumnPeerFailure { + error: String, + faulty_peers: Vec<(ColumnIndex, PeerId)>, + action: PeerAction, + exceeded_retries: bool, + }, + BlobPeerFailure(String), } impl RangeBlockComponentsRequest { + /// Creates a new range request for blocks and their associated data (blobs or data columns). + /// + /// # Arguments + /// * `blocks_req_id` - Request ID for the blocks + /// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork) + /// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork #[allow(clippy::type_complexity)] pub fn new( blocks_req_id: BlocksByRangeRequestId, @@ -65,6 +93,7 @@ impl RangeBlockComponentsRequest { .collect(), column_peers, expected_custody_columns, + attempt: 0, } } else { RangeBlockDataRequest::NoData @@ -87,6 +116,7 @@ impl RangeBlockComponentsRequest { requests, expected_custody_columns: _, column_peers, + attempt: _, } => { for (request, columns) in failed_column_requests.into_iter() { requests.insert(request, ByRangeRequest::Active(request)); @@ -98,6 +128,9 @@ impl RangeBlockComponentsRequest { } } + /// Adds received blocks to the request. + /// + /// Returns an error if the request ID doesn't match the expected blocks request. pub fn add_blocks( &mut self, req_id: BlocksByRangeRequestId, @@ -106,6 +139,10 @@ impl RangeBlockComponentsRequest { self.blocks_request.finish(req_id, blocks) } + /// Adds received blobs to the request. + /// + /// Returns an error if this request expects data columns instead of blobs, + /// or if the request ID doesn't match. pub fn add_blobs( &mut self, req_id: BlobsByRangeRequestId, @@ -120,6 +157,10 @@ impl RangeBlockComponentsRequest { } } + /// Adds received custody columns to the request. + /// + /// Returns an error if this request expects blobs instead of data columns, + /// or if the request ID is unknown. pub fn add_custody_columns( &mut self, req_id: DataColumnsByRangeRequestId, @@ -143,6 +184,11 @@ impl RangeBlockComponentsRequest { } } + /// Attempts to construct RPC blocks from all received components. + /// + /// Returns `None` if not all expected requests have completed. + /// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid. + /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. pub fn responses( &mut self, spec: &ChainSpec, @@ -151,7 +197,7 @@ impl RangeBlockComponentsRequest { return None; }; - match &mut self.block_data_request { + let resp = match &mut self.block_data_request { RangeBlockDataRequest::NoData => { Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) } @@ -169,6 +215,7 @@ impl RangeBlockComponentsRequest { requests, expected_custody_columns, column_peers, + attempt, } => { let mut data_columns = vec![]; let mut column_to_peer_id: HashMap = HashMap::new(); @@ -179,6 +226,10 @@ impl RangeBlockComponentsRequest { data_columns.extend(data.clone()) } + // An "attempt" is complete here after we have received a response for all the + // requests we made. i.e. `req.to_finished()` returns Some for all requests. + *attempt += 1; + // Note: this assumes that only 1 peer is responsible for a column // with a batch. for (id, columns) in column_peers { @@ -192,22 +243,31 @@ impl RangeBlockComponentsRequest { data_columns, column_to_peer_id, expected_custody_columns, + *attempt, spec, ); - if let Err(err) = &resp { - if let Some((peers, _)) = &err.column_and_peer { - for (_, peer) in peers.iter() { - // find the req id associated with the peer and - // delete it from the entries - requests.retain(|&k, _| k.peer != *peer); - } + if let Err(CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + action: _, + exceeded_retries: _, + }) = &resp + { + for (_, peer) in faulty_peers.iter() { + // find the req id associated with the peer and + // delete it from the entries as we are going to make + // a separate attempt for those components. + requests.retain(|&k, _| k.peer != *peer); } } Some(resp) } - } + }; + + // Increment the attempt once this function returns the response or errors + resp } fn responses_with_blobs( @@ -229,9 +289,8 @@ impl RangeBlockComponentsRequest { .unwrap_or(false); pair_next_blob } { - blob_list.push(blob_iter.next().ok_or_else(|| CouplingError { - msg: "Missing next blob".to_string(), - column_and_peer: None, + blob_list.push(blob_iter.next().ok_or_else(|| { + CouplingError::BlobPeerFailure("Missing next blob".to_string()) })?); } @@ -239,16 +298,14 @@ impl RangeBlockComponentsRequest { for blob in blob_list { let blob_index = blob.index as usize; let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else { - return Err(CouplingError { - msg: "Invalid blob index".to_string(), - column_and_peer: None, - }); + return Err(CouplingError::BlobPeerFailure( + "Invalid blob index".to_string(), + )); }; if blob_opt.is_some() { - return Err(CouplingError { - msg: "Repeat blob index".to_string(), - column_and_peer: None, - }); + return Err(CouplingError::BlobPeerFailure( + "Repeat blob index".to_string(), + )); } else { *blob_opt = Some(blob); } @@ -257,24 +314,22 @@ impl RangeBlockComponentsRequest { blobs_buffer.into_iter().flatten().collect::>(), max_blobs_per_block, ) - .map_err(|_| CouplingError { - msg: "Blobs returned exceeds max length".to_string(), - column_and_peer: None, + .map_err(|_| { + CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string()) })?; responses.push( - RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError { - msg: format!("{e:?}"), - column_and_peer: None, - })?, + RpcBlock::new(None, block, Some(blobs)) + .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } - // if accumulated sidecars is not empty, throw an error. + // if accumulated sidecars is not empty, log an error but return the responses + // as we can still make progress. if blob_iter.next().is_some() { - return Err(CouplingError { - msg: "Received sidecars that don't pair well".to_string(), - column_and_peer: None, - }); + tracing::debug!( + remaining_blobs=?blob_iter.collect::>(), + "Received sidecars that don't pair well", + ); } Ok(responses) @@ -285,6 +340,7 @@ impl RangeBlockComponentsRequest { data_columns: DataColumnSidecarList, column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], + attempt: usize, spec: &ChainSpec, ) -> Result>, CouplingError> { // Group data columns by block_root and index @@ -300,10 +356,12 @@ impl RangeBlockComponentsRequest { .insert(index, column) .is_some() { - return Err(CouplingError { - msg: format!("Repeated column block_root {block_root:?} index {index}"), - column_and_peer: None, - }); + // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers + // we request the data from. + // If there are duplicated indices, its likely a peer sending us the same index multiple times. + // However we can still proceed even if there are extra columns, just log an error. + tracing::debug!(?block_root, ?index, "Repeated column for block_root"); + continue; } } @@ -311,52 +369,43 @@ impl RangeBlockComponentsRequest { // plus we have columns for our custody requirements let mut rpc_blocks = Vec::with_capacity(blocks.len()); + let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; for block in blocks { let block_root = get_block_root(&block); rpc_blocks.push(if block.num_expected_blobs() > 0 { let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) else { - // This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675 - // which allows blobs to not match blocks. - // TODO(das): on the initial version of PeerDAS the beacon chain does not check - // rpc custody requirements and dropping this check can allow the block to have - // an inconsistent DB. - - // For now, we always assume that the block peer is right. - // This is potentially dangerous as we can get isolated on a chain with a - // malicious block peer. - // TODO: fix this by checking the proposer signature before downloading columns. let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError { - msg: format!("No columns for block {block_root:?} with data"), - column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)), + return Err(CouplingError::DataColumnPeerFailure { + error: format!("No columns for block {block_root:?} with data"), + faulty_peers: responsible_peers, + action: PeerAction::LowToleranceError, + exceeded_retries, + }); }; let mut custody_columns = vec![]; let mut naughty_peers = vec![]; for index in expects_custody_columns { + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. if let Some(data_column) = data_columns_by_index.remove(index) { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { - // Penalize the peer for claiming to have the columns but not returning - // them let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError { - msg: format!("Internal error, no request made for column {}", index), - column_and_peer: None, - }); + return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); }; naughty_peers.push((*index, *responsible_peer)); } } if !naughty_peers.is_empty() { - return Err(CouplingError { - msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)), + return Err(CouplingError::DataColumnPeerFailure { + error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), + faulty_peers: naughty_peers, + action: PeerAction::LowToleranceError, + exceeded_retries }); } @@ -364,7 +413,7 @@ impl RangeBlockComponentsRequest { if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); // log the error but don't return an error, we can still progress with extra columns. - tracing::error!( + tracing::debug!( ?block_root, ?remaining_indices, "Not all columns consumed for block" @@ -372,10 +421,7 @@ impl RangeBlockComponentsRequest { } RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec) - .map_err(|e| CouplingError { - msg: format!("{:?}", e), - column_and_peer: None, - })? + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns RpcBlock::new_without_blobs(Some(block_root), block) @@ -387,7 +433,7 @@ impl RangeBlockComponentsRequest { let remaining_roots = data_columns_by_block.keys().collect::>(); // log the error but don't return an error, we can still progress with responses. // this is most likely an internal error with overrequesting or a client bug. - tracing::error!(?remaining_roots, "Not all columns consumed for block"); + tracing::debug!(?remaining_roots, "Not all columns consumed for block"); } Ok(rpc_blocks) @@ -419,6 +465,7 @@ impl ByRangeRequest { #[cfg(test)] mod tests { use super::RangeBlockComponentsRequest; + use crate::sync::network_context::MAX_COLUMN_RETRIES; use beacon_chain::test_utils::{ generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs, }; @@ -427,7 +474,7 @@ mod tests { BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, DataColumnsByRangeRequestId, Id, RangeRequestId, }, - PeerId, + PeerAction, PeerId, }; use rand::SeedableRng; use std::sync::Arc; @@ -666,4 +713,252 @@ mod tests { // All completed construct response info.responses(&spec).unwrap().unwrap(); } + + #[test] + fn missing_custody_columns_from_faulty_peers() { + // GIVEN: A request expecting custody columns from multiple peers + let spec = test_spec::(); + let expected_custody_columns = vec![1, 2, 3, 4]; + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..2) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = expected_custody_columns + .iter() + .enumerate() + .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expected_custody_columns.clone())), + ); + + // AND: All blocks are received successfully + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + ) + .unwrap(); + + // AND: Only some custody columns are received (columns 1 and 2) + for (i, &column_index) in expected_custody_columns.iter().take(2).enumerate() { + let (req, _columns) = columns_req_id.get(i).unwrap(); + info.add_custody_columns( + *req, + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned()) + .collect(), + ) + .unwrap(); + } + + // AND: Remaining column requests are completed with empty data (simulating faulty peers) + for i in 2..4 { + let (req, _columns) = columns_req_id.get(i).unwrap(); + info.add_custody_columns(*req, vec![]).unwrap(); + } + + // WHEN: Attempting to construct RPC blocks + let result = info.responses(&spec).unwrap(); + + // THEN: Should fail with PeerFailure identifying the faulty peers + assert!(result.is_err()); + if let Err(super::CouplingError::DataColumnPeerFailure { + error, + faulty_peers, + action, + exceeded_retries, + }) = result + { + assert!(error.contains("Peers did not return column")); + assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing + assert_eq!(faulty_peers[0].0, 3); // column index 3 + assert_eq!(faulty_peers[1].0, 4); // column index 4 + assert!(matches!(action, PeerAction::LowToleranceError)); + assert!(!exceeded_retries); // First attempt, should be false + } else { + panic!("Expected PeerFailure error"); + } + } + + #[test] + fn retry_logic_after_peer_failures() { + // GIVEN: A request expecting custody columns where some peers initially fail + let spec = test_spec::(); + let expected_custody_columns = vec![1, 2]; + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..2) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = expected_custody_columns + .iter() + .enumerate() + .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expected_custody_columns.clone())), + ); + + // AND: All blocks are received + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + ) + .unwrap(); + + // AND: Only partial custody columns are received (column 1 but not 2) + let (req1, _) = columns_req_id.first().unwrap(); + info.add_custody_columns( + *req1, + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned()) + .collect(), + ) + .unwrap(); + + // AND: The missing column request is completed with empty data (peer failure) + let (req2, _) = columns_req_id.get(1).unwrap(); + info.add_custody_columns(*req2, vec![]).unwrap(); + + // WHEN: First attempt to get responses fails + let result = info.responses(&spec).unwrap(); + assert!(result.is_err()); + + // AND: We retry with a new peer for the failed column + let new_columns_req_id = columns_id(10 as Id, components_id); + let failed_column_requests = vec![(new_columns_req_id, vec![2])]; + info.reinsert_failed_column_requests(failed_column_requests) + .unwrap(); + + // AND: The new peer provides the missing column data + info.add_custody_columns( + new_columns_req_id, + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == 2).cloned()) + .collect(), + ) + .unwrap(); + + // WHEN: Attempting to get responses again + let result = info.responses(&spec).unwrap(); + + // THEN: Should succeed with complete RPC blocks + assert!(result.is_ok()); + let rpc_blocks = result.unwrap(); + assert_eq!(rpc_blocks.len(), 2); + } + + #[test] + fn max_retries_exceeded_behavior() { + // GIVEN: A request where peers consistently fail to provide required columns + let spec = test_spec::(); + let expected_custody_columns = vec![1, 2]; + let mut rng = XorShiftRng::from_seed([42; 16]); + let blocks = (0..1) + .map(|_| { + generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ) + }) + .collect::>(); + + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let columns_req_id = expected_custody_columns + .iter() + .enumerate() + .map(|(i, column)| (columns_id(i as Id, components_id), vec![*column])) + .collect::>(); + let mut info = RangeBlockComponentsRequest::::new( + blocks_req_id, + None, + Some((columns_req_id.clone(), expected_custody_columns.clone())), + ); + + // AND: All blocks are received + info.add_blocks( + blocks_req_id, + blocks.iter().map(|b| b.0.clone().into()).collect(), + ) + .unwrap(); + + // AND: Only partial custody columns are provided (column 1 but not 2) + let (req1, _) = columns_req_id.first().unwrap(); + info.add_custody_columns( + *req1, + blocks + .iter() + .flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned()) + .collect(), + ) + .unwrap(); + + // AND: Column 2 request completes with empty data (persistent peer failure) + let (req2, _) = columns_req_id.get(1).unwrap(); + info.add_custody_columns(*req2, vec![]).unwrap(); + + // WHEN: Multiple retry attempts are made (up to max retries) + for _ in 0..MAX_COLUMN_RETRIES { + let result = info.responses(&spec).unwrap(); + assert!(result.is_err()); + + if let Err(super::CouplingError::DataColumnPeerFailure { + exceeded_retries, .. + }) = &result + { + if *exceeded_retries { + break; + } + } + } + + // AND: One final attempt after exceeding max retries + let result = info.responses(&spec).unwrap(); + + // THEN: Should fail with exceeded_retries = true + assert!(result.is_err()); + if let Err(super::CouplingError::DataColumnPeerFailure { + error: _, + faulty_peers, + action, + exceeded_retries, + }) = result + { + assert_eq!(faulty_peers.len(), 1); // column 2 missing + assert_eq!(faulty_peers[0].0, 2); // column index 2 + assert!(matches!(action, PeerAction::LowToleranceError)); + assert!(exceeded_retries); // Should be true after max retries + } else { + panic!("Expected PeerFailure error with exceeded_retries=true"); + } + } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 5d321ad9e8..0a1c6fbd3a 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -55,6 +55,9 @@ use types::{ pub mod custody; mod requests; +/// Max retries for block components after which we fail the batch. +pub const MAX_COLUMN_RETRIES: usize = 3; + #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -435,14 +438,14 @@ impl SyncNetworkContext { /// the batch. pub fn retry_columns_by_range( &mut self, - request_id: Id, + id: Id, peers: &HashSet, peers_to_deprioritize: &HashSet, request: BlocksByRangeRequest, failed_columns: &HashSet, ) -> Result<(), String> { let Some(requester) = self.components_by_range_requests.keys().find_map(|r| { - if r.id == request_id { + if r.id == id { Some(r.requester) } else { None @@ -455,6 +458,8 @@ impl SyncNetworkContext { debug!( ?failed_columns, + ?id, + ?requester, "Retrying only failed column requests from other peers" ); @@ -469,10 +474,7 @@ impl SyncNetworkContext { .map_err(|e| format!("{:?}", e))?; // Reuse the id for the request that received partially correct responses - let id = ComponentsByRangeRequestId { - id: request_id, - requester, - }; + let id = ComponentsByRangeRequestId { id, requester }; let data_column_requests = columns_by_range_peers_to_request .into_iter() @@ -683,18 +685,16 @@ impl SyncNetworkContext { match range_block_component { RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { request.add_blocks(req_id, blocks).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) }) }), RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { request.add_blobs(req_id, blobs).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( + e, + )) }) }), RangeBlockComponent::CustodyColumns(req_id, resp) => { @@ -702,10 +702,9 @@ impl SyncNetworkContext { request .add_custody_columns(req_id, custody_columns) .map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError { - msg: e, - column_and_peer: None, - }) + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) }) }) } @@ -715,10 +714,27 @@ impl SyncNetworkContext { return Some(Err(e)); } - if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) { - if blocks_result.is_ok() { - // remove the entry only if it coupled successfully with - // no errors + let range_req = entry.get_mut(); + if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + if let Err(CouplingError::DataColumnPeerFailure { + action: _, + error, + faulty_peers: _, + exceeded_retries, + }) = &blocks_result + { + // Remove the entry if it's a peer failure **and** retry counter is exceeded + if *exceeded_retries { + debug!( + entry=?entry.key(), + msg = error, + "Request exceeded max retries, failing batch" + ); + entry.remove(); + }; + } else { + // also remove the entry only if it coupled successfully + // or if it isn't a column peer failure. entry.remove(); } // If the request is finished, dequeue everything diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 963b633ed6..f42595fb69 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -86,6 +86,11 @@ impl ActiveRequests { /// `add_item` may convert ReqResp success chunks into errors. This function handles the /// multiple errors / stream termination internally ensuring that a single `Some` is /// returned. + /// + /// ## Returns + /// - `Some` if the request has either completed or errored, and needs to be actioned by the + /// caller. + /// - `None` if no further action is currently needed. pub fn on_response( &mut self, id: K, diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 000d274a1b..90d1bf6621 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -817,32 +817,44 @@ impl SyncingChain { ) -> ProcessingResult { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { - if let RpcResponseError::BlockComponentCouplingError(CouplingError { - column_and_peer, - msg, - }) = &err - { - debug!(?batch_id, msg, "Block components coupling error"); - // Note: we don't fail the batch here because a `CouplingError` is - // recoverable by requesting from other honest peers. - if let Some((column_and_peer, action)) = column_and_peer { - let mut failed_columns = HashSet::new(); - let mut failed_peers = HashSet::new(); - for (column, peer) in column_and_peer { - failed_columns.insert(*column); - failed_peers.insert(*peer); + if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { + match coupling_error { + CouplingError::DataColumnPeerFailure { + error, + faulty_peers, + action, + exceeded_retries, + } => { + debug!(?batch_id, error, "Block components coupling error"); + // Note: we don't fail the batch here because a `CouplingError` is + // recoverable by requesting from other honest peers. + let mut failed_columns = HashSet::new(); + let mut failed_peers = HashSet::new(); + for (column, peer) in faulty_peers { + failed_columns.insert(*column); + failed_peers.insert(*peer); + } + for peer in failed_peers.iter() { + network.report_peer(*peer, *action, "failed to return columns"); + } + // Retry the failed columns if the column requests haven't exceeded the + // max retries. Otherwise, remove treat it as a failed batch below. + if !*exceeded_retries { + return self.retry_partial_batch( + network, + batch_id, + request_id, + failed_columns, + failed_peers, + ); + } } - for peer in failed_peers.iter() { - network.report_peer(*peer, *action, "failed to return columns"); + CouplingError::BlobPeerFailure(msg) => { + tracing::debug!(?batch_id, msg, "Blob peer failure"); + } + CouplingError::InternalError(msg) => { + tracing::error!(?batch_id, msg, "Block components coupling internal error"); } - - return self.retry_partial_batch( - network, - batch_id, - request_id, - failed_columns, - failed_peers, - ); } } // A batch could be retried without the peer failing the request (disconnecting/ @@ -900,14 +912,11 @@ impl SyncingChain { let (request, batch_type) = batch.to_blocks_by_range_request(); let failed_peers = batch.failed_peers(); - // TODO(das): we should request only from peers that are part of this SyncingChain. - // However, then we hit the NoPeer error frequently which causes the batch to fail and - // the SyncingChain to be dropped. We need to handle this case more gracefully. let synced_peers = network .network_globals() .peers .read() - .synced_peers_for_epoch(batch_id, &self.peers) + .synced_peers_for_epoch(batch_id, Some(&self.peers)) .cloned() .collect::>(); @@ -984,7 +993,7 @@ impl SyncingChain { .network_globals() .peers .read() - .synced_peers() + .synced_peers_for_epoch(batch_id, Some(&self.peers)) .cloned() .collect::>(); @@ -1084,11 +1093,13 @@ impl SyncingChain { .sampling_subnets() .iter() .all(|subnet_id| { - let peer_count = network - .network_globals() + let peer_db = network.network_globals().peers.read(); + let peer_count = self .peers - .read() - .good_range_sync_custody_subnet_peer(*subnet_id, &self.peers) + .iter() + .filter(|peer| { + peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer) + }) .count(); peer_count > 0 }); diff --git a/scripts/tests/checkpoint-sync-config-devnet.yaml b/scripts/tests/checkpoint-sync-config-devnet.yaml index a5093631b4..de56e486cf 100644 --- a/scripts/tests/checkpoint-sync-config-devnet.yaml +++ b/scripts/tests/checkpoint-sync-config-devnet.yaml @@ -3,18 +3,20 @@ participants: - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-2 + # There isn't a devnet-4 image + el_image: ethpandaops/geth:fusaka-devnet-3 supernode: true - cl_type: lighthouse cl_image: lighthouse:local el_type: geth - el_image: ethpandaops/geth:fusaka-devnet-2 + # There isn't a devnet-4 image + el_image: ethpandaops/geth:fusaka-devnet-3 supernode: false checkpoint_sync_enabled: true -checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-2.ethpandaops.io" +checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-4.ethpandaops.io" global_log_level: debug network_params: - network: fusaka-devnet-2 + network: fusaka-devnet-4