mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-04 17:21:44 +00:00
Block availability data enum (#6866)
PeerDAS has undergone multiple refactors + the blending with the get_blobs optimization has generated technical debt.
A function signature like this
f008b84079/beacon_node/beacon_chain/src/beacon_chain.rs (L7171-L7178)
Allows at least the following combination of states:
- blobs: Some / None
- data_columns: Some / None
- data_column_recv: Some / None
- Block has data? Yes / No
- Block post-PeerDAS? Yes / No
In reality, we don't have that many possible states, only:
- `NoData`: pre-deneb, pre-PeerDAS with 0 blobs or post-PeerDAS with 0 blobs
- `Blobs(BlobSidecarList<E>)`: post-Deneb pre-PeerDAS with > 0 blobs
- `DataColumns(DataColumnSidecarList<E>)`: post-PeerDAS with > 0 blobs
- `DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>)`: post-PeerDAS with > 0 blobs, but we obtained the columns via reconstruction
^ this are the variants of the new `AvailableBlockData` enum
So we go from 2^5 states to 4 well-defined. Downstream code benefits nicely from this clarity and I think it makes the whole feature much more maintainable.
Currently `is_available` returns a bool, and then we construct the available block in `make_available`. In a way the availability condition is duplicated in both functions. Instead, this PR constructs `AvailableBlockData` in `is_available` so the availability conditions are written once
```rust
if let Some(block_data) = is_available(..) {
let available_block = make_available(block_data);
}
```
This commit is contained in:
@@ -21,8 +21,8 @@ use crate::block_verification_types::{
|
||||
pub use crate::canonical_head::CanonicalHead;
|
||||
use crate::chain_config::ChainConfig;
|
||||
use crate::data_availability_checker::{
|
||||
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
|
||||
DataColumnReconstructionResult,
|
||||
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
|
||||
DataAvailabilityChecker, DataColumnReconstructionResult,
|
||||
};
|
||||
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use crate::early_attester_cache::EarlyAttesterCache;
|
||||
@@ -3169,7 +3169,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Err(BlockError::DuplicateFullyImported(block_root));
|
||||
}
|
||||
|
||||
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
|
||||
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
|
||||
// consumers don't expect the blobs event to fire erratically.
|
||||
if !self
|
||||
.spec
|
||||
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
|
||||
{
|
||||
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
|
||||
}
|
||||
|
||||
let r = self
|
||||
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
|
||||
@@ -3640,9 +3647,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
self.check_blobs_for_slashability(block_root, &blobs)?;
|
||||
let availability =
|
||||
self.data_availability_checker
|
||||
.put_engine_blobs(block_root, blobs, data_column_recv)?;
|
||||
let availability = self.data_availability_checker.put_engine_blobs(
|
||||
block_root,
|
||||
slot.epoch(T::EthSpec::slots_per_epoch()),
|
||||
blobs,
|
||||
data_column_recv,
|
||||
)?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -3727,7 +3737,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots,
|
||||
consensus_context,
|
||||
data_column_recv,
|
||||
} = import_data;
|
||||
|
||||
// Record the time at which this block's blobs became available.
|
||||
@@ -3755,7 +3764,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
parent_block,
|
||||
parent_eth1_finalization_data,
|
||||
consensus_context,
|
||||
data_column_recv,
|
||||
)
|
||||
},
|
||||
"payload_verification_handle",
|
||||
@@ -3794,7 +3802,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
|
||||
parent_eth1_finalization_data: Eth1FinalizationData,
|
||||
mut consensus_context: ConsensusContext<T::EthSpec>,
|
||||
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
) -> Result<Hash256, BlockError> {
|
||||
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
|
||||
// Everything in this initial section is on the hot path between processing the block and
|
||||
@@ -3892,7 +3899,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if let Some(proto_block) = fork_choice.get_block(&block_root) {
|
||||
if let Err(e) = self.early_attester_cache.add_head_block(
|
||||
block_root,
|
||||
signed_block.clone(),
|
||||
&signed_block,
|
||||
proto_block,
|
||||
&state,
|
||||
&self.spec,
|
||||
@@ -3961,15 +3968,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// If the write fails, revert fork choice to the version from disk, else we can
|
||||
// end up with blocks in fork choice that are missing from disk.
|
||||
// See https://github.com/sigp/lighthouse/issues/2028
|
||||
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
|
||||
let (_, signed_block, block_data) = signed_block.deconstruct();
|
||||
|
||||
match self.get_blobs_or_columns_store_op(
|
||||
block_root,
|
||||
signed_block.epoch(),
|
||||
blobs,
|
||||
data_columns,
|
||||
data_column_recv,
|
||||
) {
|
||||
match self.get_blobs_or_columns_store_op(block_root, block_data) {
|
||||
Ok(Some(blobs_or_columns_store_op)) => {
|
||||
ops.push(blobs_or_columns_store_op);
|
||||
}
|
||||
@@ -7218,29 +7219,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_blobs_or_columns_store_op(
|
||||
pub(crate) fn get_blobs_or_columns_store_op(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
block_epoch: Epoch,
|
||||
blobs: Option<BlobSidecarList<T::EthSpec>>,
|
||||
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
|
||||
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
block_data: AvailableBlockData<T::EthSpec>,
|
||||
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
|
||||
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
|
||||
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
|
||||
// custody columns: https://github.com/sigp/lighthouse/issues/6465
|
||||
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
|
||||
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
|
||||
// custody columns: https://github.com/sigp/lighthouse/issues/6465
|
||||
let _custody_columns_count = self.data_availability_checker.get_sampling_column_count();
|
||||
|
||||
let custody_columns_available = data_columns
|
||||
.as_ref()
|
||||
.as_ref()
|
||||
.is_some_and(|columns| columns.len() == custody_columns_count);
|
||||
|
||||
let data_columns_to_persist = if custody_columns_available {
|
||||
// If the block was made available via custody columns received from gossip / rpc, use them
|
||||
// since we already have them.
|
||||
data_columns
|
||||
} else if let Some(data_column_recv) = data_column_recv {
|
||||
match block_data {
|
||||
AvailableBlockData::NoData => Ok(None),
|
||||
AvailableBlockData::Blobs(blobs) => {
|
||||
debug!(
|
||||
self.log, "Writing blobs to store";
|
||||
"block_root" => %block_root,
|
||||
"count" => blobs.len(),
|
||||
);
|
||||
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
|
||||
}
|
||||
AvailableBlockData::DataColumns(data_columns) => {
|
||||
debug!(
|
||||
self.log, "Writing data columns to store";
|
||||
"block_root" => %block_root,
|
||||
"count" => data_columns.len(),
|
||||
);
|
||||
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
|
||||
}
|
||||
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
|
||||
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
|
||||
let _column_recv_timer =
|
||||
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
|
||||
@@ -7250,34 +7256,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let computed_data_columns = data_column_recv
|
||||
.blocking_recv()
|
||||
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
|
||||
Some(computed_data_columns)
|
||||
} else {
|
||||
// No blobs in the block.
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(data_columns) = data_columns_to_persist {
|
||||
if !data_columns.is_empty() {
|
||||
debug!(
|
||||
self.log, "Writing data_columns to store";
|
||||
"block_root" => %block_root,
|
||||
"count" => data_columns.len(),
|
||||
);
|
||||
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
|
||||
}
|
||||
}
|
||||
} else if let Some(blobs) = blobs {
|
||||
if !blobs.is_empty() {
|
||||
debug!(
|
||||
self.log, "Writing blobs to store";
|
||||
self.log, "Writing data columns to store";
|
||||
"block_root" => %block_root,
|
||||
"count" => blobs.len(),
|
||||
"count" => computed_data_columns.len(),
|
||||
);
|
||||
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
|
||||
// TODO(das): Store only this node's custody columns
|
||||
Ok(Some(StoreOp::PutDataColumns(
|
||||
block_root,
|
||||
computed_data_columns,
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1707,7 +1707,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots,
|
||||
consensus_context,
|
||||
data_column_recv: None,
|
||||
},
|
||||
payload_verification_handle,
|
||||
})
|
||||
|
||||
@@ -7,11 +7,10 @@ use derivative::Derivative;
|
||||
use state_processing::ConsensusContext;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
use types::blob_sidecar::BlobIdentifier;
|
||||
use types::{
|
||||
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
|
||||
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
|
||||
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
|
||||
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
|
||||
};
|
||||
|
||||
/// A block that has been received over RPC. It has 2 internal variants:
|
||||
@@ -265,7 +264,6 @@ impl<E: EthSpec> ExecutedBlock<E> {
|
||||
|
||||
/// A block that has completed all pre-deneb block processing checks including verification
|
||||
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
|
||||
#[derive(PartialEq)]
|
||||
pub struct AvailableExecutedBlock<E: EthSpec> {
|
||||
pub block: AvailableBlock<E>,
|
||||
pub import_data: BlockImportData<E>,
|
||||
@@ -338,8 +336,7 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Derivative)]
|
||||
#[derivative(PartialEq)]
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct BlockImportData<E: EthSpec> {
|
||||
pub block_root: Hash256,
|
||||
pub state: BeaconState<E>,
|
||||
@@ -347,12 +344,6 @@ pub struct BlockImportData<E: EthSpec> {
|
||||
pub parent_eth1_finalization_data: Eth1FinalizationData,
|
||||
pub confirmed_state_roots: Vec<Hash256>,
|
||||
pub consensus_context: ConsensusContext<E>,
|
||||
#[derivative(PartialEq = "ignore")]
|
||||
/// An optional receiver for `DataColumnSidecarList`.
|
||||
///
|
||||
/// This field is `Some` when data columns are being computed asynchronously.
|
||||
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
|
||||
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
|
||||
}
|
||||
|
||||
impl<E: EthSpec> BlockImportData<E> {
|
||||
@@ -371,7 +362,6 @@ impl<E: EthSpec> BlockImportData<E> {
|
||||
},
|
||||
confirmed_state_roots: vec![],
|
||||
consensus_context: ConsensusContext::new(Slot::new(0)),
|
||||
data_column_recv: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -91,7 +91,6 @@ pub enum DataColumnReconstructionResult<E: EthSpec> {
|
||||
///
|
||||
/// Indicates if the block is fully `Available` or if we need blobs or blocks
|
||||
/// to "complete" the requirements for an `AvailableBlock`.
|
||||
#[derive(PartialEq)]
|
||||
pub enum Availability<E: EthSpec> {
|
||||
MissingComponents(Hash256),
|
||||
Available(Box<AvailableExecutedBlock<E>>),
|
||||
@@ -219,7 +218,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
.map_err(AvailabilityCheckError::InvalidBlobs)?;
|
||||
|
||||
self.availability_cache
|
||||
.put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log)
|
||||
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
|
||||
}
|
||||
|
||||
/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
|
||||
@@ -253,23 +252,29 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
pub fn put_engine_blobs(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
block_epoch: Epoch,
|
||||
blobs: FixedBlobSidecarList<T::EthSpec>,
|
||||
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
data_columns_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let seen_timestamp = self
|
||||
.slot_clock
|
||||
.now_duration()
|
||||
.ok_or(AvailabilityCheckError::SlotClockError)?;
|
||||
|
||||
let verified_blobs =
|
||||
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);
|
||||
|
||||
self.availability_cache.put_kzg_verified_blobs(
|
||||
block_root,
|
||||
verified_blobs,
|
||||
data_column_recv,
|
||||
&self.log,
|
||||
)
|
||||
// `data_columns_recv` is always Some if block_root is post-PeerDAS
|
||||
if let Some(data_columns_recv) = data_columns_recv {
|
||||
self.availability_cache.put_computed_data_columns_recv(
|
||||
block_root,
|
||||
block_epoch,
|
||||
data_columns_recv,
|
||||
&self.log,
|
||||
)
|
||||
} else {
|
||||
let seen_timestamp = self
|
||||
.slot_clock
|
||||
.now_duration()
|
||||
.ok_or(AvailabilityCheckError::SlotClockError)?;
|
||||
self.availability_cache.put_kzg_verified_blobs(
|
||||
block_root,
|
||||
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp),
|
||||
&self.log,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if we've cached other blobs for this block. If it completes a set and we also
|
||||
@@ -284,7 +289,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
self.availability_cache.put_kzg_verified_blobs(
|
||||
gossip_blob.block_root(),
|
||||
vec![gossip_blob.into_inner()],
|
||||
None,
|
||||
&self.log,
|
||||
)
|
||||
}
|
||||
@@ -338,15 +342,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
) -> Result<MaybeAvailableBlock<T::EthSpec>, AvailabilityCheckError> {
|
||||
let (block_root, block, blobs, data_columns) = block.deconstruct();
|
||||
if self.blobs_required_for_block(&block) {
|
||||
return if let Some(blob_list) = blobs.as_ref() {
|
||||
return if let Some(blob_list) = blobs {
|
||||
verify_kzg_for_blob_list(blob_list.iter(), &self.kzg)
|
||||
.map_err(AvailabilityCheckError::InvalidBlobs)?;
|
||||
Ok(MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
blob_data: AvailableBlockData::Blobs(blob_list),
|
||||
blobs_available_timestamp: None,
|
||||
data_columns: None,
|
||||
spec: self.spec.clone(),
|
||||
}))
|
||||
} else {
|
||||
@@ -365,14 +368,13 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
Ok(MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs: None,
|
||||
blobs_available_timestamp: None,
|
||||
data_columns: Some(
|
||||
blob_data: AvailableBlockData::DataColumns(
|
||||
data_column_list
|
||||
.into_iter()
|
||||
.map(|d| d.clone_arc())
|
||||
.collect(),
|
||||
),
|
||||
blobs_available_timestamp: None,
|
||||
spec: self.spec.clone(),
|
||||
}))
|
||||
} else {
|
||||
@@ -383,9 +385,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
Ok(MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs: None,
|
||||
blob_data: AvailableBlockData::NoData,
|
||||
blobs_available_timestamp: None,
|
||||
data_columns: None,
|
||||
spec: self.spec.clone(),
|
||||
}))
|
||||
}
|
||||
@@ -437,27 +438,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
let (block_root, block, blobs, data_columns) = block.deconstruct();
|
||||
|
||||
let maybe_available_block = if self.blobs_required_for_block(&block) {
|
||||
if blobs.is_some() {
|
||||
if let Some(blobs) = blobs {
|
||||
MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
blob_data: AvailableBlockData::Blobs(blobs),
|
||||
blobs_available_timestamp: None,
|
||||
data_columns: None,
|
||||
spec: self.spec.clone(),
|
||||
})
|
||||
} else {
|
||||
MaybeAvailableBlock::AvailabilityPending { block_root, block }
|
||||
}
|
||||
} else if self.data_columns_required_for_block(&block) {
|
||||
if data_columns.is_some() {
|
||||
if let Some(data_columns) = data_columns {
|
||||
MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs: None,
|
||||
data_columns: data_columns.map(|data_columns| {
|
||||
data_columns.into_iter().map(|d| d.into_inner()).collect()
|
||||
}),
|
||||
blob_data: AvailableBlockData::DataColumns(
|
||||
data_columns.into_iter().map(|d| d.into_inner()).collect(),
|
||||
),
|
||||
blobs_available_timestamp: None,
|
||||
spec: self.spec.clone(),
|
||||
})
|
||||
@@ -468,8 +467,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
MaybeAvailableBlock::Available(AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs: None,
|
||||
data_columns: None,
|
||||
blob_data: AvailableBlockData::NoData,
|
||||
blobs_available_timestamp: None,
|
||||
spec: self.spec.clone(),
|
||||
})
|
||||
@@ -545,11 +543,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
) -> Result<DataColumnReconstructionResult<T::EthSpec>, AvailabilityCheckError> {
|
||||
let pending_components = match self
|
||||
let verified_data_columns = match self
|
||||
.availability_cache
|
||||
.check_and_set_reconstruction_started(block_root)
|
||||
{
|
||||
ReconstructColumnsDecision::Yes(pending_components) => pending_components,
|
||||
ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns,
|
||||
ReconstructColumnsDecision::No(reason) => {
|
||||
return Ok(DataColumnReconstructionResult::NotStarted(reason));
|
||||
}
|
||||
@@ -560,7 +558,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
|
||||
let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns(
|
||||
&self.kzg,
|
||||
&pending_components.verified_data_columns,
|
||||
&verified_data_columns,
|
||||
&self.spec,
|
||||
)
|
||||
.map_err(|e| {
|
||||
@@ -713,13 +711,25 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AvailableBlockData<E: EthSpec> {
|
||||
/// Block is pre-Deneb or has zero blobs
|
||||
NoData,
|
||||
/// Block is post-Deneb, pre-PeerDAS and has more than zero blobs
|
||||
Blobs(BlobSidecarList<E>),
|
||||
/// Block is post-PeerDAS and has more than zero blobs
|
||||
DataColumns(DataColumnSidecarList<E>),
|
||||
/// Block is post-PeerDAS, has more than zero blobs and we recomputed the columns from the EL's
|
||||
/// mempool blobs
|
||||
DataColumnsRecv(oneshot::Receiver<DataColumnSidecarList<E>>),
|
||||
}
|
||||
|
||||
/// A fully available block that is ready to be imported into fork choice.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
#[derive(Debug)]
|
||||
pub struct AvailableBlock<E: EthSpec> {
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
blobs: Option<BlobSidecarList<E>>,
|
||||
data_columns: Option<DataColumnSidecarList<E>>,
|
||||
blob_data: AvailableBlockData<E>,
|
||||
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
|
||||
blobs_available_timestamp: Option<Duration>,
|
||||
pub spec: Arc<ChainSpec>,
|
||||
@@ -729,15 +739,13 @@ impl<E: EthSpec> AvailableBlock<E> {
|
||||
pub fn __new_for_testing(
|
||||
block_root: Hash256,
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
blobs: Option<BlobSidecarList<E>>,
|
||||
data_columns: Option<DataColumnSidecarList<E>>,
|
||||
data: AvailableBlockData<E>,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Self {
|
||||
Self {
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
data_columns,
|
||||
blob_data: data,
|
||||
blobs_available_timestamp: None,
|
||||
spec,
|
||||
}
|
||||
@@ -750,39 +758,56 @@ impl<E: EthSpec> AvailableBlock<E> {
|
||||
self.block.clone()
|
||||
}
|
||||
|
||||
pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
|
||||
self.blobs.as_ref()
|
||||
}
|
||||
|
||||
pub fn blobs_available_timestamp(&self) -> Option<Duration> {
|
||||
self.blobs_available_timestamp
|
||||
}
|
||||
|
||||
pub fn data_columns(&self) -> Option<&DataColumnSidecarList<E>> {
|
||||
self.data_columns.as_ref()
|
||||
pub fn data(&self) -> &AvailableBlockData<E> {
|
||||
&self.blob_data
|
||||
}
|
||||
|
||||
pub fn has_blobs(&self) -> bool {
|
||||
match self.blob_data {
|
||||
AvailableBlockData::NoData => false,
|
||||
AvailableBlockData::Blobs(..) => true,
|
||||
AvailableBlockData::DataColumns(_) => false,
|
||||
AvailableBlockData::DataColumnsRecv(_) => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn deconstruct(
|
||||
self,
|
||||
) -> (
|
||||
Hash256,
|
||||
Arc<SignedBeaconBlock<E>>,
|
||||
Option<BlobSidecarList<E>>,
|
||||
Option<DataColumnSidecarList<E>>,
|
||||
) {
|
||||
pub fn deconstruct(self) -> (Hash256, Arc<SignedBeaconBlock<E>>, AvailableBlockData<E>) {
|
||||
let AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
data_columns,
|
||||
blob_data,
|
||||
..
|
||||
} = self;
|
||||
(block_root, block, blobs, data_columns)
|
||||
(block_root, block, blob_data)
|
||||
}
|
||||
|
||||
/// Only used for testing
|
||||
pub fn __clone_without_recv(&self) -> Result<Self, String> {
|
||||
Ok(Self {
|
||||
block_root: self.block_root,
|
||||
block: self.block.clone(),
|
||||
blob_data: match &self.blob_data {
|
||||
AvailableBlockData::NoData => AvailableBlockData::NoData,
|
||||
AvailableBlockData::Blobs(blobs) => AvailableBlockData::Blobs(blobs.clone()),
|
||||
AvailableBlockData::DataColumns(data_columns) => {
|
||||
AvailableBlockData::DataColumns(data_columns.clone())
|
||||
}
|
||||
AvailableBlockData::DataColumnsRecv(_) => {
|
||||
return Err("Can't clone DataColumnsRecv".to_owned())
|
||||
}
|
||||
},
|
||||
blobs_available_timestamp: self.blobs_available_timestamp,
|
||||
spec: self.spec.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub enum MaybeAvailableBlock<E: EthSpec> {
|
||||
/// This variant is fully available.
|
||||
/// i.e. for pre-deneb blocks, it contains a (`SignedBeaconBlock`, `Blobs::None`) and for
|
||||
|
||||
@@ -10,7 +10,7 @@ pub enum Error {
|
||||
blob_commitment: KzgCommitment,
|
||||
block_commitment: KzgCommitment,
|
||||
},
|
||||
Unexpected,
|
||||
Unexpected(&'static str),
|
||||
SszTypes(ssz_types::Error),
|
||||
MissingBlobs,
|
||||
MissingCustodyColumns,
|
||||
@@ -40,7 +40,7 @@ impl Error {
|
||||
| Error::MissingCustodyColumns
|
||||
| Error::StoreError(_)
|
||||
| Error::DecodeError(_)
|
||||
| Error::Unexpected
|
||||
| Error::Unexpected(_)
|
||||
| Error::ParentStateMissing(_)
|
||||
| Error::BlockReplayError(_)
|
||||
| Error::RebuildingStateCaches(_)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache};
|
||||
use super::AvailableBlockData;
|
||||
use crate::beacon_chain::BeaconStore;
|
||||
use crate::blob_verification::KzgVerifiedBlob;
|
||||
use crate::block_verification_types::{
|
||||
@@ -10,6 +11,7 @@ use crate::BeaconChainTypes;
|
||||
use lru::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
use slog::{debug, Logger};
|
||||
use std::cmp::Ordering;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
@@ -39,19 +41,6 @@ pub struct PendingComponents<E: EthSpec> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> PendingComponents<E> {
|
||||
/// Clones the `PendingComponent` without cloning `data_column_recv`, as `Receiver` is not cloneable.
|
||||
/// This should only be used when the receiver is no longer needed.
|
||||
pub fn clone_without_column_recv(&self) -> Self {
|
||||
PendingComponents {
|
||||
block_root: self.block_root,
|
||||
verified_blobs: self.verified_blobs.clone(),
|
||||
verified_data_columns: self.verified_data_columns.clone(),
|
||||
executed_block: self.executed_block.clone(),
|
||||
reconstruction_started: self.reconstruction_started,
|
||||
data_column_recv: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns an immutable reference to the cached block.
|
||||
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
|
||||
&self.executed_block
|
||||
@@ -95,26 +84,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a
|
||||
/// block.
|
||||
///
|
||||
/// This corresponds to the number of commitments that are present in a block.
|
||||
pub fn block_kzg_commitments_count(&self) -> Option<usize> {
|
||||
self.get_cached_block()
|
||||
.as_ref()
|
||||
.map(|b| b.get_commitments().len())
|
||||
}
|
||||
|
||||
/// Returns the number of blobs that have been received and are stored in the cache.
|
||||
pub fn num_received_blobs(&self) -> usize {
|
||||
self.get_cached_blobs().iter().flatten().count()
|
||||
}
|
||||
|
||||
/// Returns the number of data columns that have been received and are stored in the cache.
|
||||
pub fn num_received_data_columns(&self) -> usize {
|
||||
self.verified_data_columns.len()
|
||||
}
|
||||
|
||||
/// Returns the indices of cached custody columns
|
||||
pub fn get_cached_data_columns_indices(&self) -> Vec<ColumnIndex> {
|
||||
self.verified_data_columns
|
||||
@@ -189,51 +158,121 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
self.merge_blobs(reinsert);
|
||||
}
|
||||
|
||||
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
|
||||
/// available in the cache.
|
||||
/// Returns Some if the block has received all its required data for import. The return value
|
||||
/// must be persisted in the DB along with the block.
|
||||
///
|
||||
/// Returns `true` if both the block exists and the number of received blobs / custody columns
|
||||
/// matches the number of expected blobs / custody columns.
|
||||
pub fn is_available(&self, custody_column_count: usize, log: &Logger) -> bool {
|
||||
let block_kzg_commitments_count_opt = self.block_kzg_commitments_count();
|
||||
let expected_blobs_msg = block_kzg_commitments_count_opt
|
||||
.as_ref()
|
||||
.map(|num| num.to_string())
|
||||
.unwrap_or("unknown".to_string());
|
||||
/// WARNING: This function can potentially take a lot of time if the state needs to be
|
||||
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
|
||||
pub fn make_available<R>(
|
||||
&mut self,
|
||||
custody_column_count: usize,
|
||||
spec: &Arc<ChainSpec>,
|
||||
recover: R,
|
||||
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError>
|
||||
where
|
||||
R: FnOnce(
|
||||
DietAvailabilityPendingExecutedBlock<E>,
|
||||
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
|
||||
{
|
||||
let Some(block) = &self.executed_block else {
|
||||
// Block not available yet
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// No data columns when there are 0 blobs
|
||||
let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| {
|
||||
if blob_count > 0 {
|
||||
custody_column_count
|
||||
} else {
|
||||
0
|
||||
let num_expected_blobs = block.num_blobs_expected();
|
||||
|
||||
let blob_data = if num_expected_blobs == 0 {
|
||||
Some(AvailableBlockData::NoData)
|
||||
} else if spec.is_peer_das_enabled_for_epoch(block.epoch()) {
|
||||
match self.verified_data_columns.len().cmp(&custody_column_count) {
|
||||
Ordering::Greater => {
|
||||
// Should never happen
|
||||
return Err(AvailabilityCheckError::Unexpected("too many columns"));
|
||||
}
|
||||
Ordering::Equal => {
|
||||
// Block is post-peerdas, and we got enough columns
|
||||
let data_columns = self
|
||||
.verified_data_columns
|
||||
.iter()
|
||||
.map(|d| d.clone().into_inner())
|
||||
.collect::<Vec<_>>();
|
||||
Some(AvailableBlockData::DataColumns(data_columns))
|
||||
}
|
||||
Ordering::Less => {
|
||||
// The data_columns_recv is an infallible promise that we will receive all expected
|
||||
// columns, so we consider the block available.
|
||||
// We take the receiver as it can't be cloned, and make_available should never
|
||||
// be called again once it returns `Some`.
|
||||
self.data_column_recv
|
||||
.take()
|
||||
.map(AvailableBlockData::DataColumnsRecv)
|
||||
}
|
||||
}
|
||||
});
|
||||
let expected_columns_msg = expected_columns_opt
|
||||
.as_ref()
|
||||
.map(|num| num.to_string())
|
||||
.unwrap_or("unknown".to_string());
|
||||
} else {
|
||||
// Before PeerDAS, blobs
|
||||
let num_received_blobs = self.verified_blobs.iter().flatten().count();
|
||||
match num_received_blobs.cmp(&num_expected_blobs) {
|
||||
Ordering::Greater => {
|
||||
// Should never happen
|
||||
return Err(AvailabilityCheckError::Unexpected("too many blobs"));
|
||||
}
|
||||
Ordering::Equal => {
|
||||
let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize;
|
||||
let blobs_vec = self
|
||||
.verified_blobs
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|blob| blob.clone().to_blob())
|
||||
.collect::<Vec<_>>();
|
||||
let blobs = RuntimeVariableList::new(blobs_vec, max_blobs)
|
||||
.map_err(|_| AvailabilityCheckError::Unexpected("over max_blobs"))?;
|
||||
Some(AvailableBlockData::Blobs(blobs))
|
||||
}
|
||||
Ordering::Less => {
|
||||
// Not enough blobs received yet
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let num_received_blobs = self.num_received_blobs();
|
||||
let num_received_columns = self.num_received_data_columns();
|
||||
// Block's data not available yet
|
||||
let Some(blob_data) = blob_data else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
debug!(
|
||||
log,
|
||||
"Component(s) added to data availability checker";
|
||||
"block_root" => ?self.block_root,
|
||||
"received_blobs" => num_received_blobs,
|
||||
"expected_blobs" => expected_blobs_msg,
|
||||
"received_columns" => num_received_columns,
|
||||
"expected_columns" => expected_columns_msg,
|
||||
);
|
||||
// Block is available, construct `AvailableExecutedBlock`
|
||||
|
||||
let all_blobs_received = block_kzg_commitments_count_opt
|
||||
.is_some_and(|num_expected_blobs| num_expected_blobs == num_received_blobs);
|
||||
let blobs_available_timestamp = match blob_data {
|
||||
AvailableBlockData::NoData => None,
|
||||
AvailableBlockData::Blobs(_) => self
|
||||
.verified_blobs
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|blob| blob.seen_timestamp())
|
||||
.max(),
|
||||
// TODO(das): To be fixed with https://github.com/sigp/lighthouse/pull/6850
|
||||
AvailableBlockData::DataColumns(_) => None,
|
||||
AvailableBlockData::DataColumnsRecv(_) => None,
|
||||
};
|
||||
|
||||
let all_columns_received = expected_columns_opt
|
||||
.is_some_and(|num_expected_columns| num_expected_columns == num_received_columns);
|
||||
let AvailabilityPendingExecutedBlock {
|
||||
block,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} = recover(block.clone())?;
|
||||
|
||||
all_blobs_received || all_columns_received
|
||||
let available_block = AvailableBlock {
|
||||
block_root: self.block_root,
|
||||
block,
|
||||
blob_data,
|
||||
blobs_available_timestamp,
|
||||
spec: spec.clone(),
|
||||
};
|
||||
Ok(Some(AvailableExecutedBlock::new(
|
||||
available_block,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns an empty `PendingComponents` object with the given block root.
|
||||
@@ -248,87 +287,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Verifies an `SignedBeaconBlock` against a set of KZG verified blobs.
|
||||
/// This does not check whether a block *should* have blobs, these checks should have been
|
||||
/// completed when producing the `AvailabilityPendingBlock`.
|
||||
///
|
||||
/// WARNING: This function can potentially take a lot of time if the state needs to be
|
||||
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
|
||||
pub fn make_available<R>(
|
||||
self,
|
||||
spec: &Arc<ChainSpec>,
|
||||
recover: R,
|
||||
) -> Result<Availability<E>, AvailabilityCheckError>
|
||||
where
|
||||
R: FnOnce(
|
||||
DietAvailabilityPendingExecutedBlock<E>,
|
||||
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
|
||||
{
|
||||
let Self {
|
||||
block_root,
|
||||
verified_blobs,
|
||||
verified_data_columns,
|
||||
executed_block,
|
||||
data_column_recv,
|
||||
..
|
||||
} = self;
|
||||
|
||||
let blobs_available_timestamp = verified_blobs
|
||||
.iter()
|
||||
.flatten()
|
||||
.map(|blob| blob.seen_timestamp())
|
||||
.max();
|
||||
|
||||
let Some(diet_executed_block) = executed_block else {
|
||||
return Err(AvailabilityCheckError::Unexpected);
|
||||
};
|
||||
|
||||
let is_peer_das_enabled = spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch());
|
||||
let (blobs, data_columns) = if is_peer_das_enabled {
|
||||
let data_columns = verified_data_columns
|
||||
.into_iter()
|
||||
.map(|d| d.into_inner())
|
||||
.collect::<Vec<_>>();
|
||||
(None, Some(data_columns))
|
||||
} else {
|
||||
let num_blobs_expected = diet_executed_block.num_blobs_expected();
|
||||
let Some(verified_blobs) = verified_blobs
|
||||
.into_iter()
|
||||
.map(|b| b.map(|b| b.to_blob()))
|
||||
.take(num_blobs_expected)
|
||||
.collect::<Option<Vec<_>>>()
|
||||
else {
|
||||
return Err(AvailabilityCheckError::Unexpected);
|
||||
};
|
||||
let max_len = spec.max_blobs_per_block(diet_executed_block.as_block().epoch()) as usize;
|
||||
(
|
||||
Some(RuntimeVariableList::new(verified_blobs, max_len)?),
|
||||
None,
|
||||
)
|
||||
};
|
||||
let executed_block = recover(diet_executed_block)?;
|
||||
|
||||
let AvailabilityPendingExecutedBlock {
|
||||
block,
|
||||
mut import_data,
|
||||
payload_verification_outcome,
|
||||
} = executed_block;
|
||||
|
||||
import_data.data_column_recv = data_column_recv;
|
||||
|
||||
let available_block = AvailableBlock {
|
||||
block_root,
|
||||
block,
|
||||
blobs,
|
||||
data_columns,
|
||||
blobs_available_timestamp,
|
||||
spec: spec.clone(),
|
||||
};
|
||||
Ok(Availability::Available(Box::new(
|
||||
AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome),
|
||||
)))
|
||||
}
|
||||
|
||||
/// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob.
|
||||
pub fn epoch(&self) -> Option<Epoch> {
|
||||
self.executed_block
|
||||
@@ -354,6 +312,41 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
pub fn status_str(
|
||||
&self,
|
||||
block_epoch: Epoch,
|
||||
sampling_column_count: usize,
|
||||
spec: &ChainSpec,
|
||||
) -> String {
|
||||
let block_count = if self.executed_block.is_some() { 1 } else { 0 };
|
||||
if spec.is_peer_das_enabled_for_epoch(block_epoch) {
|
||||
let data_column_recv_count = if self.data_column_recv.is_some() {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
format!(
|
||||
"block {} data_columns {}/{} data_columns_recv {}",
|
||||
block_count,
|
||||
self.verified_data_columns.len(),
|
||||
sampling_column_count,
|
||||
data_column_recv_count,
|
||||
)
|
||||
} else {
|
||||
let num_expected_blobs = if let Some(block) = self.get_cached_block() {
|
||||
&block.num_blobs_expected().to_string()
|
||||
} else {
|
||||
"?"
|
||||
};
|
||||
format!(
|
||||
"block {} blobs {}/{}",
|
||||
block_count,
|
||||
self.verified_blobs.len(),
|
||||
num_expected_blobs
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is the main struct for this module. Outside methods should
|
||||
@@ -374,7 +367,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
|
||||
// the current usage, as it's deconstructed immediately.
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
|
||||
Yes(PendingComponents<E>),
|
||||
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>),
|
||||
No(&'static str),
|
||||
}
|
||||
|
||||
@@ -455,16 +448,10 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
}
|
||||
|
||||
/// Puts the KZG verified blobs into the availability cache as pending components.
|
||||
///
|
||||
/// The `data_column_recv` parameter is an optional `Receiver` for data columns that are
|
||||
/// computed asynchronously. This method remains **used** after PeerDAS activation, because
|
||||
/// blocks can be made available if the EL already has the blobs and returns them via the
|
||||
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
|
||||
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
kzg_verified_blobs: I,
|
||||
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
|
||||
log: &Logger,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let mut kzg_verified_blobs = kzg_verified_blobs.into_iter().peekable();
|
||||
@@ -474,7 +461,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
.map(|verified_blob| verified_blob.as_blob().epoch())
|
||||
else {
|
||||
// Verified blobs list should be non-empty.
|
||||
return Err(AvailabilityCheckError::Unexpected);
|
||||
return Err(AvailabilityCheckError::Unexpected("empty blobs"));
|
||||
};
|
||||
|
||||
let mut fixed_blobs =
|
||||
@@ -499,21 +486,22 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
// Merge in the blobs.
|
||||
pending_components.merge_blobs(fixed_blobs);
|
||||
|
||||
if data_column_recv.is_some() {
|
||||
// If `data_column_recv` is `Some`, it means we have all the blobs from engine, and have
|
||||
// started computing data columns. We store the receiver in `PendingComponents` for
|
||||
// later use when importing the block.
|
||||
pending_components.data_column_recv = data_column_recv;
|
||||
}
|
||||
debug!(log, "Component added to data availability checker";
|
||||
"component" => "blobs",
|
||||
"block_root" => ?block_root,
|
||||
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
|
||||
);
|
||||
|
||||
if pending_components.is_available(self.sampling_column_count, log) {
|
||||
if let Some(available_block) =
|
||||
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
|
||||
self.state_cache.recover_pending_executed_block(block)
|
||||
})?
|
||||
{
|
||||
// We keep the pending components in the availability cache during block import (#5845).
|
||||
// `data_column_recv` is returned as part of the available block and is no longer needed here.
|
||||
write_lock.put(block_root, pending_components.clone_without_column_recv());
|
||||
write_lock.put(block_root, pending_components);
|
||||
drop(write_lock);
|
||||
pending_components.make_available(&self.spec, |diet_block| {
|
||||
self.state_cache.recover_pending_executed_block(diet_block)
|
||||
})
|
||||
Ok(Availability::Available(Box::new(available_block)))
|
||||
} else {
|
||||
write_lock.put(block_root, pending_components);
|
||||
Ok(Availability::MissingComponents(block_root))
|
||||
@@ -535,7 +523,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
.map(|verified_blob| verified_blob.as_data_column().epoch())
|
||||
else {
|
||||
// Verified data_columns list should be non-empty.
|
||||
return Err(AvailabilityCheckError::Unexpected);
|
||||
return Err(AvailabilityCheckError::Unexpected("empty columns"));
|
||||
};
|
||||
|
||||
let mut write_lock = self.critical.write();
|
||||
@@ -551,14 +539,72 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
// Merge in the data columns.
|
||||
pending_components.merge_data_columns(kzg_verified_data_columns)?;
|
||||
|
||||
if pending_components.is_available(self.sampling_column_count, log) {
|
||||
debug!(log, "Component added to data availability checker";
|
||||
"component" => "data_columns",
|
||||
"block_root" => ?block_root,
|
||||
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
|
||||
);
|
||||
|
||||
if let Some(available_block) =
|
||||
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
|
||||
self.state_cache.recover_pending_executed_block(block)
|
||||
})?
|
||||
{
|
||||
// We keep the pending components in the availability cache during block import (#5845).
|
||||
// `data_column_recv` is returned as part of the available block and is no longer needed here.
|
||||
write_lock.put(block_root, pending_components.clone_without_column_recv());
|
||||
write_lock.put(block_root, pending_components);
|
||||
drop(write_lock);
|
||||
pending_components.make_available(&self.spec, |diet_block| {
|
||||
self.state_cache.recover_pending_executed_block(diet_block)
|
||||
})
|
||||
Ok(Availability::Available(Box::new(available_block)))
|
||||
} else {
|
||||
write_lock.put(block_root, pending_components);
|
||||
Ok(Availability::MissingComponents(block_root))
|
||||
}
|
||||
}
|
||||
|
||||
/// The `data_column_recv` parameter is a `Receiver` for data columns that are computed
|
||||
/// asynchronously. This method is used if the EL already has the blobs and returns them via the
|
||||
/// `getBlobsV1` engine method. More details in [fetch_blobs.rs](https://github.com/sigp/lighthouse/blob/44f8add41ea2252769bb967864af95b3c13af8ca/beacon_node/beacon_chain/src/fetch_blobs.rs).
|
||||
pub fn put_computed_data_columns_recv(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
block_epoch: Epoch,
|
||||
data_column_recv: oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>,
|
||||
log: &Logger,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
let mut write_lock = self.critical.write();
|
||||
|
||||
// Grab existing entry or create a new entry.
|
||||
let mut pending_components = write_lock
|
||||
.pop_entry(&block_root)
|
||||
.map(|(_, v)| v)
|
||||
.unwrap_or_else(|| {
|
||||
PendingComponents::empty(
|
||||
block_root,
|
||||
self.spec.max_blobs_per_block(block_epoch) as usize,
|
||||
)
|
||||
});
|
||||
|
||||
// We have all the blobs from engine, and have started computing data columns. We store the
|
||||
// receiver in `PendingComponents` for later use when importing the block.
|
||||
// TODO(das): Error or log if we overwrite a prior receiver https://github.com/sigp/lighthouse/issues/6764
|
||||
pending_components.data_column_recv = Some(data_column_recv);
|
||||
|
||||
debug!(log, "Component added to data availability checker";
|
||||
"component" => "data_columns_recv",
|
||||
"block_root" => ?block_root,
|
||||
"status" => pending_components.status_str(block_epoch, self.sampling_column_count, &self.spec),
|
||||
);
|
||||
|
||||
if let Some(available_block) =
|
||||
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
|
||||
self.state_cache.recover_pending_executed_block(block)
|
||||
})?
|
||||
{
|
||||
// We keep the pending components in the availability cache during block import (#5845).
|
||||
// `data_column_recv` is returned as part of the available block and is no longer needed here.
|
||||
write_lock.put(block_root, pending_components);
|
||||
drop(write_lock);
|
||||
Ok(Availability::Available(Box::new(available_block)))
|
||||
} else {
|
||||
write_lock.put(block_root, pending_components);
|
||||
Ok(Availability::MissingComponents(block_root))
|
||||
@@ -603,7 +649,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
}
|
||||
|
||||
pending_components.reconstruction_started = true;
|
||||
ReconstructColumnsDecision::Yes(pending_components.clone_without_column_recv())
|
||||
ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone())
|
||||
}
|
||||
|
||||
/// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`.
|
||||
@@ -643,15 +689,23 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
// Merge in the block.
|
||||
pending_components.merge_block(diet_executed_block);
|
||||
|
||||
debug!(log, "Component added to data availability checker";
|
||||
"component" => "block",
|
||||
"block_root" => ?block_root,
|
||||
"status" => pending_components.status_str(epoch, self.sampling_column_count, &self.spec),
|
||||
);
|
||||
|
||||
// Check if we have all components and entire set is consistent.
|
||||
if pending_components.is_available(self.sampling_column_count, log) {
|
||||
if let Some(available_block) =
|
||||
pending_components.make_available(self.sampling_column_count, &self.spec, |block| {
|
||||
self.state_cache.recover_pending_executed_block(block)
|
||||
})?
|
||||
{
|
||||
// We keep the pending components in the availability cache during block import (#5845).
|
||||
// `data_column_recv` is returned as part of the available block and is no longer needed here.
|
||||
write_lock.put(block_root, pending_components.clone_without_column_recv());
|
||||
write_lock.put(block_root, pending_components);
|
||||
drop(write_lock);
|
||||
pending_components.make_available(&self.spec, |diet_block| {
|
||||
self.state_cache.recover_pending_executed_block(diet_block)
|
||||
})
|
||||
Ok(Availability::Available(Box::new(available_block)))
|
||||
} else {
|
||||
write_lock.put(block_root, pending_components);
|
||||
Ok(Availability::MissingComponents(block_root))
|
||||
@@ -882,7 +936,6 @@ mod test {
|
||||
parent_eth1_finalization_data,
|
||||
confirmed_state_roots: vec![],
|
||||
consensus_context,
|
||||
data_column_recv: None,
|
||||
};
|
||||
|
||||
let payload_verification_outcome = PayloadVerificationOutcome {
|
||||
@@ -989,7 +1042,7 @@ mod test {
|
||||
for (blob_index, gossip_blob) in blobs.into_iter().enumerate() {
|
||||
kzg_verified_blobs.push(gossip_blob.into_inner());
|
||||
let availability = cache
|
||||
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
|
||||
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
|
||||
.expect("should put blob");
|
||||
if blob_index == blobs_expected - 1 {
|
||||
assert!(matches!(availability, Availability::Available(_)));
|
||||
@@ -1017,11 +1070,10 @@ mod test {
|
||||
for gossip_blob in blobs {
|
||||
kzg_verified_blobs.push(gossip_blob.into_inner());
|
||||
let availability = cache
|
||||
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger())
|
||||
.put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger())
|
||||
.expect("should put blob");
|
||||
assert_eq!(
|
||||
availability,
|
||||
Availability::MissingComponents(root),
|
||||
assert!(
|
||||
matches!(availability, Availability::MissingComponents(_)),
|
||||
"should be pending block"
|
||||
);
|
||||
assert_eq!(cache.critical.read().len(), 1);
|
||||
@@ -1273,7 +1325,6 @@ mod pending_components_tests {
|
||||
},
|
||||
confirmed_state_roots: vec![],
|
||||
consensus_context: ConsensusContext::new(Slot::new(0)),
|
||||
data_column_recv: None,
|
||||
},
|
||||
payload_verification_outcome: PayloadVerificationOutcome {
|
||||
payload_verification_status: PayloadVerificationStatus::Verified,
|
||||
|
||||
@@ -136,7 +136,6 @@ impl<T: BeaconChainTypes> StateLRUCache<T> {
|
||||
consensus_context: diet_executed_block
|
||||
.consensus_context
|
||||
.into_consensus_context(),
|
||||
data_column_recv: None,
|
||||
},
|
||||
payload_verification_outcome: diet_executed_block.payload_verification_outcome,
|
||||
})
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::data_availability_checker::AvailableBlock;
|
||||
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
|
||||
use crate::{
|
||||
attester_cache::{CommitteeLengths, Error},
|
||||
metrics,
|
||||
@@ -52,7 +52,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
|
||||
pub fn add_head_block(
|
||||
&self,
|
||||
beacon_block_root: Hash256,
|
||||
block: AvailableBlock<E>,
|
||||
block: &AvailableBlock<E>,
|
||||
proto_block: ProtoBlock,
|
||||
state: &BeaconState<E>,
|
||||
spec: &ChainSpec,
|
||||
@@ -70,14 +70,23 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
|
||||
},
|
||||
};
|
||||
|
||||
let (_, block, blobs, data_columns) = block.deconstruct();
|
||||
let (blobs, data_columns) = match block.data() {
|
||||
AvailableBlockData::NoData => (None, None),
|
||||
AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None),
|
||||
AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())),
|
||||
// TODO(das): Once the columns are received, they will not be available in
|
||||
// the early attester cache. If someone does a query to us via RPC we
|
||||
// will get downscored.
|
||||
AvailableBlockData::DataColumnsRecv(_) => (None, None),
|
||||
};
|
||||
|
||||
let item = CacheItem {
|
||||
epoch,
|
||||
committee_lengths,
|
||||
beacon_block_root,
|
||||
source,
|
||||
target,
|
||||
block,
|
||||
block: block.block_cloned(),
|
||||
blobs,
|
||||
data_columns,
|
||||
proto_block,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::data_availability_checker::AvailableBlock;
|
||||
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
|
||||
use crate::{metrics, BeaconChain, BeaconChainTypes};
|
||||
use itertools::Itertools;
|
||||
use slog::debug;
|
||||
@@ -105,7 +105,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
let blob_batch_size = blocks_to_import
|
||||
.iter()
|
||||
.filter(|available_block| available_block.blobs().is_some())
|
||||
.filter(|available_block| available_block.has_blobs())
|
||||
.count()
|
||||
.saturating_mul(n_blob_ops_per_block);
|
||||
|
||||
@@ -114,14 +114,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
|
||||
let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot;
|
||||
|
||||
let mut blob_batch = Vec::with_capacity(blob_batch_size);
|
||||
let mut blob_batch = Vec::<KeyValueStoreOp>::with_capacity(blob_batch_size);
|
||||
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
|
||||
let mut hot_batch = Vec::with_capacity(blocks_to_import.len());
|
||||
let mut signed_blocks = Vec::with_capacity(blocks_to_import.len());
|
||||
|
||||
for available_block in blocks_to_import.into_iter().rev() {
|
||||
let (block_root, block, maybe_blobs, maybe_data_columns) =
|
||||
available_block.deconstruct();
|
||||
let (block_root, block, block_data) = available_block.deconstruct();
|
||||
|
||||
if block_root != expected_block_root {
|
||||
return Err(HistoricalBlockError::MismatchedBlockRoot {
|
||||
@@ -144,17 +143,26 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
);
|
||||
}
|
||||
|
||||
// Store the blobs too
|
||||
if let Some(blobs) = maybe_blobs {
|
||||
new_oldest_blob_slot = Some(block.slot());
|
||||
self.store
|
||||
.blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch);
|
||||
match &block_data {
|
||||
AvailableBlockData::NoData => {}
|
||||
AvailableBlockData::Blobs(..) => {
|
||||
new_oldest_blob_slot = Some(block.slot());
|
||||
}
|
||||
AvailableBlockData::DataColumns(_) | AvailableBlockData::DataColumnsRecv(_) => {
|
||||
new_oldest_data_column_slot = Some(block.slot());
|
||||
}
|
||||
}
|
||||
// Store the data columns too
|
||||
if let Some(data_columns) = maybe_data_columns {
|
||||
new_oldest_data_column_slot = Some(block.slot());
|
||||
self.store
|
||||
.data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch);
|
||||
|
||||
// Store the blobs or data columns too
|
||||
if let Some(op) = self
|
||||
.get_blobs_or_columns_store_op(block_root, block_data)
|
||||
.map_err(|e| {
|
||||
HistoricalBlockError::StoreError(StoreError::DBError {
|
||||
message: format!("get_blobs_or_columns_store_op error {e:?}"),
|
||||
})
|
||||
})?
|
||||
{
|
||||
blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?);
|
||||
}
|
||||
|
||||
// Store block roots, including at all skip slots in the freezer DB.
|
||||
|
||||
@@ -242,7 +242,7 @@ async fn produces_attestations() {
|
||||
.early_attester_cache
|
||||
.add_head_block(
|
||||
block_root,
|
||||
available_block,
|
||||
&available_block,
|
||||
proto_block,
|
||||
&state,
|
||||
&chain.spec,
|
||||
@@ -310,7 +310,7 @@ async fn early_attester_cache_old_request() {
|
||||
.early_attester_cache
|
||||
.add_head_block(
|
||||
head.beacon_block_root,
|
||||
available_block,
|
||||
&available_block,
|
||||
head_proto_block,
|
||||
&head.beacon_state,
|
||||
&harness.chain.spec,
|
||||
|
||||
@@ -2517,18 +2517,13 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
||||
|
||||
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
|
||||
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
|
||||
let mut batch_with_invalid_first_block = available_blocks.clone();
|
||||
let mut batch_with_invalid_first_block =
|
||||
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
||||
batch_with_invalid_first_block[0] = {
|
||||
let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct();
|
||||
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
|
||||
let mut corrupt_block = (*block).clone();
|
||||
*corrupt_block.signature_mut() = Signature::empty();
|
||||
AvailableBlock::__new_for_testing(
|
||||
block_root,
|
||||
Arc::new(corrupt_block),
|
||||
blobs,
|
||||
data_columns,
|
||||
Arc::new(spec),
|
||||
)
|
||||
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec))
|
||||
};
|
||||
|
||||
// Importing the invalid batch should error.
|
||||
@@ -2540,8 +2535,9 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
||||
));
|
||||
|
||||
// Importing the batch with valid signatures should succeed.
|
||||
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
||||
beacon_chain
|
||||
.import_historical_block_batch(available_blocks.clone())
|
||||
.import_historical_block_batch(available_blocks_dup)
|
||||
.unwrap();
|
||||
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
|
||||
|
||||
@@ -3690,3 +3686,7 @@ fn get_blocks(
|
||||
.map(|checkpoint| checkpoint.beacon_block_root.into())
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn clone_block<E: EthSpec>(block: &AvailableBlock<E>) -> AvailableBlock<E> {
|
||||
block.__clone_without_recv().unwrap()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user