Block availability data enum (#6866)

PeerDAS has undergone multiple refactors + the blending with the get_blobs optimization has generated technical debt.

A function signature like this

f008b84079/beacon_node/beacon_chain/src/beacon_chain.rs (L7171-L7178)

Allows at least the following combination of states:
- blobs: Some / None
- data_columns: Some / None
- data_column_recv: Some / None
- Block has data? Yes / No
- Block post-PeerDAS? Yes / No

In reality, we don't have that many possible states, only:

- `NoData`: pre-deneb, pre-PeerDAS with 0 blobs or post-PeerDAS with 0 blobs
- `Blobs(BlobSidecarList<E>)`: post-Deneb pre-PeerDAS with > 0 blobs
- `DataColumns(DataColumnSidecarList<E>)`: post-PeerDAS with > 0 blobs
- `DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>)`: post-PeerDAS with > 0 blobs, but we obtained the columns via reconstruction

^ this are the variants of the new `AvailableBlockData` enum

So we go from 2^5 states to 4 well-defined. Downstream code benefits nicely from this clarity and I think it makes the whole feature much more maintainable.

Currently `is_available` returns a bool, and then we construct the available block in `make_available`. In a way the availability condition is duplicated in both functions. Instead, this PR constructs `AvailableBlockData` in `is_available` so the availability conditions are written once

```rust
if let Some(block_data) = is_available(..) {
let available_block = make_available(block_data);
}
```
This commit is contained in:
Lion - dapplion
2025-02-24 01:47:09 -03:00
committed by GitHub
parent 60964fc7b5
commit 3fab6a2c0b
11 changed files with 432 additions and 361 deletions

View File

@@ -10,7 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
Unexpected,
Unexpected(&'static str),
SszTypes(ssz_types::Error),
MissingBlobs,
MissingCustodyColumns,
@@ -40,7 +40,7 @@ impl Error {
| Error::MissingCustodyColumns
| Error::StoreError(_)
| Error::DecodeError(_)
| Error::Unexpected
| Error::Unexpected(_)
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::RebuildingStateCaches(_)

View File

@@ -1,4 +1,5 @@
use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache};
use super::AvailableBlockData;
use crate::beacon_chain::BeaconStore;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::{
@@ -10,6 +11,7 @@ use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
use slog::{debug, Logger};
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::oneshot;
@@ -39,19 +41,6 @@ pub struct PendingComponents<E: EthSpec> {
}
impl<E: EthSpec> PendingComponents<E> {
/// Clones the `PendingComponent` without cloning `data_column_recv`, as `Receiver` is not cloneable.
/// This should only be used when the receiver is no longer needed.
pub fn clone_without_column_recv(&self) -> Self {
PendingComponents {
block_root: self.block_root,
verified_blobs: self.verified_blobs.clone(),
verified_data_columns: self.verified_data_columns.clone(),
executed_block: self.executed_block.clone(),
reconstruction_started: self.reconstruction_started,
data_column_recv: None,
}
}
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
&self.executed_block
@@ -95,26 +84,6 @@ impl<E: EthSpec> PendingComponents<E> {
.unwrap_or(false)
}
/// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a
/// block.
///
/// This corresponds to the number of commitments that are present in a block.
pub fn block_kzg_commitments_count(&self) -> Option<usize> {
self.get_cached_block()
.as_ref()
.map(|b| b.get_commitments().len())
}
/// Returns the number of blobs that have been received and are stored in the cache.
pub fn num_received_blobs(&self) -> usize {
self.get_cached_blobs().iter().flatten().count()
}
/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
}
/// Returns the indices of cached custody columns
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
self.verified_data_columns
@@ -189,51 +158,121 @@ impl<E: EthSpec> PendingComponents<E> {
self.merge_blobs(reinsert);
}
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
/// available in the cache.
/// Returns Some if the block has received all its required data for import. The return value
/// must be persisted in the DB along with the block.
///
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, custody_column_count: usize, log: &Logger) -> bool {
let block_kzg_commitments_count_opt = self.block_kzg_commitments_count();
let expected_blobs_msg = block_kzg_commitments_count_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(
&mut self,
custody_column_count: usize,
spec: &Arc<ChainSpec>,
recover: R,
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
{
let Some(block) = &self.executed_block else {
// Block not available yet
return Ok(None);
};
// No data columns when there are 0 blobs
let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| {
if blob_count > 0 {
custody_column_count
} else {
0
let num_expected_blobs = block.num_blobs_expected();
let blob_data = if num_expected_blobs == 0 {
Some(AvailableBlockData::NoData)
} else if spec.is_peer_das_enabled_for_epoch(block.epoch()) {
match self.verified_data_columns.len().cmp(&custody_column_count) {
Ordering::Greater => {
// Should never happen
return Err(AvailabilityCheckError::Unexpected("too many columns"));
}
Ordering::Equal => {
// Block is post-peerdas, and we got enough columns
let data_columns = self
.verified_data_columns
.iter()
.map(|d| d.clone().into_inner())
.collect::<Vec<_>>();
Some(AvailableBlockData::DataColumns(data_columns))
}
Ordering::Less => {
// The data_columns_recv is an infallible promise that we will receive all expected
// columns, so we consider the block available.
// We take the receiver as it can't be cloned, and make_available should never
// be called again once it returns `Some`.
self.data_column_recv
.take()
.map(AvailableBlockData::DataColumnsRecv)
}
}
});
let expected_columns_msg = expected_columns_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
} else {
// Before PeerDAS, blobs
let num_received_blobs = self.verified_blobs.iter().flatten().count();
match num_received_blobs.cmp(&num_expected_blobs) {
Ordering::Greater => {
// Should never happen
return Err(AvailabilityCheckError::Unexpected("too many blobs"));
}
Ordering::Equal => {
let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize;
let blobs_vec = self
.verified_blobs
.iter()
.flatten()
.map(|blob| blob.clone().to_blob())
.collect::<Vec<_>>();
let blobs = RuntimeVariableList::new(blobs_vec, max_blobs)
.map_err(|_| AvailabilityCheckError::Unexpected("over max_blobs"))?;
Some(AvailableBlockData::Blobs(blobs))
}
Ordering::Less => {
// Not enough blobs received yet
None
}
}
};
let num_received_blobs = self.num_received_blobs();
let num_received_columns = self.num_received_data_columns();
// Block's data not available yet
let Some(blob_data) = blob_data else {
return Ok(None);
};
debug!(
log,
"Component(s) added to data availability checker";
"block_root" => ?self.block_root,
"received_blobs" => num_received_blobs,
"expected_blobs" => expected_blobs_msg,
"received_columns" => num_received_columns,
"expected_columns" => expected_columns_msg,
);
// Block is available, construct `AvailableExecutedBlock`
let all_blobs_received = block_kzg_commitments_count_opt
.is_some_and(|num_expected_blobs| num_expected_blobs == num_received_blobs);
let blobs_available_timestamp = match blob_data {
AvailableBlockData::NoData => None,
AvailableBlockData::Blobs(_) => self
.verified_blobs
.iter()
.flatten()
.map(|blob| blob.seen_timestamp())
.max(),
// TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850
AvailableBlockData::DataColumns(_) => None,
AvailableBlockData::DataColumnsRecv(_) => None,
};
let all_columns_received = expected_columns_opt
.is_some_and(|num_expected_columns| num_expected_columns == num_received_columns);
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = recover(block.clone())?;
all_blobs_received || all_columns_received
let available_block = AvailableBlock {
block_root: self.block_root,
block,
blob_data,
blobs_available_timestamp,
spec: spec.clone(),
};
Ok(Some(AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
)))
}
/// Returns an empty `PendingComponents` object with the given block root.
@@ -248,87 +287,6 @@ impl<E: EthSpec> PendingComponents<E> {
}
}
/// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs.
/// This does not check whether a block *should* have blobs, these checks should have been
/// completed when producing the `AvailabilityPendingBlock`.
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(
self,
spec: &Arc<ChainSpec>,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
{
let Self {
block_root,
verified_blobs,
verified_data_columns,
executed_block,
data_column_recv,
..
} = self;
let blobs_available_timestamp = verified_blobs
.iter()
.flatten()
.map(|blob| blob.seen_timestamp())
.max();
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let is_peer_das_enabled = spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch());
let (blobs, data_columns) = if is_peer_das_enabled {
let data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect::<Vec<_>>();
(None, Some(data_columns))
} else {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
let max_len = spec.max_blobs_per_block(diet_executed_block.as_block().epoch()) as usize;
(
Some(RuntimeVariableList::new(verified_blobs, max_len)?),
None,
)
};
let executed_block = recover(diet_executed_block)?;
let AvailabilityPendingExecutedBlock {
block,
mut import_data,
payload_verification_outcome,
} = executed_block;
import_data.data_column_recv = data_column_recv;
let available_block = AvailableBlock {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp,
spec: spec.clone(),
};
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
)))
}
/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
self.executed_block
@@ -354,6 +312,41 @@ impl<E: EthSpec> PendingComponents<E> {
None
})
}
pub fn status_str(
&self,
block_epoch: Epoch,
sampling_column_count: usize,
spec: &ChainSpec,
) -> String {
let block_count = if self.executed_block.is_some() { 1 } else { 0 };
if spec.is_peer_das_enabled_for_epoch(block_epoch) {
let data_column_recv_count = if self.data_column_recv.is_some() {
1
} else {
0
};
format!(
"block {} data_columns {}/{} data_columns_recv {}",
block_count,
self.verified_data_columns.len(),
sampling_column_count,
data_column_recv_count,
)
} else {
let num_expected_blobs = if let Some(block) = self.get_cached_block() {
&block.num_blobs_expected().to_string()
} else {
"?"
};
format!(
"block {} blobs {}/{}",
block_count,
self.verified_blobs.len(),
num_expected_blobs
)
}
}
}
/// This is the main struct for this module. Outside methods should
@@ -374,7 +367,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
// the current usage, as it's deconstructed immediately.
#[allow(clippy::large_enum_variant)]
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
Yes(PendingComponents<E>),
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>),
No(&'static str),
}
@@ -455,16 +448,10 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
/// Puts the KZG verified blobs into the availability cache as pending components.
///
/// The `data_column_recv` parameter is an optional `Receiver` for data columns that are
/// computed asynchronously. This method remains **used** after PeerDAS activation, because
/// blocks can be made available if the EL already has the blobs and returns them via the
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
kzg_verified_blobs: I,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut kzg_verified_blobs = kzg_verified_blobs.into_iter().peekable();
@@ -474,7 +461,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.map(|verified_blob| verified_blob.as_blob().epoch())
else {
// Verified blobs list should be non-empty.
return Err(AvailabilityCheckError::Unexpected);
return Err(AvailabilityCheckError::Unexpected("empty blobs"));
};
let mut fixed_blobs =
@@ -499,21 +486,22 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);
if data_column_recv.is_some() {
// If `data_column_recv` is `Some`, it means we have all the blobs from engine, and have
// started computing data columns. We store the receiver in `PendingComponents` for
// later use when importing the block.
pending_components.data_column_recv = data_column_recv;
}
debug!(log, "Component added to data availability checker";
"component" => "blobs",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
if pending_components.is_available(self.sampling_column_count, log) {
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -535,7 +523,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.map(|verified_blob| verified_blob.as_data_column().epoch())
else {
// Verified data_columns list should be non-empty.
return Err(AvailabilityCheckError::Unexpected);
return Err(AvailabilityCheckError::Unexpected("empty columns"));
};
let mut write_lock = self.critical.write();
@@ -551,14 +539,72 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the data columns.
pending_components.merge_data_columns(kzg_verified_data_columns)?;
if pending_components.is_available(self.sampling_column_count, log) {
debug!(log, "Component added to data availability checker";
"component" => "data_columns",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
}
}
/// The `data_column_recv` parameter is a `Receiver` for data columns that are computed
/// asynchronously. This method is used if the EL already has the blobs and returns them via the
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
pub fn put_computed_data_columns_recv(
&self,
block_root: Hash256,
block_epoch: Epoch,
data_column_recv: oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_entry(&block_root)
.map(|(_, v)| v)
.unwrap_or_else(|| {
PendingComponents::empty(
block_root,
self.spec.max_blobs_per_block(block_epoch) as usize,
)
});
// We have all the blobs from engine, and have started computing data columns. We store the
// receiver in `PendingComponents` for later use when importing the block.
// TODO(das): Error or log if we overwrite a prior receiver https://github.com/sigp/lighthouse/issues/6764
pending_components.data_column_recv = Some(data_column_recv);
debug!(log, "Component added to data availability checker";
"component" => "data_columns_recv",
"block_root" => ?block_root,
"status" => pending_components.status_str(block_epoch, self.sampling_column_count, &self.spec),
);
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components);
drop(write_lock);
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -603,7 +649,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
pending_components.reconstruction_started = true;
ReconstructColumnsDecision::Yes(pending_components.clone_without_column_recv())
ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone())
}
/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
@@ -643,15 +689,23 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the block.
pending_components.merge_block(diet_executed_block);
debug!(log, "Component added to data availability checker";
"component" => "block",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
// Check if we have all components and entire set is consistent.
if pending_components.is_available(self.sampling_column_count, log) {
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -882,7 +936,6 @@ mod test {
parent_eth1_finalization_data,
confirmed_state_roots: vec![],
consensus_context,
data_column_recv: None,
};
let payload_verification_outcome = PayloadVerificationOutcome {
@@ -989,7 +1042,7 @@ mod test {
for (blob_index, gossip_blob) in blobs.into_iter().enumerate() {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -1017,11 +1070,10 @@ mod test {
for gossip_blob in blobs {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
assert_eq!(
availability,
Availability::MissingComponents(root),
assert!(
matches!(availability, Availability::MissingComponents(_)),
"should be pending block"
);
assert_eq!(cache.critical.read().len(), 1);
@@ -1273,7 +1325,6 @@ mod pending_components_tests {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
},
payload_verification_outcome: PayloadVerificationOutcome {
payload_verification_status: PayloadVerificationStatus::Verified,

View File

@@ -136,7 +136,6 @@ impl<T: BeaconChainTypes> StateLRUCache<T> {
consensus_context: diet_executed_block
.consensus_context
.into_consensus_context(),
data_column_recv: None,
},
payload_verification_outcome: diet_executed_block.payload_verification_outcome,
})