Sync peer attribution (#7733)

Which issue # does this PR address?

Closes #7604


  Improvements to range sync including:

1. Contain column requests only to peers that are part of the SyncingChain
2. Attribute the fault to the correct peer and downscore them if they don't return the data columns for the request
3. Improve sync performance by retrying only the failed columns from other peers instead of failing the entire batch
4. Uses the earliest_available_slot to make requests to peers that claim to have the epoch. Note: if no earliest_available_slot info is available, fallback to using previous logic i.e. assume peer has everything backfilled upto WS checkpoint/da boundary

Tested this on fusaka-devnet-2 with a full node and supernode and the recovering logic seems to works well.
Also tested this a little on mainnet.

Need to do more testing and possibly add some unit tests.
This commit is contained in:
Pawan Dhananjay
2025-07-11 17:02:30 -07:00
committed by GitHub
parent b43e0b446c
commit 90ff64381e
9 changed files with 437 additions and 99 deletions

View File

@@ -1,15 +1,17 @@
use beacon_chain::{
block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root,
};
use lighthouse_network::service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
use lighthouse_network::{
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId,
},
PeerAction, PeerId,
};
use std::{collections::HashMap, sync::Arc};
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock,
};
pub struct RangeBlockComponentsRequest<E: EthSpec> {
/// Blocks we have received awaiting for their corresponding sidecar.
blocks_request: ByRangeRequest<BlocksByRangeRequestId, Vec<Arc<SignedBeaconBlock<E>>>>,
@@ -30,24 +32,38 @@ enum RangeBlockDataRequest<E: EthSpec> {
DataColumnsByRangeRequestId,
ByRangeRequest<DataColumnsByRangeRequestId, DataColumnSidecarList<E>>,
>,
/// The column indices corresponding to the request
column_peers: HashMap<DataColumnsByRangeRequestId, Vec<ColumnIndex>>,
expected_custody_columns: Vec<ColumnIndex>,
},
}
#[derive(Debug)]
pub struct CouplingError {
pub(crate) msg: String,
pub(crate) column_and_peer: Option<(Vec<(ColumnIndex, PeerId)>, PeerAction)>,
}
impl<E: EthSpec> RangeBlockComponentsRequest<E> {
#[allow(clippy::type_complexity)]
pub fn new(
blocks_req_id: BlocksByRangeRequestId,
blobs_req_id: Option<BlobsByRangeRequestId>,
data_columns: Option<(Vec<DataColumnsByRangeRequestId>, Vec<ColumnIndex>)>,
data_columns: Option<(
Vec<(DataColumnsByRangeRequestId, Vec<ColumnIndex>)>,
Vec<ColumnIndex>,
)>,
) -> Self {
let block_data_request = if let Some(blobs_req_id) = blobs_req_id {
RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id))
} else if let Some((requests, expected_custody_columns)) = data_columns {
let column_peers: HashMap<_, _> = requests.into_iter().collect();
RangeBlockDataRequest::DataColumns {
requests: requests
.into_iter()
.map(|id| (id, ByRangeRequest::Active(id)))
requests: column_peers
.keys()
.map(|id| (*id, ByRangeRequest::Active(*id)))
.collect(),
column_peers,
expected_custody_columns,
}
} else {
@@ -60,6 +76,28 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
/// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed
/// request for some columns.
pub fn reinsert_failed_column_requests(
&mut self,
failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec<u64>)>,
) -> Result<(), String> {
match &mut self.block_data_request {
RangeBlockDataRequest::DataColumns {
requests,
expected_custody_columns: _,
column_peers,
} => {
for (request, columns) in failed_column_requests.into_iter() {
requests.insert(request, ByRangeRequest::Active(request));
column_peers.insert(request, columns);
}
Ok(())
}
_ => Err("not a column request".to_string()),
}
}
pub fn add_blocks(
&mut self,
req_id: BlocksByRangeRequestId,
@@ -105,12 +143,15 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
}
}
pub fn responses(&self, spec: &ChainSpec) -> Option<Result<Vec<RpcBlock<E>>, String>> {
pub fn responses(
&mut self,
spec: &ChainSpec,
) -> Option<Result<Vec<RpcBlock<E>>, CouplingError>> {
let Some(blocks) = self.blocks_request.to_finished() else {
return None;
};
match &self.block_data_request {
match &mut self.block_data_request {
RangeBlockDataRequest::NoData => {
Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec))
}
@@ -127,8 +168,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
RangeBlockDataRequest::DataColumns {
requests,
expected_custody_columns,
column_peers,
} => {
let mut data_columns = vec![];
let mut column_to_peer_id: HashMap<u64, PeerId> = HashMap::new();
for req in requests.values() {
let Some(data) = req.to_finished() else {
return None;
@@ -136,12 +179,33 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
data_columns.extend(data.clone())
}
Some(Self::responses_with_custody_columns(
// Note: this assumes that only 1 peer is responsible for a column
// with a batch.
for (id, columns) in column_peers {
for column in columns {
column_to_peer_id.insert(*column, id.peer);
}
}
let resp = Self::responses_with_custody_columns(
blocks.to_vec(),
data_columns,
column_to_peer_id,
expected_custody_columns,
spec,
))
);
if let Err(err) = &resp {
if let Some((peers, _)) = &err.column_and_peer {
for (_, peer) in peers.iter() {
// find the req id associated with the peer and
// delete it from the entries
requests.retain(|&k, _| k.peer != *peer);
}
}
}
Some(resp)
}
}
}
@@ -150,7 +214,7 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
blobs: Vec<Arc<BlobSidecar<E>>>,
spec: &ChainSpec,
) -> Result<Vec<RpcBlock<E>>, String> {
) -> Result<Vec<RpcBlock<E>>, CouplingError> {
// There can't be more more blobs than blocks. i.e. sending any blob (empty
// included) for a skipped slot is not permitted.
let mut responses = Vec::with_capacity(blocks.len());
@@ -165,17 +229,26 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.unwrap_or(false);
pair_next_blob
} {
blob_list.push(blob_iter.next().ok_or("Missing next blob".to_string())?);
blob_list.push(blob_iter.next().ok_or_else(|| CouplingError {
msg: "Missing next blob".to_string(),
column_and_peer: None,
})?);
}
let mut blobs_buffer = vec![None; max_blobs_per_block];
for blob in blob_list {
let blob_index = blob.index as usize;
let Some(blob_opt) = blobs_buffer.get_mut(blob_index) else {
return Err("Invalid blob index".to_string());
return Err(CouplingError {
msg: "Invalid blob index".to_string(),
column_and_peer: None,
});
};
if blob_opt.is_some() {
return Err("Repeat blob index".to_string());
return Err(CouplingError {
msg: "Repeat blob index".to_string(),
column_and_peer: None,
});
} else {
*blob_opt = Some(blob);
}
@@ -184,13 +257,24 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
blobs_buffer.into_iter().flatten().collect::<Vec<_>>(),
max_blobs_per_block,
)
.map_err(|_| "Blobs returned exceeds max length".to_string())?;
responses.push(RpcBlock::new(None, block, Some(blobs)).map_err(|e| format!("{e:?}"))?)
.map_err(|_| CouplingError {
msg: "Blobs returned exceeds max length".to_string(),
column_and_peer: None,
})?;
responses.push(
RpcBlock::new(None, block, Some(blobs)).map_err(|e| CouplingError {
msg: format!("{e:?}"),
column_and_peer: None,
})?,
)
}
// if accumulated sidecars is not empty, throw an error.
if blob_iter.next().is_some() {
return Err("Received sidecars that don't pair well".to_string());
return Err(CouplingError {
msg: "Received sidecars that don't pair well".to_string(),
column_and_peer: None,
});
}
Ok(responses)
@@ -199,9 +283,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
fn responses_with_custody_columns(
blocks: Vec<Arc<SignedBeaconBlock<E>>>,
data_columns: DataColumnSidecarList<E>,
column_to_peer: HashMap<u64, PeerId>,
expects_custody_columns: &[ColumnIndex],
spec: &ChainSpec,
) -> Result<Vec<RpcBlock<E>>, String> {
) -> Result<Vec<RpcBlock<E>>, CouplingError> {
// Group data columns by block_root and index
let mut data_columns_by_block =
HashMap::<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>::new();
@@ -215,9 +300,10 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
.insert(index, column)
.is_some()
{
return Err(format!(
"Repeated column block_root {block_root:?} index {index}"
));
return Err(CouplingError {
msg: format!("Repeated column block_root {block_root:?} index {index}"),
column_and_peer: None,
});
}
}
@@ -235,30 +321,61 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
// TODO(das): on the initial version of PeerDAS the beacon chain does not check
// rpc custody requirements and dropping this check can allow the block to have
// an inconsistent DB.
return Err(format!("No columns for block {block_root:?} with data"));
// For now, we always assume that the block peer is right.
// This is potentially dangerous as we can get isolated on a chain with a
// malicious block peer.
// TODO: fix this by checking the proposer signature before downloading columns.
let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect();
return Err(CouplingError {
msg: format!("No columns for block {block_root:?} with data"),
column_and_peer: Some((responsible_peers, PeerAction::LowToleranceError)),
});
};
let mut custody_columns = vec![];
let mut naughty_peers = vec![];
for index in expects_custody_columns {
let Some(data_column) = data_columns_by_index.remove(index) else {
return Err(format!("No column for block {block_root:?} index {index}"));
};
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
if let Some(data_column) = data_columns_by_index.remove(index) {
// Safe to convert to `CustodyDataColumn`: we have asserted that the index of
// this column is in the set of `expects_custody_columns` and with the expected
// block root, so for the expected epoch of this batch.
custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column));
} else {
// Penalize the peer for claiming to have the columns but not returning
// them
let Some(responsible_peer) = column_to_peer.get(index) else {
return Err(CouplingError {
msg: format!("Internal error, no request made for column {}", index),
column_and_peer: None,
});
};
naughty_peers.push((*index, *responsible_peer));
}
}
if !naughty_peers.is_empty() {
return Err(CouplingError {
msg: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"),
column_and_peer: Some((naughty_peers, PeerAction::LowToleranceError)),
});
}
// Assert that there are no columns left
if !data_columns_by_index.is_empty() {
let remaining_indices = data_columns_by_index.keys().collect::<Vec<_>>();
return Err(format!(
"Not all columns consumed for block {block_root:?}: {remaining_indices:?}"
));
// log the error but don't return an error, we can still progress with extra columns.
tracing::error!(
?block_root,
?remaining_indices,
"Not all columns consumed for block"
);
}
RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, spec)
.map_err(|e| format!("{e:?}"))?
.map_err(|e| CouplingError {
msg: format!("{:?}", e),
column_and_peer: None,
})?
} else {
// Block has no data, expects zero columns
RpcBlock::new_without_blobs(Some(block_root), block)
@@ -268,7 +385,9 @@ impl<E: EthSpec> RangeBlockComponentsRequest<E> {
// Assert that there are no columns left for other blocks
if !data_columns_by_block.is_empty() {
let remaining_roots = data_columns_by_block.keys().collect::<Vec<_>>();
return Err(format!("Not all columns consumed: {remaining_roots:?}"));
// log the error but don't return an error, we can still progress with responses.
// this is most likely an internal error with overrequesting or a client bug.
tracing::error!(?remaining_roots, "Not all columns consumed for block");
}
Ok(rpc_blocks)
@@ -303,9 +422,12 @@ mod tests {
use beacon_chain::test_utils::{
generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, NumBlobs,
};
use lighthouse_network::service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, Id, RangeRequestId,
use lighthouse_network::{
service::api_types::{
BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, Id, RangeRequestId,
},
PeerId,
};
use rand::SeedableRng;
use std::sync::Arc;
@@ -342,10 +464,11 @@ mod tests {
DataColumnsByRangeRequestId {
id,
parent_request_id,
peer: PeerId::random(),
}
}
fn is_finished(info: &RangeBlockComponentsRequest<E>) -> bool {
fn is_finished(info: &mut RangeBlockComponentsRequest<E>) -> bool {
let spec = test_spec::<E>();
info.responses(&spec).is_some()
}
@@ -428,7 +551,7 @@ mod tests {
let columns_req_id = expects_custody_columns
.iter()
.enumerate()
.map(|(i, _)| columns_id(i as Id, components_id))
.map(|(i, column)| (columns_id(i as Id, components_id), vec![*column]))
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
blocks_req_id,
@@ -442,12 +565,13 @@ mod tests {
)
.unwrap();
// Assert response is not finished
assert!(!is_finished(&info));
assert!(!is_finished(&mut info));
// Send data columns
for (i, &column_index) in expects_custody_columns.iter().enumerate() {
let (req, _columns) = columns_req_id.get(i).unwrap();
info.add_custody_columns(
columns_req_id.get(i).copied().unwrap(),
*req,
blocks
.iter()
.flat_map(|b| b.1.iter().filter(|d| d.index == column_index).cloned())
@@ -457,7 +581,7 @@ mod tests {
if i < expects_custody_columns.len() - 1 {
assert!(
!is_finished(&info),
!is_finished(&mut info),
"requested should not be finished at loop {i}"
);
}
@@ -485,7 +609,7 @@ mod tests {
let columns_req_id = batched_column_requests
.iter()
.enumerate()
.map(|(i, _)| columns_id(i as Id, components_id))
.map(|(i, columns)| (columns_id(i as Id, components_id), columns.clone()))
.collect::<Vec<_>>();
let mut info = RangeBlockComponentsRequest::<E>::new(
@@ -513,12 +637,13 @@ mod tests {
)
.unwrap();
// Assert response is not finished
assert!(!is_finished(&info));
assert!(!is_finished(&mut info));
for (i, column_indices) in batched_column_requests.iter().enumerate() {
let (req, _columns) = columns_req_id.get(i).unwrap();
// Send the set of columns in the same batch request
info.add_custody_columns(
columns_req_id.get(i).copied().unwrap(),
*req,
blocks
.iter()
.flat_map(|b| {
@@ -532,7 +657,7 @@ mod tests {
if i < num_of_data_column_requests - 1 {
assert!(
!is_finished(&info),
!is_finished(&mut info),
"requested should not be finished at loop {i}"
);
}

View File

@@ -14,6 +14,7 @@ use crate::network_beacon_processor::TestBeaconChainType;
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
use crate::sync::block_lookups::SingleLookupId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState};
@@ -81,7 +82,7 @@ pub enum RpcResponseError {
RpcError(#[allow(dead_code)] RPCError),
VerifyError(LookupVerifyError),
CustodyRequestError(#[allow(dead_code)] CustodyRequestError),
BlockComponentCouplingError(#[allow(dead_code)] String),
BlockComponentCouplingError(CouplingError),
}
#[derive(Debug, PartialEq, Eq)]
@@ -441,6 +442,79 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
active_request_count_by_peer
}
/// Retries only the specified failed columns by requesting them again.
///
/// Note: This function doesn't retry the whole batch, but retries specific requests within
/// the batch.
pub fn retry_columns_by_range(
&mut self,
request_id: Id,
peers: &HashSet<PeerId>,
peers_to_deprioritize: &HashSet<PeerId>,
request: BlocksByRangeRequest,
failed_columns: &HashSet<ColumnIndex>,
) -> Result<(), String> {
let Some(requester) = self.components_by_range_requests.keys().find_map(|r| {
if r.id == request_id {
Some(r.requester)
} else {
None
}
}) else {
return Err("request id not present".to_string());
};
let active_request_count_by_peer = self.active_request_count_by_peer();
debug!(
?failed_columns,
"Retrying only failed column requests from other peers"
);
// Attempt to find all required custody peers to request the failed columns from
let columns_by_range_peers_to_request = self
.select_columns_by_range_peers_to_request(
failed_columns,
peers,
active_request_count_by_peer,
peers_to_deprioritize,
)
.map_err(|e| format!("{:?}", e))?;
// Reuse the id for the request that received partially correct responses
let id = ComponentsByRangeRequestId {
id: request_id,
requester,
};
let data_column_requests = columns_by_range_peers_to_request
.into_iter()
.map(|(peer_id, columns)| {
self.send_data_columns_by_range_request(
peer_id,
DataColumnsByRangeRequest {
start_slot: *request.start_slot(),
count: *request.count(),
columns,
},
id,
)
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("{:?}", e))?;
// instead of creating a new `RangeBlockComponentsRequest`, we reinsert
// the new requests created for the failed requests
let Some(range_request) = self.components_by_range_requests.get_mut(&id) else {
return Err(
"retrying custody request for range request that does not exist".to_string(),
);
};
range_request.reinsert_failed_column_requests(data_column_requests)?;
Ok(())
}
/// A blocks by range request sent by the range sync algorithm
pub fn block_components_by_range_request(
&mut self,
@@ -619,20 +693,31 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let request = entry.get_mut();
match range_block_component {
RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| {
request
.add_blocks(req_id, blocks)
.map_err(RpcResponseError::BlockComponentCouplingError)
request.add_blocks(req_id, blocks).map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
})
}),
RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| {
request
.add_blobs(req_id, blobs)
.map_err(RpcResponseError::BlockComponentCouplingError)
request.add_blobs(req_id, blobs).map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
})
}),
RangeBlockComponent::CustodyColumns(req_id, resp) => {
resp.and_then(|(custody_columns, _)| {
request
.add_custody_columns(req_id, custody_columns)
.map_err(RpcResponseError::BlockComponentCouplingError)
.map_err(|e| {
RpcResponseError::BlockComponentCouplingError(CouplingError {
msg: e,
column_and_peer: None,
})
})
})
}
}
@@ -641,8 +726,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
return Some(Err(e));
}
if let Some(blocks_result) = entry.get().responses(&self.chain.spec) {
entry.remove();
if let Some(blocks_result) = entry.get_mut().responses(&self.chain.spec) {
if blocks_result.is_ok() {
// remove the entry only if it coupled successfully with
// no errors
entry.remove();
}
// If the request is finished, dequeue everything
Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError))
} else {
@@ -1075,10 +1164,12 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
peer_id: PeerId,
request: DataColumnsByRangeRequest,
parent_request_id: ComponentsByRangeRequestId,
) -> Result<DataColumnsByRangeRequestId, RpcRequestSendError> {
) -> Result<(DataColumnsByRangeRequestId, Vec<u64>), RpcRequestSendError> {
let requested_columns = request.columns.clone();
let id = DataColumnsByRangeRequestId {
id: self.next_id(),
parent_request_id,
peer: peer_id,
};
self.send_network_msg(NetworkMessage::SendRequest {
@@ -1106,7 +1197,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
false,
DataColumnsByRangeRequestItems::new(request),
);
Ok(id)
Ok((id, requested_columns))
}
pub fn is_execution_engine_online(&self) -> bool {

View File

@@ -89,6 +89,7 @@ pub enum BatchOperationOutcome {
Failed { blacklist: bool },
}
#[derive(Debug)]
pub enum BatchProcessingResult {
Success,
FaultyFailure,
@@ -364,7 +365,6 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
}
#[must_use = "Batch may have failed"]
pub fn processing_completed(
&mut self,
procesing_result: BatchProcessingResult,

View File

@@ -2,6 +2,7 @@ use super::batch::{BatchInfo, BatchProcessingResult, BatchState};
use super::RangeSyncType;
use crate::metrics;
use crate::network_beacon_processor::ChainSegmentProcessId;
use crate::sync::block_sidecar_coupling::CouplingError;
use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError};
use crate::sync::{network_context::SyncNetworkContext, BatchOperationOutcome, BatchProcessResult};
use beacon_chain::block_verification_types::RpcBlock;
@@ -12,7 +13,7 @@ use logging::crit;
use std::collections::{btree_map::Entry, BTreeMap, HashSet};
use strum::IntoStaticStr;
use tracing::{debug, instrument, warn};
use types::{Epoch, EthSpec, Hash256, Slot};
use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot};
/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
@@ -826,11 +827,37 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(CouplingError {
column_and_peer,
msg,
}) = &err
{
debug!(?batch_id, msg, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
if let Some((column_and_peer, action)) = column_and_peer {
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in column_and_peer {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}
for peer in failed_peers.iter() {
network.report_peer(*peer, *action, "failed to return columns");
}
return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
}
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
debug!(
batch_epoch = %batch_id,
@@ -891,7 +918,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.network_globals()
.peers
.read()
.synced_peers()
.synced_peers_for_epoch(batch_id, &self.peers)
.cloned()
.collect::<HashSet<_>>();
@@ -951,6 +978,50 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
Ok(KeepChain)
}
/// Retries partial column requests within the batch by creating new requests for the failed columns.
#[instrument(parent = None, fields(chain = self.id , service = "range_sync"), skip_all)]
pub fn retry_partial_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
id: Id,
failed_columns: HashSet<ColumnIndex>,
mut failed_peers: HashSet<PeerId>,
) -> ProcessingResult {
if let Some(batch) = self.batches.get_mut(&batch_id) {
failed_peers.extend(&batch.failed_peers());
let req = batch.to_blocks_by_range_request().0;
let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers()
.cloned()
.collect::<HashSet<_>>();
match network.retry_columns_by_range(
id,
&synced_peers,
&failed_peers,
req,
&failed_columns,
) {
Ok(_) => {
debug!(
?batch_id,
id, "Retried column requests from different peers"
);
return Ok(KeepChain);
}
Err(e) => {
debug!(?batch_id, id, e, "Failed to retry partial batch");
}
}
}
Ok(KeepChain)
}
/// Returns true if this chain is currently syncing.
pub fn is_syncing(&self) -> bool {
match self.state {
@@ -1031,9 +1102,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.network_globals()
.peers
.read()
.good_custody_subnet_peer(*subnet_id)
.good_range_sync_custody_subnet_peer(*subnet_id, &self.peers)
.count();
peer_count > 0
});
peers_on_all_custody_subnets

View File

@@ -77,7 +77,7 @@ impl TestRig {
/// Produce a head peer with an advanced head
fn add_head_peer_with_root(&mut self, head_root: Hash256) -> PeerId {
let local_info = self.local_info();
self.add_random_peer(SyncInfo {
self.add_supernode_peer(SyncInfo {
head_root,
head_slot: local_info.head_slot + 1 + Slot::new(SLOT_IMPORT_TOLERANCE as u64),
..local_info
@@ -93,7 +93,7 @@ impl TestRig {
fn add_finalized_peer_with_root(&mut self, finalized_root: Hash256) -> PeerId {
let local_info = self.local_info();
let finalized_epoch = local_info.finalized_epoch + 2;
self.add_random_peer(SyncInfo {
self.add_supernode_peer(SyncInfo {
finalized_epoch,
finalized_root,
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
@@ -132,13 +132,13 @@ impl TestRig {
}
}
fn add_random_peer_not_supernode(&mut self, remote_info: SyncInfo) -> PeerId {
fn add_fullnode_peer(&mut self, remote_info: SyncInfo) -> PeerId {
let peer_id = self.new_connected_peer();
self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info));
peer_id
}
fn add_random_peer(&mut self, remote_info: SyncInfo) -> PeerId {
fn add_supernode_peer(&mut self, remote_info: SyncInfo) -> PeerId {
// Create valid peer known to network globals
// TODO(fulu): Using supernode peers to ensure we have peer across all column
// subnets for syncing. Should add tests connecting to full node peers.
@@ -148,17 +148,13 @@ impl TestRig {
peer_id
}
fn add_random_peers(&mut self, remote_info: SyncInfo, count: usize) {
for _ in 0..count {
fn add_fullnode_peers(&mut self, remote_info: SyncInfo, peer_count: usize) {
for _ in 0..peer_count {
let peer = self.new_connected_peer();
self.add_peer(peer, remote_info.clone());
self.send_sync_message(SyncMessage::AddPeer(peer, remote_info.clone()));
}
}
fn add_peer(&mut self, peer: PeerId, remote_info: SyncInfo) {
self.send_sync_message(SyncMessage::AddPeer(peer, remote_info));
}
fn assert_state(&self, state: RangeSyncType) {
assert_eq!(
self.sync_manager
@@ -562,19 +558,14 @@ const EXTRA_SYNCED_EPOCHS: u64 = 2 + 1;
fn finalized_sync_enough_global_custody_peers_few_chain_peers() {
// Run for all forks
let mut r = TestRig::test_setup();
// This test creates enough global custody peers to satisfy column queries but only adds few
// peers to the chain
r.new_connected_peers_for_peerdas();
let advanced_epochs: u64 = 2;
let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into());
// Current priorization only sends batches to idle peers, so we need enough peers for each batch
// TODO: Test this with a single peer in the chain, it should still work
r.add_random_peers(
remote_info,
(advanced_epochs + EXTRA_SYNCED_EPOCHS) as usize,
);
// Generate enough peers and supernodes to cover all custody columns
let peer_count = 100;
r.add_fullnode_peers(remote_info.clone(), peer_count);
r.add_supernode_peer(remote_info);
r.assert_state(RangeSyncType::Finalized);
let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
@@ -592,9 +583,9 @@ fn finalized_sync_not_enough_custody_peers_on_start() {
let advanced_epochs: u64 = 2;
let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into());
// Unikely that the single peer we added has enough columns for us. Tests are determinstic and
// Unikely that the single peer we added has enough columns for us. Tests are deterministic and
// this error should never be hit
r.add_random_peer_not_supernode(remote_info.clone());
r.add_fullnode_peer(remote_info.clone());
r.assert_state(RangeSyncType::Finalized);
// Because we don't have enough peers on all columns we haven't sent any request.
@@ -603,14 +594,9 @@ fn finalized_sync_not_enough_custody_peers_on_start() {
r.expect_empty_network();
// Generate enough peers and supernodes to cover all custody columns
r.new_connected_peers_for_peerdas();
// Note: not necessary to add this peers to the chain, as we draw from the global pool
// We still need to add enough peers to trigger batch downloads with idle peers. Same issue as
// the test above.
r.add_random_peers(
remote_info,
(advanced_epochs + EXTRA_SYNCED_EPOCHS - 1) as usize,
);
let peer_count = 100;
r.add_fullnode_peers(remote_info.clone(), peer_count);
r.add_supernode_peer(remote_info);
let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS;
r.complete_and_process_range_sync_until(last_epoch, filter());