diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7b825be2c3..0c10aaadc4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3555,7 +3555,7 @@ impl BeaconChain { #[instrument(skip_all, level = "debug")] pub async fn into_executed_payload_envelope( self: Arc, - pending_envelope: ExecutionPendingEnvelope, + pending_envelope: ExecutionPendingEnvelope, ) -> Result, BlockError> { let ExecutionPendingEnvelope { signed_envelope, @@ -4168,7 +4168,7 @@ impl BeaconChain { Ok(block_root) } - fn handle_import_block_db_write_error( + pub(crate) fn handle_import_block_db_write_error( &self, // We don't actually need this value, however it's always present when we call this function // and it needs to be dropped to prevent a dead-lock. Requiring it to be passed here is @@ -4202,7 +4202,7 @@ impl BeaconChain { } /// Check block's consistentency with any configured weak subjectivity checkpoint. - fn check_block_against_weak_subjectivity_checkpoint( + pub(crate) fn check_block_against_weak_subjectivity_checkpoint( &self, block: BeaconBlockRef, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 0be058231a..b0644ac8aa 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -160,7 +160,8 @@ pub async fn notify_new_payload( ?validation_error, ?latest_valid_hash, ?execution_block_hash, - // root = ?block.tree_hash_root(), + // TODO(gloas) are these other logs important? + root = ?beacon_block_root, // graffiti = block.body().graffiti().as_utf8_lossy(), // proposer_index = block.proposer_index(), // slot = %block.slot(), @@ -205,7 +206,8 @@ pub async fn notify_new_payload( warn!( ?validation_error, ?execution_block_hash, - // root = ?block.tree_hash_root(), + // TODO(gloas) are these other logs important? + root = ?beacon_block_root, // graffiti = block.body().graffiti().as_utf8_lossy(), // proposer_index = block.proposer_index(), // slot = %block.slot(), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4120ed86fc..f0151eed01 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -42,6 +42,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_envelope_import; pub mod payload_envelope_verification; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/payload_envelope_import/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_import/mod.rs new file mode 100644 index 0000000000..ab7de5c15d --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_import/mod.rs @@ -0,0 +1,339 @@ +use std::sync::Arc; + +use fork_choice::PayloadVerificationStatus; +use logging::crit; +use store::StoreOp; +use tracing::{debug, error, info_span, instrument}; +use types::{BeaconState, BlockImportSource, EthSpec, Hash256, SignedBeaconBlock}; + +use crate::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, + NotifyExecutionLayer, + block_verification_types::{AsBlock, AvailableBlockData}, + payload_envelope_verification::{ + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeImportData, ExecutedEnvelope, + IntoExecutionPendingEnvelope, + }, + validator_monitor::timestamp_now, +}; + +impl BeaconChain { + /// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and + /// imported into the chain. + /// + /// Items that implement `IntoExecutionPendingEnvelope` include: + /// + /// - `GossipVerifiedEnvelope` + /// + /// ## Errors + /// + /// Returns an `Err` if the given block was invalid, or an error was encountered during + /// verification. + #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + pub async fn process_execution_payload_envelope>( + self: &Arc, + block_root: Hash256, + unverified_envelope: P, + notify_execution_layer: NotifyExecutionLayer, + block_source: BlockImportSource, + publish_fn: impl FnOnce() -> Result<(), BlockError>, + ) -> Result { + let block_slot = unverified_envelope.envelope().slot(); + + // TODO(gloas) Set observed time if not already set. Usually this should be set by gossip or RPC, + // but just in case we set it again here (useful for tests). + + // TODO(gloas) if we decide to insert the payload envelope into the new DA checker + // we should insert the pre executed envelope here. + + // TODO(gloas) Start the Prometheus timer. + // let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); + + // TODO(gloas) Increment the Prometheus counter for envelope processing requests. + // metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + + // A small closure to group the verification and import errors. + let chain = self.clone(); + let import_envelope = async move { + let execution_pending = unverified_envelope + .into_execution_pending_envelope(&chain, notify_execution_layer)?; + publish_fn()?; + + // TODO(gloas) Record the time it took to complete consensus verification. + // if let Some(timestamp) = self.slot_clock.now_duration() { + // self.block_times_cache + // .write() + // .set_time_consensus_verified(block_root, block_slot, timestamp) + // } + + let executed_envelope = chain + .into_executed_payload_envelope(execution_pending) + .await + .inspect_err(|_| { + // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + + //self.data_availability_checker + // .remove_block_on_execution_error(&block_root); + })?; + + // TODO(gloas) Record the *additional* time it took to wait for execution layer verification. + // if let Some(timestamp) = self.slot_clock.now_duration() { + // self.block_times_cache + // .write() + // .set_time_executed(block_root, block_slot, timestamp) + // } + + match executed_envelope { + ExecutedEnvelope::Available(envelope) => { + self.import_available_execution_payload_envelope(Box::new(envelope)) + .await + } + ExecutedEnvelope::AvailabilityPending() => { + return Err(BlockError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )); + } + } + }; + + // Verify and import the block. + match import_envelope.await { + // The block was successfully verified and imported. Yay. + Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + debug!( + ?block_root, + %block_slot, + source = %block_source, + "Envelope imported" + ); + + // TODO(gloas) Increment the Prometheus counter for block processing successes. + // metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + Ok(status) + } + Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + debug!(?block_root, %slot, "Beacon block awaiting blobs"); + + Ok(status) + } + Err(BlockError::BeaconChainError(e)) => { + match e.as_ref() { + BeaconChainError::TokioJoin(e) => { + debug!( + error = ?e, + "Envelope processing cancelled" + ); + } + _ => { + // There was an error whilst attempting to verify and import the block. The block might + // be partially verified or partially imported. + crit!( + error = ?e, + "Envelope processing error" + ); + } + }; + Err(BlockError::BeaconChainError(e)) + } + // The block failed verification. + Err(other) => { + debug!(reason = other.to_string(), " Envelope rejected"); + Err(other) + } + } + } + + #[instrument(skip_all)] + pub async fn import_available_execution_payload_envelope( + self: &Arc, + envelope: Box>, + ) -> Result { + let AvailableExecutedEnvelope { + envelope, + import_data, + payload_verification_outcome, + } = *envelope; + + let EnvelopeImportData { + block_root, + block, + post_state, + } = import_data; + + // TODO(gloas) Record the time at which this block's blobs became available. + + let block_root = { + // Capture the current span before moving into the blocking task + let current_span = tracing::Span::current(); + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + // Enter the captured span in the blocking thread + let _guard = current_span.enter(); + chain.import_execution_payload_envelope( + envelope, + block_root, + *post_state, + payload_verification_outcome.payload_verification_status, + block, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any + /// additional verification. + /// + /// An error is returned if the envelope was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + fn import_execution_payload_envelope( + &self, + signed_envelope: AvailableEnvelope, + block_root: Hash256, + mut state: BeaconState, + payload_verification_status: PayloadVerificationStatus, + parent_block: Arc>, + ) -> Result { + // ----------------------------- ENVELOPE NOT YET ATTESTABLE ---------------------------------- + // Everything in this initial section is on the hot path between processing the envelope and + // being able to attest to it. DO NOT add any extra processing in this initial section + // unless it must run before fork choice. + // ----------------------------------------------------------------------------------------- + let current_slot = self.slot()?; + let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); + let envelope = signed_envelope.message(); + + // TODO(gloas) implement metrics + // let post_exec_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_POST_EXEC_PROCESSING); + + // Check the payloads parent block against weak subjectivity checkpoint. + self.check_block_against_weak_subjectivity_checkpoint( + parent_block.message(), + block_root, + &state, + )?; + + // Take an upgradable read lock on fork choice so we can check if this block has already + // been imported. We don't want to repeat work importing a block that is already imported. + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if fork_choice_reader.contains_block(&block_root) { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + + // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by + // avoiding taking other locks whilst holding this lock. + let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + + // TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root. + // let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; + + // TODO(gloas)Do we want to use an early attester cache like mechanism for payload enevelopes? + // TODO(gloas) emit SSE event if the payload became the new head payload + // drop(post_exec_timer); + + // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- + // Most blocks are now capable of being attested to thanks to the `early_attester_cache` + // cache above. Resume non-essential processing. + // + // It is important NOT to return errors here before the database commit, because the block + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. + // ----------------------------------------------------------------------------------------- + + // Store the envelope and its state, and execute the confirmation batch for the intermediate + // states, which will delete their temporary flags. + // If the write fails, revert fork choice to the version from disk, else we can + // end up with envelopes in fork choice that are missing from disk. + // See https://github.com/sigp/lighthouse/issues/2028 + let (signed_envelope, columns) = signed_envelope.deconstruct(); + + let mut ops = vec![]; + + match self.get_blobs_or_columns_store_op( + block_root, + signed_envelope.slot(), + AvailableBlockData::DataColumns(columns), + ) { + Ok(Some(blobs_or_columns_store_op)) => { + ops.push(blobs_or_columns_store_op); + } + Ok(None) => {} + Err(e) => { + error!( + msg = "Restoring fork choice from disk", + error = &e, + ?block_root, + "Failed to store data columns into the database" + ); + return Err(self + .handle_import_block_db_write_error(fork_choice) + .err() + .unwrap_or(BlockError::InternalError(e))); + } + } + + // TODO(gloas) metrics + // let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); + + ops.push(StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )); + ops.push(StoreOp::PutState( + signed_envelope.message.state_root, + &state, + )); + + let db_span = info_span!("persist_payloads_and_blobs").entered(); + + // TODO(gloas) do i need this + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!( + msg = "Restoring fork choice from disk", + error = ?e, + "Database write failed!" + ); + // TODO(gloas) handle db write failure + // return Err(self + // .handle_import_block_db_write_error(fork_choice) + // .err() + // .unwrap_or(e.into())); + } + + drop(db_span); + + // The fork choice write-lock is dropped *after* the on-disk database has been updated. + // This prevents inconsistency between the two at the expense of concurrency. + drop(fork_choice); + + // We're declaring the envelope "imported" at this point, since fork choice and the DB know + // about it. + let envelope_time_imported = timestamp_now(); + + // TODO(gloas) depending on what happens with light clients + // we might need to do some computations here + + // TODO(gloas) metrics + // metrics::stop_timer(db_write_timer); + + // TODO(gloas) metrics + // metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + + // TODO(gloas) we might want to implement something similar + // to `import_block_update_metrics_and_events` + Ok(block_root) + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs deleted file mode 100644 index 7dffd1c09c..0000000000 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ /dev/null @@ -1,28 +0,0 @@ -use task_executor::JoinHandle; -use types::{EthSpec, FullPayload}; - -use crate::{ - BeaconChainTypes, PayloadVerificationOutcome, - payload_envelope_verification::{MaybeAvailableEnvelope, PayloadEnvelopeImportData}, -}; - -/// Used to await the result of executing payload with an EE. -pub type PayloadVerificationHandle = - JoinHandle>>>; - -/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and -/// ready to import into the `BeaconChain`. The validation includes: -/// -/// - Parent is known -/// - Signatures -/// - State root check -/// - Block processing -/// -/// Note: a `ExecutionPendingEnvelope` is not _forever_ valid to be imported, it may later become invalid -/// due to finality or some other event. A `ExecutionPendingEnvelope` should be imported into the -/// `BeaconChain` immediately after it is instantiated. -pub struct ExecutionPendingEnvelope { - pub block: MaybeAvailableEnvelope, - pub import_data: PayloadEnvelopeImportData, - pub payload_verification_handle: PayloadVerificationHandle, -} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs index a7ccc056ac..5af6fe3984 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use educe::Educe; use slot_clock::SlotClock; use state_processing::{VerifySignatures, envelope_processing::process_execution_payload_envelope}; -use tracing::debug; +use tracing::{Span, debug}; use types::{ EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, consts::gloas::BUILDER_INDEX_SELF_BUILD, }; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, NotifyExecutionLayer, PayloadVerificationOutcome, payload_envelope_verification::{ EnvelopeError, EnvelopeImportData, EnvelopeProcessingSnapshot, ExecutionPendingEnvelope, @@ -181,7 +181,7 @@ impl IntoExecutionPendingEnvelope for GossipVerifiedEnve self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, EnvelopeError> { + ) -> Result, BlockError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; let payload = &envelope.payload; @@ -255,4 +255,63 @@ impl IntoExecutionPendingEnvelope for GossipVerifiedEnve payload_verification_handle, }) } + + fn envelope(&self) -> &Arc> { + &self.signed_envelope + } +} + +impl BeaconChain { + /// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the + /// gossip network. The envelope is not imported into the chain, it is just partially verified. + /// + /// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately + /// after it is returned, unless some other circumstance decides it should not be imported at + /// all. + /// + /// ## Errors + /// + /// Returns an `Err` if the given envelope was invalid, or an error was encountered during + pub async fn verify_envelope_for_gossip( + self: &Arc, + envelope: Arc>, + ) -> Result, EnvelopeError> { + let chain = self.clone(); + let span = Span::current(); + self.task_executor + .clone() + .spawn_blocking_handle( + move || { + let _guard = span.enter(); + let slot = envelope.slot(); + let beacon_block_root = envelope.message.beacon_block_root; + + match GossipVerifiedEnvelope::new(envelope, &chain) { + Ok(verified) => { + debug!( + %slot, + ?beacon_block_root, + "Successfully verified gossip envelope" + ); + + Ok(verified) + } + Err(e) => { + debug!( + error = e.to_string(), + ?beacon_block_root, + %slot, + "Rejected gossip envelope" + ); + + Err(e) + } + } + }, + "gossip_envelope_verification_handle", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin)? + } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 7ddc4b5c64..4dd8d351d7 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -34,17 +34,16 @@ use state_processing::{ use tracing::instrument; use types::{ BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, - Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; use crate::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - ExecutionPayloadError, NotifyExecutionLayer, PayloadVerificationOutcome, - block_verification::PayloadVerificationHandle, block_verification_types::BlockImportData, + BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError, + NotifyExecutionLayer, PayloadVerificationOutcome, + block_verification::PayloadVerificationHandle, payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, }; -pub mod execution_pending_envelope; pub mod gossip_verified_envelope; mod payload_notifier; mod tests; @@ -54,12 +53,14 @@ pub trait IntoExecutionPendingEnvelope: Sized { self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, EnvelopeError>; + ) -> Result, BlockError>; + + fn envelope(&self) -> &Arc>; } -pub struct ExecutionPendingEnvelope { - pub signed_envelope: MaybeAvailableEnvelope, - pub import_data: EnvelopeImportData, +pub struct ExecutionPendingEnvelope { + pub signed_envelope: MaybeAvailableEnvelope, + pub import_data: EnvelopeImportData, pub payload_verification_handle: PayloadVerificationHandle, } @@ -82,6 +83,25 @@ pub struct AvailableEnvelope { pub spec: Arc, } +impl AvailableEnvelope { + pub fn message(&self) -> &ExecutionPayloadEnvelope { + &self.envelope.message + } + + #[allow(clippy::type_complexity)] + pub fn deconstruct( + self, + ) -> ( + Arc>, + DataColumnSidecarList, + ) { + let AvailableEnvelope { + envelope, columns, .. + } = self; + (envelope, columns) + } +} + pub enum MaybeAvailableEnvelope { Available(AvailableEnvelope), AvailabilityPending { @@ -204,6 +224,12 @@ pub enum EnvelopeError { ExecutionPayloadError(ExecutionPayloadError), } +impl std::fmt::Display for EnvelopeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + impl From for EnvelopeError { fn from(e: BeaconChainError) -> Self { EnvelopeError::BeaconChainError(Arc::new(e)) @@ -248,7 +274,7 @@ impl From for EnvelopeError { pub(crate) fn load_snapshot( envelope: &SignedExecutionPayloadEnvelope, chain: &BeaconChain, -) -> Result, EnvelopeError> { +) -> Result, BlockError> { // Reject any block if its block is not known to fork choice. // // A block that is not in fork choice is either: @@ -263,8 +289,8 @@ pub(crate) fn load_snapshot( let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); let beacon_block_root = envelope.beacon_block_root(); let Some(proto_beacon_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { - return Err(EnvelopeError::BlockRootUnknown { - block_root: beacon_block_root, + return Err(BlockError::ParentUnknown { + parent_root: beacon_block_root, }); }; drop(fork_choice_read_lock); @@ -278,7 +304,7 @@ pub(crate) fn load_snapshot( let state = chain .store .get_hot_state(&block_state_root, cache_state) - .map_err(|e| EnvelopeError::BeaconChainError(Arc::new(e.into())))? + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!( "Missing state for envelope block {block_state_root:?}", @@ -299,11 +325,15 @@ impl IntoExecutionPendingEnvelope self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, EnvelopeError> { + ) -> Result, BlockError> { // TODO(EIP-7732): figure out how this should be refactored.. GossipVerifiedEnvelope::new(self, chain)? .into_execution_pending_envelope(chain, notify_execution_layer) } + + fn envelope(&self) -> &Arc> { + self + } } #[derive(Clone, Debug, PartialEq)] diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs index b485af0bfa..4ac556f1f7 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -8,7 +8,7 @@ use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use crate::{ BeaconChain, BeaconChainTypes, BlockError, ExecutionPayloadError, NotifyExecutionLayer, - execution_payload::notify_new_payload, + execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError, }; /// Used to await the result of executing payload with a remote EE. diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs index 381a5eaa4f..2545a900ad 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs @@ -1,10 +1,13 @@ use std::sync::Arc; -use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, payload_envelope_verification::{ExecutedEnvelope, ExecutionPendingEnvelope}}; +use crate::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, + payload_envelope_verification::{ExecutedEnvelope, ExecutionPendingEnvelope}, +}; async fn import_execution_pending_envelope( chain: Arc>, - execution_pending_envelope: ExecutionPendingEnvelope, + execution_pending_envelope: ExecutionPendingEnvelope, ) -> Result { match chain .clone() diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index fdc7d27320..d8c161f8c1 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -30,7 +30,7 @@ use types::{ Attestation, AttestationData, AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, SignedAggregateAndProof, SignedContributionAndProof, - Slot, SyncCommitteeMessage, VoluntaryExit, + SignedExecutionPayloadEnvelope, Slot, SyncCommitteeMessage, VoluntaryExit, }; /// Used for Prometheus labels. diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index a4125f3df0..bdcdd68860 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -19,6 +18,12 @@ use beacon_chain::{ sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; +use beacon_chain::{ + blob_verification::{GossipBlobError, GossipVerifiedBlob}, + payload_envelope_verification::{ + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, + }, +}; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; @@ -3224,20 +3229,20 @@ impl NetworkBeaconProcessor { } } - pub async fn process_gossip_execution_payload( + pub async fn process_gossip_execution_payload_envelope( self: &Arc, message_id: MessageId, peer_id: PeerId, - execution_payload: SignedExecutionPayloadEnvelope, + envelope: SignedExecutionPayloadEnvelope, ) { // TODO(EIP-7732): Implement proper execution payload envelope gossip processing. // This should integrate with the envelope_verification.rs module once it's implemented. trace!( %peer_id, - builder_index = execution_payload.message.builder_index, - slot = %execution_payload.message.slot, - beacon_block_root = %execution_payload.message.beacon_block_root, + builder_index = envelope.message.builder_index, + slot = %envelope.message.slot, + beacon_block_root = %envelope.message.beacon_block_root, "Processing execution payload envelope" ); @@ -3245,6 +3250,134 @@ impl NetworkBeaconProcessor { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } + async fn process_gossip_unverified_execution_payload_envelope( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + envelope: Arc>, + ) -> Option> { + // TODO(glos) add payload envelope delay metrics + + let verification_result = self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await; + + // TODO(gloas) delay metrics and write the time that the payload was observed into + // the delay cache + + let verified_envelope = match verification_result { + Ok(verified_envelope) => { + info!( + slot = %verified_envelope.signed_envelope.slot(), + root = ?verified_envelope.signed_envelope.beacon_block_root(), + "New envelope received" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + verified_envelope + } + // TODO(gloas) penalize peers accordingly + Err(_) => return None, + }; + + // TODO(gloas) do we need to register the payload with monitored validators? + + let envelope_slot = verified_envelope.signed_envelope.slot(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + match self.chain.slot() { + // We only need to do a simple check about the envelope slot vs the current slot beacuse + // `verify_envelope_for_gossip` already ensuresthat the envelope slot is within tolerance + // for envelope imports. + Ok(current_slot) if envelope_slot > current_slot => { + warn!( + ?envelope_slot, + ?beacon_block_root, + msg = "if this happens consistently, check system clock", + "envelope arrived early" + ); + + // TODO(gloas) update metrics to note how early the envelope arrived + + let inner_self = self.clone(); + let process_fn = Box::pin(async move { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + }); + + // TODO(gloas) send to reprocess queue + None + } + Ok(_) => Some(verified_envelope), + Err(e) => { + error!( + error = ?e, + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + None + } + } + } + + async fn process_gossip_verified_execution_payload_envelope( + self: Arc, + peer_id: PeerId, + verified_envelope: GossipVerifiedEnvelope, + ) { + let processing_start_time = Instant::now(); + let envelope = verified_envelope.envelope_cloned(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + + let result = self + .chain + .process_execution_payload_envelope( + block_root, + verified_envelope, + notify_execution_layer, + block_source, + publish_fn, + ) + .await; + + register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); + + match &result { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // TODO(gloas) do we need to send a `PayloadImported` event to the reporcess queue? + + debug!( + ?block_root, + %peer_id, + "Gossipsub envelope processed" + ); + + // TODO(gloas) do we need to recompute head? + // should canonical_head return the block and the payload now? + self.chain.recompute_head_at_current_slot().await; + + // TODO(gloas) metrics + } + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + %slot, + %block_root, + "Processed envelope, waiting for other components" + ) + } + + Err(_) => todo!(), + } + } + pub fn process_gossip_execution_payload_bid( self: &Arc, message_id: MessageId,