diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ac74875398..3e776f17fd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -32,7 +32,7 @@ use crate::data_column_verification::{ }; use crate::early_attester_cache::EarlyAttesterCache; use crate::envelope_times_cache::EnvelopeTimesCache; -use crate::errors::{BeaconChainError as Error, BlockProductionError}; +use crate::errors::{BeaconChainError as Error, BlockOrEnvelopeError, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; use crate::fetch_blobs::EngineGetBlobsOutput; @@ -67,8 +67,12 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; -use crate::pending_payload_cache::DataColumnReconstructionResult as DataColumnReconstructionResultV2; -use crate::pending_payload_cache::{Availability as PayloadAvailability, PendingPayloadCache}; +use crate::payload_envelope_verification::EnvelopeError; +use crate::pending_payload_cache::PendingPayloadCache; +use crate::pending_payload_cache::{ + Availability as PayloadAvailability, + DataColumnReconstructionResult as DataColumnReconstructionResultV2, +}; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -3271,7 +3275,7 @@ impl BeaconChain { pub async fn process_gossip_blob( self: &Arc, blob: GossipVerifiedBlob, - ) -> Result { + ) -> Result { let block_root = blob.block_root(); // If this block has already been imported to forkchoice it must have been available, so @@ -3281,12 +3285,12 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(blob.block_root())); + return Err(BlockError::DuplicateFullyImported(blob.block_root()).into()); } // No need to process and import blobs beyond the PeerDAS epoch. if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) { - return Err(BlockError::BlobNotRequired(blob.slot())); + return Err(BlockError::BlobNotRequired(blob.slot()).into()); } self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob())); @@ -3302,7 +3306,7 @@ impl BeaconChain { self: &Arc, data_columns: Vec>, publish_fn: impl FnOnce() -> Result<(), BlockError>, - ) -> Result { + ) -> Result { let Ok((slot, block_root)) = data_columns .iter() .map(|c| (c.slot(), c.block_root())) @@ -3311,7 +3315,8 @@ impl BeaconChain { else { return Err(BlockError::InternalError( "Columns should be from the same block".to_string(), - )); + ) + .into()); }; // If this block has already been imported to forkchoice it must have been available, so @@ -3321,7 +3326,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root).into()); } self.emit_sse_data_column_sidecar_events( @@ -3347,7 +3352,7 @@ impl BeaconChain { verified_partial: KzgVerifiedPartialDataColumn, verified_header: GossipVerifiedPartialDataColumnHeader, slot: Slot, - ) -> Result, BlockError> { + ) -> Result, BlockOrEnvelopeError> { let block_root = verified_partial.block_root(); let partial = verified_partial.as_data_column(); let index_str = partial.index.to_string(); @@ -3372,7 +3377,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root).into()); } let Some(assembler) = self.data_availability_checker.partial_assembler() else { @@ -3421,7 +3426,8 @@ impl BeaconChain { .put_kzg_verified_custody_data_columns( block_root, merge_result.full_columns.clone(), - )?; + ) + .map_err(EnvelopeError::from)?; self.process_payload_availability(slot, availability, || Ok(())) .await? } else { @@ -3430,7 +3436,8 @@ impl BeaconChain { .put_kzg_verified_custody_data_columns( block_root, merge_result.full_columns.clone(), - )?; + ) + .map_err(BlockError::from)?; self.process_availability(slot, availability, || Ok(())) .await? } @@ -3449,7 +3456,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, blobs: FixedBlobSidecarList, - ) -> Result { + ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. if self @@ -3457,7 +3464,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root).into()); } // Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data @@ -3472,7 +3479,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { - return Err(BlockError::ParentUnknown { parent_root }); + return Err(BlockError::ParentUnknown { parent_root }.into()); } self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); @@ -3487,7 +3494,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, - ) -> Result { + ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. if self @@ -3495,7 +3502,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root).into()); } match &engine_get_blobs_output { @@ -3576,7 +3583,7 @@ impl BeaconChain { pub async fn process_rpc_custody_columns( self: &Arc, custody_columns: DataColumnSidecarList, - ) -> Result { + ) -> Result { let Ok((slot, block_root)) = custody_columns .iter() .map(|c| (c.slot(), c.block_root())) @@ -3585,7 +3592,8 @@ impl BeaconChain { else { return Err(BlockError::InternalError( "Columns should be from the same block".to_string(), - )); + ) + .into()); }; // If this block has already been imported to forkchoice it must have been available, so @@ -3597,7 +3605,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root).into()); } // Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data @@ -3616,7 +3624,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { - return Err(BlockError::ParentUnknown { parent_root }); + return Err(BlockError::ParentUnknown { parent_root }.into()); } self.emit_sse_data_column_sidecar_events( @@ -3638,7 +3646,7 @@ impl BeaconChain { AvailabilityProcessingStatus, DataColumnSidecarList, )>, - BlockError, + BlockOrEnvelopeError, > { // As of now we only reconstruct data columns on supernodes, so if the block is already // available on a supernode, there's no need to reconstruct as the node must already have @@ -3664,7 +3672,8 @@ impl BeaconChain { pending_payload_cache.reconstruct_data_columns(&block_root) }) .await - .map_err(|_| BeaconChainError::RuntimeShutdown)??; + .map_err(|_| EnvelopeError::from(BeaconChainError::RuntimeShutdown))? + .map_err(EnvelopeError::from)?; match result { DataColumnReconstructionResultV2::Success(( @@ -3930,7 +3939,7 @@ impl BeaconChain { async fn check_gossip_blob_availability_and_import( self: &Arc, blob: GossipVerifiedBlob, - ) -> Result { + ) -> Result { let slot = blob.slot(); if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(blob.signed_block_header()); @@ -3953,7 +3962,7 @@ impl BeaconChain { block_root: Hash256, data_columns: Vec>, publish_fn: impl FnOnce() -> Result<(), BlockError>, - ) -> Result { + ) -> Result { if let Some(slasher) = self.slasher.as_ref() { for data_column in &data_columns { // TODO(gloas) different gossip checks in gloas @@ -4020,14 +4029,15 @@ impl BeaconChain { slot: Slot, block_root: Hash256, blobs: FixedBlobSidecarList, - ) -> Result { + ) -> Result { self.check_blob_header_signature_and_slashability( block_root, blobs.iter().flatten().map(Arc::as_ref), )?; let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + .put_rpc_blobs(block_root, blobs) + .map_err(BlockError::from)?; self.process_availability(slot, availability, || Ok(())) .await @@ -4038,7 +4048,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, - ) -> Result { + ) -> Result { match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blob_header_signature_and_slashability( @@ -4091,7 +4101,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, custody_columns: DataColumnSidecarList, - ) -> Result { + ) -> Result { // TODO(gloas) ensure that this check is no longer relevant post gloas self.check_data_column_sidecar_header_signature_and_slashability( block_root, @@ -4183,13 +4193,12 @@ impl BeaconChain { slot: Slot, availability: PayloadAvailability, publish_fn: impl FnOnce() -> Result<(), BlockError>, - ) -> Result { + ) -> Result { match availability { PayloadAvailability::Available(available_envelope) => { publish_fn()?; self.import_available_execution_payload_envelope(available_envelope) .await - .map_err(|e| BlockError::InternalError(e.to_string())) } PayloadAvailability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9802f091e0..68c560611c 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,3 +1,4 @@ +use crate::BlockError; use crate::beacon_block_streamer::Error as BlockStreamerError; use crate::beacon_chain::ForkChoiceError; use crate::beacon_fork_choice_store::Error as ForkChoiceStoreError; @@ -9,6 +10,7 @@ use crate::observed_attesters::Error as ObservedAttestersError; use crate::observed_block_producers::Error as ObservedBlockProducersError; use crate::observed_data_sidecars::Error as ObservedDataSidecarsError; use crate::payload_envelope_streamer::Error as EnvelopeStreamerError; +use crate::payload_envelope_verification::EnvelopeError; use bls::PublicKeyBytes; use execution_layer::PayloadStatus; use fork_choice::ExecutionStatus; @@ -334,3 +336,11 @@ easy_from_to!(SlotProcessingError, BlockProductionError); easy_from_to!(StateAdvanceError, BlockProductionError); easy_from_to!(ForkChoiceError, BlockProductionError); easy_from_to!(EpochCacheError, BlockProductionError); + +pub enum BlockOrEnvelopeError { + BlockError(BlockError), + EnvelopeError(EnvelopeError), +} + +easy_from_to!(BlockError, BlockOrEnvelopeError); +easy_from_to!(EnvelopeError, BlockOrEnvelopeError); diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index 17799d27d7..b678bdbaea 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -2,20 +2,20 @@ use bls::Hash256; use slot_clock::SlotClock; use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; use std::sync::Arc; -use types::EthSpec; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, PayloadVerificationOutcome, block_verification::PayloadVerificationHandle, payload_envelope_verification::{ - EnvelopeError, MaybeAvailableEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, }; pub struct ExecutionPendingEnvelope { - pub signed_envelope: MaybeAvailableEnvelope, + pub signed_envelope: Arc>, pub block_root: Hash256, pub payload_verification_handle: PayloadVerificationHandle, } @@ -28,7 +28,6 @@ impl GossipVerifiedEnvelope { ) -> Result, EnvelopeError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; - let payload = &envelope.payload; // Define a future that will verify the execution payload with an execution engine. // @@ -86,10 +85,7 @@ impl GossipVerifiedEnvelope { )?; Ok(ExecutionPendingEnvelope { - signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { - block_hash: payload.block_hash, - envelope: signed_envelope, - }, + signed_envelope, block_root, payload_verification_handle, }) diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 15cd0ee3b4..2d371a5315 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -9,7 +9,7 @@ use tracing::{debug, error, info, info_span, instrument, warn}; use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope, Slot}; use super::{ - AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, ExecutedEnvelope, + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, }; use crate::pending_payload_cache::Availability as PayloadAvailability; @@ -91,15 +91,7 @@ impl BeaconChain { .set_time_executed(block_root, block_slot, timestamp); } - match executed_envelope { - ExecutedEnvelope::Available(envelope) => { - self.import_available_execution_payload_envelope(Box::new(envelope)) - .await - } - ExecutedEnvelope::AvailabilityPending(envelope) => { - self.check_envelope_availability_and_import(envelope).await - } - } + self.check_envelope_availability_and_import(executed_envelope) }; // Verify and import the payload envelope. @@ -188,7 +180,7 @@ impl BeaconChain { async fn into_executed_payload_envelope( self: Arc, pending_envelope: ExecutionPendingEnvelope, - ) -> Result, EnvelopeError> { + ) -> Result, EnvelopeError> { let ExecutionPendingEnvelope { signed_envelope, block_root, @@ -208,7 +200,7 @@ impl BeaconChain { return Err(EnvelopeError::OptimisticSyncNotSupported { block_root }); } - Ok(ExecutedEnvelope::new( + Ok(AvailabilityPendingExecutedEnvelope::new( signed_envelope, block_root, payload_verification_outcome,