From c6b12990732b81a3d5895e7f84c3c8954c0f3c8e Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Wed, 18 Mar 2026 00:08:26 -0700 Subject: [PATCH] temp chhanges --- .../src/data_availability_checker_v2/mod.rs | 23 +++++++- .../pending_components_cache.rs | 59 ++++++++++++++++++- .../payload_envelope_verification/import.rs | 20 ++++--- .../src/payload_envelope_verification/mod.rs | 11 +++- 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs index 448d87cfcd..2d006ea4c6 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs @@ -14,8 +14,8 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tracing::{debug, error, instrument}; use types::{ - ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, - SignedExecutionPayloadBid, Slot, + BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + Hash256, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, Slot, }; mod payload_envelope_cache; @@ -50,6 +50,16 @@ pub enum Availability { Available(Box>), } +pub enum PayloadEnvelopeProcessingStatus { + /// Envelope is not in any pre-import cache. Envelope may be in the data-base or in the fork-choice. + Unknown, + /// Envelope is currently processing but not yet validated. + NotValidated(Arc>, BlockImportSource), + /// Envelope is fully valid, but not yet imported. It's cached in the da_checker while awaiting + /// missing envelope components. + ExecutionValidated(Arc>), +} + impl Debug for Availability { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -148,6 +158,15 @@ impl DataAvailabilityChecker { }) } + pub fn put_pre_executed_payload_envelope( + &self, + envelope: Arc>, + source: BlockImportSource, + ) -> Result<(), AvailabilityCheckError> { + self.availability_cache + .put_pre_executed_payload_envelope(envelope, source) + } + /// Insert RPC custody columns and check if the payload becomes available. #[instrument(skip_all, level = "trace")] pub fn put_rpc_custody_columns( diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs index 1ccf953036..de353c32b0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components_cache.rs @@ -2,6 +2,7 @@ use crate::BeaconChainTypes; use crate::CustodyContext; use crate::data_availability_checker::AvailabilityCheckError; use crate::data_availability_checker_v2::Availability; +use crate::data_availability_checker_v2::PayloadEnvelopeProcessingStatus; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; use crate::payload_envelope_verification::AvailableEnvelope; @@ -42,6 +43,7 @@ pub struct PendingComponents { } impl PendingComponents { + /// Returns an immutable reference to the cached data column. pub fn get_cached_data_column( &self, @@ -80,7 +82,7 @@ impl PendingComponents { self.bid = Some(bid); } - pub fn insert_pending_executed_envelope( + pub fn insert_pre_executed_envelope( &mut self, envelope: Arc>, import_source: BlockImportSource, @@ -249,6 +251,24 @@ impl DataAvailabilityCheckerInner { }) } + /// Returns the envelope processing status for the given `block_root`. A `None` response indicates that + /// the envelope has not yet been inserted into the cache. + pub fn get_envelope_processing_status(&self, block_root: &Hash256) -> Option> { + self.critical + .read() + .peek(block_root) + .and_then(|pending_components| { + pending_components.envelope.as_ref().map(|envelope| match envelope { + CachedPayloadEnvelope::PreExecution(e, source) => { + PayloadEnvelopeProcessingStatus::NotValidated(e.clone(), *source) + } + CachedPayloadEnvelope::Executed(e) => { + PayloadEnvelopeProcessingStatus::ExecutionValidated(e.envelope.clone()) + } + }) + }) + } + /// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering pub fn peek_data_columns( &self, @@ -301,6 +321,43 @@ impl DataAvailabilityCheckerInner { self.check_availability(block_root, pending_components, num_expected_columns) } + pub fn put_pre_executed_payload_envelope( + &self, + envelope: Arc>, + source: BlockImportSource, + ) -> Result<(), AvailabilityCheckError> { + let epoch = envelope.epoch(); + let beacon_block_root = envelope.beacon_block_root(); + let pending_components = + self.update_or_insert_pending_components(beacon_block_root, |pending_components| { + pending_components.insert_pre_executed_envelope(envelope, source); + Ok(()) + })?; + + let num_expected_columns_opt = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "pre executed payload envelope", + status = pending_components.status_str(num_expected_columns_opt), + "Component added to data availability checker" + ); + }); + + Ok(()) + } + + /// Removes a pre-executed envelope from the cache. + /// This does NOT remove an existing executed envelope. + pub fn remove_pre_executed_envelope(&self, block_root: &Hash256) { + // The read lock is immediately dropped so we can safely remove the envelope from the cache. + if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) = self.get_envelope_processing_status(block_root) { + // If the envelope is execution invalid, this status is permanent and idempotent to this + // block_root. We drop its components (e.g. columns) because they will never be useful. + self.critical.write().pop(block_root); + } + } + #[allow(clippy::type_complexity)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 2ee315e559..4f754abd2c 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -27,13 +27,13 @@ impl BeaconChain { /// /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during /// verification. - #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + #[instrument(skip_all, fields(block_root = ?block_root, envelope_source = %envelope_source))] pub async fn process_execution_payload_envelope( self: &Arc, block_root: Hash256, unverified_envelope: GossipVerifiedEnvelope, notify_execution_layer: NotifyExecutionLayer, - block_source: BlockImportSource, + envelope_source: BlockImportSource, publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, ) -> Result { let block_slot = unverified_envelope.signed_envelope.slot(); @@ -49,7 +49,12 @@ impl BeaconChain { ); } - // TODO(gloas) insert the pre-executed envelope into some type of cache. + self.data_availability_checker + .v2() + .put_pre_executed_payload_envelope( + unverified_envelope.envelope_cloned(), + envelope_source, + )?; let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); @@ -79,11 +84,12 @@ impl BeaconChain { .into_executed_payload_envelope(execution_pending) .await .inspect_err(|_| { - // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), + // If the envelope fails execution for whatever reason (e.g. engine offline), // and we keep it in the cache, then the node will NOT perform lookup and - // reprocess this block until the block is evicted from DA checker, causing the - // chain to get stuck temporarily if the block is canonical. Therefore we remove + // reprocess this envelope until the envelope is evicted from DA checker, causing the + // chain to get stuck temporarily if the envelope is canonical. Therefore we remove // it from the cache if execution fails. + // self.data_availability_checker.v2().remove_pre_executed_envelop(block_root); })?; // Record the time it took to wait for execution layer verification. @@ -111,7 +117,7 @@ impl BeaconChain { info!( ?block_root, %block_slot, - source = %block_source, + source = %envelope_source, "Execution payload envelope imported" ); diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index d9503a0272..908da9a609 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -39,6 +39,7 @@ pub mod gossip_verified_envelope; pub mod import; mod payload_notifier; +use crate::data_availability_checker::AvailabilityCheckError; pub use execution_pending_envelope::ExecutionPendingEnvelope; #[derive(Clone, Debug, PartialEq)] @@ -221,6 +222,9 @@ pub enum EnvelopeError { ExecutionPayloadError(ExecutionPayloadError), /// An error from block-level checks reused during envelope import BlockError(BlockError), + /// The envelope satisfied all validity conditions except consistency + /// with the corresponding columns that we received over gossip/rpc. + AvailabilityCheck(AvailabilityCheckError), /// Internal error InternalError(String), } @@ -261,7 +265,12 @@ impl From for EnvelopeError { } } -/// Pull errors up from EnvelopeProcessingError to EnvelopeError +impl From for EnvelopeError { + fn from(e: AvailabilityCheckError) -> Self { + EnvelopeError::AvailabilityCheck(e) + } +} + impl From for EnvelopeError { fn from(e: EnvelopeProcessingError) -> Self { match e {