Refactor data column reconstruction and avoid blocking processing (#6403)

* Move reconstruction logic out of `overflow_lru_cache` to simplify the code and avoids having to pass `DataColumnsToPublish` around and blocking other processing.

* Publish reconstructed cells before recomputing head. Remove duplicate functions.

* Merge branch 'unstable' into non-blocking-reconstruction

* Merge branch 'unstable' into non-blocking-reconstruction

# Conflicts:
#	beacon_node/beacon_chain/src/beacon_chain.rs
#	beacon_node/beacon_chain/src/data_availability_checker.rs
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs
#	beacon_node/network/src/network_beacon_processor/sync_methods.rs

* Spawn a blocking task for reconstruction.

* Merge branch 'unstable' into non-blocking-reconstruction

# Conflicts:
#	beacon_node/network/src/network_beacon_processor/mod.rs

* Fix fmt

* Merge branch 'unstable' into non-blocking-reconstruction

# Conflicts:
#	beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

* Fix race condition by making check and mutation atomic as suggested by Lion. Also added error handling to reconstruction failure.

* Add reconstruction reason metric and more debug logging to da checker.

* Add comment and logging.

* Rename `NotRequired` to `NotStarted`.

* Remove extra character added.
This commit is contained in:
Jimmy Chen
2024-10-17 15:56:25 +11:00
committed by GitHub
parent 772929fae2
commit ee7fca3ebd
9 changed files with 454 additions and 246 deletions

View File

@@ -6,23 +6,19 @@ use crate::block_verification_types::{
};
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
use crate::metrics;
use crate::BeaconChainTypes;
use kzg::Kzg;
use lru::LruCache;
use parking_lot::RwLock;
use slog::{debug, Logger};
use ssz_types::{FixedVector, VariableList};
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::blob_sidecar::BlobIdentifier;
use types::{
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec,
Hash256, SignedBeaconBlock,
};
pub type DataColumnsToPublish<E> = Option<DataColumnSidecarList<E>>;
/// This represents the components of a partially available block
///
/// The blobs are all gossip and kzg verified.
@@ -95,7 +91,7 @@ impl<E: EthSpec> PendingComponents<E> {
/// block.
///
/// This corresponds to the number of commitments that are present in a block.
pub fn num_expected_blobs(&self) -> Option<usize> {
pub fn block_kzg_commitments_count(&self) -> Option<usize> {
self.get_cached_block()
.as_ref()
.map(|b| b.get_commitments().len())
@@ -203,21 +199,61 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool {
pub fn is_available(
&self,
block_import_requirement: &BlockImportRequirement,
log: &Logger,
) -> bool {
let block_kzg_commitments_count_opt = self.block_kzg_commitments_count();
match block_import_requirement {
BlockImportRequirement::AllBlobs => self
.num_expected_blobs()
.map_or(false, |num_expected_blobs| {
num_expected_blobs == self.num_received_blobs()
}),
BlockImportRequirement::AllBlobs => {
let received_blobs = self.num_received_blobs();
let expected_blobs_msg = block_kzg_commitments_count_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
debug!(log,
"Component(s) added to data availability checker";
"block_root" => ?self.block_root,
"received_block" => block_kzg_commitments_count_opt.is_some(),
"received_blobs" => received_blobs,
"expected_blobs" => expected_blobs_msg,
);
block_kzg_commitments_count_opt.map_or(false, |num_expected_blobs| {
num_expected_blobs == received_blobs
})
}
BlockImportRequirement::ColumnSampling(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
// No data columns when there are 0 blobs
self.num_expected_blobs()
.map_or(false, |num_expected_blobs| {
num_expected_blobs == 0
|| *num_expected_columns == num_received_data_columns
})
let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| {
if blob_count > 0 {
*num_expected_columns
} else {
0
}
});
let expected_columns_msg = expected_columns_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
let num_received_columns = self.num_received_data_columns();
debug!(log,
"Component(s) added to data availability checker";
"block_root" => ?self.block_root,
"received_block" => block_kzg_commitments_count_opt.is_some(),
"received_columns" => num_received_columns,
"expected_columns" => expected_columns_msg,
);
expected_columns_opt.map_or(false, |num_expected_columns| {
num_expected_columns == num_received_columns
})
}
}
}
@@ -311,10 +347,6 @@ impl<E: EthSpec> PendingComponents<E> {
)))
}
pub fn reconstruction_started(&mut self) {
self.reconstruction_started = true;
}
/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
self.executed_block
@@ -358,6 +390,15 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
spec: Arc<ChainSpec>,
}
// This enum is only used internally within the crate in the reconstruction function to improve
// readability, so it's OK to not box the variant value, and it shouldn't impact memory much with
// the current usage, as it's deconstructed immediately.
#[allow(clippy::large_enum_variant)]
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
Yes(PendingComponents<E>),
No(&'static str),
}
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
@@ -448,33 +489,12 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}
/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns
/// - We >= 50% of columns, but not all columns
fn should_reconstruct(
&self,
block_import_requirement: &BlockImportRequirement,
pending_components: &PendingComponents<T::EthSpec>,
) -> bool {
let BlockImportRequirement::ColumnSampling(num_expected_columns) = block_import_requirement
else {
return false;
};
let num_of_columns = self.spec.number_of_columns;
let has_missing_columns = pending_components.verified_data_columns.len() < num_of_columns;
has_missing_columns
&& !pending_components.reconstruction_started
&& *num_expected_columns == num_of_columns
&& pending_components.verified_data_columns.len() >= num_of_columns / 2
}
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
epoch: Epoch,
kzg_verified_blobs: I,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut fixed_blobs = FixedVector::default();
@@ -496,7 +516,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pending_components.merge_blobs(fixed_blobs);
let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
if pending_components.is_available(&block_import_requirement, log) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
@@ -514,12 +534,11 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
>(
&self,
kzg: &Kzg,
block_root: Hash256,
epoch: Epoch,
kzg_verified_data_columns: I,
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
// Grab existing entry or create a new entry.
@@ -533,65 +552,67 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
let block_import_requirement = self.block_import_requirement(epoch)?;
// Potentially trigger reconstruction if:
// - Our custody requirement is all columns
// - We >= 50% of columns
let data_columns_to_publish =
if self.should_reconstruct(&block_import_requirement, &pending_components) {
pending_components.reconstruction_started();
let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME);
let existing_column_indices = pending_components
.verified_data_columns
.iter()
.map(|d| d.index())
.collect::<HashSet<_>>();
// Will only return an error if:
// - < 50% of columns
// - There are duplicates
let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns(
kzg,
pending_components.verified_data_columns.as_slice(),
&self.spec,
)
.map_err(AvailabilityCheckError::ReconstructColumnsError)?;
let data_columns_to_publish = all_data_columns
.iter()
.filter(|d| !existing_column_indices.contains(&d.index()))
.map(|d| d.clone_arc())
.collect::<Vec<_>>();
pending_components.verified_data_columns = all_data_columns;
metrics::stop_timer(timer);
metrics::inc_counter_by(
&metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS,
data_columns_to_publish.len() as u64,
);
Some(data_columns_to_publish)
} else {
None
};
if pending_components.is_available(&block_import_requirement) {
if pending_components.is_available(&block_import_requirement, log) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components
.make_available(block_import_requirement, &self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
.map(|availability| (availability, data_columns_to_publish))
pending_components.make_available(block_import_requirement, &self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
write_lock.put(block_root, pending_components);
Ok((
Availability::MissingComponents(block_root),
data_columns_to_publish,
))
Ok(Availability::MissingComponents(block_root))
}
}
/// Check whether data column reconstruction should be attempted.
///
/// Potentially trigger reconstruction if:
/// - Our custody requirement is all columns (supernode), and we haven't got all columns
/// - We have >= 50% of columns, but not all columns
/// - Reconstruction hasn't been started for the block
///
/// If reconstruction is required, returns `PendingComponents` which contains the
/// components to be used as inputs to reconstruction, otherwise returns a `reason`.
pub fn check_and_set_reconstruction_started(
&self,
block_root: &Hash256,
) -> ReconstructColumnsDecision<T::EthSpec> {
let mut write_lock = self.critical.write();
let Some(pending_components) = write_lock.get_mut(block_root) else {
// Block may have been imported as it does not exist in availability cache.
return ReconstructColumnsDecision::No("block already imported");
};
// If we're sampling all columns, it means we must be custodying all columns.
let custody_column_count = self.sampling_column_count();
let total_column_count = self.spec.number_of_columns;
let received_column_count = pending_components.verified_data_columns.len();
if pending_components.reconstruction_started {
return ReconstructColumnsDecision::No("already started");
}
if custody_column_count != total_column_count {
return ReconstructColumnsDecision::No("not required for full node");
}
if received_column_count == self.spec.number_of_columns {
return ReconstructColumnsDecision::No("all columns received");
}
if received_column_count < total_column_count / 2 {
return ReconstructColumnsDecision::No("not enough columns");
}
pending_components.reconstruction_started = true;
ReconstructColumnsDecision::Yes(pending_components.clone())
}
/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
/// In this case, we remove all data columns in `PendingComponents`, reset reconstruction
/// status so that we can attempt to retrieve columns from peers again.
pub fn handle_reconstruction_failure(&self, block_root: &Hash256) {
if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) {
pending_components_mut.verified_data_columns = vec![];
pending_components_mut.reconstruction_started = false;
}
}
@@ -600,6 +621,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn put_pending_executed_block(
&self,
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
let block_root = executed_block.import_data.block_root;
@@ -621,7 +643,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Check if we have all components and entire set is consistent.
let block_import_requirement = self.block_import_requirement(epoch)?;
if pending_components.is_available(&block_import_requirement) {
if pending_components.is_available(&block_import_requirement, log) {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
@@ -919,7 +941,7 @@ mod test {
);
assert!(cache.critical.read().is_empty(), "cache should be empty");
let availability = cache
.put_pending_executed_block(pending_block)
.put_pending_executed_block(pending_block, harness.logger())
.expect("should put block");
if blobs_expected == 0 {
assert!(
@@ -958,7 +980,7 @@ mod test {
for (blob_index, gossip_blob) in blobs.into_iter().enumerate() {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone())
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -985,7 +1007,7 @@ mod test {
for gossip_blob in blobs {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone())
.put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
assert_eq!(
availability,
@@ -995,7 +1017,7 @@ mod test {
assert_eq!(cache.critical.read().len(), 1);
}
let availability = cache
.put_pending_executed_block(pending_block)
.put_pending_executed_block(pending_block, harness.logger())
.expect("should put block");
assert!(
matches!(availability, Availability::Available(_)),
@@ -1063,7 +1085,7 @@ mod test {
// put the block in the cache
let availability = cache
.put_pending_executed_block(pending_block)
.put_pending_executed_block(pending_block, harness.logger())
.expect("should put block");
// grab the diet block from the cache for later testing