mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-01 05:37:05 +00:00
nuke router
This commit is contained in:
@@ -23,9 +23,7 @@ use crate::data_availability_checker::{
|
||||
DataColumnReconstructionResult as DataColumnReconstructionResultV1,
|
||||
};
|
||||
|
||||
use crate::data_availability_router::{
|
||||
AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome,
|
||||
};
|
||||
use crate::data_availability_checker::DataAvailabilityChecker;
|
||||
use crate::data_column_verification::{
|
||||
GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn,
|
||||
GossipVerifiedPartialDataColumnHeader, KzgVerifiedCustodyPartialDataColumn,
|
||||
@@ -70,6 +68,7 @@ use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBid
|
||||
#[cfg(not(test))]
|
||||
use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream};
|
||||
use crate::pending_payload_cache::DataColumnReconstructionResult as DataColumnReconstructionResultV2;
|
||||
use crate::pending_payload_cache::{Availability as PayloadAvailability, PendingPayloadCache};
|
||||
use crate::pending_payload_envelopes::PendingPayloadEnvelopes;
|
||||
use crate::persisted_beacon_chain::PersistedBeaconChain;
|
||||
use crate::persisted_custody::persist_custody_context;
|
||||
@@ -503,9 +502,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
|
||||
/// The slot at which blocks are downloaded back to.
|
||||
pub genesis_backfill_slot: Slot,
|
||||
/// Provides a KZG verification and temporary storage for blocks and blobs as
|
||||
/// they are collected and combined.
|
||||
pub data_availability_checker: Arc<DataAvailabilityRouter<T>>,
|
||||
/// Provides KZG verification and temporary storage for pre-Gloas blocks and blobs.
|
||||
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
|
||||
/// Provides KZG verification and temporary storage for post-Gloas payload envelopes.
|
||||
pub pending_payload_cache: Arc<PendingPayloadCache<T>>,
|
||||
/// The KZG trusted setup used by this chain.
|
||||
pub kzg: Arc<Kzg>,
|
||||
/// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing.
|
||||
@@ -1183,10 +1183,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
indices: &[ColumnIndex],
|
||||
fork_name: ForkName,
|
||||
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
|
||||
let all_cached_columns_opt = self
|
||||
.data_availability_checker
|
||||
.get_data_columns(block_root, fork_name)
|
||||
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
|
||||
let all_cached_columns_opt = if fork_name.gloas_enabled() {
|
||||
self.pending_payload_cache.get_data_columns(block_root)
|
||||
} else {
|
||||
self.data_availability_checker.get_data_columns(block_root)
|
||||
}
|
||||
.or_else(|| self.early_attester_cache.get_data_columns(block_root));
|
||||
|
||||
if let Some(mut all_cached_columns) = all_cached_columns_opt {
|
||||
all_cached_columns.retain(|col| indices.contains(col.index()));
|
||||
@@ -2420,11 +2422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let _timer = metrics::start_timer(
|
||||
&metrics::PARTIAL_DATA_COLUMN_SIDECAR_HEADER_GOSSIP_VERIFICATION_TIMES,
|
||||
);
|
||||
let Some(assembler) = self
|
||||
.data_availability_checker
|
||||
.pending_block_cache()
|
||||
.partial_assembler()
|
||||
else {
|
||||
let Some(assembler) = self.data_availability_checker.partial_assembler() else {
|
||||
return Err(GossipPartialDataColumnError::PartialColumnsDisabled);
|
||||
};
|
||||
if let Some(cached_header) = assembler.get_header(&block_root) {
|
||||
@@ -3377,11 +3375,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Err(BlockError::DuplicateFullyImported(block_root));
|
||||
}
|
||||
|
||||
let Some(assembler) = self
|
||||
.data_availability_checker
|
||||
.pending_block_cache()
|
||||
.partial_assembler()
|
||||
else {
|
||||
let Some(assembler) = self.data_availability_checker.partial_assembler() else {
|
||||
// Partial messages are apparently not activated
|
||||
return Ok(None);
|
||||
};
|
||||
@@ -3417,16 +3411,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.map(|column| column.as_data_column()),
|
||||
);
|
||||
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_kzg_verified_custody_data_columns(
|
||||
block_root,
|
||||
slot,
|
||||
merge_result.full_columns.clone(),
|
||||
)?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await?
|
||||
if self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let availability = self
|
||||
.pending_payload_cache
|
||||
.put_kzg_verified_custody_data_columns(
|
||||
block_root,
|
||||
merge_result.full_columns.clone(),
|
||||
)?;
|
||||
self.process_payload_availability(slot, availability, || Ok(()))
|
||||
.await?
|
||||
} else {
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_kzg_verified_custody_data_columns(
|
||||
block_root,
|
||||
merge_result.full_columns.clone(),
|
||||
)?;
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await?
|
||||
}
|
||||
} else {
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root)
|
||||
};
|
||||
@@ -3540,10 +3547,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if let Some(event_handler) = self.event_handler.as_ref()
|
||||
&& event_handler.has_data_column_sidecar_subscribers()
|
||||
{
|
||||
let imported_data_columns = self
|
||||
.data_availability_checker
|
||||
.cached_data_column_indexes(block_root, slot)
|
||||
.unwrap_or_default();
|
||||
let imported_data_columns = if self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
self.pending_payload_cache
|
||||
.cached_data_column_indexes(block_root)
|
||||
} else {
|
||||
self.data_availability_checker
|
||||
.cached_data_column_indexes(block_root)
|
||||
}
|
||||
.unwrap_or_default();
|
||||
let new_data_columns =
|
||||
data_columns_iter.filter(|b| !imported_data_columns.contains(b.index()));
|
||||
|
||||
@@ -3636,80 +3651,73 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let data_availability_checker = self.data_availability_checker.clone();
|
||||
let is_gloas = self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled();
|
||||
|
||||
let result = self
|
||||
.task_executor
|
||||
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
|
||||
data_availability_checker.reconstruct_data_columns(&block_root, slot)
|
||||
})
|
||||
.await
|
||||
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
|
||||
if is_gloas {
|
||||
let pending_payload_cache = self.pending_payload_cache.clone();
|
||||
let result = self
|
||||
.task_executor
|
||||
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
|
||||
pending_payload_cache.reconstruct_data_columns(&block_root)
|
||||
})
|
||||
.await
|
||||
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
|
||||
|
||||
match result {
|
||||
ReconstructionOutcome::Block(data_column_reconstruction_result) => {
|
||||
match data_column_reconstruction_result {
|
||||
DataColumnReconstructionResultV1::Success((
|
||||
availability,
|
||||
data_columns_to_publish,
|
||||
)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
|
||||
return Ok(None);
|
||||
};
|
||||
match result {
|
||||
DataColumnReconstructionResultV2::Success((
|
||||
availability,
|
||||
data_columns_to_publish,
|
||||
)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.process_availability(
|
||||
slot,
|
||||
AvailabilityOutcome::Block(availability),
|
||||
|| Ok(()),
|
||||
)
|
||||
self.process_payload_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
.map(|availability_processing_status| {
|
||||
Some((availability_processing_status, data_columns_to_publish))
|
||||
})
|
||||
}
|
||||
DataColumnReconstructionResultV1::NotStarted(reason)
|
||||
| DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => {
|
||||
// We use metric here because logging this would be *very* noisy.
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
.map(|status| Some((status, data_columns_to_publish)))
|
||||
}
|
||||
DataColumnReconstructionResultV2::NotStarted(reason)
|
||||
| DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => {
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
// TODO(gloas) handle data column reconstruction for gloas.
|
||||
ReconstructionOutcome::Payload(data_column_reconstruction_result) => {
|
||||
match data_column_reconstruction_result {
|
||||
DataColumnReconstructionResultV2::Success((
|
||||
availability,
|
||||
data_columns_to_publish,
|
||||
)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
|
||||
return Ok(None);
|
||||
};
|
||||
} else {
|
||||
let pending_block_cache = self.data_availability_checker.clone();
|
||||
let result = self
|
||||
.task_executor
|
||||
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
|
||||
pending_block_cache.reconstruct_data_columns(&block_root)
|
||||
})
|
||||
.await
|
||||
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
|
||||
|
||||
self.process_availability(
|
||||
slot,
|
||||
AvailabilityOutcome::Payload(availability),
|
||||
|| Ok(()),
|
||||
)
|
||||
match result {
|
||||
DataColumnReconstructionResultV1::Success((
|
||||
availability,
|
||||
data_columns_to_publish,
|
||||
)) => {
|
||||
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
.map(|availability_processing_status| {
|
||||
Some((availability_processing_status, data_columns_to_publish))
|
||||
})
|
||||
}
|
||||
DataColumnReconstructionResultV2::NotStarted(reason)
|
||||
| DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => {
|
||||
// We use metric here because logging this would be *very* noisy.
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
.map(|status| Some((status, data_columns_to_publish)))
|
||||
}
|
||||
DataColumnReconstructionResultV1::NotStarted(reason)
|
||||
| DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => {
|
||||
metrics::inc_counter_vec(
|
||||
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
|
||||
&[reason],
|
||||
);
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3912,8 +3920,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
let slot = block.block.slot();
|
||||
let availability =
|
||||
AvailabilityOutcome::Block(self.data_availability_checker.put_executed_block(block)?);
|
||||
let availability = self.data_availability_checker.put_executed_block(block)?;
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
@@ -3928,10 +3935,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
if let Some(slasher) = self.slasher.as_ref() {
|
||||
slasher.accept_block_header(blob.signed_block_header());
|
||||
}
|
||||
let availability = AvailabilityOutcome::Block(
|
||||
self.data_availability_checker
|
||||
.put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?,
|
||||
);
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -3958,12 +3964,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}
|
||||
}
|
||||
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
|
||||
|
||||
self.process_availability(slot, availability, publish_fn)
|
||||
.await
|
||||
if self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let availability = self
|
||||
.pending_payload_cache
|
||||
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
|
||||
self.process_payload_availability(slot, availability, publish_fn)
|
||||
.await
|
||||
} else {
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
|
||||
self.process_availability(slot, availability, publish_fn)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
fn check_blob_header_signature_and_slashability<'a>(
|
||||
@@ -4008,10 +4025,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root,
|
||||
blobs.iter().flatten().map(Arc::as_ref),
|
||||
)?;
|
||||
let availability = AvailabilityOutcome::Block(
|
||||
self.data_availability_checker
|
||||
.put_rpc_blobs(block_root, blobs)?,
|
||||
);
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_rpc_blobs(block_root, blobs)?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
@@ -4023,7 +4039,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
block_root: Hash256,
|
||||
engine_get_blobs_output: EngineGetBlobsOutput<T>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
let availability = match engine_get_blobs_output {
|
||||
match engine_get_blobs_output {
|
||||
EngineGetBlobsOutput::Blobs(blobs) => {
|
||||
self.check_blob_header_signature_and_slashability(
|
||||
block_root,
|
||||
@@ -4033,7 +4049,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.data_availability_checker
|
||||
.put_kzg_verified_blobs(block_root, blobs)?;
|
||||
|
||||
AvailabilityOutcome::Block(availability)
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
|
||||
// TODO(gloas) verify that this check is no longer relevant for gloas
|
||||
@@ -4046,13 +4063,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
_ => None,
|
||||
}),
|
||||
)?;
|
||||
self.data_availability_checker
|
||||
.put_kzg_verified_custody_data_columns(block_root, slot, data_columns)?
|
||||
if self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let availability = self
|
||||
.pending_payload_cache
|
||||
.put_kzg_verified_custody_data_columns(block_root, data_columns)?;
|
||||
self.process_payload_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
} else {
|
||||
let availability = self
|
||||
.data_availability_checker
|
||||
.put_kzg_verified_custody_data_columns(block_root, data_columns)?;
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the provided columns can make any cached blocks available, and imports immediately
|
||||
@@ -4072,16 +4101,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
}),
|
||||
)?;
|
||||
|
||||
// This slot value is purely informative for the consumers of
|
||||
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
|
||||
let availability = self.data_availability_checker.put_rpc_custody_columns(
|
||||
block_root,
|
||||
slot,
|
||||
custody_columns,
|
||||
)?;
|
||||
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
if self
|
||||
.spec
|
||||
.fork_name_at_slot::<T::EthSpec>(slot)
|
||||
.gloas_enabled()
|
||||
{
|
||||
let availability = self.pending_payload_cache.put_rpc_custody_columns(
|
||||
block_root,
|
||||
slot,
|
||||
custody_columns,
|
||||
)?;
|
||||
self.process_payload_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
} else {
|
||||
let availability = self.data_availability_checker.put_rpc_custody_columns(
|
||||
block_root,
|
||||
slot,
|
||||
custody_columns,
|
||||
)?;
|
||||
self.process_availability(slot, availability, || Ok(()))
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
fn check_data_column_sidecar_header_signature_and_slashability<'a>(
|
||||
@@ -4124,25 +4164,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
async fn process_availability(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
availability: AvailabilityOutcome<T::EthSpec>,
|
||||
availability: BlockAvailability<T::EthSpec>,
|
||||
publish_fn: impl FnOnce() -> Result<(), BlockError>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
match availability {
|
||||
AvailabilityOutcome::Block(availability) => {
|
||||
match availability {
|
||||
BlockAvailability::Available(block) => {
|
||||
publish_fn()?;
|
||||
// Block is fully available, import into fork choice
|
||||
self.import_available_block(block).await
|
||||
}
|
||||
BlockAvailability::MissingComponents(block_root) => Ok(
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
|
||||
),
|
||||
}
|
||||
BlockAvailability::Available(block) => {
|
||||
publish_fn()?;
|
||||
self.import_available_block(block).await
|
||||
}
|
||||
AvailabilityOutcome::Payload(_) => {
|
||||
Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string()))
|
||||
},
|
||||
BlockAvailability::MissingComponents(block_root) => Ok(
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_payload_availability(
|
||||
self: &Arc<Self>,
|
||||
slot: Slot,
|
||||
availability: PayloadAvailability<T::EthSpec>,
|
||||
publish_fn: impl FnOnce() -> Result<(), BlockError>,
|
||||
) -> Result<AvailabilityProcessingStatus, BlockError> {
|
||||
match availability {
|
||||
PayloadAvailability::Available(available_envelope) => {
|
||||
publish_fn()?;
|
||||
self.import_available_execution_payload_envelope(available_envelope)
|
||||
.await
|
||||
.map_err(|e| BlockError::InternalError(e.to_string()))
|
||||
}
|
||||
PayloadAvailability::MissingComponents(block_root) => Ok(
|
||||
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user