Only publish reconstructed columns that we need to sample (#8269)

N/A


  We were publishing columns all columns that we didn't already have in the da cache when reconstructing. This is unnecessary outbound bandwidth for the node that is supposed to sample fewer columns.
This PR changes the behaviour to publish only columns that we are supposed to sample in the topics that we are subscribed to.


Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
Pawan Dhananjay
2025-10-22 22:05:08 -07:00
committed by GitHub
parent d8c6c57029
commit c668cb7d9a
2 changed files with 22 additions and 25 deletions

View File

@@ -617,48 +617,45 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
));
};
let data_columns_to_publish = all_data_columns
.into_iter()
.filter(|d| !existing_column_indices.contains(&d.index()))
.collect::<Vec<_>>();
let Some(slot) = data_columns_to_publish
.first()
.map(|d| d.as_data_column().slot())
else {
let Some(slot) = all_data_columns.first().map(|d| d.as_data_column().slot()) else {
return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported(
"No new columns to import and publish",
));
};
let columns_to_sample = self
.custody_context()
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec);
// We only need to import and publish columns that we need to sample
// and columns that we haven't already received
let data_columns_to_import_and_publish = all_data_columns
.into_iter()
.filter(|d| {
columns_to_sample.contains(&d.index())
&& !existing_column_indices.contains(&d.index())
})
.collect::<Vec<_>>();
metrics::stop_timer(timer);
metrics::inc_counter_by(
&metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS,
data_columns_to_publish.len() as u64,
data_columns_to_import_and_publish.len() as u64,
);
debug!(
count = data_columns_to_publish.len(),
count = data_columns_to_import_and_publish.len(),
?block_root,
%slot,
"Reconstructed columns"
);
let columns_to_sample = self
.custody_context()
.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec);
let data_columns_to_import: Vec<_> = data_columns_to_publish
.iter()
.filter(|column| columns_to_sample.contains(&column.index()))
.cloned()
.collect();
self.availability_cache
.put_kzg_verified_data_columns(*block_root, data_columns_to_import)
.put_kzg_verified_data_columns(*block_root, data_columns_to_import_and_publish.clone())
.map(|availability| {
DataColumnReconstructionResult::Success((
availability,
data_columns_to_publish
data_columns_to_import_and_publish
.into_iter()
.map(|d| d.clone_arc())
.collect::<Vec<_>>(),
@@ -1163,8 +1160,8 @@ mod test {
// Remaining 64 columns should be reconstructed
assert_eq!(
reconstructed_columns.len(),
64,
"should reconstruct the remaining 64 columns"
sampling_requirement - spec.number_of_custody_groups as usize / 2,
"should reconstruct the remaining 1 columns"
);
// Only the columns required for custody (65) should be imported into the cache

View File

@@ -1875,7 +1875,7 @@ pub static DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS: LazyLock<Result<IntCounter>>
LazyLock::new(|| {
try_create_int_counter(
"beacon_data_availability_reconstructed_columns_total",
"Total count of reconstructed columns",
"Total count of useful reconstructed columns",
)
});