Block availability data enum (#6866)

PeerDAS has undergone multiple refactors + the blending with the get_blobs optimization has generated technical debt.

A function signature like this

f008b84079/beacon_node/beacon_chain/src/beacon_chain.rs (L7171-L7178)

Allows at least the following combination of states:
- blobs: Some / None
- data_columns: Some / None
- data_column_recv: Some / None
- Block has data? Yes / No
- Block post-PeerDAS? Yes / No

In reality, we don't have that many possible states, only:

- `NoData`: pre-deneb, pre-PeerDAS with 0 blobs or post-PeerDAS with 0 blobs
- `Blobs(BlobSidecarList<E>)`: post-Deneb pre-PeerDAS with > 0 blobs
- `DataColumns(DataColumnSidecarList<E>)`: post-PeerDAS with > 0 blobs
- `DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>)`: post-PeerDAS with > 0 blobs, but we obtained the columns via reconstruction

^ this are the variants of the new `AvailableBlockData` enum

So we go from 2^5 states to 4 well-defined. Downstream code benefits nicely from this clarity and I think it makes the whole feature much more maintainable.

Currently `is_available` returns a bool, and then we construct the available block in `make_available`. In a way the availability condition is duplicated in both functions. Instead, this PR constructs `AvailableBlockData` in `is_available` so the availability conditions are written once

```rust
if let Some(block_data) = is_available(..) {
let available_block = make_available(block_data);
}
```
This commit is contained in:
Lion - dapplion
2025-02-24 01:47:09 -03:00
committed by GitHub
parent 60964fc7b5
commit 3fab6a2c0b
11 changed files with 432 additions and 361 deletions

View File

@@ -21,8 +21,8 @@ use crate::block_verification_types::{
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnReconstructionResult,
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
DataAvailabilityChecker, DataColumnReconstructionResult,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
@@ -3169,7 +3169,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erratically.
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}
let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
@@ -3640,9 +3647,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;
self.process_availability(slot, availability, || Ok(()))
.await
@@ -3727,7 +3737,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;
// Record the time at which this block's blobs became available.
@@ -3755,7 +3764,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block,
parent_eth1_finalization_data,
consensus_context,
data_column_recv,
)
},
"payload_verification_handle",
@@ -3794,7 +3802,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
@@ -3892,7 +3899,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
&signed_block,
proto_block,
&state,
&self.spec,
@@ -3961,15 +3968,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let (_, signed_block, block_data) = signed_block.deconstruct();
match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
match self.get_blobs_or_columns_store_op(block_root, block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
@@ -7218,29 +7219,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
fn get_blobs_or_columns_store_op(
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let _custody_columns_count = self.data_availability_checker.get_sampling_column_count();
let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);
let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
match block_data {
AvailableBlockData::NoData => Ok(None),
AvailableBlockData::Blobs(blobs) => {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
}
AvailableBlockData::DataColumns(data_columns) => {
debug!(
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
}
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
@@ -7250,34 +7256,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};
if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
self.log, "Writing data columns to store";
"block_root" => %block_root,
"count" => blobs.len(),
"count" => computed_data_columns.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
// TODO(das): Store only this node's custody columns
Ok(Some(StoreOp::PutDataColumns(
block_root,
computed_data_columns,
)))
}
}
Ok(None)
}
}

View File

@@ -1707,7 +1707,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})

View File

