From 47782a68c31c4b4c10a597562d6a94032546e9d8 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Thu, 12 Feb 2026 21:27:39 -0800 Subject: [PATCH] delay cache, and remove some todos --- beacon_node/beacon_chain/src/beacon_chain.rs | 16 +- beacon_node/beacon_chain/src/builder.rs | 1 + .../beacon_chain/src/envelope_times_cache.rs | 197 ++++++++++++++++++ beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_chain/src/metrics.rs | 38 ++++ .../gossip_verified_envelope.rs | 17 +- .../import.rs} | 172 ++++++++------- .../src/payload_envelope_verification/mod.rs | 58 +++--- .../payload_notifier.rs | 8 +- .../payload_envelope_verification/tests.rs | 23 -- .../beacon_chain/src/validator_monitor.rs | 2 +- .../beacon_chain/tests/block_verification.rs | 2 +- beacon_node/network/src/metrics.rs | 10 + .../gossip_methods.rs | 82 +++++--- .../src/network_beacon_processor/mod.rs | 8 +- beacon_node/network/src/router.rs | 1 + 16 files changed, 459 insertions(+), 178 deletions(-) create mode 100644 beacon_node/beacon_chain/src/envelope_times_cache.rs rename beacon_node/beacon_chain/src/{payload_envelope_import/mod.rs => payload_envelope_verification/import.rs} (68%) delete mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 59dd0fd1fe..4af147addb 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -27,6 +27,7 @@ use crate::data_availability_checker::{ }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; +use crate::envelope_times_cache::EnvelopeTimesCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; @@ -56,7 +57,9 @@ use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; -use crate::payload_envelope_verification::{ExecutedEnvelope, ExecutionPendingEnvelope}; +use crate::payload_envelope_verification::{ + EnvelopeError, ExecutedEnvelope, ExecutionPendingEnvelope, +}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; use crate::persisted_fork_choice::PersistedForkChoice; @@ -460,6 +463,8 @@ pub struct BeaconChain { pub early_attester_cache: EarlyAttesterCache, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, + /// A cache used to keep track of various envelope timings. + pub envelope_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, /// A cache used to produce light_client server messages @@ -1158,8 +1163,8 @@ impl BeaconChain { match self.store.try_get_full_block(block_root)? { Some(DatabaseBlock::Full(block)) => Ok(Some(block)), Some(DatabaseBlock::Blinded(_)) => { - // TODO(gloas) this should error out - todo!() + // TODO(gloas) should we return None here? + Ok(None) } None => Ok(None), } @@ -3556,7 +3561,7 @@ impl BeaconChain { pub async fn into_executed_payload_envelope( self: Arc, pending_envelope: ExecutionPendingEnvelope, - ) -> Result, BlockError> { + ) -> Result, EnvelopeError> { let ExecutionPendingEnvelope { signed_envelope, import_data, @@ -4168,7 +4173,7 @@ impl BeaconChain { Ok(block_root) } - pub(crate) fn handle_import_block_db_write_error( + fn handle_import_block_db_write_error( &self, // We don't actually need this value, however it's always present when we call this function // and it needs to be dropped to prevent a dead-lock. Requiring it to be passed here is @@ -6698,6 +6703,7 @@ impl BeaconChain { // sync anyway). self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); + self.envelope_times_cache.write().prune(slot); // Don't run heavy-weight tasks during sync. if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index f673519f5f..3a912016dc 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1055,6 +1055,7 @@ where )), beacon_proposer_cache, block_times_cache: <_>::default(), + envelope_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), validator_pubkey_cache: RwLock::new(validator_pubkey_cache), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/envelope_times_cache.rs b/beacon_node/beacon_chain/src/envelope_times_cache.rs new file mode 100644 index 0000000000..84c936c210 --- /dev/null +++ b/beacon_node/beacon_chain/src/envelope_times_cache.rs @@ -0,0 +1,197 @@ +//! This module provides the `EnvelopeTimesCache` which contains information regarding payload +//! envelope timings. +//! +//! This provides `BeaconChain` and associated functions with access to the timestamps of when a +//! payload envelope was observed, verified, executed, and imported. +//! This allows for better traceability and allows us to determine the root cause for why an +//! envelope was imported late. +//! This allows us to distinguish between the following scenarios: +//! - The envelope was observed late. +//! - Consensus verification was slow. +//! - Execution verification was slow. +//! - The DB write was slow. + +use eth2::types::{Hash256, Slot}; +use std::collections::HashMap; +use std::time::Duration; + +type BlockRoot = Hash256; + +#[derive(Clone, Default)] +pub struct EnvelopeTimestamps { + /// When the envelope was first observed (gossip or RPC). + pub observed: Option, + /// When consensus verification (state transition) completed. + pub consensus_verified: Option, + /// When execution layer verification started. + pub started_execution: Option, + /// When execution layer verification completed. + pub executed: Option, + /// When the envelope was imported into the DB. + pub imported: Option, +} + +/// Delay data for envelope processing, computed relative to the slot start time. +#[derive(Debug, Default)] +pub struct EnvelopeDelays { + /// Time after start of slot we saw the envelope. + pub observed: Option, + /// The time it took to complete consensus verification of the envelope. + pub consensus_verification_time: Option, + /// The time it took to complete execution verification of the envelope. + pub execution_time: Option, + /// Time after execution until the envelope was imported. + pub imported: Option, +} + +impl EnvelopeDelays { + fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays { + let observed = times + .observed + .and_then(|observed_time| observed_time.checked_sub(slot_start_time)); + let consensus_verification_time = times + .consensus_verified + .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); + let execution_time = times + .executed + .and_then(|executed| executed.checked_sub(times.started_execution?)); + let imported = times + .imported + .and_then(|imported_time| imported_time.checked_sub(times.executed?)); + EnvelopeDelays { + observed, + consensus_verification_time, + execution_time, + imported, + } + } +} + +pub struct EnvelopeTimesCacheValue { + pub slot: Slot, + pub timestamps: EnvelopeTimestamps, + pub peer_id: Option, +} + +impl EnvelopeTimesCacheValue { + fn new(slot: Slot) -> Self { + EnvelopeTimesCacheValue { + slot, + timestamps: Default::default(), + peer_id: None, + } + } +} + +#[derive(Default)] +pub struct EnvelopeTimesCache { + pub cache: HashMap, +} + +impl EnvelopeTimesCache { + /// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than + /// any previous timestamp at which this envelope was observed. + pub fn set_time_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + peer_id: Option, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + match entry.timestamps.observed { + Some(existing) if existing <= timestamp => { + // Existing timestamp is earlier, do nothing. + } + _ => { + entry.timestamps.observed = Some(timestamp); + entry.peer_id = peer_id; + } + } + } + + /// Set the timestamp for `field` if that timestamp is less than any previously known value. + fn set_time_if_less( + &mut self, + block_root: BlockRoot, + slot: Slot, + field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option, + timestamp: Duration, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + let existing_timestamp = field(&mut entry.timestamps); + if existing_timestamp.is_none_or(|prev| timestamp < prev) { + *existing_timestamp = Some(timestamp); + } + } + + pub fn set_time_consensus_verified( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.consensus_verified, + timestamp, + ) + } + + pub fn set_time_started_execution( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.started_execution, + timestamp, + ) + } + + pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.executed, + timestamp, + ) + } + + pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.imported, + timestamp, + ) + } + + pub fn get_envelope_delays( + &self, + block_root: BlockRoot, + slot_start_time: Duration, + ) -> EnvelopeDelays { + if let Some(entry) = self.cache.get(&block_root) { + EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time) + } else { + EnvelopeDelays::default() + } + } + + /// Prune the cache to only store the most recent 2 epochs. + pub fn prune(&mut self, current_slot: Slot) { + self.cache + .retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64)); + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f0151eed01..5acc3edadd 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -20,6 +20,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +pub mod envelope_times_cache; mod errors; pub mod events; pub mod execution_payload; @@ -42,7 +43,6 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; -pub mod payload_envelope_import; pub mod payload_envelope_verification; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9de67ca93f..775f5a3df0 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -21,6 +21,44 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str = "validator_monitor_attestation_simulator_source_attester_miss_total"; +/* +* Execution Payload Envelope Procsesing +*/ + +pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_requests_total", + "Count of payload envelopes submitted for processing", + ) +}); +pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_successes_total", + "Count of payload envelopes processed without error", + ) +}); +pub static ENVELOPE_PROCESSING_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_seconds", + "Full runtime of payload envelope processing", + ) +}); +pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_db_write_seconds", + "Time spent writing a newly processed payload envelope and state to DB", + ) +}); +pub static ENVELOPE_PROCESSING_POST_EXEC_PROCESSING: LazyLock> = + LazyLock::new(|| { + try_create_histogram_with_buckets( + "payload_envelope_processing_post_exec_pre_attestable_seconds", + "Time between finishing execution processing and the payload envelope + becoming attestable", + linear_buckets(0.01, 0.01, 15), + ) + }); + /* * Block Processing */ diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs index 5af6fe3984..c9bef630aa 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -10,7 +10,7 @@ use types::{ }; use crate::{ - BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, NotifyExecutionLayer, + BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, PayloadVerificationOutcome, payload_envelope_verification::{ EnvelopeError, EnvelopeImportData, EnvelopeProcessingSnapshot, ExecutionPendingEnvelope, @@ -151,8 +151,7 @@ impl GossipVerifiedEnvelope { ); (is_valid, opt_snapshot) } else { - // TODO(gloas) we should probably introduce a builder cache or some type of - // global cache. + // TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here. // External builder: must load the state to get the builder pubkey. let snapshot = load_snapshot(signed_envelope.as_ref(), chain)?; let is_valid = @@ -181,7 +180,7 @@ impl IntoExecutionPendingEnvelope for GossipVerifiedEnve self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError> { + ) -> Result, EnvelopeError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; let payload = &envelope.payload; @@ -198,13 +197,11 @@ impl IntoExecutionPendingEnvelope for GossipVerifiedEnve let payload_verification_future = async move { let chain = payload_notifier.chain.clone(); - // TODO:(gloas): timing metrics if let Some(started_execution) = chain.slot_clock.now_duration() { - chain.block_times_cache.write().set_time_started_execution( - block_root, - slot, - started_execution, - ); + chain + .envelope_times_cache + .write() + .set_time_started_execution(block_root, slot, started_execution); } let payload_verification_status = payload_notifier.notify_new_payload().await?; diff --git a/beacon_node/beacon_chain/src/payload_envelope_import/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs similarity index 68% rename from beacon_node/beacon_chain/src/payload_envelope_import/mod.rs rename to beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index ab7de5c15d..3628712721 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_import/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -1,20 +1,23 @@ use std::sync::Arc; +use std::time::Duration; use fork_choice::PayloadVerificationStatus; use logging::crit; +use slot_clock::SlotClock; use store::StoreOp; use tracing::{debug, error, info_span, instrument}; -use types::{BeaconState, BlockImportSource, EthSpec, Hash256, SignedBeaconBlock}; +use types::{BeaconState, BlockImportSource, Hash256, SignedBeaconBlock, Slot}; +use super::{ + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, + ExecutedEnvelope, IntoExecutionPendingEnvelope, +}; use crate::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, - payload_envelope_verification::{ - AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeImportData, ExecutedEnvelope, - IntoExecutionPendingEnvelope, - }, - validator_monitor::timestamp_now, + metrics, + validator_monitor::{get_slot_delay_ms, timestamp_now}, }; impl BeaconChain { @@ -36,21 +39,26 @@ impl BeaconChain { unverified_envelope: P, notify_execution_layer: NotifyExecutionLayer, block_source: BlockImportSource, - publish_fn: impl FnOnce() -> Result<(), BlockError>, - ) -> Result { + publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, + ) -> Result { let block_slot = unverified_envelope.envelope().slot(); - // TODO(gloas) Set observed time if not already set. Usually this should be set by gossip or RPC, + // Set observed time if not already set. Usually this should be set by gossip or RPC, // but just in case we set it again here (useful for tests). + if let Some(seen_timestamp) = self.slot_clock.now_duration() { + self.envelope_times_cache.write().set_time_observed( + block_root, + block_slot, + seen_timestamp, + None, + ); + } - // TODO(gloas) if we decide to insert the payload envelope into the new DA checker - // we should insert the pre executed envelope here. + // TODO(gloas) insert the pre-executed envelope into some type of cache. - // TODO(gloas) Start the Prometheus timer. - // let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); + let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); - // TODO(gloas) Increment the Prometheus counter for envelope processing requests. - // metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS); + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); // A small closure to group the verification and import errors. let chain = self.clone(); @@ -59,12 +67,16 @@ impl BeaconChain { .into_execution_pending_envelope(&chain, notify_execution_layer)?; publish_fn()?; - // TODO(gloas) Record the time it took to complete consensus verification. - // if let Some(timestamp) = self.slot_clock.now_duration() { - // self.block_times_cache - // .write() - // .set_time_consensus_verified(block_root, block_slot, timestamp) - // } + // Record the time it took to complete consensus verification. + if let Some(timestamp) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp); + } + + let envelope_times_cache = chain.envelope_times_cache.clone(); + let slot_clock = chain.slot_clock.clone(); let executed_envelope = chain .into_executed_payload_envelope(execution_pending) @@ -75,28 +87,23 @@ impl BeaconChain { // reprocess this block until the block is evicted from DA checker, causing the // chain to get stuck temporarily if the block is canonical. Therefore we remove // it from the cache if execution fails. - - //self.data_availability_checker - // .remove_block_on_execution_error(&block_root); })?; - // TODO(gloas) Record the *additional* time it took to wait for execution layer verification. - // if let Some(timestamp) = self.slot_clock.now_duration() { - // self.block_times_cache - // .write() - // .set_time_executed(block_root, block_slot, timestamp) - // } + // Record the time it took to wait for execution layer verification. + if let Some(timestamp) = slot_clock.now_duration() { + envelope_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp); + } match executed_envelope { ExecutedEnvelope::Available(envelope) => { self.import_available_execution_payload_envelope(Box::new(envelope)) .await } - ExecutedEnvelope::AvailabilityPending() => { - return Err(BlockError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )); - } + ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )), } }; @@ -111,8 +118,7 @@ impl BeaconChain { "Envelope imported" ); - // TODO(gloas) Increment the Prometheus counter for block processing successes. - // metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); Ok(status) } @@ -121,7 +127,7 @@ impl BeaconChain { Ok(status) } - Err(BlockError::BeaconChainError(e)) => { + Err(EnvelopeError::BeaconChainError(e)) => { match e.as_ref() { BeaconChainError::TokioJoin(e) => { debug!( @@ -138,7 +144,7 @@ impl BeaconChain { ); } }; - Err(BlockError::BeaconChainError(e)) + Err(EnvelopeError::BeaconChainError(e)) } // The block failed verification. Err(other) => { @@ -152,7 +158,7 @@ impl BeaconChain { pub async fn import_available_execution_payload_envelope( self: &Arc, envelope: Box>, - ) -> Result { + ) -> Result { let AvailableExecutedEnvelope { envelope, import_data, @@ -165,8 +171,6 @@ impl BeaconChain { post_state, } = import_data; - // TODO(gloas) Record the time at which this block's blobs became available. - let block_root = { // Capture the current span before moving into the blocking task let current_span = tracing::Span::current(); @@ -202,21 +206,18 @@ impl BeaconChain { &self, signed_envelope: AvailableEnvelope, block_root: Hash256, - mut state: BeaconState, - payload_verification_status: PayloadVerificationStatus, + state: BeaconState, + _payload_verification_status: PayloadVerificationStatus, parent_block: Arc>, - ) -> Result { + ) -> Result { // ----------------------------- ENVELOPE NOT YET ATTESTABLE ---------------------------------- // Everything in this initial section is on the hot path between processing the envelope and // being able to attest to it. DO NOT add any extra processing in this initial section // unless it must run before fork choice. // ----------------------------------------------------------------------------------------- - let current_slot = self.slot()?; - let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch()); - let envelope = signed_envelope.message(); - // TODO(gloas) implement metrics - // let post_exec_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_POST_EXEC_PROCESSING); + let post_exec_timer = + metrics::start_timer(&metrics::ENVELOPE_PROCESSING_POST_EXEC_PROCESSING); // Check the payloads parent block against weak subjectivity checkpoint. self.check_block_against_weak_subjectivity_checkpoint( @@ -228,26 +229,24 @@ impl BeaconChain { // Take an upgradable read lock on fork choice so we can check if this block has already // been imported. We don't want to repeat work importing a block that is already imported. let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); - if fork_choice_reader.contains_block(&block_root) { - return Err(BlockError::DuplicateFullyImported(block_root)); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); } + // TODO(gloas) no fork choice logic yet // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by // avoiding taking other locks whilst holding this lock. - let mut fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + // let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); // TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root. // let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; - // TODO(gloas)Do we want to use an early attester cache like mechanism for payload enevelopes? + // TODO(gloas) Do we want to use an early attester cache like mechanism for payload enevelopes? // TODO(gloas) emit SSE event if the payload became the new head payload - // drop(post_exec_timer); + drop(post_exec_timer); - // ---------------------------- BLOCK PROBABLY ATTESTABLE ---------------------------------- - // Most blocks are now capable of being attested to thanks to the `early_attester_cache` - // cache above. Resume non-essential processing. - // - // It is important NOT to return errors here before the database commit, because the block + // ---------------------------- ENVELOPE PROBABLY ATTESTABLE ---------------------------------- + // It is important NOT to return errors here before the database commit, because the envelope // has already been added to fork choice and the database would be left in an inconsistent // state if we returned early without committing. In other words, an error here would // corrupt the node's database permanently. @@ -278,15 +277,13 @@ impl BeaconChain { ?block_root, "Failed to store data columns into the database" ); - return Err(self - .handle_import_block_db_write_error(fork_choice) - .err() - .unwrap_or(BlockError::InternalError(e))); + // TODO(gloas) implement failed write handling to fork choice + // let _ = self.handle_import_block_db_write_error(fork_choice); + return Err(EnvelopeError::InternalError(e)); } } - // TODO(gloas) metrics - // let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); ops.push(StoreOp::PutPayloadEnvelope( block_root, @@ -299,7 +296,6 @@ impl BeaconChain { let db_span = info_span!("persist_payloads_and_blobs").entered(); - // TODO(gloas) do i need this if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { error!( msg = "Restoring fork choice from disk", @@ -315,25 +311,49 @@ impl BeaconChain { drop(db_span); + // TODO(gloas) drop fork choice lock // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. - drop(fork_choice); + // drop(fork_choice); // We're declaring the envelope "imported" at this point, since fork choice and the DB know // about it. let envelope_time_imported = timestamp_now(); // TODO(gloas) depending on what happens with light clients - // we might need to do some computations here + // we might need to do some light client related computations here - // TODO(gloas) metrics - // metrics::stop_timer(db_write_timer); + metrics::stop_timer(db_write_timer); + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); - // TODO(gloas) metrics - // metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); + self.import_envelope_update_metrics_and_events( + block_root, + signed_envelope.slot(), + envelope_time_imported, + ); - // TODO(gloas) we might want to implement something similar - // to `import_block_update_metrics_and_events` Ok(block_root) } + + fn import_envelope_update_metrics_and_events( + &self, + block_root: Hash256, + envelope_slot: Slot, + envelope_time_imported: Duration, + ) { + let envelope_delay_total = + get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock); + + // Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes + // to the cache during sync. + if envelope_delay_total < self.slot_clock.slot_duration().saturating_mul(64) { + self.envelope_times_cache.write().set_time_imported( + block_root, + envelope_slot, + envelope_time_imported, + ); + } + + // TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks). + } } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index 4dd8d351d7..80e62f93b7 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -1,8 +1,9 @@ //! The incremental processing steps (e.g., signatures verified but not the state transition) is //! represented as a sequence of wrapper-types around the block. There is a linear progression of -//! types, starting at a `SignedBeaconBlock` and finishing with a `Fully VerifiedBlock` (see +//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see //! diagram below). //! +//! // TODO(gloas) we might want to update this diagram to include `AvailabelExecutedEnvelope` //! ```ignore //! START //! | @@ -28,9 +29,9 @@ use std::sync::Arc; -use state_processing::{ - BlockProcessingError, ConsensusContext, envelope_processing::EnvelopeProcessingError, -}; +use store::Error as DBError; + +use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; use tracing::instrument; use types::{ BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, @@ -45,15 +46,15 @@ use crate::{ }; pub mod gossip_verified_envelope; +pub mod import; mod payload_notifier; -mod tests; pub trait IntoExecutionPendingEnvelope: Sized { fn into_execution_pending_envelope( self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError>; + ) -> Result, EnvelopeError>; fn envelope(&self) -> &Arc>; } @@ -74,8 +75,7 @@ pub struct EnvelopeImportData { #[derive(Debug)] #[allow(dead_code)] pub struct AvailableEnvelope { - // TODO(EIP-7732): rename to execution_block_hash - block_hash: ExecutionBlockHash, + execution_block_hash: ExecutionBlockHash, envelope: Arc>, columns: DataColumnSidecarList, /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). @@ -222,6 +222,10 @@ pub enum EnvelopeError { EnvelopeProcessingError(EnvelopeProcessingError), // Error verifying the execution payload ExecutionPayloadError(ExecutionPayloadError), + // An error from block-level checks reused during envelope import + BlockError(BlockError), + // Internal error + InternalError(String), } impl std::fmt::Display for EnvelopeError { @@ -248,6 +252,18 @@ impl From for EnvelopeError { } } +impl From for EnvelopeError { + fn from(e: DBError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e))) + } +} + +impl From for EnvelopeError { + fn from(e: BlockError) -> Self { + EnvelopeError::BlockError(e) + } +} + /// Pull errors up from EnvelopeProcessingError to EnvelopeError impl From for EnvelopeError { fn from(e: EnvelopeProcessingError) -> Self { @@ -274,14 +290,14 @@ impl From for EnvelopeError { pub(crate) fn load_snapshot( envelope: &SignedExecutionPayloadEnvelope, chain: &BeaconChain, -) -> Result, BlockError> { - // Reject any block if its block is not known to fork choice. +) -> Result, EnvelopeError> { + // Reject any envelope if its block is not known to fork choice. // // A block that is not in fork choice is either: // - // - Not yet imported: we should reject this block because we should only import a child - // envelope after its parent has been fully imported. - // - Pre-finalized: if the block is _prior_ to finalization, we should ignore the envelope + // - Not yet imported: we should reject this envelope because we should only import it after its parent block + // has been fully imported. + // - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore the envelope // because it will revert finalization. Note that the finalized block is stored in fork // choice, so we will not reject any child of the finalized block (this is relevant during // genesis). @@ -289,8 +305,8 @@ pub(crate) fn load_snapshot( let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); let beacon_block_root = envelope.beacon_block_root(); let Some(proto_beacon_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { - return Err(BlockError::ParentUnknown { - parent_root: beacon_block_root, + return Err(EnvelopeError::BlockRootUnknown { + block_root: beacon_block_root, }); }; drop(fork_choice_read_lock); @@ -304,7 +320,7 @@ pub(crate) fn load_snapshot( let state = chain .store .get_hot_state(&block_state_root, cache_state) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .map_err(EnvelopeError::from)? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!( "Missing state for envelope block {block_state_root:?}", @@ -325,8 +341,7 @@ impl IntoExecutionPendingEnvelope self, chain: &Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result, BlockError> { - // TODO(EIP-7732): figure out how this should be refactored.. + ) -> Result, EnvelopeError> { GossipVerifiedEnvelope::new(self, chain)? .into_execution_pending_envelope(chain, notify_execution_layer) } @@ -335,10 +350,3 @@ impl IntoExecutionPendingEnvelope self } } - -#[derive(Clone, Debug, PartialEq)] -pub struct PayloadEnvelopeImportData { - pub block_root: Hash256, - pub state: BeaconState, - pub consensus_context: ConsensusContext, -} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs index 4ac556f1f7..5b1f332b5a 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -7,7 +7,7 @@ use tracing::warn; use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use crate::{ - BeaconChain, BeaconChainTypes, BlockError, ExecutionPayloadError, NotifyExecutionLayer, + BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError, }; @@ -25,15 +25,13 @@ impl PayloadNotifier { envelope: Arc>, block: Arc>, notify_execution_layer: NotifyExecutionLayer, - ) -> Result { + ) -> Result { let payload_verification_status = { let payload_message = &envelope.message; match notify_execution_layer { NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { - // TODO(gloas) unwrap - let new_payload_request = - Self::build_new_payload_request(&envelope, &block).unwrap(); + let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { warn!( block_number = ?payload_message.payload.block_number, diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs deleted file mode 100644 index 2545a900ad..0000000000 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/tests.rs +++ /dev/null @@ -1,23 +0,0 @@ -use std::sync::Arc; - -use crate::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, - payload_envelope_verification::{ExecutedEnvelope, ExecutionPendingEnvelope}, -}; - -async fn import_execution_pending_envelope( - chain: Arc>, - execution_pending_envelope: ExecutionPendingEnvelope, -) -> Result { - match chain - .clone() - .into_executed_payload_envelope(execution_pending_envelope) - .await - .unwrap() - { - ExecutedEnvelope::Available(envelope) => todo!(), - ExecutedEnvelope::AvailabilityPending() => { - Err("AvailabilityPending not expected in this test. Block not imported.".to_string()) - } - } -} diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index d8c161f8c1..fdc7d27320 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -30,7 +30,7 @@ use types::{ Attestation, AttestationData, AttesterSlashingRef, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, IndexedAttestation, IndexedAttestationRef, ProposerSlashing, SignedAggregateAndProof, SignedContributionAndProof, - SignedExecutionPayloadEnvelope, Slot, SyncCommitteeMessage, VoluntaryExit, + Slot, SyncCommitteeMessage, VoluntaryExit, }; /// Used for Prometheus labels. diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index d214ea6b15..779e196495 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,5 +1,5 @@ #![cfg(not(debug_assertions))] - +// TODO(gloas) we probably need similar test for payload envelope verification use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 0016f66c01..098d6d8324 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -532,6 +532,16 @@ pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new ) }); +/* + * Execution Payload Envelope Delay Metrics + */ +pub static ENVELOPE_DELAY_GOSSIP: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "payload_envelope_delay_gossip", + "The first time we see this payload envelope from gossip as a delay from the start of the slot", + ) +}); + /* * Block Delay Metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index bdcdd68860..1dcfea96d9 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -20,9 +20,7 @@ use beacon_chain::{ }; use beacon_chain::{ blob_verification::{GossipBlobError, GossipVerifiedBlob}, - payload_envelope_verification::{ - EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, - }, + payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; @@ -3230,24 +3228,34 @@ impl NetworkBeaconProcessor { } pub async fn process_gossip_execution_payload_envelope( - self: &Arc, + self: Arc, message_id: MessageId, peer_id: PeerId, - envelope: SignedExecutionPayloadEnvelope, + envelope: Arc>, + seen_timestamp: Duration, ) { - // TODO(EIP-7732): Implement proper execution payload envelope gossip processing. - // This should integrate with the envelope_verification.rs module once it's implemented. + if let Some(gossip_verified_envelope) = self + .process_gossip_unverified_execution_payload_envelope( + message_id, + peer_id, + envelope.clone(), + seen_timestamp, + ) + .await + { + let beacon_block_root = gossip_verified_envelope.signed_envelope.beacon_block_root(); - trace!( - %peer_id, - builder_index = envelope.message.builder_index, - slot = %envelope.message.slot, - beacon_block_root = %envelope.message.beacon_block_root, - "Processing execution payload envelope" - ); + Span::current().record("beacon_block_root", beacon_block_root.to_string()); - // For now, ignore all envelopes since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + // TODO(gloas) in process_gossip_block here we check_and_insert on the duplicate cache + // before calling gossip_verified_block + + self.process_gossip_verified_execution_payload_envelope( + peer_id, + gossip_verified_envelope, + ) + .await; + } } async fn process_gossip_unverified_execution_payload_envelope( @@ -3255,8 +3263,10 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, envelope: Arc>, + seen_duration: Duration, ) -> Option> { - // TODO(glos) add payload envelope delay metrics + let envelope_delay = + get_slot_delay_ms(seen_duration, envelope.slot(), &self.chain.slot_clock); let verification_result = self .chain @@ -3264,11 +3274,21 @@ impl NetworkBeaconProcessor { .verify_envelope_for_gossip(envelope.clone()) .await; - // TODO(gloas) delay metrics and write the time that the payload was observed into - // the delay cache - let verified_envelope = match verification_result { Ok(verified_envelope) => { + metrics::set_gauge( + &metrics::ENVELOPE_DELAY_GOSSIP, + envelope_delay.as_millis() as i64, + ); + + // Write the time the envelope was observed into the delay cache. + self.chain.envelope_times_cache.write().set_time_observed( + verified_envelope.signed_envelope.beacon_block_root(), + envelope.slot(), + seen_duration, + Some(peer_id.to_string()), + ); + info!( slot = %verified_envelope.signed_envelope.slot(), root = ?verified_envelope.signed_envelope.beacon_block_root(), @@ -3302,7 +3322,7 @@ impl NetworkBeaconProcessor { // TODO(gloas) update metrics to note how early the envelope arrived let inner_self = self.clone(); - let process_fn = Box::pin(async move { + let _process_fn = Box::pin(async move { inner_self .process_gossip_verified_execution_payload_envelope( peer_id, @@ -3333,27 +3353,26 @@ impl NetworkBeaconProcessor { peer_id: PeerId, verified_envelope: GossipVerifiedEnvelope, ) { - let processing_start_time = Instant::now(); - let envelope = verified_envelope.envelope_cloned(); + let _processing_start_time = Instant::now(); let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); let result = self .chain .process_execution_payload_envelope( - block_root, + beacon_block_root, verified_envelope, - notify_execution_layer, - block_source, - publish_fn, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), ) .await; - register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); + // TODO(gloas) metrics + // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); match &result { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { // TODO(gloas) do we need to send a `PayloadImported` event to the reporcess queue? - debug!( ?block_root, %peer_id, @@ -3374,7 +3393,10 @@ impl NetworkBeaconProcessor { ) } - Err(_) => todo!(), + Err(_) => { + // TODO(gloas) implement peer penalties + warn!("process_gossip_verified_execution_payload_envelope_failed") + } } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index fd67fcde82..ccd36b528e 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -429,11 +429,17 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, execution_payload: Box>, + seen_timestamp: Duration, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { processor - .process_gossip_execution_payload(message_id, peer_id, *execution_payload) + .process_gossip_execution_payload_envelope( + message_id, + peer_id, + Arc::new(*execution_payload), + seen_timestamp, + ) .await }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec322..77d64c92e6 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -493,6 +493,7 @@ impl Router { message_id, peer_id, signed_execution_payload_envelope, + timestamp_now(), ), ) }