mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-31 21:27:12 +00:00
Add da router, and initial logic
This commit is contained in:
@@ -25,6 +25,10 @@ use crate::data_availability_checker::{
|
||||
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
|
||||
DataAvailabilityChecker, DataColumnReconstructionResult,
|
||||
};
|
||||
use crate::data_availability_checker_v2::DataAvailabilityChecker as DataAvailabilityCheckerV2;
|
||||
use crate::data_column_availability_cache::{
|
||||
AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome,
|
||||
};
|
||||
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use crate::early_attester_cache::EarlyAttesterCache;
|
||||
use crate::errors::{BeaconChainError as Error, BlockProductionError};
|
||||
@@ -476,7 +480,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub genesis_backfill_slot: Slot,
|
||||
/// Provides a KZG verification and temporary storage for blocks and blobs as
|
||||
/// they are collected and combined.
|
||||
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
|
||||
pub data_availability_checker:
|
||||
Arc<DataAvailabilityRouter<T, DataAvailabilityChecker<T>, DataAvailabilityCheckerV2<T>>>,
|
||||
/// The KZG trusted setup used by this chain.
|
||||
pub kzg: Arc<Kzg>,
|
||||
/// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing.
|
||||
@@ -1123,10 +1128,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
indices: &[ColumnIndex],
|
||||
fork_name: ForkName,
|
||||
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
|
||||
let all_cached_columns_opt = self
|
||||
.data_availability_checker
|
||||
.get_data_columns(block_root)
|
||||
.get_data_columns(block_root, fork_name)
|
||||
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
|
||||
|
||||
if let Some(mut all_cached_columns) = all_cached_columns_opt {
|
||||
@@ -1286,7 +1292,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
|
||||
/// processing attempts.
|
||||
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
|
||||
if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) {
|
||||
if let Some(cached_block) = self
|
||||
.data_availability_checker
|
||||
.v1()
|
||||
.get_cached_block(block_root)
|
||||
{
|
||||
return cached_block;
|
||||
}
|
||||
|
||||
@@ -3060,6 +3070,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
self.emit_sse_data_column_sidecar_events(
|
||||
slot,
|
||||
&block_root,
|
||||
data_columns.iter().map(|column| column.as_data_column()),
|
||||
);
|
||||
@@ -3136,6 +3147,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
EngineGetBlobsOutput::CustodyColumns(columns) => {
|
||||
self.emit_sse_data_column_sidecar_events(
|
||||
slot,
|
||||
&block_root,
|
||||
columns.iter().map(|column| column.as_data_column()),
|
||||
);
|
||||
@@ -3155,6 +3167,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
{
|
||||
let imported_blobs = self
|
||||
.data_availability_checker
|
||||
.v1()
|
||||
.cached_blob_indexes(block_root)
|
||||
.unwrap_or_default();
|
||||
let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index));
|
||||
@@ -3169,6 +3182,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
fn emit_sse_data_column_sidecar_events<'a, I>(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
block_root: &Hash256,
|
||||
data_columns_iter: I,
|
||||
) where
|
||||
@@ -3179,7 +3193,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
{
|
||||
let imported_data_columns = self
|
||||
.data_availability_checker
|
||||
.cached_data_column_indexes(block_root)
|
||||
.cached_data_column_indexes(block_root, slot)
|
||||
.unwrap_or_default();
|
||||
let new_data_columns =
|
||||
data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index));
|
||||
@@ -3232,6 +3246,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
|
||||
self.emit_sse_data_column_sidecar_events(
|
||||
slot,
|
||||
&block_root,
|
||||
custody_columns.iter().map(|column| column.as_ref()),
|
||||
);
|
||||
@@ -3242,6 +3257,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
|
||||
pub async fn reconstruct_data_columns(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
block_root: Hash256,
|
||||
) -> Result<
|
||||
Option<(
|
||||
@@ -3268,33 +3284,45 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.task_executor
|
||||
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
|
||||
let _guard = current_span.enter();
|
||||
data_availability_checker.reconstruct_data_columns(&block_root)
|
||||
data_availability_checker.reconstruct_data_columns(&block_root, slot)
|
||||
})
|
||||
.await
|
||||
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
|
||||
|
||||
match result {
|
||||
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
|
||||
return Ok(None);
|
||||
};
|
||||
ReconstructionOutcome::Block(data_column_reconstruction_result) => {
|
||||
match data_column_reconstruction_result {
|
||||
DataColumnReconstructionResult::Success((
|
||||
availability,
|
||||
data_columns_to_publish,
|
||||
)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
.map(|availability_processing_status| {
|
||||
Some((availability_processing_status, data_columns_to_publish))
|
||||
})
|
||||
}
|
||||
DataColumnReconstructionResult::NotStarted(reason)
|
||||
| DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => {
|
||||
// We use metric here because logging this would be *very* noisy.
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
self.process_availability(
|
||||
slot,
|
||||
AvailabilityOutcome::Block(availability),
|
||||
|| Ok(()),
|
||||
)
|
||||
.await
|
||||
.map(|availability_processing_status| {
|
||||
Some((availability_processing_status, data_columns_to_publish))
|
||||
})
|
||||
}
|
||||
DataColumnReconstructionResult::NotStarted(reason)
|
||||
| DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => {
|
||||
// We use metric here because logging this would be *very* noisy.
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
ReconstructionOutcome::Payload(_data_column_reconstruction_result) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3343,11 +3371,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
);
|
||||
}
|
||||
|
||||
self.data_availability_checker.put_pre_execution_block(
|
||||
block_root,
|
||||
unverified_block.block_cloned(),
|
||||
block_source,
|
||||
)?;
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.put_pre_execution_block(block_root, unverified_block.block_cloned(), block_source)?;
|
||||
|
||||
// Start the Prometheus timer.
|
||||
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
|
||||
@@ -3382,6 +3408,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
// chain to get stuck temporarily if the block is canonical. Therefore we remove
|
||||
// it from the cache if execution fails.
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.remove_block_on_execution_error(&block_root);
|
||||
})?;
|
||||
|
||||
@@ -3509,7 +3536,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
let slot = block.block.slot();
|
||||
let availability = self.data_availability_checker.put_executed_block(block)?;
|
||||
let availability = AvailabilityOutcome::Block(
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.put_executed_block(block)?,
|
||||
);
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
@@ -3524,9 +3555,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if let Some(slasher) = self.slasher.as_ref() {
|
||||
slasher.accept_block_header(blob.signed_block_header());
|
||||
}
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?;
|
||||
let availability = AvailabilityOutcome::Block(
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?,
|
||||
);
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -3597,9 +3630,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root,
|
||||
blobs.iter().flatten().map(Arc::as_ref),
|
||||
)?;
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_rpc_blobs(block_root, blobs)?;
|
||||
let availability = AvailabilityOutcome::Block(
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.put_rpc_blobs(block_root, blobs)?,
|
||||
);
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -3617,8 +3652,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root,
|
||||
blobs.iter().map(|b| b.as_blob()),
|
||||
)?;
|
||||
self.data_availability_checker
|
||||
.put_kzg_verified_blobs(block_root, blobs)?
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.v1()
|
||||
.put_kzg_verified_blobs(block_root, blobs)?;
|
||||
|
||||
AvailabilityOutcome::Block(availability)
|
||||
}
|
||||
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
|
||||
self.check_data_column_sidecar_header_signature_and_slashability(
|
||||
@@ -3626,7 +3665,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
data_columns.iter().map(|c| c.as_data_column()),
|
||||
)?;
|
||||
self.data_availability_checker
|
||||
.put_kzg_verified_custody_data_columns(block_root, data_columns)?
|
||||
.put_kzg_verified_custody_data_columns(block_root, slot, data_columns)?
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3699,18 +3738,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
async fn process_availability(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
availability: Availability<T::EthSpec>,
|
||||
availability: AvailabilityOutcome<T::EthSpec>,
|
||||
publish_fn: impl FnOnce() -> Result<(), BlockError>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
match availability {
|
||||
Availability::Available(block) => {
|
||||
publish_fn()?;
|
||||
// Block is fully available, import into fork choice
|
||||
self.import_available_block(block).await
|
||||
AvailabilityOutcome::Block(availability) => {
|
||||
match availability {
|
||||
Availability::Available(block) => {
|
||||
publish_fn()?;
|
||||
// Block is fully available, import into fork choice
|
||||
self.import_available_block(block).await
|
||||
}
|
||||
Availability::MissingComponents(block_root) => Ok(
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
|
||||
),
|
||||
}
|
||||
}
|
||||
Availability::MissingComponents(block_root) => Ok(
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
|
||||
),
|
||||
AvailabilityOutcome::Payload(_availability) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7300,12 +7344,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// The epoch at which we require a data availability check in block processing.
|
||||
/// `None` if the `Deneb` fork is disabled.
|
||||
pub fn data_availability_boundary(&self) -> Option<Epoch> {
|
||||
self.data_availability_checker.data_availability_boundary()
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.data_availability_boundary()
|
||||
}
|
||||
|
||||
/// Returns true if epoch is within the data availability boundary
|
||||
pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool {
|
||||
self.data_availability_checker
|
||||
.v1()
|
||||
.da_check_required_for_epoch(epoch)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user