This commit is contained in:
dapplion
2026-04-30 13:39:05 +02:00
parent bd8cfa35f4
commit 0b7397eb4e
16 changed files with 116 additions and 206 deletions

View File

@@ -32,7 +32,7 @@ use crate::data_column_verification::{
};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::envelope_times_cache::EnvelopeTimesCache;
use crate::errors::{BeaconChainError as Error, BlockOrEnvelopeError, BlockProductionError};
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::events::ServerSentEventHandler;
use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload};
use crate::fetch_blobs::EngineGetBlobsOutput;
@@ -67,7 +67,6 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage;
use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache;
#[cfg(not(test))]
use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream};
use crate::payload_envelope_verification::EnvelopeError;
use crate::pending_payload_cache::PendingPayloadCache;
use crate::pending_payload_cache::{
Availability as PayloadAvailability,
@@ -3304,7 +3303,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
@@ -3313,8 +3312,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
)
.into());
));
};
let is_gloas = self
@@ -3331,7 +3329,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root).into());
return Err(BlockError::DuplicateFullyImported(block_root));
}
self.emit_sse_data_column_sidecar_events(
@@ -3357,7 +3355,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
verified_partial: KzgVerifiedPartialDataColumn<T::EthSpec>,
verified_header: GossipVerifiedPartialDataColumnHeader<T::EthSpec>,
slot: Slot,
) -> Result<ProcessedPartialColumnStatus<T::EthSpec>, BlockOrEnvelopeError> {
) -> Result<ProcessedPartialColumnStatus<T::EthSpec>, BlockError> {
let block_root = verified_partial.block_root();
let partial = verified_partial.as_data_column();
let index_str = partial.index.to_string();
@@ -3382,7 +3380,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root).into());
return Err(BlockError::DuplicateFullyImported(block_root));
}
let Some(assembler) = self.data_availability_checker.partial_assembler() else {
@@ -3426,11 +3424,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
let Some(bid) =
load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)?
else {
return Err(EnvelopeError::BlockRootUnknown { block_root }.into());
};
let bid = load_gloas_payload_bid(block_root, self)?
.ok_or(BlockError::EnvelopeBlockRootUnknown { block_root })?;
let availability = self
.pending_payload_cache
.put_kzg_verified_custody_data_columns(
@@ -3438,7 +3433,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
bid,
merge_result.full_columns.clone(),
)
.map_err(EnvelopeError::from)?;
.map_err(BlockError::from)?;
self.process_payload_availability(slot, availability, || Ok(()))
.await?
} else {
@@ -3505,7 +3500,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
engine_get_blobs_output: EngineGetBlobsOutput<T>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
@@ -3513,7 +3508,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root).into());
return Err(BlockError::DuplicateFullyImported(block_root));
}
match &engine_get_blobs_output {
@@ -3594,7 +3589,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let Ok((slot, block_root)) = custody_columns
.iter()
.map(|c| (c.slot(), c.block_root()))
@@ -3603,8 +3598,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
)
.into());
));
};
// If this block has already been imported to forkchoice it must have been available, so
@@ -3616,7 +3610,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root).into());
return Err(BlockError::DuplicateFullyImported(block_root));
}
// Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data
@@ -3635,7 +3629,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&parent_root)
{
return Err(BlockError::ParentUnknown { parent_root }.into());
return Err(BlockError::ParentUnknown { parent_root });
}
self.emit_sse_data_column_sidecar_events(
@@ -3644,9 +3638,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
custody_columns.iter().map(|column| column.as_ref()),
);
Ok(self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await?)
self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await
}
pub async fn reconstruct_data_columns(
@@ -3658,7 +3651,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
AvailabilityProcessingStatus,
DataColumnSidecarList<T::EthSpec>,
)>,
BlockOrEnvelopeError,
BlockError,
> {
// As of now we only reconstruct data columns on supernodes, so if the block is already
// available on a supernode, there's no need to reconstruct as the node must already have
@@ -3677,11 +3670,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.gloas_enabled();
if is_gloas {
let Some(bid) =
load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)?
else {
return Err(EnvelopeError::BlockRootUnknown { block_root }.into());
};
let bid = load_gloas_payload_bid(block_root, self)?
.ok_or(BlockError::EnvelopeBlockRootUnknown { block_root })?;
let pending_payload_cache = self.pending_payload_cache.clone();
let result = self
.task_executor
@@ -3689,8 +3679,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pending_payload_cache.reconstruct_data_columns(&block_root, bid)
})
.await
.map_err(|_| EnvelopeError::from(BeaconChainError::RuntimeShutdown))?
.map_err(EnvelopeError::from)?;
.map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))?
.map_err(BlockError::from)?;
match result {
DataColumnReconstructionResultGloas::Success((
@@ -3991,7 +3981,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
if let Some(slasher) = self.slasher.as_ref() {
for data_column in &data_columns {
// TODO(gloas) different gossip checks in gloas
@@ -4007,25 +3997,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
let Some(bid) =
load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)?
else {
return Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
));
};
let bid = load_gloas_payload_bid(block_root, self)?
.ok_or(BlockError::EnvelopeBlockRootUnknown { block_root })?;
let availability = self
.pending_payload_cache
.put_gossip_verified_data_columns(block_root, bid, data_columns)
.map_err(EnvelopeError::from)?;
.put_gossip_verified_data_columns(block_root, bid, data_columns)?;
Ok(self
.process_payload_availability(slot, availability, publish_fn)
.await?)
} else {
let availability = self
.data_availability_checker
.put_gossip_verified_data_columns(block_root, slot, data_columns)
.map_err(BlockError::from)?;
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
Ok(self
.process_availability(slot, availability, publish_fn)
.await?)
@@ -4088,7 +4071,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
engine_get_blobs_output: EngineGetBlobsOutput<T>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
match engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blob_header_signature_and_slashability(
@@ -4120,17 +4103,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
let Some(bid) =
load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)?
else {
return Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
));
};
let bid = load_gloas_payload_bid(block_root, self)?
.ok_or(BlockError::EnvelopeBlockRootUnknown { block_root })?;
let availability = self
.pending_payload_cache
.put_kzg_verified_custody_data_columns(block_root, bid, data_columns)
.map_err(EnvelopeError::from)?;
.map_err(BlockError::from)?;
Ok(self
.process_payload_availability(slot, availability, || Ok(()))
.await?)
@@ -4169,23 +4147,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
let Some(bid) = load_gloas_payload_bid(block_root, self).map_err(BlockError::from)?
else {
return Ok(AvailabilityProcessingStatus::MissingComponents(
slot, block_root,
));
};
let bid = load_gloas_payload_bid(block_root, self)?
.ok_or(BlockError::EnvelopeBlockRootUnknown { block_root })?;
let availability = self
.pending_payload_cache
.put_rpc_custody_columns(block_root, bid, custody_columns)
.map_err(BlockError::from)?;
Ok(self
.process_payload_availability(slot, availability, || Ok(()))
.await
.map_err(|error| match error {
EnvelopeError::BlockError(error) => error,
error => BlockError::InternalError(error.to_string()),
})?)
.await?)
} else {
let availability = self
.data_availability_checker
@@ -4256,7 +4226,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
availability: PayloadAvailability<T::EthSpec>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, EnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
PayloadAvailability::Available(available_envelope) => {
publish_fn()?;