Backfill peer attribution (#7762)

Partly addresses https://github.com/sigp/lighthouse/issues/7744


  Implement similar peer sync attribution like in #7733 for backfill sync.
This commit is contained in:
Pawan Dhananjay
2025-08-11 19:11:56 -07:00
committed by GitHub
parent 122f16776f
commit 80ba0b169b
7 changed files with 620 additions and 144 deletions

View File

@@ -9,6 +9,7 @@
//! sync as failed, log an error and attempt to retry once a new peer joins the node.
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::manager::BatchProcessResult;
use crate::sync::network_context::{
RangeRequestId, RpcRequestSendError, RpcResponseError, SyncNetworkContext,
@@ -28,7 +29,7 @@ use std::collections::{
};
use std::sync::Arc;
use tracing::{debug, error, info, warn};
use types::{Epoch, EthSpec};
use types::{ColumnIndex, Epoch, EthSpec};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
@@ -209,9 +210,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.network_globals
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(self.to_be_downloaded, None)
.next()
.is_some()
// backfill can't progress if we do not have peers in the required subnets post peerdas.
&& self.good_peers_on_sampling_subnets(self.to_be_downloaded, network)
{
// If there are peers to resume with, begin the resume.
debug!(start_epoch = ?self.current_start, awaiting_batches = self.batches.len(), processing_target = ?self.processing_target, "Resuming backfill sync");
@@ -305,6 +308,46 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
err: RpcResponseError,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
match coupling_error {
CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
action,
exceeded_retries,
} => {
debug!(?batch_id, error, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in faulty_peers {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
}
// Only retry if peer failure **and** retries have been exceeded
if !*exceeded_retries {
return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
}
CouplingError::BlobPeerFailure(msg) => {
tracing::debug!(?batch_id, msg, "Blob peer failure");
}
CouplingError::InternalError(msg) => {
error!(?batch_id, msg, "Block components coupling internal error");
}
}
}
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
@@ -834,12 +877,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
) -> Result<(), BackFillError> {
if matches!(self.state(), BackFillState::Paused) {
return Err(BackFillError::Paused);
}
if let Some(batch) = self.batches.get_mut(&batch_id) {
debug!(?batch_id, "Sending backfill batch");
let synced_peers = self
.network_globals
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(batch_id, None)
.cloned()
.collect::<HashSet<_>>();
@@ -898,6 +945,53 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
Ok(())
}
/// Retries partial column requests within the batch by creating new requests for the failed columns.
pub fn retry_partial_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
id: Id,
failed_columns: HashSet<ColumnIndex>,
mut failed_peers: HashSet<PeerId>,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
failed_peers.extend(&batch.failed_peers());
let req = batch.to_blocks_by_range_request().0;
let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers_for_epoch(batch_id, None)
.cloned()
.collect::<HashSet<_>>();
match network.retry_columns_by_range(
id,
&synced_peers,
&failed_peers,
req,
&failed_columns,
) {
Ok(_) => {
debug!(
?batch_id,
id, "Retried column requests from different peers"
);
return Ok(());
}
Err(e) => {
debug!(?batch_id, id, e, "Failed to retry partial batch");
}
}
} else {
return Err(BackFillError::InvalidSyncState(
"Batch should exist to be retried".to_string(),
));
}
Ok(())
}
/// When resuming a chain, this function searches for batches that need to be re-downloaded and
/// transitions their state to redownload the batch.
fn resume_batches(&mut self, network: &mut SyncNetworkContext<T>) -> Result<(), BackFillError> {
@@ -973,6 +1067,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return None;
}
if !self.good_peers_on_sampling_subnets(self.to_be_downloaded, network) {
debug!("Waiting for peers to be available on custody column subnets");
return None;
}
let batch_id = self.to_be_downloaded;
// this batch could have been included already being an optimistic batch
match self.batches.entry(batch_id) {
@@ -1005,6 +1104,36 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
/// Checks all sampling column subnets for peers. Returns `true` if there is at least one peer in
/// every sampling column subnet.
///
/// Returns `true` if peerdas isn't enabled for the epoch.
fn good_peers_on_sampling_subnets(
&self,
epoch: Epoch,
network: &SyncNetworkContext<T>,
) -> bool {
if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) {
// Require peers on all sampling column subnets before sending batches
let peers_on_all_custody_subnets = network
.network_globals()
.sampling_subnets()
.iter()
.all(|subnet_id| {
let peer_count = network
.network_globals()
.peers
.read()
.good_range_sync_custody_subnet_peers(*subnet_id)
.count();
peer_count > 0
});
peers_on_all_custody_subnets
} else {
true
}
}
/// Resets the start epoch based on the beacon chain.
///
/// This errors if the beacon chain indicates that backfill sync has already completed or is

View File

@@ -12,6 +12,20 @@ use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock,
};
use crate::sync::network_context::MAX_COLUMN_RETRIES;
/// Accumulates and couples beacon blocks with their associated data (blobs or data columns)
/// from range sync network responses.
///
/// This struct acts as temporary storage while multiple network responses arrive:
/// - Blocks themselves (always required)
/// - Blob sidecars (pre-Fulu fork)
/// - Data columns (Fulu fork and later)
///
/// It accumulates responses until all expected components are received, then couples
/// them together and returns complete `RpcBlock`s ready for processing. Handles validation
/// and peer failure detection during the coupling process.
pub struct RangeBlockComponentsRequest<E: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
@@ -35,16 +49,30 @@ enum RangeBlockDataRequest<E: EthSpec> {
/// The column indices corresponding to the request
column_peers: HashMap<DataColumnsByRangeRequestId, Vec<ColumnIndex>>,
expected_custody_columns: Vec<ColumnIndex>,
attempt: usize,
},
}
#[derive(Debug)]
pub struct CouplingError {
pub(crate) msg: String,
pub(crate) column_and_peer: Option<(Vec<(ColumnIndex, PeerId)>, PeerAction)>,
pub(crate) enum CouplingError {
InternalError(String),
/// The peer we requested the columns from was faulty/malicious
DataColumnPeerFailure {
error: String,
faulty_peers: Vec<(ColumnIndex, PeerId)>,
action: PeerAction,
exceeded_retries: bool,
},
BlobPeerFailure(String),
}
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
/// Creates a new range request for blocks and their associated data (blobs or data columns).
///
/// # Arguments
/// * `blocks_req_id` - Request ID for the blocks
/// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork)
/// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork
#[allow(clippy::type_complexity)]
pub fn new(
blocks_req_id: BlocksByRangeRequestId,
@@ -65,6 +93,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.collect(),
column_peers,
expected_custody_columns,
attempt: 0,
}
} else {
RangeBlockDataRequest::NoData
@@ -87,6 +116,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
requests,
expected_custody_columns: _,
column_peers,
attempt: _,
} => {
for (request, columns) in failed_column_requests.into_iter() {
requests.insert(request, ByRangeRequest::Active(request));
@@ -98,6 +128,9 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
/// Adds received blocks to the request.
///
/// Returns an error if the request ID doesn't match the expected blocks request.
pub fn add_blocks(
&mut self,
req_id: BlocksByRangeRequestId,
@@ -106,6 +139,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
self.blocks_request.finish(req_id, blocks)
}
/// Adds received blobs to the request.
///
/// Returns an error if this request expects data columns instead of blobs,
/// or if the request ID doesn't match.
pub fn add_blobs(
&mut self,
req_id: BlobsByRangeRequestId,
@@ -120,6 +157,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
/// Adds received custody columns to the request.
///
/// Returns an error if this request expects blobs instead of data columns,
/// or if the request ID is unknown.
pub fn add_custody_columns(
&mut self,
req_id: DataColumnsByRangeRequestId,
@@ -143,6 +184,11 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
/// Attempts to construct RPC blocks from all received components.
///
/// Returns `None` if not all expected requests have completed.
/// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid.
/// Returns `Some(Err(_))` if there are issues coupling blocks with their data.
pub fn responses(
&mut self,
spec: &ChainSpec,
@@ -151,7 +197,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
return None;
};
match &mut self.block_data_request {
let resp = match &mut self.block_data_request {
RangeBlockDataRequest::NoData => {
Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec))
}
@@ -169,6 +215,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
requests,
expected_custody_columns,
column_peers,
attempt,
} => {
let mut data_columns = vec![];
let mut column_to_peer_id: HashMap<u64, PeerId> = HashMap::new();
@@ -179,6 +226,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
data_columns.extend(data.clone())
}
// An "attempt" is complete here after we have received a response for all the
// requests we made. i.e. `req.to_finished()` returns Some for all requests.
*attempt += 1;
// Note: this assumes that only 1 peer is responsible for a column
// with a batch.
for (id, columns) in column_peers {
@@ -192,22 +243,31 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
data_columns,
column_to_peer_id,
expected_custody_columns,
*attempt,
spec,
);
if let Err(err) = &resp {
if let Some((peers, _)) = &err.column_and_peer {
for (_, peer) in peers.iter() {
// find the req id associated with the peer and
// delete it from the entries
requests.retain(|&k, _| k.peer != *peer);
}
if let Err(CouplingError::DataColumnPeerFailure {
error: _,
faulty_peers,
action: _,
exceeded_retries: _,
}) = &resp
{
for (_, peer) in faulty_peers.iter() {
// find the req id associated with the peer and
// delete it from the entries as we are going to make
// a separate attempt for those components.
requests.retain(|&k, _| k.peer != *peer);
}
}
Some(resp)
}
}
};
// Increment the attempt once this function returns the response or errors
resp
}
fn responses_with_blobs(
@@ -229,9 +289,8 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.unwrap_or(false);
pair_next_blob
} {
blob_list.push(blob_iter.next().ok_or_else(|| CouplingError {
msg: "Missing next blob".to_string(),
column_and_peer: None,
blob_list.push(blob_iter.next().ok_or_else(|| {
CouplingError::BlobPeerFailure("Missing next blob".to_string())
})?);
}
@@ -239,16 +298,14 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
for blob in blob_list {
let blob_index = blob.index as usize;
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
return Err(CouplingError {
msg: "Invalid blob index".to_string(),
column_and_peer: None,
});
return Err(CouplingError::BlobPeerFailure(
"Invalid blob index".to_string(),
));
};
if blob_opt.is_some() {
return Err(CouplingError {
msg: "Repeat blob index".to_string(),
column_and_peer: None,
});
return Err(CouplingError::BlobPeerFailure(
"Repeat blob index".to_string(),
));
} else {
*blob_opt = Some(blob);
}
@@ -257,24 +314,22 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
blobs_buffer.into_iter().flatten().collect::<Vec<_>>(),
max_blobs_per_block,
)
.map_err(|_| CouplingError {
msg: "Blobs returned exceeds max length".to_string(),
column_and_peer: None,
.map_err(|_| {
CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string())
})?;
responses.push(
RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError {
msg: format!("{e:?}"),
column_and_peer: None,
})?,
RpcBlock::new(None, block, Some(blobs))
.map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?,
)
}
// if accumulated sidecars is not empty, throw an error.
// if accumulated sidecars is not empty, log an error but return the responses
// as we can still make progress.
if blob_iter.next().is_some() {
return Err(CouplingError {
msg: "Received sidecars that don't pair well".to_string(),
column_and_peer: None,
});
tracing::debug!(
remaining_blobs=?blob_iter.collect::<Vec<_>>(),
"Received sidecars that don't pair well",
);
}
Ok(responses)
@@ -285,6 +340,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
data_columns: DataColumnSidecarList<E>,
column_to_peer: HashMap<u64, PeerId>,
expects_custody_columns: &[ColumnIndex],
attempt: usize,
spec: &ChainSpec,
) -> Result<Vec<RpcBlock<E>>, CouplingError> {
// Group data columns by block_root and index
@@ -300,10 +356,12 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.insert(index, column)
.is_some()
{
return Err(CouplingError {
msg: format!("Repeated column block_root {block_root:?} index {index}"),
column_and_peer: None,
});
// `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers
// we request the data from.
// If there are duplicated indices, its likely a peer sending us the same index multiple times.
// However we can still proceed even if there are extra columns, just log an error.
tracing::debug!(?block_root, ?index, "Repeated column for block_root");
continue;
}
}
@@ -311,52 +369,43 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
// plus we have columns for our custody requirements
let mut rpc_blocks = Vec::with_capacity(blocks.len());
let exceeded_retries = attempt >= MAX_COLUMN_RETRIES;
for block in blocks {
let block_root = get_block_root(&block);
rpc_blocks.push(if block.num_expected_blobs() > 0 {
let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root)
else {
// This PR ignores the fix from https://github.com/sigp/lighthouse/pull/5675
// which allows blobs to not match blocks.
// TODO(das): on the initial version of PeerDAS the beacon chain does not check
// rpc custody requirements and dropping this check can allow the block to have
// an inconsistent DB.
// For now, we always assume that the block peer is right.
// This is potentially dangerous as we can get isolated on a chain with a
// malicious block peer.
// TODO: fix this by checking the proposer signature before downloading columns.
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError {
msg: format!("No columns for block {block_root:?} with data"),
column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)),
return Err(CouplingError::DataColumnPeerFailure {
error: format!("No columns for block {block_root:?} with data"),
faulty_peers: responsible_peers,
action: PeerAction::LowToleranceError,
exceeded_retries,
});
};
let mut custody_columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
if let Some(data_column) = data_columns_by_index.remove(index) {
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
} else {
// Penalize the peer for claiming to have the columns but not returning
// them
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError {
msg: format!("Internal error, no request made for column {}", index),
column_and_peer: None,
});
return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index)));
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError {
msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)),
return Err(CouplingError::DataColumnPeerFailure {
error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
faulty_peers: naughty_peers,
action: PeerAction::LowToleranceError,
exceeded_retries
});
}
@@ -364,7 +413,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
if !data_columns_by_index.is_empty() {
let remaining_indices = data_columns_by_index.keys().collect::<Vec<_>>();
// log the error but don't return an error, we can still progress with extra columns.
tracing::error!(
tracing::debug!(
?block_root,
?remaining_indices,
"Not all columns consumed for block"
@@ -372,10 +421,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec)
.map_err(|e| CouplingError {
msg: format!("{:?}", e),
column_and_peer: None,
})?
.map_err(|e| CouplingError::InternalError(format!("{:?}", e)))?
} else {
// Block has no data, expects zero columns
RpcBlock::new_without_blobs(Some(block_root), block)
@@ -387,7 +433,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
// log the error but don't return an error, we can still progress with responses.
// this is most likely an internal error with overrequesting or a client bug.
tracing::error!(?remaining_roots, "Not all columns consumed for block");
tracing::debug!(?remaining_roots, "Not all columns consumed for block");
}
Ok(rpc_blocks)
@@ -419,6 +465,7 @@ impl<I: PartialEq + std::fmt::Display, T> ByRangeRequest<I, T> {
#[cfg(test)]
mod tests {
use super::RangeBlockComponentsRequest;
use crate::sync::network_context::MAX_COLUMN_RETRIES;
use beacon_chain::test_utils::{
generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs,
};
@@ -427,7 +474,7 @@ mod tests {
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, Id, RangeRequestId,
},
PeerId,
PeerAction, PeerId,
};
use rand::SeedableRng;
use std::sync::Arc;
@@ -666,4 +713,252 @@ mod tests {
// All completed construct response
info.responses(&spec).unwrap().unwrap();
}
#[test]
fn missing_custody_columns_from_faulty_peers() {
// GIVEN: A request expecting custody columns from multiple peers
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2, 3, 4];
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..2)
.map(|_| {
generate_rand_block_and_data_columns::<E>(
ForkName::Fulu,
NumBlobs::Number(1),
&mut rng,
&spec,
)
})
.collect::<Vec<_>>();
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
.iter()
.enumerate()
.map(|(i, column)| (columns_id(i as Id, components_id), vec![*column]))
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
);
// AND: All blocks are received successfully
info.add_blocks(
blocks_req_id,
blocks.iter().map(|b| b.0.clone().into()).collect(),
)
.unwrap();
// AND: Only some custody columns are received (columns 1 and 2)
for (i, &column_index) in expected_custody_columns.iter().take(2).enumerate() {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(
*req,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
.collect(),
)
.unwrap();
}
// AND: Remaining column requests are completed with empty data (simulating faulty peers)
for i in 2..4 {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(*req, vec![]).unwrap();
}
// WHEN: Attempting to construct RPC blocks
let result = info.responses(&spec).unwrap();
// THEN: Should fail with PeerFailure identifying the faulty peers
assert!(result.is_err());
if let Err(super::CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
action,
exceeded_retries,
}) = result
{
assert!(error.contains("Peers did not return column"));
assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing
assert_eq!(faulty_peers[0].0, 3); // column index 3
assert_eq!(faulty_peers[1].0, 4); // column index 4
assert!(matches!(action, PeerAction::LowToleranceError));
assert!(!exceeded_retries); // First attempt, should be false
} else {
panic!("Expected PeerFailure error");
}
}
#[test]
fn retry_logic_after_peer_failures() {
// GIVEN: A request expecting custody columns where some peers initially fail
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2];
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..2)
.map(|_| {
generate_rand_block_and_data_columns::<E>(
ForkName::Fulu,
NumBlobs::Number(1),
&mut rng,
&spec,
)
})
.collect::<Vec<_>>();
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
.iter()
.enumerate()
.map(|(i, column)| (columns_id(i as Id, components_id), vec![*column]))
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
);
// AND: All blocks are received
info.add_blocks(
blocks_req_id,
blocks.iter().map(|b| b.0.clone().into()).collect(),
)
.unwrap();
// AND: Only partial custody columns are received (column 1 but not 2)
let (req1, _) = columns_req_id.first().unwrap();
info.add_custody_columns(
*req1,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned())
.collect(),
)
.unwrap();
// AND: The missing column request is completed with empty data (peer failure)
let (req2, _) = columns_req_id.get(1).unwrap();
info.add_custody_columns(*req2, vec![]).unwrap();
// WHEN: First attempt to get responses fails
let result = info.responses(&spec).unwrap();
assert!(result.is_err());
// AND: We retry with a new peer for the failed column
let new_columns_req_id = columns_id(10 as Id, components_id);
let failed_column_requests = vec![(new_columns_req_id, vec![2])];
info.reinsert_failed_column_requests(failed_column_requests)
.unwrap();
// AND: The new peer provides the missing column data
info.add_custody_columns(
new_columns_req_id,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 2).cloned())
.collect(),
)
.unwrap();
// WHEN: Attempting to get responses again
let result = info.responses(&spec).unwrap();
// THEN: Should succeed with complete RPC blocks
assert!(result.is_ok());
let rpc_blocks = result.unwrap();
assert_eq!(rpc_blocks.len(), 2);
}
#[test]
fn max_retries_exceeded_behavior() {
// GIVEN: A request where peers consistently fail to provide required columns
let spec = test_spec::<E>();
let expected_custody_columns = vec![1, 2];
let mut rng = XorShiftRng::from_seed([42; 16]);
let blocks = (0..1)
.map(|_| {
generate_rand_block_and_data_columns::<E>(
ForkName::Fulu,
NumBlobs::Number(1),
&mut rng,
&spec,
)
})
.collect::<Vec<_>>();
let components_id = components_id();
let blocks_req_id = blocks_id(components_id);
let columns_req_id = expected_custody_columns
.iter()
.enumerate()
.map(|(i, column)| (columns_id(i as Id, components_id), vec![*column]))
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
None,
Some((columns_req_id.clone(), expected_custody_columns.clone())),
);
// AND: All blocks are received
info.add_blocks(
blocks_req_id,
blocks.iter().map(|b| b.0.clone().into()).collect(),
)
.unwrap();
// AND: Only partial custody columns are provided (column 1 but not 2)
let (req1, _) = columns_req_id.first().unwrap();
info.add_custody_columns(
*req1,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned())
.collect(),
)
.unwrap();
// AND: Column 2 request completes with empty data (persistent peer failure)
let (req2, _) = columns_req_id.get(1).unwrap();
info.add_custody_columns(*req2, vec![]).unwrap();
// WHEN: Multiple retry attempts are made (up to max retries)
for _ in 0..MAX_COLUMN_RETRIES {
let result = info.responses(&spec).unwrap();
assert!(result.is_err());
if let Err(super::CouplingError::DataColumnPeerFailure {
exceeded_retries, ..
}) = &result
{
if *exceeded_retries {
break;
}
}
}
// AND: One final attempt after exceeding max retries
let result = info.responses(&spec).unwrap();
// THEN: Should fail with exceeded_retries = true
assert!(result.is_err());
if let Err(super::CouplingError::DataColumnPeerFailure {
error: _,
faulty_peers,
action,
exceeded_retries,
}) = result
{
assert_eq!(faulty_peers.len(), 1); // column 2 missing
assert_eq!(faulty_peers[0].0, 2); // column index 2
assert!(matches!(action, PeerAction::LowToleranceError));
assert!(exceeded_retries); // Should be true after max retries
} else {
panic!("Expected PeerFailure error with exceeded_retries=true");
}
}
}

