error handling and wiring up

This commit is contained in:
Daniel Knopik
2026-04-29 15:35:31 +02:00
parent 58fd3dde40
commit 2d3354551e
15 changed files with 132 additions and 84 deletions

View File

@@ -3275,7 +3275,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_root = blob.block_root();
// If this block has already been imported to forkchoice it must have been available, so
@@ -3285,12 +3285,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(blob.block_root()).into());
return Err(BlockError::DuplicateFullyImported(blob.block_root()));
}
// No need to process and import blobs beyond the PeerDAS epoch.
if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) {
return Err(BlockError::BlobNotRequired(blob.slot()).into());
return Err(BlockError::BlobNotRequired(blob.slot()));
}
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
@@ -3456,7 +3456,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
@@ -3464,7 +3464,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(block_root).into());
return Err(BlockError::DuplicateFullyImported(block_root));
}
// Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data
@@ -3479,7 +3479,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&parent_root)
{
return Err(BlockError::ParentUnknown { parent_root }.into());
return Err(BlockError::ParentUnknown { parent_root });
}
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
@@ -3684,9 +3684,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(None);
};
self.process_payload_availability(slot, availability, || Ok(()))
Ok(self
.process_payload_availability(slot, availability, || Ok(()))
.await
.map(|status| Some((status, data_columns_to_publish)))
.map(|status| Some((status, data_columns_to_publish)))?)
}
DataColumnReconstructionResultV2::NotStarted(reason)
| DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => {
@@ -3698,14 +3699,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
} else {
let pending_block_cache = self.data_availability_checker.clone();
let data_availability_checker = self.data_availability_checker.clone();
let result = self
.task_executor
.spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || {
pending_block_cache.reconstruct_data_columns(&block_root)
data_availability_checker.reconstruct_data_columns(&block_root)
})
.await
.map_err(|_| BeaconChainError::RuntimeShutdown)??;
.map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))?
.map_err(BlockError::from)?;
match result {
DataColumnReconstructionResultV1::Success((
@@ -3716,9 +3718,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(None);
};
self.process_availability(slot, availability, || Ok(()))
Ok(self
.process_availability(slot, availability, || Ok(()))
.await
.map(|status| Some((status, data_columns_to_publish)))
.map(|status| Some((status, data_columns_to_publish)))?)
}
DataColumnReconstructionResultV1::NotStarted(reason)
| DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => {
@@ -3939,7 +3942,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = blob.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(blob.signed_block_header());
@@ -3980,15 +3983,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
let availability = self
.pending_payload_cache
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
self.process_payload_availability(slot, availability, publish_fn)
.await
.put_gossip_verified_data_columns(block_root, slot, data_columns)
.map_err(EnvelopeError::from)?;
Ok(self
.process_payload_availability(slot, availability, publish_fn)
.await?)
} else {
let availability = self
.data_availability_checker
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
self.process_availability(slot, availability, publish_fn)
.await
.put_gossip_verified_data_columns(block_root, slot, data_columns)
.map_err(BlockError::from)?;
Ok(self
.process_availability(slot, availability, publish_fn)
.await?)
}
}
@@ -4029,7 +4036,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockOrEnvelopeError> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blob_header_signature_and_slashability(
block_root,
blobs.iter().flatten().map(Arc::as_ref),
@@ -4057,10 +4064,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)?;
let availability = self
.data_availability_checker
.put_kzg_verified_blobs(block_root, blobs)?;
.put_kzg_verified_blobs(block_root, blobs)
.map_err(BlockError::from)?;
self.process_availability(slot, availability, || Ok(()))
.await
Ok(self
.process_availability(slot, availability, || Ok(()))
.await?)
}
EngineGetBlobsOutput::CustodyColumns(data_columns) => {
// TODO(gloas) verify that this check is no longer relevant for gloas
@@ -4080,15 +4089,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
{
let availability = self
.pending_payload_cache
.put_kzg_verified_custody_data_columns(block_root, data_columns)?;
self.process_payload_availability(slot, availability, || Ok(()))
.await
.put_kzg_verified_custody_data_columns(block_root, data_columns)
.map_err(EnvelopeError::from)?;
Ok(self
.process_payload_availability(slot, availability, || Ok(()))
.await?)
} else {
let availability = self
.data_availability_checker
.put_kzg_verified_custody_data_columns(block_root, data_columns)?;
self.process_availability(slot, availability, || Ok(()))
.await
.put_kzg_verified_custody_data_columns(block_root, data_columns)
.map_err(BlockError::from)?;
Ok(self
.process_availability(slot, availability, || Ok(()))
.await?)
}
}
}
@@ -4116,21 +4129,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_name_at_slot::<T::EthSpec>(slot)
.gloas_enabled()
{
let availability = self.pending_payload_cache.put_rpc_custody_columns(
block_root,
slot,
custody_columns,
)?;
self.process_payload_availability(slot, availability, || Ok(()))
.await
let availability = self
.pending_payload_cache
.put_rpc_custody_columns(block_root, slot, custody_columns)
.map_err(EnvelopeError::from)?;
Ok(self
.process_payload_availability(slot, availability, || Ok(()))
.await?)
} else {
let availability = self.data_availability_checker.put_rpc_custody_columns(
block_root,
slot,
custody_columns,
)?;
self.process_availability(slot, availability, || Ok(()))
.await
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, slot, custody_columns)
.map_err(BlockError::from)?;
Ok(self
.process_availability(slot, availability, || Ok(()))
.await?)
}
}