Merge remote-tracking branch 'origin/stable' into unstable-merge-v8

This commit is contained in:
Michael Sproul
2025-11-04 16:08:34 +11:00
23 changed files with 505 additions and 96 deletions

View File

@@ -497,9 +497,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self: &Arc<Self>,
batch_id: CustodyBackfillBatchId,
data_columns: DataColumnSidecarList<T::EthSpec>,
expected_cgc: u64,
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = move || processor.process_historic_data_columns(batch_id, data_columns);
let process_fn =
move || processor.process_historic_data_columns(batch_id, data_columns, expected_cgc);
let work = Work::ChainSegmentBackfill(Box::new(process_fn));

View File

@@ -426,6 +426,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self,
batch_id: CustodyBackfillBatchId,
downloaded_columns: DataColumnSidecarList<T::EthSpec>,
expected_cgc: u64,
) {
let _guard = debug_span!(
SPAN_CUSTODY_BACKFILL_SYNC_IMPORT_COLUMNS,
@@ -435,10 +436,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.entered();
let sent_columns = downloaded_columns.len();
let result = match self
.chain
.import_historical_data_column_batch(batch_id.epoch, downloaded_columns)
{
let result = match self.chain.import_historical_data_column_batch(
batch_id.epoch,
downloaded_columns,
expected_cgc,
) {
Ok(imported_columns) => {
metrics::inc_counter_by(
&metrics::BEACON_PROCESSOR_CUSTODY_BACKFILL_COLUMN_IMPORT_SUCCESS_TOTAL,

View File

@@ -382,11 +382,9 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
return None;
};
let mut missing_columns = HashSet::new();
// Skip all batches (Epochs) that don't have missing columns.
for epoch in Epoch::range_inclusive_rev(self.to_be_downloaded, column_da_boundary) {
missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch);
let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(epoch);
if !missing_columns.is_empty() {
self.to_be_downloaded = epoch;
@@ -445,6 +443,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
self.include_next_batch()
}
Entry::Vacant(entry) => {
let missing_columns = self.beacon_chain.get_missing_columns_for_epoch(batch_id);
entry.insert(BatchInfo::new(
&batch_id,
CUSTODY_BACKFILL_EPOCHS_PER_BATCH,
@@ -504,6 +503,7 @@ impl<T: BeaconChainTypes> CustodyBackFillSync<T> {
run_id: self.run_id,
},
data_columns,
self.cgc,
) {
crit!(
msg = "process_batch",

View File

@@ -70,16 +70,17 @@ impl<T: BeaconChainTypes> RangeDataColumnBatchRequest<T> {
HashMap::new();
let mut column_to_peer_id: HashMap<u64, PeerId> = HashMap::new();
for column in self
.requests
.values()
.filter_map(|req| req.to_finished())
.flatten()
{
received_columns_for_slot
.entry(column.slot())
.or_default()
.push(column.clone());
for req in self.requests.values() {
let Some(columns) = req.to_finished() else {
return None;
};
for column in columns {
received_columns_for_slot
.entry(column.slot())
.or_default()
.push(column.clone());
}
}
// Note: this assumes that only 1 peer is responsible for a column