some progress around reconstruction

This commit is contained in:
Daniel Knopik
2026-04-28 17:06:45 +02:00
parent 4535753c9b
commit 4ef4c7ddd4
2 changed files with 36 additions and 20 deletions

View File

@@ -470,7 +470,9 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>, pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>,
num_expected_columns: usize, num_expected_columns: usize,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> { ) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? { if let Some(available_envelope) =
pending_components.make_available(block_root, num_expected_columns)?
{
// Explicitly drop read lock before acquiring write lock // Explicitly drop read lock before acquiring write lock
drop(pending_components); drop(pending_components);
if let Some(components) = self.availability_cache.write().get_mut(&block_root) { if let Some(components) = self.availability_cache.write().get_mut(&block_root) {
@@ -534,23 +536,17 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
return ReconstructColumnsDecision::No("block already imported"); return ReconstructColumnsDecision::No("block already imported");
}; };
let Some(epoch) = pending_components let epoch = pending_components.epoch();
.verified_data_columns
.first()
.map(|c| c.as_data_column().epoch())
else {
return ReconstructColumnsDecision::No("not enough columns");
};
let total_column_count = T::EthSpec::number_of_columns(); let total_column_count = T::EthSpec::number_of_columns();
let sampling_column_count = self let sampling_column_count = self
.custody_context .custody_context
.num_of_data_columns_to_sample(epoch, &self.spec); .num_of_data_columns_to_sample(epoch, &self.spec);
let received_column_count = pending_components.verified_data_columns.len();
if pending_components.reconstruction_started { if pending_components.reconstruction_started {
return ReconstructColumnsDecision::No("already started"); return ReconstructColumnsDecision::No("already started");
} }
let received_column_count = pending_components.num_completed_columns();
if received_column_count >= sampling_column_count { if received_column_count >= sampling_column_count {
return ReconstructColumnsDecision::No("all sampling columns received"); return ReconstructColumnsDecision::No("all sampling columns received");
} }
@@ -559,7 +555,7 @@ impl<T: BeaconChainTypes> PendingPayloadCache<T> {
} }
pending_components.reconstruction_started = true; pending_components.reconstruction_started = true;
ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()) ReconstructColumnsDecision::Yes(pending_components.get_cached_data_columns(block_root))
} }
/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.

View File

@@ -30,6 +30,16 @@ pub struct PendingComponents<E: EthSpec> {
} }
impl<E: EthSpec> PendingComponents<E> { impl<E: EthSpec> PendingComponents<E> {
/// Returns the completed custody columns
pub fn get_cached_data_columns(&self, block_root: Hash256) -> Vec<Arc<DataColumnSidecar<E>>> {
self.verified_data_columns
.iter()
.filter_map(|(col_idx, col)| {
col.try_to_sidecar(*col_idx, self.slot, block_root, self.num_blobs_expected)
})
.collect()
}
/// Returns the indices of cached custody columns /// Returns the indices of cached custody columns
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> { pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
self.verified_data_columns self.verified_data_columns
@@ -81,6 +91,13 @@ impl<E: EthSpec> PendingComponents<E> {
self.num_blobs_expected self.num_blobs_expected
} }
pub fn num_completed_columns(&self) -> usize {
self.verified_data_columns
.iter()
.filter_map(|(_, col)| col.is_complete(self.num_blobs_expected).then_some(()))
.count()
}
/// Returns `Some` if the envelope and all required data columns have been received. /// Returns `Some` if the envelope and all required data columns have been received.
pub fn make_available( pub fn make_available(
&self, &self,
@@ -104,14 +121,7 @@ impl<E: EthSpec> PendingComponents<E> {
}); });
vec![] vec![]
} else { } else {
let data_columns: Vec<_> = self let num_completed_columns = self.num_completed_columns();
.verified_data_columns
.iter()
.filter_map(|(col_idx, col)| {
col.try_to_sidecar(*col_idx, self.slot, block_hash, self.num_blobs_expected)
})
.collect();
let num_completed_columns = data_columns.len();
match num_completed_columns.cmp(&num_expected_columns) { match num_completed_columns.cmp(&num_expected_columns) {
Ordering::Greater => { Ordering::Greater => {
// Should never happen // Should never happen
@@ -124,7 +134,17 @@ impl<E: EthSpec> PendingComponents<E> {
debug!("All data columns received, data is available"); debug!("All data columns received, data is available");
}); });
data_columns self.verified_data_columns
.iter()
.filter_map(|(col_idx, col)| {
col.try_to_sidecar(
*col_idx,
self.slot,
block_hash,
self.num_blobs_expected,
)
})
.collect()
} }
Ordering::Less => { Ordering::Less => {
// Not enough data columns received yet // Not enough data columns received yet
@@ -187,7 +207,7 @@ impl<E: EthSpec> PendingComponents<E> {
// the current usage, as it's deconstructed immediately. // the current usage, as it's deconstructed immediately.
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> { pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>), Yes(Vec<Arc<DataColumnSidecar<E>>>),
No(&'static str), No(&'static str),
} }