@@ -7,11 +7,10 @@ use derivative::Derivative;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
/// A block that has been received over RPC. It has 2 internal variants:
@@ -265,7 +264,6 @@ impl<E: EthSpec> ExecutedBlock<E> {
/// A block that has completed all pre-deneb block processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
#[derive(PartialEq)]
pub struct AvailableExecutedBlock<E: EthSpec> {
pub block: AvailableBlock<E>,
pub import_data: BlockImportData<E>,
@@ -338,8 +336,7 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}
#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
#[derive(Debug, PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
@@ -347,12 +344,6 @@ pub struct BlockImportData<E: EthSpec> {
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
/// An optional receiver for `DataColumnSidecarList`.
///
/// This field is `Some` when data columns are being computed asynchronously.
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}
impl<E: EthSpec> BlockImportData<E> {
@@ -371,7 +362,6 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}

View File

@@ -91,7 +91,6 @@ pub enum DataColumnReconstructionResult<E: EthSpec> {
///
/// Indicates if the block is fully `Available` or if we need blobs or blocks
/// to "complete" the requirements for an `AvailableBlock`.
#[derive(PartialEq)]
pub enum Availability<E: EthSpec> {
MissingComponents(Hash256),
Available(Box<AvailableExecutedBlock<E>>),
@@ -219,7 +218,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::InvalidBlobs)?;
self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log)
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
}
/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
@@ -253,23 +252,29 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_engine_blobs(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_columns_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);
self.availability_cache.put_kzg_verified_blobs(
block_root,
verified_blobs,
data_column_recv,
&self.log,
)
// `data_columns_recv` is always Some if block_root is post-PeerDAS
if let Some(data_columns_recv) = data_columns_recv {
self.availability_cache.put_computed_data_columns_recv(
block_root,
block_epoch,
data_columns_recv,
&self.log,
)
} else {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;
self.availability_cache.put_kzg_verified_blobs(
block_root,
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp),
&self.log,
)
}
}
/// Check if we've cached other blobs for this block. If it completes a set and we also
@@ -284,7 +289,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
vec![gossip_blob.into_inner()],
None,
&self.log,
)
}
@@ -338,15 +342,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<MaybeAvailableBlock<T::EthSpec>, AvailabilityCheckError> {
let (block_root, block, blobs, data_columns) = block.deconstruct();
if self.blobs_required_for_block(&block) {
return if let Some(blob_list) = blobs.as_ref() {
return if let Some(blob_list) = blobs {
verify_kzg_for_blob_list(blob_list.iter(), &self.kzg)
.map_err(AvailabilityCheckError::InvalidBlobs)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs,
blob_data: AvailableBlockData::Blobs(blob_list),
blobs_available_timestamp: None,
data_columns: None,
spec: self.spec.clone(),
}))
} else {
@@ -365,14 +368,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
blobs_available_timestamp: None,
data_columns: Some(
blob_data: AvailableBlockData::DataColumns(
data_column_list
.into_iter()
.map(|d| d.clone_arc())
.collect(),
),
blobs_available_timestamp: None,
spec: self.spec.clone(),
}))
} else {
@@ -383,9 +385,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
blob_data: AvailableBlockData::NoData,
blobs_available_timestamp: None,
data_columns: None,
spec: self.spec.clone(),
}))
}
@@ -437,27 +438,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let (block_root, block, blobs, data_columns) = block.deconstruct();
let maybe_available_block = if self.blobs_required_for_block(&block) {
if blobs.is_some() {
if let Some(blobs) = blobs {
MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs,
blob_data: AvailableBlockData::Blobs(blobs),
blobs_available_timestamp: None,
data_columns: None,
spec: self.spec.clone(),
})
} else {
MaybeAvailableBlock::AvailabilityPending { block_root, block }
}
} else if self.data_columns_required_for_block(&block) {
if data_columns.is_some() {
if let Some(data_columns) = data_columns {
MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
data_columns: data_columns.map(|data_columns| {
data_columns.into_iter().map(|d| d.into_inner()).collect()
}),
blob_data: AvailableBlockData::DataColumns(
data_columns.into_iter().map(|d| d.into_inner()).collect(),
),
blobs_available_timestamp: None,
spec: self.spec.clone(),
})
@@ -468,8 +467,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
blobs: None,
data_columns: None,
blob_data: AvailableBlockData::NoData,
blobs_available_timestamp: None,
spec: self.spec.clone(),
})
@@ -545,11 +543,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: &Hash256,
) -> Result<DataColumnReconstructionResult<T::EthSpec>, AvailabilityCheckError> {
let pending_components = match self
let verified_data_columns = match self
.availability_cache
.check_and_set_reconstruction_started(block_root)
{
ReconstructColumnsDecision::Yes(pending_components) => pending_components,
ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns,
ReconstructColumnsDecision::No(reason) => {
return Ok(DataColumnReconstructionResult::NotStarted(reason));
}
@@ -560,7 +558,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns(
&self.kzg,
&pending_components.verified_data_columns,
&verified_data_columns,
&self.spec,
)
.map_err(|e| {
@@ -713,13 +711,25 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
}
}
#[derive(Debug)]
pub enum AvailableBlockData<E: EthSpec> {
/// Block is pre-Deneb or has zero blobs
NoData,
/// Block is post-Deneb, pre-PeerDAS and has more than zero blobs
Blobs(BlobSidecarList<E>),
/// Block is post-PeerDAS and has more than zero blobs
DataColumns(DataColumnSidecarList<E>),
/// Block is post-PeerDAS, has more than zero blobs and we recomputed the columns from the EL's
/// mempool blobs
DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>),
}
/// A fully available block that is ready to be imported into fork choice.
#[derive(Clone, Debug, PartialEq)]
#[derive(Debug)]
pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
blob_data: AvailableBlockData<E>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>,
pub spec: Arc<ChainSpec>,
@@ -729,15 +739,13 @@ impl<E: EthSpec> AvailableBlock<E> {
pub fn __new_for_testing(
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
data: AvailableBlockData<E>,
spec: Arc<ChainSpec>,
) -> Self {
Self {
block_root,
block,
blobs,
data_columns,
blob_data: data,
blobs_available_timestamp: None,
spec,
}
@@ -750,39 +758,56 @@ impl<E: EthSpec> AvailableBlock<E> {
self.block.clone()
}
pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
self.blobs.as_ref()
}
pub fn blobs_available_timestamp(&self) -> Option<Duration> {
self.blobs_available_timestamp
}
pub fn data_columns(&self) -> Option<&DataColumnSidecarList<E>> {
self.data_columns.as_ref()
pub fn data(&self) -> &AvailableBlockData<E> {
&self.blob_data
}
pub fn has_blobs(&self) -> bool {
match self.blob_data {
AvailableBlockData::NoData => false,
AvailableBlockData::Blobs(..) => true,
AvailableBlockData::DataColumns(_) => false,
AvailableBlockData::DataColumnsRecv(_) => false,
}
}
#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
pub fn deconstruct(self) -> (Hash256, Arc<SignedBeaconBlock<E>>, AvailableBlockData<E>) {
let AvailableBlock {
block_root,
block,
blobs,
data_columns,
blob_data,
..
} = self;
(block_root, block, blobs, data_columns)
(block_root, block, blob_data)
}
/// Only used for testing
pub fn __clone_without_recv(&self) -> Result<Self, String> {
Ok(Self {
block_root: self.block_root,
block: self.block.clone(),
blob_data: match &self.blob_data {
AvailableBlockData::NoData => AvailableBlockData::NoData,
AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()),
AvailableBlockData::DataColumns(data_columns) => {
AvailableBlockData::DataColumns(data_columns.clone())
}
AvailableBlockData::DataColumnsRecv(_) => {
return Err("Can't clone DataColumnsRecv".to_owned())
}
},
blobs_available_timestamp: self.blobs_available_timestamp,
spec: self.spec.clone(),
})
}
}
#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum MaybeAvailableBlock<E: EthSpec> {
/// This variant is fully available.
/// i.e. for pre-deneb blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for

View File

@@ -10,7 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
Unexpected,
Unexpected(&'static str),
SszTypes(ssz_types::Error),
MissingBlobs,
MissingCustodyColumns,
@@ -40,7 +40,7 @@ impl Error {
| Error::MissingCustodyColumns
| Error::StoreError(_)
| Error::DecodeError(_)
| Error::Unexpected
| Error::Unexpected(_)
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::RebuildingStateCaches(_)

View File

@@ -1,4 +1,5 @@
use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache};
use super::AvailableBlockData;
use crate::beacon_chain::BeaconStore;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::{
@@ -10,6 +11,7 @@ use crate::BeaconChainTypes;
use lru::LruCache;
use parking_lot::RwLock;
use slog::{debug, Logger};
use std::cmp::Ordering;
use std::num::NonZeroUsize;
use std::sync::Arc;
use tokio::sync::oneshot;
@@ -39,19 +41,6 @@ pub struct PendingComponents<E: EthSpec> {
}
impl<E: EthSpec> PendingComponents<E> {
/// Clones the `PendingComponent` without cloning `data_column_recv`, as `Receiver` is not cloneable.
/// This should only be used when the receiver is no longer needed.
pub fn clone_without_column_recv(&self) -> Self {
PendingComponents {
block_root: self.block_root,
verified_blobs: self.verified_blobs.clone(),
verified_data_columns: self.verified_data_columns.clone(),
executed_block: self.executed_block.clone(),
reconstruction_started: self.reconstruction_started,
data_column_recv: None,
}
}
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
&self.executed_block
@@ -95,26 +84,6 @@ impl<E: EthSpec> PendingComponents<E> {
.unwrap_or(false)
}
/// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a
/// block.
///
/// This corresponds to the number of commitments that are present in a block.
pub fn block_kzg_commitments_count(&self) -> Option<usize> {
self.get_cached_block()
.as_ref()
.map(|b| b.get_commitments().len())
}
/// Returns the number of blobs that have been received and are stored in the cache.
pub fn num_received_blobs(&self) -> usize {
self.get_cached_blobs().iter().flatten().count()
}
/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.verified_data_columns.len()
}
/// Returns the indices of cached custody columns
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
self.verified_data_columns
@@ -189,51 +158,121 @@ impl<E: EthSpec> PendingComponents<E> {
self.merge_blobs(reinsert);
}
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
/// available in the cache.
/// Returns Some if the block has received all its required data for import. The return value
/// must be persisted in the DB along with the block.
///
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, custody_column_count: usize, log: &Logger) -> bool {
let block_kzg_commitments_count_opt = self.block_kzg_commitments_count();
let expected_blobs_msg = block_kzg_commitments_count_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(
&mut self,
custody_column_count: usize,
spec: &Arc<ChainSpec>,
recover: R,
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
{
let Some(block) = &self.executed_block else {
// Block not available yet
return Ok(None);
};
// No data columns when there are 0 blobs
let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| {
if blob_count > 0 {
custody_column_count
} else {
0
let num_expected_blobs = block.num_blobs_expected();
let blob_data = if num_expected_blobs == 0 {
Some(AvailableBlockData::NoData)
} else if spec.is_peer_das_enabled_for_epoch(block.epoch()) {
match self.verified_data_columns.len().cmp(&custody_column_count) {
Ordering::Greater => {
// Should never happen
return Err(AvailabilityCheckError::Unexpected("too many columns"));
}
Ordering::Equal => {
// Block is post-peerdas, and we got enough columns
let data_columns = self
.verified_data_columns
.iter()
.map(|d| d.clone().into_inner())
.collect::<Vec<_>>();
Some(AvailableBlockData::DataColumns(data_columns))
}
Ordering::Less => {
// The data_columns_recv is an infallible promise that we will receive all expected
// columns, so we consider the block available.
// We take the receiver as it can't be cloned, and make_available should never
// be called again once it returns `Some`.
self.data_column_recv
.take()
.map(AvailableBlockData::DataColumnsRecv)
}
}
});
let expected_columns_msg = expected_columns_opt
.as_ref()
.map(|num| num.to_string())
.unwrap_or("unknown".to_string());
} else {
// Before PeerDAS, blobs
let num_received_blobs = self.verified_blobs.iter().flatten().count();
match num_received_blobs.cmp(&num_expected_blobs) {
Ordering::Greater => {
// Should never happen
return Err(AvailabilityCheckError::Unexpected("too many blobs"));
}
Ordering::Equal => {
let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize;
let blobs_vec = self
.verified_blobs
.iter()
.flatten()
.map(|blob| blob.clone().to_blob())
.collect::<Vec<_>>();
let blobs = RuntimeVariableList::new(blobs_vec, max_blobs)
.map_err(|_| AvailabilityCheckError::Unexpected("over max_blobs"))?;
Some(AvailableBlockData::Blobs(blobs))
}
Ordering::Less => {
// Not enough blobs received yet
None
}
}
};
let num_received_blobs = self.num_received_blobs();
let num_received_columns = self.num_received_data_columns();
// Block's data not available yet
let Some(blob_data) = blob_data else {
return Ok(None);
};
debug!(
log,
"Component(s) added to data availability checker";
"block_root" => ?self.block_root,
"received_blobs" => num_received_blobs,
"expected_blobs" => expected_blobs_msg,
"received_columns" => num_received_columns,
"expected_columns" => expected_columns_msg,
);
// Block is available, construct `AvailableExecutedBlock`
let all_blobs_received = block_kzg_commitments_count_opt
.is_some_and(|num_expected_blobs| num_expected_blobs == num_received_blobs);
let blobs_available_timestamp = match blob_data {
AvailableBlockData::NoData => None,
AvailableBlockData::Blobs(_) => self
.verified_blobs
.iter()
.flatten()
.map(|blob| blob.seen_timestamp())
.max(),
// TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850
AvailableBlockData::DataColumns(_) => None,
AvailableBlockData::DataColumnsRecv(_) => None,
};
let all_columns_received = expected_columns_opt
.is_some_and(|num_expected_columns| num_expected_columns == num_received_columns);
let AvailabilityPendingExecutedBlock {
block,
import_data,
payload_verification_outcome,
} = recover(block.clone())?;
all_blobs_received || all_columns_received
let available_block = AvailableBlock {
block_root: self.block_root,
block,
blob_data,
blobs_available_timestamp,
spec: spec.clone(),
};
Ok(Some(AvailableExecutedBlock::new(
available_block,
import_data,
payload_verification_outcome,
)))
}
/// Returns an empty `PendingComponents` object with the given block root.
@@ -248,87 +287,6 @@ impl<E: EthSpec> PendingComponents<E> {
}
}
/// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs.
/// This does not check whether a block *should* have blobs, these checks should have been
/// completed when producing the `AvailabilityPendingBlock`.
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(
self,
spec: &Arc<ChainSpec>,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
{
let Self {
block_root,
verified_blobs,
verified_data_columns,
executed_block,
data_column_recv,
..
} = self;
let blobs_available_timestamp = verified_blobs
.iter()
.flatten()
.map(|blob| blob.seen_timestamp())
.max();
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let is_peer_das_enabled = spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch());
let (blobs, data_columns) = if is_peer_das_enabled {
let data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect::<Vec<_>>();
(None, Some(data_columns))
} else {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
let max_len = spec.max_blobs_per_block(diet_executed_block.as_block().epoch()) as usize;
(
Some(RuntimeVariableList::new(verified_blobs, max_len)?),
None,
)
};
let executed_block = recover(diet_executed_block)?;
let AvailabilityPendingExecutedBlock {
block,
mut import_data,
payload_verification_outcome,
} = executed_block;
import_data.data_column_recv = data_column_recv;
let available_block = AvailableBlock {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp,
spec: spec.clone(),
};
Ok(Availability::Available(Box::new(
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
)))
}
/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
pub fn epoch(&self) -> Option<Epoch> {
self.executed_block
@@ -354,6 +312,41 @@ impl<E: EthSpec> PendingComponents<E> {
None
})
}
pub fn status_str(
&self,
block_epoch: Epoch,
sampling_column_count: usize,
spec: &ChainSpec,
) -> String {
let block_count = if self.executed_block.is_some() { 1 } else { 0 };
if spec.is_peer_das_enabled_for_epoch(block_epoch) {
let data_column_recv_count = if self.data_column_recv.is_some() {
1
} else {
0
};
format!(
"block {} data_columns {}/{} data_columns_recv {}",
block_count,
self.verified_data_columns.len(),
sampling_column_count,
data_column_recv_count,
)
} else {
let num_expected_blobs = if let Some(block) = self.get_cached_block() {
&block.num_blobs_expected().to_string()
} else {
"?"
};
format!(
"block {} blobs {}/{}",
block_count,
self.verified_blobs.len(),
num_expected_blobs
)
}
}
}
/// This is the main struct for this module. Outside methods should
@@ -374,7 +367,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
// the current usage, as it's deconstructed immediately.
#[allow(clippy::large_enum_variant)]
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
Yes(PendingComponents<E>),
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>),
No(&'static str),
}
@@ -455,16 +448,10 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
/// Puts the KZG verified blobs into the availability cache as pending components.
///
/// The `data_column_recv` parameter is an optional `Receiver` for data columns that are
/// computed asynchronously. This method remains **used** after PeerDAS activation, because
/// blocks can be made available if the EL already has the blobs and returns them via the
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
kzg_verified_blobs: I,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut kzg_verified_blobs = kzg_verified_blobs.into_iter().peekable();
@@ -474,7 +461,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.map(|verified_blob| verified_blob.as_blob().epoch())
else {
// Verified blobs list should be non-empty.
return Err(AvailabilityCheckError::Unexpected);
return Err(AvailabilityCheckError::Unexpected("empty blobs"));
};
let mut fixed_blobs =
@@ -499,21 +486,22 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);
if data_column_recv.is_some() {
// If `data_column_recv` is `Some`, it means we have all the blobs from engine, and have
// started computing data columns. We store the receiver in `PendingComponents` for
// later use when importing the block.
pending_components.data_column_recv = data_column_recv;
}
debug!(log, "Component added to data availability checker";
"component" => "blobs",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
if pending_components.is_available(self.sampling_column_count, log) {
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -535,7 +523,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
.map(|verified_blob| verified_blob.as_data_column().epoch())
else {
// Verified data_columns list should be non-empty.
return Err(AvailabilityCheckError::Unexpected);
return Err(AvailabilityCheckError::Unexpected("empty columns"));
};
let mut write_lock = self.critical.write();
@@ -551,14 +539,72 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the data columns.
pending_components.merge_data_columns(kzg_verified_data_columns)?;
if pending_components.is_available(self.sampling_column_count, log) {
debug!(log, "Component added to data availability checker";
"component" => "data_columns",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
}
}
/// The `data_column_recv` parameter is a `Receiver` for data columns that are computed
/// asynchronously. This method is used if the EL already has the blobs and returns them via the
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
pub fn put_computed_data_columns_recv(
&self,
block_root: Hash256,
block_epoch: Epoch,
data_column_recv: oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>,
log: &Logger,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut write_lock = self.critical.write();
// Grab existing entry or create a new entry.
let mut pending_components = write_lock
.pop_entry(&block_root)
.map(|(_, v)| v)
.unwrap_or_else(|| {
PendingComponents::empty(
block_root,
self.spec.max_blobs_per_block(block_epoch) as usize,
)
});
// We have all the blobs from engine, and have started computing data columns. We store the
// receiver in `PendingComponents` for later use when importing the block.
// TODO(das): Error or log if we overwrite a prior receiver https://github.com/sigp/lighthouse/issues/6764
pending_components.data_column_recv = Some(data_column_recv);
debug!(log, "Component added to data availability checker";
"component" => "data_columns_recv",
"block_root" => ?block_root,
"status" => pending_components.status_str(block_epoch, self.sampling_column_count, &self.spec),
);
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components);
drop(write_lock);
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -603,7 +649,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
pending_components.reconstruction_started = true;
ReconstructColumnsDecision::Yes(pending_components.clone_without_column_recv())
ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone())
}
/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
@@ -643,15 +689,23 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
// Merge in the block.
pending_components.merge_block(diet_executed_block);
debug!(log, "Component added to data availability checker";
"component" => "block",
"block_root" => ?block_root,
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
);
// Check if we have all components and entire set is consistent.
if pending_components.is_available(self.sampling_column_count, log) {
if let Some(available_block) =
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
self.state_cache.recover_pending_executed_block(block)
})?
{
// We keep the pending components in the availability cache during block import (#5845).
// `data_column_recv` is returned as part of the available block and is no longer needed here.
write_lock.put(block_root, pending_components.clone_without_column_recv());
write_lock.put(block_root, pending_components);
drop(write_lock);
pending_components.make_available(&self.spec, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
Ok(Availability::Available(Box::new(available_block)))
} else {
write_lock.put(block_root, pending_components);
Ok(Availability::MissingComponents(block_root))
@@ -882,7 +936,6 @@ mod test {
parent_eth1_finalization_data,
confirmed_state_roots: vec![],
consensus_context,
data_column_recv: None,
};
let payload_verification_outcome = PayloadVerificationOutcome {
@@ -989,7 +1042,7 @@ mod test {
for (blob_index, gossip_blob) in blobs.into_iter().enumerate() {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
if blob_index == blobs_expected - 1 {
assert!(matches!(availability, Availability::Available(_)));
@@ -1017,11 +1070,10 @@ mod test {
for gossip_blob in blobs {
kzg_verified_blobs.push(gossip_blob.into_inner());
let availability = cache
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
.expect("should put blob");
assert_eq!(
availability,
Availability::MissingComponents(root),
assert!(
matches!(availability, Availability::MissingComponents(_)),
"should be pending block"
);
assert_eq!(cache.critical.read().len(), 1);
@@ -1273,7 +1325,6 @@ mod pending_components_tests {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
},
payload_verification_outcome: PayloadVerificationOutcome {
payload_verification_status: PayloadVerificationStatus::Verified,

View File

@@ -136,7 +136,6 @@ impl<T: BeaconChainTypes> StateLRUCache<T> {
consensus_context: diet_executed_block
.consensus_context
.into_consensus_context(),
data_column_recv: None,
},
payload_verification_outcome: diet_executed_block.payload_verification_outcome,
})

View File

@@ -1,4 +1,4 @@
use crate::data_availability_checker::AvailableBlock;
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
use crate::{
attester_cache::{CommitteeLengths, Error},
metrics,
@@ -52,7 +52,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
pub fn add_head_block(
&self,
beacon_block_root: Hash256,
block: AvailableBlock<E>,
block: &AvailableBlock<E>,
proto_block: ProtoBlock,
state: &BeaconState<E>,
spec: &ChainSpec,
@@ -70,14 +70,23 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
},
};
let (_, block, blobs, data_columns) = block.deconstruct();
let (blobs, data_columns) = match block.data() {
AvailableBlockData::NoData => (None, None),
AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None),
AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())),
// TODO(das): Once the columns are received, they will not be available in
// the early attester cache. If someone does a query to us via RPC we
// will get downscored.
AvailableBlockData::DataColumnsRecv(_) => (None, None),
};
let item = CacheItem {
epoch,
committee_lengths,
beacon_block_root,
source,
target,
block,
block: block.block_cloned(),
blobs,
data_columns,
proto_block,

View File

@@ -1,4 +1,4 @@
use crate::data_availability_checker::AvailableBlock;
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
use crate::{metrics, BeaconChain, BeaconChainTypes};
use itertools::Itertools;
use slog::debug;
@@ -105,7 +105,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let blob_batch_size = blocks_to_import
.iter()
.filter(|available_block| available_block.blobs().is_some())
.filter(|available_block| available_block.has_blobs())
.count()
.saturating_mul(n_blob_ops_per_block);
@@ -114,14 +114,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;
let mut blob_batch = Vec::with_capacity(blob_batch_size);
let mut blob_batch = Vec::<KeyValueStoreOp>::with_capacity(blob_batch_size);
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
for available_block in blocks_to_import.into_iter().rev() {
let (block_root, block, maybe_blobs, maybe_data_columns) =
available_block.deconstruct();
let (block_root, block, block_data) = available_block.deconstruct();
if block_root != expected_block_root {
return Err(HistoricalBlockError::MismatchedBlockRoot {
@@ -144,17 +143,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
// Store the blobs too
if let Some(blobs) = maybe_blobs {
new_oldest_blob_slot = Some(block.slot());
self.store
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
match &block_data {
AvailableBlockData::NoData => {}
AvailableBlockData::Blobs(..) => {
new_oldest_blob_slot = Some(block.slot());
}
AvailableBlockData::DataColumns(_) | AvailableBlockData::DataColumnsRecv(_) => {
new_oldest_data_column_slot = Some(block.slot());
}
}
// Store the data columns too
if let Some(data_columns) = maybe_data_columns {
new_oldest_data_column_slot = Some(block.slot());
self.store
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
// Store the blobs or data columns too
if let Some(op) = self
.get_blobs_or_columns_store_op(block_root, block_data)
.map_err(|e| {
HistoricalBlockError::StoreError(StoreError::DBError {
message: format!("get_blobs_or_columns_store_op error {e:?}"),
})
})?
{
blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?);
}
// Store block roots, including at all skip slots in the freezer DB.

View File

@@ -242,7 +242,7 @@ async fn produces_attestations() {
.early_attester_cache
.add_head_block(
block_root,
available_block,
&available_block,
proto_block,
&state,
&chain.spec,
@@ -310,7 +310,7 @@ async fn early_attester_cache_old_request() {
.early_attester_cache
.add_head_block(
head.beacon_block_root,
available_block,
&available_block,
head_proto_block,
&head.beacon_state,
&harness.chain.spec,

View File

@@ -2517,18 +2517,13 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block = available_blocks.clone();
let mut batch_with_invalid_first_block =
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
batch_with_invalid_first_block[0] = {
let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct();
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(
block_root,
Arc::new(corrupt_block),
blobs,
data_columns,
Arc::new(spec),
)
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec))
};
// Importing the invalid batch should error.
@@ -2540,8 +2535,9 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
));
// Importing the batch with valid signatures should succeed.
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
beacon_chain
.import_historical_block_batch(available_blocks.clone())
.import_historical_block_batch(available_blocks_dup)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
@@ -3690,3 +3686,7 @@ fn get_blocks(
.map(|checkpoint| checkpoint.beacon_block_root.into())
.collect()
}
fn clone_block<E: EthSpec>(block: &AvailableBlock<E>) -> AvailableBlock<E> {
block.__clone_without_recv().unwrap()
}