View File

@@ -55,6 +55,9 @@ use types::{
pub mod custody;
mod requests;
/// Max retries for block components after which we fail the batch.
pub const MAX_COLUMN_RETRIES: usize = 3;
#[derive(Debug)]
pub enum RpcEvent<T> {
StreamTermination,
@@ -435,14 +438,14 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
/// the batch.
pub fn retry_columns_by_range(
&mut self,
request_id: Id,
id: Id,
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
request: BlocksByRangeRequest,
failed_columns: &HashSet<ColumnIndex>,
) -> Result<(), String> {
let Some(requester) = self.components_by_range_requests.keys().find_map(|r| {
if r.id == request_id {
if r.id == id {
Some(r.requester)
} else {
None
@@ -455,6 +458,8 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
debug!(
?failed_columns,
?id,
?requester,
"Retrying only failed column requests from other peers"
);
@@ -469,10 +474,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
.map_err(|e| format!("{:?}", e))?;
// Reuse the id for the request that received partially correct responses
let id = ComponentsByRangeRequestId {
id: request_id,
requester,
};
let id = ComponentsByRangeRequestId { id, requester };
let data_column_requests = columns_by_range_peers_to_request
.into_iter()
@@ -683,18 +685,16 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
match range_block_component {
RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| {
request.add_blocks(req_id, blocks).map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError(
e,
))
})
}),
RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| {
request.add_blobs(req_id, blobs).map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError(
e,
))
})
}),
RangeBlockComponent::CustodyColumns(req_id, resp) => {
@@ -702,10 +702,9 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
request
.add_custody_columns(req_id, custody_columns)
.map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
RpcResponseError::BlockComponentCouplingError(
CouplingError::InternalError(e),
)
})
})
}
@@ -715,10 +714,27 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Some(Err(e));
}
if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) {
if blocks_result.is_ok() {
// remove the entry only if it coupled successfully with
// no errors
let range_req = entry.get_mut();
if let Some(blocks_result) = range_req.responses(&self.chain.spec) {
if let Err(CouplingError::DataColumnPeerFailure {
action: _,
error,
faulty_peers: _,
exceeded_retries,
}) = &blocks_result
{
// Remove the entry if it's a peer failure **and** retry counter is exceeded
if *exceeded_retries {
debug!(
entry=?entry.key(),
msg = error,
"Request exceeded max retries, failing batch"
);
entry.remove();
};
} else {
// also remove the entry only if it coupled successfully
// or if it isn't a column peer failure.
entry.remove();
}
// If the request is finished, dequeue everything

View File

@@ -86,6 +86,11 @@ impl<K: Eq + Hash, T: ActiveRequestItems> ActiveRequests<K, T> {
/// `add_item` may convert ReqResp success chunks into errors. This function handles the
/// multiple errors / stream termination internally ensuring that a single `Some<Result>` is
/// returned.
///
/// ## Returns
/// - `Some` if the request has either completed or errored, and needs to be actioned by the
/// caller.
/// - `None` if no further action is currently needed.
pub fn on_response(
&mut self,
id: K,

View File

@@ -817,32 +817,44 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(CouplingError {
column_and_peer,
msg,
}) = &err
{
debug!(?batch_id, msg, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
if let Some((column_and_peer, action)) = column_and_peer {
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in column_and_peer {
failed_columns.insert(*column);
failed_peers.insert(*peer);
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
match coupling_error {
CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
action,
exceeded_retries,
} => {
debug!(?batch_id, error, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in faulty_peers {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
}
// Retry the failed columns if the column requests haven't exceeded the
// max retries. Otherwise, remove treat it as a failed batch below.
if !*exceeded_retries {
return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
CouplingError::BlobPeerFailure(msg) => {
tracing::debug!(?batch_id, msg, "Blob peer failure");
}
CouplingError::InternalError(msg) => {
tracing::error!(?batch_id, msg, "Block components coupling internal error");
}
return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
}
// A batch could be retried without the peer failing the request (disconnecting/
@@ -900,14 +912,11 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let (request, batch_type) = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_peers();
// TODO(das): we should request only from peers that are part of this SyncingChain.
// However, then we hit the NoPeer error frequently which causes the batch to fail and
// the SyncingChain to be dropped. We need to handle this case more gracefully.
let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers_for_epoch(batch_id, &self.peers)
.synced_peers_for_epoch(batch_id, Some(&self.peers))
.cloned()
.collect::<HashSet<_>>();
@@ -984,7 +993,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.network_globals()
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(batch_id, Some(&self.peers))
.cloned()
.collect::<HashSet<_>>();
@@ -1084,11 +1093,13 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.sampling_subnets()
.iter()
.all(|subnet_id| {
let peer_count = network
.network_globals()
let peer_db = network.network_globals().peers.read();
let peer_count = self
.peers
.read()
.good_range_sync_custody_subnet_peer(*subnet_id, &self.peers)
.iter()
.filter(|peer| {
peer_db.is_good_range_sync_custody_subnet_peer(*subnet_id, peer)
})
.count();
peer_count > 0
});