From 561898fc1c74c11a1a765f252ae504f35263f6ed Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Thu, 19 Feb 2026 06:08:56 +0530 Subject: [PATCH 01/13] Process head_chains in descending order of number of peers (#8859) N/A Another find by @gitToki. Sort the preferred_ids in descending order as originally intended from the comment in the function. Co-Authored-By: Pawan Dhananjay --- beacon_node/network/src/sync/range_sync/chain_collection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index 1d57ee6c3d..bd4dd6c181 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -351,7 +351,8 @@ impl ChainCollection { .iter() .map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id)) .collect::>(); - preferred_ids.sort_unstable(); + // Sort in descending order + preferred_ids.sort_unstable_by(|a, b| b.cmp(a)); let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new(); for (_, _, id) in preferred_ids { From 4588971085840dc56cedc85ba0f12bcaa99be8ed Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 19 Feb 2026 12:57:53 +1100 Subject: [PATCH 02/13] Add sync batch state metrics (#8847) Co-Authored-By: Jimmy Chen --- beacon_node/network/src/metrics.rs | 7 +++++ .../network/src/sync/backfill_sync/mod.rs | 20 +++++++++++++- beacon_node/network/src/sync/batch.rs | 26 ++++++++++++++++++- .../src/sync/custody_backfill_sync/mod.rs | 21 +++++++++++++-- beacon_node/network/src/sync/manager.rs | 3 +++ .../network/src/sync/range_sync/chain.rs | 11 +++++++- .../src/sync/range_sync/chain_collection.rs | 21 +++++++++++++++ .../network/src/sync/range_sync/range.rs | 4 +++ 8 files changed, 108 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index cea06a28c8..0fa95b4758 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock> ]), ) }); +pub static SYNCING_CHAIN_BATCHES: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "sync_batches", + "Number of batches in sync chains by sync type and state", + &["sync_type", "state"], + ) +}); pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock> = LazyLock::new(|| { try_create_int_gauge( "sync_single_block_lookups", diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9802ec56a1..f18d31863b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,9 +8,11 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. +use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; @@ -31,6 +33,7 @@ use std::collections::{ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; @@ -1181,6 +1184,21 @@ impl BackFillSync { .epoch(T::EthSpec::slots_per_epoch()) } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["backfill", state.into()], + count as i64, + ); + } + } + /// Updates the global network state indicating the current state of a backfill sync. fn set_state(&self, state: BackFillState) { *self.network_globals.backfill_state.write() = state; diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index 8de386f5be..8f8d39ca4b 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -10,10 +10,22 @@ use std::marker::PhantomData; use std::ops::Sub; use std::time::Duration; use std::time::Instant; -use strum::Display; +use strum::{Display, EnumIter, IntoStaticStr}; use types::Slot; use types::{DataColumnSidecarList, Epoch, EthSpec}; +/// Batch states used as metrics labels. +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum BatchMetricsState { + AwaitingDownload, + Downloading, + AwaitingProcessing, + Processing, + AwaitingValidation, + Failed, +} + pub type BatchId = Epoch; /// Type of expected batch. @@ -142,6 +154,18 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } + + /// Returns the metrics state for this batch. + pub fn metrics_state(&self) -> BatchMetricsState { + match self { + BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload, + BatchState::Downloading(_) => BatchMetricsState::Downloading, + BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing, + BatchState::Processing(_) => BatchMetricsState::Processing, + BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation, + BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed, + } + } } impl BatchInfo { diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index fa8b70c8b4..893aa849d3 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -12,14 +12,16 @@ use lighthouse_network::{ }; use logging::crit; use std::hash::{DefaultHasher, Hash, Hasher}; +use strum::IntoEnumIterator; use tracing::{debug, error, info, info_span, warn}; use types::{DataColumnSidecarList, Epoch, EthSpec}; +use crate::metrics; use crate::sync::{ backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, ByRangeRequestType, }, block_sidecar_coupling::CouplingError, manager::CustodyBatchProcessResult, @@ -1114,6 +1116,21 @@ impl CustodyBackFillSync { *self.network_globals.custody_sync_state.write() = state; } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["custody_backfill", state.into()], + count as i64, + ); + } + } + /// A fully synced peer has joined us. /// If we are in a failed state, update a local variable to indicate we are able to restart /// the failed sync on the next attempt. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 096ed9c328..c2faff5b62 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -784,6 +784,9 @@ impl SyncManager { } _ = register_metrics_interval.tick() => { self.network.register_metrics(); + self.range_sync.register_metrics(); + self.backfill_sync.register_metrics(); + self.custody_backfill_sync.register_metrics(); } _ = epoch_interval.tick() => { self.update_sync_state(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index d67d6468a9..61161ae6f4 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,7 +3,8 @@ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::BatchId; use crate::sync::batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult, + BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; @@ -234,6 +235,14 @@ impl SyncingChain { .sum() } + /// Returns the number of batches in the given metrics state. + pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize { + self.batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count() + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index bd4dd6c181..b430b7c572 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -6,6 +6,7 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; +use crate::sync::batch::BatchMetricsState; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; @@ -17,6 +18,7 @@ use smallvec::SmallVec; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error}; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -516,6 +518,25 @@ impl ChainCollection { } } + pub fn register_metrics(&self) { + for (sync_type, chains) in [ + ("range_finalized", &self.finalized_chains), + ("range_head", &self.head_chains), + ] { + for state in BatchMetricsState::iter() { + let count: usize = chains + .values() + .map(|chain| chain.count_batches_in_state(state)) + .sum(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &[sync_type, state.into()], + count as i64, + ); + } + } + } + fn update_metrics(&self) { metrics::set_gauge_vec( &metrics::SYNCING_CHAINS_COUNT, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c9656ad1d0..4c2123451a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -371,6 +371,10 @@ where .update(network, &local, &mut self.awaiting_head_peers); } + pub fn register_metrics(&self) { + self.chains.register_metrics(); + } + /// Kickstarts sync. pub fn resume(&mut self, network: &mut SyncNetworkContext) { for (removed_chain, sync_type, remove_reason) in From 2f43d234d88a9c43518f032d5e4b34fe1c889068 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 27 Feb 2026 08:05:57 +1100 Subject: [PATCH 03/13] Add CI job to check for deleted files (#8727) Co-Authored-By: Michael Sproul Co-Authored-By: Michael Sproul --- .github/CODEOWNERS | 1 + .github/forbidden-files.txt | 7 +++++++ .github/workflows/test-suite.yml | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+) create mode 100644 .github/forbidden-files.txt diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cdec442276..e9ec8740a0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,4 @@ /beacon_node/network/ @jxs /beacon_node/lighthouse_network/ @jxs /beacon_node/store/ @michaelsproul +/.github/forbidden-files.txt @michaelsproul diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt new file mode 100644 index 0000000000..ec89bd2e4b --- /dev/null +++ b/.github/forbidden-files.txt @@ -0,0 +1,7 @@ +# Files that have been intentionally deleted and should not be re-added. +# This prevents accidentally reviving files during botched merges. +# Add one file path per line (relative to repo root). + +beacon_node/beacon_chain/src/otb_verification_service.rs +beacon_node/store/src/partial_beacon_state.rs +beacon_node/store/src/consensus_context.rs diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 72ea9d41ae..d9efbfc148 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -72,6 +72,27 @@ jobs: steps: - name: Check that the pull request is not targeting the stable branch run: test ${{ github.base_ref }} != "stable" + + forbidden-files-check: + name: forbidden-files-check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check for forbidden files + run: | + if [ -f .github/forbidden-files.txt ]; then + status=0 + while IFS= read -r file || [ -n "$file" ]; do + # Skip comments and empty lines + [[ "$file" =~ ^#.*$ || -z "$file" ]] && continue + if [ -f "$file" ]; then + echo "::error::Forbidden file exists: $file" + status=1 + fi + done < .github/forbidden-files.txt + exit $status + fi + release-tests-ubuntu: name: release-tests-ubuntu needs: [check-labels] @@ -430,6 +451,7 @@ jobs: needs: [ 'check-labels', 'target-branch-check', + 'forbidden-files-check', 'release-tests-ubuntu', 'beacon-chain-tests', 'op-pool-tests', From 8cf6ffac4b1954bfc61939e116afd5e2ab349dbb Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 25 Feb 2026 23:10:41 +1100 Subject: [PATCH 04/13] Update yanked keccak 0.1.5 to 0.1.6 (#8900) Co-Authored-By: Jimmy Chen --- Cargo.lock | 7 +++---- Cargo.toml | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d75f5c197..dd1637045b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4832,9 +4832,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures", ] @@ -10607,8 +10607,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deab71f2e20691b4728b349c6cee8fc7223880fa67b6b4f92225ec32225447e5" +source = "git+https://github.com/sigp/rust-yamux?rev=575b17c0f44f4253079a6bafaa2de74ca1d6dfaa#575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index aac26e060b..61caacf5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -303,3 +303,4 @@ debug = true [patch.crates-io] quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" } +yamux = { git = "https://github.com/sigp/rust-yamux", rev = "575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" } From 95f12d0927831971ca9e1acf7ca7e87fdda56f4a Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 27 Feb 2026 16:48:56 +1100 Subject: [PATCH 05/13] Bump version to v8.1.1 (#8853) --- Cargo.lock | 14 +++++++------- Cargo.toml | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dd1637045b..40c550f4c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "account_manager" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "bls", @@ -1276,7 +1276,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -1513,7 +1513,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "beacon_node", "bytes", @@ -4897,7 +4897,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -5383,7 +5383,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_manager", "account_utils", @@ -5515,7 +5515,7 @@ dependencies = [ [[package]] name = "lighthouse_version" -version = "8.1.0" +version = "8.1.1" dependencies = [ "regex", ] @@ -9622,7 +9622,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_node_fallback", diff --git a/Cargo.toml b/Cargo.toml index 61caacf5df..5f6f43d2f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ resolver = "2" [workspace.package] edition = "2024" -version = "8.1.0" +version = "8.1.1" [workspace.dependencies] account_utils = { path = "common/account_utils" } From 6194dddc5b9ea176fc38796fb5de6c7fac8a8143 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 2 Mar 2026 17:43:51 +1100 Subject: [PATCH 06/13] Persist custody context more readily (#8921) We received a bug report of a node restarting custody backfill unnecessarily after upgrading to Lighthouse v8.1.1. What happened is: - User started LH v8.0.1 many months ago, CGC updated 0 -> N but the CGC was not eagerly persisted. - LH experienced an unclean shutdown (not sure of what type). - Upon restarting (still running v8.0.1), the custody context read from disk contains CGC=0: `DEBUG Loaded persisted custody context custody_context: CustodyContext { validator_custody_count: 0, ...`). - CGC updates again to N, retriggering custody backfill: `DEBUG Validator count at head updated old_count: 0, new_count: N`. - Custody backfill does a bunch of downloading for no gain: `DEBUG Imported historical data columns epoch: Epoch(428433), total_imported: 0` - While custody backfill is running user updated to v8.1.1, and we see logs for the CGC=N being peristed upon clean shutdown, and then correctly read on startup with v8.1.1. - Custody backfill keeps running and downloading due to the CGC change still being considered in progress. - Call `persist_custody_context` inside the `register_validators` handler so that it is written to disk eagerly whenever it changes. The performance impact of this should be minimal as the amount of data is very small and this call can only happen at most ~128 times (once for each change) in the entire life of a beacon node. - Call `persist_custody_context` inside `BeaconChainBuilder::build` so that changes caused by CLI flags are persisted (otherwise starting a node with `--semi-supernode` and no validators, then shutting it down uncleanly would cause use to forget the CGC). These changes greatly reduce the timespan during which an unclean shutdown can create inconsistency. In the worst case, we only lose backfill progress that runs concurrently with the `register_validators` handler (should be extremely minimal, nigh impossible). Co-Authored-By: Michael Sproul --- beacon_node/beacon_chain/src/beacon_chain.rs | 13 ++++++++++++- beacon_node/beacon_chain/src/builder.rs | 5 +++++ beacon_node/http_api/src/validator/mod.rs | 12 ++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9d204ac7f2..703ed24420 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -662,7 +662,18 @@ impl BeaconChain { .custody_context() .as_ref() .into(); - debug!(?custody_context, "Persisting custody context to store"); + + // Pattern match to avoid accidentally missing fields and to ignore deprecated fields. + let CustodyContextSsz { + validator_custody_at_head, + epoch_validator_custody_requirements, + persisted_is_supernode: _, + } = &custody_context; + debug!( + validator_custody_at_head, + ?epoch_validator_custody_requirements, + "Persisting custody context to store" + ); persist_custody_context::( self.store.clone(), diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2c1dae9215..66a54d46e8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1083,6 +1083,11 @@ where let cgc_change_effective_slot = cgc_changed.effective_epoch.start_slot(E::slots_per_epoch()); beacon_chain.update_data_column_custody_info(Some(cgc_change_effective_slot)); + + // Persist change to disk. + beacon_chain + .persist_custody_context() + .map_err(|e| format!("Failed writing updated CGC: {e:?}"))?; } info!( diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index df237d9f9b..a9082df715 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -727,6 +727,18 @@ pub fn post_validator_prepare_beacon_proposer( debug!(error = %e, "Could not send message to the network service. \ Likely shutdown") }); + + // Write the updated custody context to disk. This happens at most 128 + // times ever, so the I/O burden should be extremely minimal. Without a + // write here we risk forgetting custody backfill progress upon an + // unclean shutdown. The custody context is otherwise only persisted in + // `BeaconChain::drop`. + if let Err(error) = chain.persist_custody_context() { + error!( + ?error, + "Failed to persist custody context after CGC update" + ); + } } } From f4b5b033a227fcacdb8e8514bbca6cf6702f3a24 Mon Sep 17 00:00:00 2001 From: Mac L Date: Mon, 2 Mar 2026 09:19:41 +0200 Subject: [PATCH 07/13] Add `testing` feature to validator_client/http_api (#8909) Create a `testing` feature which we can use to gate off `test_utils.rs` and its associated dependencies from the rest of the crate. Co-Authored-By: Mac L --- validator_client/http_api/Cargo.toml | 12 +++++++++--- validator_client/http_api/src/lib.rs | 4 +++- validator_manager/Cargo.toml | 2 +- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/validator_client/http_api/Cargo.toml b/validator_client/http_api/Cargo.toml index 2bd57867ac..e334ab9db0 100644 --- a/validator_client/http_api/Cargo.toml +++ b/validator_client/http_api/Cargo.toml @@ -8,14 +8,17 @@ authors = ["Sigma Prime "] name = "validator_http_api" path = "src/lib.rs" +[features] +testing = ["dep:deposit_contract", "dep:doppelganger_service", "dep:tempfile"] + [dependencies] account_utils = { workspace = true } beacon_node_fallback = { workspace = true } bls = { workspace = true } -deposit_contract = { workspace = true } +deposit_contract = { workspace = true, optional = true } directory = { workspace = true } dirs = { workspace = true } -doppelganger_service = { workspace = true } +doppelganger_service = { workspace = true, optional = true } eth2 = { workspace = true, features = ["lighthouse"] } eth2_keystore = { workspace = true } ethereum_serde_utils = { workspace = true } @@ -38,7 +41,7 @@ slot_clock = { workspace = true } sysinfo = { workspace = true } system_health = { workspace = true } task_executor = { workspace = true } -tempfile = { workspace = true } +tempfile = { workspace = true, optional = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } @@ -53,7 +56,10 @@ warp_utils = { workspace = true } zeroize = { workspace = true } [dev-dependencies] +deposit_contract = { workspace = true } +doppelganger_service = { workspace = true } futures = { workspace = true } itertools = { workspace = true } rand = { workspace = true, features = ["small_rng"] } ssz_types = { workspace = true } +tempfile = { workspace = true } diff --git a/validator_client/http_api/src/lib.rs b/validator_client/http_api/src/lib.rs index a35b4ec6c6..8e9c077e57 100644 --- a/validator_client/http_api/src/lib.rs +++ b/validator_client/http_api/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "testing")] +pub mod test_utils; + mod api_secret; mod create_signed_voluntary_exit; mod create_validator; @@ -6,7 +9,6 @@ mod keystores; mod remotekeys; mod tests; -pub mod test_utils; pub use api_secret::PK_FILENAME; use graffiti::{delete_graffiti, get_graffiti, set_graffiti}; diff --git a/validator_manager/Cargo.toml b/validator_manager/Cargo.toml index 16ce1e023f..d0155698b4 100644 --- a/validator_manager/Cargo.toml +++ b/validator_manager/Cargo.toml @@ -29,4 +29,4 @@ beacon_chain = { workspace = true } http_api = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } -validator_http_api = { workspace = true } +validator_http_api = { workspace = true, features = ["testing"] } From 9dccfb540f3cd9c5a0f0f9ac18a007f39b3a3b89 Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Thu, 5 Mar 2026 11:48:30 +0800 Subject: [PATCH 08/13] update cargo-sort (#8933) Co-Authored-By: Tan Chee Keong --- Cargo.toml | 29 +++-------------------------- common/logging/Cargo.toml | 2 +- common/malloc_utils/Cargo.toml | 5 +---- 3 files changed, 5 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 667ba1f803..222392bcb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,20 +166,7 @@ initialized_validators = { path = "validator_client/initialized_validators" } int_to_bytes = { path = "consensus/int_to_bytes" } itertools = "0.14" kzg = { path = "crypto/kzg" } -libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", default-features = false, features = [ - "identify", - "yamux", - "noise", - "dns", - "tcp", - "tokio", - "secp256k1", - "macros", - "metrics", - "quic", - "upnp", - "gossipsub", -] } +libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", default-features = false, features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "secp256k1", "macros", "metrics", "quic", "upnp", "gossipsub"] } libsecp256k1 = "0.7" lighthouse_network = { path = "beacon_node/lighthouse_network" } lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } @@ -219,12 +206,7 @@ r2d2 = "0.8" rand = "0.9.0" rayon = "1.7" regex = "1" -reqwest = { version = "0.12", default-features = false, features = [ - "blocking", - "json", - "stream", - "rustls-tls", -] } +reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "stream", "rustls-tls"] } ring = "0.17" rpds = "0.11" rusqlite = { version = "0.38", features = ["bundled"] } @@ -253,12 +235,7 @@ sysinfo = "0.26" system_health = { path = "common/system_health" } task_executor = { path = "common/task_executor" } tempfile = "3" -tokio = { version = "1", features = [ - "rt-multi-thread", - "sync", - "signal", - "macros", -] } +tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal", "macros"] } tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "time"] } tracing = "0.1.40" diff --git a/common/logging/Cargo.toml b/common/logging/Cargo.toml index 41c82dbd61..cbebd1a501 100644 --- a/common/logging/Cargo.toml +++ b/common/logging/Cargo.toml @@ -13,7 +13,7 @@ logroller = { workspace = true } metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = [ "time" ] } +tokio = { workspace = true, features = ["time"] } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-core = { workspace = true } diff --git a/common/malloc_utils/Cargo.toml b/common/malloc_utils/Cargo.toml index 1052128852..e90490bf09 100644 --- a/common/malloc_utils/Cargo.toml +++ b/common/malloc_utils/Cargo.toml @@ -35,7 +35,4 @@ tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats"] } # Jemalloc's background_threads feature requires Linux (pthreads). [target.'cfg(target_os = "linux")'.dependencies] -tikv-jemallocator = { version = "0.6.0", optional = true, features = [ - "stats", - "background_threads", -] } +tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats", "background_threads"] } From 9c4715c251ea19b2cc4c7688916b5cddfa2b1778 Mon Sep 17 00:00:00 2001 From: Mac L Date: Fri, 6 Mar 2026 09:54:43 +0200 Subject: [PATCH 09/13] Fix lints for Rust v1.94.0 (#8939) Following the release of Rust v1.94.0 there are new Clippy lints which do not pass and are blocking CI (which pulls in the latest version of Rust) This is pretty much the minimum just to get CI running again. Most of the errors involve error types being too large. For now I've added allows but later it might be worth doing a refactor to `Box` or otherwise remove the problematic error types. Co-Authored-By: Mac L --- beacon_node/beacon_chain/tests/attestation_verification.rs | 1 + beacon_node/beacon_chain/tests/payload_invalidation.rs | 1 + beacon_node/beacon_chain/tests/store_tests.rs | 1 + beacon_node/execution_layer/src/lib.rs | 2 +- beacon_node/http_api/src/lib.rs | 1 + slasher/service/src/lib.rs | 1 + 6 files changed, 6 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index e8ee628f28..acf326430b 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::attestation_verification::{ Error, batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations, diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index b282adecd5..bcc50990ec 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index cfc53c8ce0..b6d729cc61 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index d6796f6a05..90968fa213 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -2048,7 +2048,7 @@ fn verify_builder_bid( .cloned() .map(|withdrawals| { Withdrawals::::try_from(withdrawals) - .map_err(InvalidBuilderPayload::SszTypesError) + .map_err(|e| Box::new(InvalidBuilderPayload::SszTypesError(e))) .map(|w| w.tree_hash_root()) }) .transpose()?; diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 74710c4ed2..e9dfa2876a 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::result_large_err)] //! This crate contains a HTTP server which serves the endpoints listed here: //! //! https://github.com/ethereum/beacon-APIs diff --git a/slasher/service/src/lib.rs b/slasher/service/src/lib.rs index ac15b49ee9..69ec59aa2c 100644 --- a/slasher/service/src/lib.rs +++ b/slasher/service/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::result_large_err)] mod service; pub use service::SlasherService; From dbfb6fd9231f5a7c74667e5adbdaddacf4f1b768 Mon Sep 17 00:00:00 2001 From: Mac L Date: Sat, 7 Mar 2026 01:09:31 +0200 Subject: [PATCH 10/13] Remove `arbitrary-fuzz` (#8936) We have duplicated features which enable `arbitrary` throughout the codebase. These are `arbitrary` and `arbitrary-fuzz`. I think historically these were supposed to be distinct however in practice these function identically and so we can unify them into a single feature to avoid confusion. Co-Authored-By: Mac L --- Makefile | 4 ++-- common/eip_3076/Cargo.toml | 2 +- common/eip_3076/src/lib.rs | 10 +++++----- consensus/state_processing/Cargo.toml | 4 ++-- consensus/state_processing/src/envelope_processing.rs | 2 +- consensus/state_processing/src/per_block_processing.rs | 8 ++++---- .../per_epoch_processing/base/validator_statuses.rs | 10 +++++----- consensus/state_processing/src/verify_operation.rs | 8 ++++---- consensus/types/Cargo.toml | 1 - validator_client/slashing_protection/Cargo.toml | 2 +- .../slashing_protection/src/interchange_test.rs | 8 ++++---- 11 files changed, 29 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index 9786c17cc9..ad1bbbb8e8 100644 --- a/Makefile +++ b/Makefile @@ -321,8 +321,8 @@ make-ef-tests-nightly: # Verifies that crates compile with fuzzing features enabled arbitrary-fuzz: - cargo check -p state_processing --features arbitrary-fuzz,$(TEST_FEATURES) - cargo check -p slashing_protection --features arbitrary-fuzz,$(TEST_FEATURES) + cargo check -p state_processing --features arbitrary,$(TEST_FEATURES) + cargo check -p slashing_protection --features arbitrary,$(TEST_FEATURES) # Runs cargo audit (Audit Cargo.lock files for crates with security vulnerabilities reported to the RustSec Advisory Database) audit: install-audit audit-CI diff --git a/common/eip_3076/Cargo.toml b/common/eip_3076/Cargo.toml index 058e1fd1a0..157fe12cb3 100644 --- a/common/eip_3076/Cargo.toml +++ b/common/eip_3076/Cargo.toml @@ -6,7 +6,7 @@ edition = { workspace = true } [features] default = [] -arbitrary-fuzz = ["dep:arbitrary", "types/arbitrary"] +arbitrary = ["dep:arbitrary", "types/arbitrary"] json = ["dep:serde_json"] [dependencies] diff --git a/common/eip_3076/src/lib.rs b/common/eip_3076/src/lib.rs index cdd05d7b1e..0bf1a94d0e 100644 --- a/common/eip_3076/src/lib.rs +++ b/common/eip_3076/src/lib.rs @@ -13,7 +13,7 @@ pub enum Error { #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct InterchangeMetadata { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub interchange_format_version: u64, @@ -22,7 +22,7 @@ pub struct InterchangeMetadata { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct InterchangeData { pub pubkey: PublicKeyBytes, pub signed_blocks: Vec, @@ -31,7 +31,7 @@ pub struct InterchangeData { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct SignedBlock { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub slot: Slot, @@ -41,7 +41,7 @@ pub struct SignedBlock { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct SignedAttestation { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub source_epoch: Epoch, @@ -52,7 +52,7 @@ pub struct SignedAttestation { } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct Interchange { pub metadata: InterchangeMetadata, pub data: Vec, diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index 7426995439..ae0af03231 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -7,10 +7,10 @@ edition = { workspace = true } [features] default = [] fake_crypto = ["bls/fake_crypto"] -arbitrary-fuzz = [ +arbitrary = [ "dep:arbitrary", "smallvec/arbitrary", - "types/arbitrary-fuzz", + "types/arbitrary", "merkle_proof/arbitrary", "ethereum_ssz/arbitrary", "ssz_types/arbitrary", diff --git a/consensus/state_processing/src/envelope_processing.rs b/consensus/state_processing/src/envelope_processing.rs index c2cfeae5d3..be6b7c1b29 100644 --- a/consensus/state_processing/src/envelope_processing.rs +++ b/consensus/state_processing/src/envelope_processing.rs @@ -21,7 +21,7 @@ macro_rules! envelope_verify { } /// The strategy to be used when validating the payloads state root. -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifyStateRoot { /// Validate state root. diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 037e1c7cc7..5aa610e98e 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -55,12 +55,12 @@ use crate::common::update_progressive_balances_cache::{ initialize_progressive_balances_cache, update_progressive_balances_metrics, }; use crate::epoch_cache::initialize_epoch_cache; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; use tracing::instrument; /// The strategy to be used when validating the block's signatures. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy, Debug)] pub enum BlockSignatureStrategy { /// Do not validate any signature. Use with caution. @@ -74,7 +74,7 @@ pub enum BlockSignatureStrategy { } /// The strategy to be used when validating the block's signatures. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifySignatures { /// Validate all signatures encountered. @@ -90,7 +90,7 @@ impl VerifySignatures { } /// Control verification of the latest block header. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifyBlockRoot { True, diff --git a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs index c5ec80b92a..3e4f7e8189 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs @@ -2,7 +2,7 @@ use crate::common::attesting_indices_base::get_attesting_indices; use safe_arith::SafeArith; use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, PendingAttestation}; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; /// Sets the boolean `var` on `self` to be true if it is true on `other`. Otherwise leaves `self` @@ -16,7 +16,7 @@ macro_rules! set_self_if_other_is_true { } /// The information required to reward a block producer for including an attestation in a block. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Clone, Copy, PartialEq)] pub struct InclusionInfo { /// The distance between the attestation slot and the slot that attestation was included in a @@ -48,7 +48,7 @@ impl InclusionInfo { } /// Information required to reward some validator during the current and previous epoch. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Default, Clone, PartialEq)] pub struct ValidatorStatus { /// True if the validator has been slashed, ever. @@ -118,7 +118,7 @@ impl ValidatorStatus { /// epochs. #[derive(Clone, Debug, PartialEq)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct TotalBalances { /// The effective balance increment from the spec. effective_balance_increment: u64, @@ -175,7 +175,7 @@ impl TotalBalances { /// Summarised information about validator participation in the _previous and _current_ epochs of /// some `BeaconState`. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Clone)] pub struct ValidatorStatuses { /// Information about each individual validator from the state's validator registry. diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index a13786f9f6..1e9c3d5fe3 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -7,7 +7,7 @@ use crate::per_block_processing::{ verify_attester_slashing, verify_bls_to_execution_change, verify_exit, verify_proposer_slashing, }; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; use educe::Educe; use smallvec::{SmallVec, smallvec}; @@ -41,14 +41,14 @@ pub trait TransformPersist { /// The inner `op` field is private, meaning instances of this type can only be constructed /// by calling `validate`. #[derive(Educe, Debug, Clone)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[educe( PartialEq, Eq, Hash(bound(T: TransformPersist + std::hash::Hash, E: EthSpec)) )] #[cfg_attr( - feature = "arbitrary-fuzz", + feature = "arbitrary", arbitrary(bound = "T: TransformPersist + Arbitrary<'arbitrary>, E: EthSpec") )] pub struct SigVerifiedOp { @@ -139,7 +139,7 @@ struct SigVerifiedOpDecode { /// We need to store multiple `ForkVersion`s because attester slashings contain two indexed /// attestations which may be signed using different versions. #[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode, TestRandom)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct VerifiedAgainst { fork_versions: SmallVec<[ForkVersion; MAX_FORKS_VERIFIED_AGAINST]>, } diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index e7e382714b..c5ced83320 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -22,7 +22,6 @@ arbitrary = [ "ssz_types/arbitrary", "swap_or_not_shuffle/arbitrary", ] -arbitrary-fuzz = ["arbitrary"] portable = ["bls/supranational-portable"] [dependencies] diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index 695a693385..8017941ca6 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -6,7 +6,7 @@ edition = { workspace = true } autotests = false [features] -arbitrary-fuzz = ["dep:arbitrary", "types/arbitrary-fuzz", "eip_3076/arbitrary-fuzz"] +arbitrary = ["dep:arbitrary", "types/arbitrary", "eip_3076/arbitrary"] portable = ["types/portable"] [dependencies] diff --git a/validator_client/slashing_protection/src/interchange_test.rs b/validator_client/slashing_protection/src/interchange_test.rs index c5c3df7ea4..996116dd1c 100644 --- a/validator_client/slashing_protection/src/interchange_test.rs +++ b/validator_client/slashing_protection/src/interchange_test.rs @@ -11,7 +11,7 @@ use tempfile::tempdir; use types::{Epoch, Hash256, Slot}; #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct MultiTestCase { pub name: String, pub genesis_validators_root: Hash256, @@ -19,7 +19,7 @@ pub struct MultiTestCase { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestCase { pub should_succeed: bool, pub contains_slashable_data: bool, @@ -29,7 +29,7 @@ pub struct TestCase { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestBlock { pub pubkey: PublicKeyBytes, pub slot: Slot, @@ -39,7 +39,7 @@ pub struct TestBlock { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestAttestation { pub pubkey: PublicKeyBytes, pub source_epoch: Epoch, From efe43f769967a971fa3006ec764e622109b04e6d Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Sat, 7 Mar 2026 08:09:33 +0900 Subject: [PATCH 11/13] Fix cargo-sort errors (#8945) The `cargo-sort` job in CI is [failing](https://github.com/sigp/lighthouse/actions/runs/22781651620/job/66088700318?pr=8932) since [cargo-sort v2.1.1](https://github.com/DevinR528/cargo-sort/releases/tag/v2.1.1) has been released, which reports new errors for our Cargo.toml files. Ran `cargo-sort` formatter locally with the new version. Co-Authored-By: ackintosh --- account_manager/Cargo.toml | 5 +---- beacon_node/Cargo.toml | 13 +++++-------- beacon_node/beacon_chain/Cargo.toml | 9 ++++++--- common/logging/Cargo.toml | 3 ++- consensus/types/Cargo.toml | 5 +---- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 8dd50cbc6e..05e6f12554 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "account_manager" version = { workspace = true } -authors = [ - "Paul Hauner ", - "Luke Anderson ", -] +authors = ["Paul Hauner ", "Luke Anderson "] edition = { workspace = true } [dependencies] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 5352814dd5..ebefa6a451 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "beacon_node" version = { workspace = true } -authors = [ - "Paul Hauner ", - "Age Manning ", "Age Manning "] edition = { workspace = true } [features] -test_logger = [] # Print log output to stderr when running tests instead of dropping it +# Print log output to stderr when running tests instead of dropping it. +test_logger = [] [dependencies] chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index c5ced83320..c09e3d6931 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "types" version = "0.2.1" -authors = [ - "Paul Hauner ", - "Age Manning ", -] +authors = ["Paul Hauner ", "Age Manning "] edition = { workspace = true } [features] From 537c2ba8b3e49dd9e93dd5df5b67003f5bb91f42 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 9 Mar 2026 11:35:52 +1100 Subject: [PATCH 12/13] Remove `/lighthouse/analysis/block_rewards` APIs (#8935) Mark pointed out that these APIs will require updates for Gloas, so I figured we may as well get rid of them. As far as I know, blockprint was the only use case and it is now defunct. The consensus block value is included in getBlock API responses, so there's no reason for VCs to use the `POST` API, and there is now a standard API for the rewards of canonical blocks. The SSE event was non-standard, and likely only used by blockprint as well. Co-Authored-By: Michael Sproul --- .github/forbidden-files.txt | 3 + beacon_node/beacon_chain/src/block_reward.rs | 140 -------------- .../beacon_chain/src/block_verification.rs | 18 -- beacon_node/beacon_chain/src/events.rs | 15 -- beacon_node/beacon_chain/src/lib.rs | 1 - beacon_node/http_api/src/block_rewards.rs | 178 ------------------ beacon_node/http_api/src/lib.rs | 34 ---- book/src/api_lighthouse.md | 54 ------ common/eth2/src/lighthouse.rs | 25 +-- common/eth2/src/lighthouse/block_rewards.rs | 60 ------ common/eth2/src/types.rs | 17 -- 11 files changed, 4 insertions(+), 541 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/block_reward.rs delete mode 100644 beacon_node/http_api/src/block_rewards.rs delete mode 100644 common/eth2/src/lighthouse/block_rewards.rs diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt index ec89bd2e4b..a08a6b4e98 100644 --- a/.github/forbidden-files.txt +++ b/.github/forbidden-files.txt @@ -5,3 +5,6 @@ beacon_node/beacon_chain/src/otb_verification_service.rs beacon_node/store/src/partial_beacon_state.rs beacon_node/store/src/consensus_context.rs +beacon_node/beacon_chain/src/block_reward.rs +beacon_node/http_api/src/block_rewards.rs +common/eth2/src/lighthouse/block_rewards.rs diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs deleted file mode 100644 index f3924bb473..0000000000 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ /dev/null @@ -1,140 +0,0 @@ -use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; -use operation_pool::{ - AttMaxCover, MaxCover, PROPOSER_REWARD_DENOMINATOR, RewardCache, SplitAttestation, -}; -use state_processing::{ - common::get_attesting_indices_from_state, - per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards, -}; -use types::{AbstractExecPayload, BeaconBlockRef, BeaconState, EthSpec, Hash256}; - -impl BeaconChain { - pub fn compute_block_reward>( - &self, - block: BeaconBlockRef<'_, T::EthSpec, Payload>, - block_root: Hash256, - state: &BeaconState, - reward_cache: &mut RewardCache, - include_attestations: bool, - ) -> Result { - if block.slot() != state.slot() { - return Err(BeaconChainError::BlockRewardSlotError); - } - - reward_cache.update(state)?; - - let total_active_balance = state.get_total_active_balance()?; - - let split_attestations = block - .body() - .attestations() - .map(|att| { - let attesting_indices = get_attesting_indices_from_state(state, att)?; - Ok(SplitAttestation::new( - att.clone_as_attestation(), - attesting_indices, - )) - }) - .collect::, BeaconChainError>>()?; - - let mut per_attestation_rewards = split_attestations - .iter() - .map(|att| { - AttMaxCover::new( - att.as_ref(), - state, - reward_cache, - total_active_balance, - &self.spec, - ) - .ok_or(BeaconChainError::BlockRewardAttestationError) - }) - .collect::, _>>()?; - - // Update the attestation rewards for each previous attestation included. - // This is O(n^2) in the number of attestations n. - for i in 0..per_attestation_rewards.len() { - let (updated, to_update) = per_attestation_rewards.split_at_mut(i + 1); - let latest_att = &updated[i]; - - for att in to_update { - att.update_covering_set(latest_att.intermediate(), latest_att.covering_set()); - } - } - - let mut prev_epoch_total = 0; - let mut curr_epoch_total = 0; - - for cover in &per_attestation_rewards { - if cover.att.data.slot.epoch(T::EthSpec::slots_per_epoch()) == state.current_epoch() { - curr_epoch_total += cover.score() as u64; - } else { - prev_epoch_total += cover.score() as u64; - } - } - - let attestation_total = prev_epoch_total + curr_epoch_total; - - // Drop the covers. - let per_attestation_rewards = per_attestation_rewards - .into_iter() - .map(|cover| { - // Divide each reward numerator by the denominator. This can lead to the total being - // less than the sum of the individual rewards due to the fact that integer division - // does not distribute over addition. - let mut rewards = cover.fresh_validators_rewards; - rewards - .values_mut() - .for_each(|reward| *reward /= PROPOSER_REWARD_DENOMINATOR); - rewards - }) - .collect(); - - // Add the attestation data if desired. - let attestations = if include_attestations { - block - .body() - .attestations() - .map(|a| a.data().clone()) - .collect() - } else { - vec![] - }; - - let attestation_rewards = AttestationRewards { - total: attestation_total, - prev_epoch_total, - curr_epoch_total, - per_attestation_rewards, - attestations, - }; - - // Sync committee rewards. - let sync_committee_rewards = if let Ok(sync_aggregate) = block.body().sync_aggregate() { - let (_, proposer_reward_per_bit) = compute_sync_aggregate_rewards(state, &self.spec) - .map_err(|_| BeaconChainError::BlockRewardSyncError)?; - sync_aggregate.sync_committee_bits.num_set_bits() as u64 * proposer_reward_per_bit - } else { - 0 - }; - - // Total, metadata - let total = attestation_total + sync_committee_rewards; - - let meta = BlockRewardMeta { - slot: block.slot(), - parent_slot: state.latest_block_header().slot, - proposer_index: block.proposer_index(), - graffiti: block.body().graffiti().as_utf8_lossy(), - }; - - Ok(BlockReward { - total, - block_root, - meta, - attestation_rewards, - sync_committee_rewards, - }) - } -} diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d126c3af00..2021b0d952 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1571,24 +1571,6 @@ impl ExecutionPendingBlock { metrics::stop_timer(committee_timer); - /* - * If we have block reward listeners, compute the block reward and push it to the - * event handler. - */ - if let Some(ref event_handler) = chain.event_handler - && event_handler.has_block_reward_subscribers() - { - let mut reward_cache = Default::default(); - let block_reward = chain.compute_block_reward( - block.message(), - block_root, - &state, - &mut reward_cache, - true, - )?; - event_handler.register(EventKind::BlockReward(block_reward)); - } - /* * Perform `per_block_processing` on the block and state, returning early if the block is * invalid. diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 63be944eea..276edc3fe6 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -21,7 +21,6 @@ pub struct ServerSentEventHandler { late_head: Sender>, light_client_finality_update_tx: Sender>, light_client_optimistic_update_tx: Sender>, - block_reward_tx: Sender>, proposer_slashing_tx: Sender>, attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, @@ -48,7 +47,6 @@ impl ServerSentEventHandler { let (late_head, _) = broadcast::channel(capacity); let (light_client_finality_update_tx, _) = broadcast::channel(capacity); let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity); - let (block_reward_tx, _) = broadcast::channel(capacity); let (proposer_slashing_tx, _) = broadcast::channel(capacity); let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); @@ -69,7 +67,6 @@ impl ServerSentEventHandler { late_head, light_client_finality_update_tx, light_client_optimistic_update_tx, - block_reward_tx, proposer_slashing_tx, attester_slashing_tx, bls_to_execution_change_tx, @@ -142,10 +139,6 @@ impl ServerSentEventHandler { .light_client_optimistic_update_tx .send(kind) .map(|count| log_count("light client optimistic update", count)), - EventKind::BlockReward(_) => self - .block_reward_tx - .send(kind) - .map(|count| log_count("block reward", count)), EventKind::ProposerSlashing(_) => self .proposer_slashing_tx .send(kind) @@ -224,10 +217,6 @@ impl ServerSentEventHandler { self.light_client_optimistic_update_tx.subscribe() } - pub fn subscribe_block_reward(&self) -> Receiver> { - self.block_reward_tx.subscribe() - } - pub fn subscribe_attester_slashing(&self) -> Receiver> { self.attester_slashing_tx.subscribe() } @@ -292,10 +281,6 @@ impl ServerSentEventHandler { self.late_head.receiver_count() > 0 } - pub fn has_block_reward_subscribers(&self) -> bool { - self.block_reward_tx.receiver_count() > 0 - } - pub fn has_proposer_slashing_subscribers(&self) -> bool { self.proposer_slashing_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e1a190ffb3..4d3c3e193e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -10,7 +10,6 @@ mod beacon_snapshot; pub mod bellatrix_readiness; pub mod blob_verification; mod block_production; -pub mod block_reward; mod block_times_cache; mod block_verification; pub mod block_verification_types; diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs deleted file mode 100644 index 891f024bf9..0000000000 --- a/beacon_node/http_api/src/block_rewards.rs +++ /dev/null @@ -1,178 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::lighthouse::{BlockReward, BlockRewardsQuery}; -use lru::LruCache; -use state_processing::BlockReplayer; -use std::num::NonZeroUsize; -use std::sync::Arc; -use tracing::{debug, warn}; -use types::block::BlindedBeaconBlock; -use types::new_non_zero_usize; -use warp_utils::reject::{beacon_state_error, custom_bad_request, unhandled_error}; - -const STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(2); - -/// Fetch block rewards for blocks from the canonical chain. -pub fn get_block_rewards( - query: BlockRewardsQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let start_slot = query.start_slot; - let end_slot = query.end_slot; - let prior_slot = start_slot - 1; - - if start_slot > end_slot || start_slot == 0 { - return Err(custom_bad_request(format!( - "invalid start and end: {}, {}", - start_slot, end_slot - ))); - } - - let end_block_root = chain - .block_root_at_slot(end_slot, WhenSlotSkipped::Prev) - .map_err(unhandled_error)? - .ok_or_else(|| custom_bad_request(format!("block at end slot {} unknown", end_slot)))?; - - let blocks = chain - .store - .load_blocks_to_replay(start_slot, end_slot, end_block_root) - .map_err(|e| unhandled_error(BeaconChainError::from(e)))?; - - let state_root = chain - .state_root_at_slot(prior_slot) - .map_err(unhandled_error)? - .ok_or_else(|| custom_bad_request(format!("prior state at slot {} unknown", prior_slot)))?; - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let mut state = chain - .get_state(&state_root, Some(prior_slot), true) - .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) - .map_err(unhandled_error)?; - - state - .build_caches(&chain.spec) - .map_err(beacon_state_error)?; - - let mut reward_cache = Default::default(); - let mut block_rewards = Vec::with_capacity(blocks.len()); - - let block_replayer = BlockReplayer::new(state, &chain.spec) - .pre_block_hook(Box::new(|state, block| { - state.build_all_committee_caches(&chain.spec)?; - - // Compute block reward. - let block_reward = chain.compute_block_reward( - block.message(), - block.canonical_root(), - state, - &mut reward_cache, - query.include_attestations, - )?; - block_rewards.push(block_reward); - Ok(()) - })) - .state_root_iter( - chain - .forwards_iter_state_roots_until(prior_slot, end_slot) - .map_err(unhandled_error)?, - ) - .no_signature_verification() - .minimal_block_root_verification() - .apply_blocks(blocks, None) - .map_err(unhandled_error)?; - - if block_replayer.state_root_miss() { - warn!(%start_slot, %end_slot, "Block reward state root miss"); - } - - drop(block_replayer); - - Ok(block_rewards) -} - -/// Compute block rewards for blocks passed in as input. -pub fn compute_block_rewards( - blocks: Vec>, - chain: Arc>, -) -> Result, warp::Rejection> { - let mut block_rewards = Vec::with_capacity(blocks.len()); - let mut state_cache = LruCache::new(STATE_CACHE_SIZE); - let mut reward_cache = Default::default(); - - for block in blocks { - let parent_root = block.parent_root(); - - // Check LRU cache for a constructed state from a previous iteration. - let state = if let Some(state) = state_cache.get(&(parent_root, block.slot())) { - debug!( - ?parent_root, - slot = %block.slot(), - "Re-using cached state for block rewards" - ); - state - } else { - debug!( - ?parent_root, - slot = %block.slot(), - "Fetching state for block rewards" - ); - let parent_block = chain - .get_blinded_block(&parent_root) - .map_err(unhandled_error)? - .ok_or_else(|| { - custom_bad_request(format!( - "parent block not known or not canonical: {:?}", - parent_root - )) - })?; - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) - .map_err(unhandled_error)? - .ok_or_else(|| { - custom_bad_request(format!( - "no state known for parent block: {:?}", - parent_root - )) - })?; - - let block_replayer = BlockReplayer::new(parent_state, &chain.spec) - .no_signature_verification() - .state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter()) - .minimal_block_root_verification() - .apply_blocks(vec![], Some(block.slot())) - .map_err(unhandled_error::)?; - - if block_replayer.state_root_miss() { - warn!( - parent_slot = %parent_block.slot(), - slot = %block.slot(), - "Block reward state root miss" - ); - } - - let mut state = block_replayer.into_state(); - state - .build_all_committee_caches(&chain.spec) - .map_err(beacon_state_error)?; - - state_cache.get_or_insert((parent_root, block.slot()), || state) - }; - - // Compute block reward. - let block_reward = chain - .compute_block_reward( - block.to_ref(), - block.canonical_root(), - state, - &mut reward_cache, - true, - ) - .map_err(unhandled_error)?; - block_rewards.push(block_reward); - } - - Ok(block_rewards) -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index e9dfa2876a..0a0ae683ca 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -12,7 +12,6 @@ mod attester_duties; mod beacon; mod block_id; mod block_packing_efficiency; -mod block_rewards; mod build_block_contents; mod builder_states; mod custody; @@ -3066,34 +3065,6 @@ pub fn serve( }, ); - // GET lighthouse/analysis/block_rewards - let get_lighthouse_block_rewards = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_rewards")) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then(|query, task_spawner: TaskSpawner, chain| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_rewards::get_block_rewards(query, chain) - }) - }); - - // POST lighthouse/analysis/block_rewards - let post_lighthouse_block_rewards = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_rewards")) - .and(warp_utils::json::json()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then(|blocks, task_spawner: TaskSpawner, chain| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_rewards::compute_block_rewards(blocks, chain) - }) - }); - // GET lighthouse/analysis/attestation_performance/{index} let get_lighthouse_attestation_performance = warp::path("lighthouse") .and(warp::path("analysis")) @@ -3184,9 +3155,6 @@ pub fn serve( api_types::EventTopic::LightClientOptimisticUpdate => { event_handler.subscribe_light_client_optimistic_update() } - api_types::EventTopic::BlockReward => { - event_handler.subscribe_block_reward() - } api_types::EventTopic::AttesterSlashing => { event_handler.subscribe_attester_slashing() } @@ -3363,7 +3331,6 @@ pub fn serve( .uor(get_lighthouse_staking) .uor(get_lighthouse_database_info) .uor(get_lighthouse_custody_info) - .uor(get_lighthouse_block_rewards) .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) .uor(get_beacon_light_client_finality_update) @@ -3414,7 +3381,6 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) - .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) .uor(post_lighthouse_finalize) diff --git a/book/src/api_lighthouse.md b/book/src/api_lighthouse.md index 0442bf4ec0..2fd7290cb2 100644 --- a/book/src/api_lighthouse.md +++ b/book/src/api_lighthouse.md @@ -590,60 +590,6 @@ Caveats: This is because the state *prior* to the `start_epoch` needs to be loaded from the database, and loading a state on a boundary is most efficient. -## `/lighthouse/analysis/block_rewards` - -Fetch information about the block rewards paid to proposers for a range of consecutive blocks. - -Two query parameters are required: - -- `start_slot` (inclusive): the slot of the first block to compute rewards for. -- `end_slot` (inclusive): the slot of the last block to compute rewards for. - -Example: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/block_rewards?start_slot=1&end_slot=1" | jq -``` - -The first few lines of the response would look like: - -```json -[ - { - "total": 637260, - "block_root": "0x4a089c5e390bb98e66b27358f157df825128ea953cee9d191229c0bcf423a4f6", - "meta": { - "slot": "1", - "parent_slot": "0", - "proposer_index": 93, - "graffiti": "EF #vm-eth2-raw-iron-101" - }, - "attestation_rewards": { - "total": 637260, - "prev_epoch_total": 0, - "curr_epoch_total": 637260, - "per_attestation_rewards": [ - { - "50102": 780, - } - ] - } - } -] -``` - -Caveats: - -- Presently only attestation and sync committee rewards are computed. -- The output format is verbose and subject to change. Please see [`BlockReward`][block_reward_src] - in the source. -- For maximum efficiency the `start_slot` should satisfy `start_slot % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_slot` needs to be loaded from the database, and - loading a state on a boundary is most efficient. - -[block_reward_src]: -https://github.com/sigp/lighthouse/tree/unstable/common/eth2/src/lighthouse/block_rewards.rs - ## `/lighthouse/analysis/block_packing` Fetch information about the block packing efficiency of blocks for a range of consecutive diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 993c263cbf..3c039b16b3 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -2,12 +2,11 @@ mod attestation_performance; mod block_packing_efficiency; -mod block_rewards; mod custody; pub mod sync_state; use crate::{ - BeaconNodeHttpClient, DepositData, Error, Hash256, Slot, + BeaconNodeHttpClient, DepositData, Error, Hash256, lighthouse::sync_state::SyncState, types::{AdminPeer, Epoch, GenericResponse, ValidatorId}, }; @@ -22,7 +21,6 @@ pub use attestation_performance::{ pub use block_packing_efficiency::{ BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, }; -pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use custody::CustodyInfo; // Define "legacy" implementations of `Option` which use four bytes for encoding the union @@ -317,27 +315,6 @@ impl BeaconNodeHttpClient { Analysis endpoints. */ - /// `GET` lighthouse/analysis/block_rewards?start_slot,end_slot - pub async fn get_lighthouse_analysis_block_rewards( - &self, - start_slot: Slot, - end_slot: Slot, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("block_rewards"); - - path.query_pairs_mut() - .append_pair("start_slot", &start_slot.to_string()) - .append_pair("end_slot", &end_slot.to_string()); - - self.get(path).await - } - /// `GET` lighthouse/analysis/block_packing?start_epoch,end_epoch pub async fn get_lighthouse_analysis_block_packing( &self, diff --git a/common/eth2/src/lighthouse/block_rewards.rs b/common/eth2/src/lighthouse/block_rewards.rs deleted file mode 100644 index 38070f3539..0000000000 --- a/common/eth2/src/lighthouse/block_rewards.rs +++ /dev/null @@ -1,60 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use types::{AttestationData, Hash256, Slot}; - -/// Details about the rewards paid to a block proposer for proposing a block. -/// -/// All rewards in GWei. -/// -/// Presently this only counts attestation rewards, but in future should be expanded -/// to include information on slashings and sync committee aggregates too. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockReward { - /// Sum of all reward components. - pub total: u64, - /// Block root of the block that these rewards are for. - pub block_root: Hash256, - /// Metadata about the block, particularly reward-relevant metadata. - pub meta: BlockRewardMeta, - /// Rewards due to attestations. - pub attestation_rewards: AttestationRewards, - /// Sum of rewards due to sync committee signatures. - pub sync_committee_rewards: u64, -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockRewardMeta { - pub slot: Slot, - pub parent_slot: Slot, - pub proposer_index: u64, - pub graffiti: String, -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationRewards { - /// Total block reward from attestations included. - pub total: u64, - /// Total rewards from previous epoch attestations. - pub prev_epoch_total: u64, - /// Total rewards from current epoch attestations. - pub curr_epoch_total: u64, - /// Vec of attestation rewards for each attestation included. - /// - /// Each element of the vec is a map from validator index to reward. - pub per_attestation_rewards: Vec>, - /// The attestations themselves (optional). - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub attestations: Vec, -} - -/// Query parameters for the `/lighthouse/block_rewards` endpoint. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockRewardsQuery { - /// Lower slot limit for block rewards returned (inclusive). - pub start_slot: Slot, - /// Upper slot limit for block rewards returned (inclusive). - pub end_slot: Slot, - /// Include the full attestations themselves? - #[serde(default)] - pub include_attestations: bool, -} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f8376d430c..2f86170812 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -37,9 +37,6 @@ pub mod beacon_response { pub use crate::beacon_response::*; } -#[cfg(feature = "lighthouse")] -use crate::lighthouse::BlockReward; - // Re-export error types from the unified error module pub use crate::error::{ErrorMessage, Failure, IndexedErrorMessage, ResponseError as Error}; @@ -1199,8 +1196,6 @@ pub enum EventKind { LateHead(SseLateHead), LightClientFinalityUpdate(Box>>), LightClientOptimisticUpdate(Box>>), - #[cfg(feature = "lighthouse")] - BlockReward(BlockReward), PayloadAttributes(VersionedSsePayloadAttributes), ProposerSlashing(Box), AttesterSlashing(Box>), @@ -1225,8 +1220,6 @@ impl EventKind { EventKind::LateHead(_) => "late_head", EventKind::LightClientFinalityUpdate(_) => "light_client_finality_update", EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update", - #[cfg(feature = "lighthouse")] - EventKind::BlockReward(_) => "block_reward", EventKind::ProposerSlashing(_) => "proposer_slashing", EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", @@ -1302,10 +1295,6 @@ impl EventKind { })?), ))) } - #[cfg(feature = "lighthouse")] - "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( - |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), - )?)), "attester_slashing" => Ok(EventKind::AttesterSlashing( serde_json::from_str(data).map_err(|e| { ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e)) @@ -1355,8 +1344,6 @@ pub enum EventTopic { PayloadAttributes, LightClientFinalityUpdate, LightClientOptimisticUpdate, - #[cfg(feature = "lighthouse")] - BlockReward, AttesterSlashing, ProposerSlashing, BlsToExecutionChange, @@ -1382,8 +1369,6 @@ impl FromStr for EventTopic { "late_head" => Ok(EventTopic::LateHead), "light_client_finality_update" => Ok(EventTopic::LightClientFinalityUpdate), "light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate), - #[cfg(feature = "lighthouse")] - "block_reward" => Ok(EventTopic::BlockReward), "attester_slashing" => Ok(EventTopic::AttesterSlashing), "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), @@ -1410,8 +1395,6 @@ impl fmt::Display for EventTopic { EventTopic::LateHead => write!(f, "late_head"), EventTopic::LightClientFinalityUpdate => write!(f, "light_client_finality_update"), EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"), - #[cfg(feature = "lighthouse")] - EventTopic::BlockReward => write!(f, "block_reward"), EventTopic::AttesterSlashing => write!(f, "attester_slashing"), EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), From 7dab32dd16c997f592836f9728b2a2b4bf3ba756 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 9 Mar 2026 14:23:34 +0900 Subject: [PATCH 13/13] Gloas payload envelope processing (#8806) Co-Authored-By: Eitan Seri- Levi Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Jimmy Chen Co-Authored-By: Michael Sproul Co-Authored-By: Michael Sproul --- beacon_node/beacon_chain/src/beacon_chain.rs | 106 +---- .../beacon_chain/src/beacon_proposer_cache.rs | 79 +++- .../beacon_chain/src/block_verification.rs | 5 +- beacon_node/beacon_chain/src/builder.rs | 1 + .../beacon_chain/src/envelope_times_cache.rs | 197 ++++++++ .../beacon_chain/src/execution_payload.rs | 37 +- .../beacon_chain/src/historical_blocks.rs | 9 +- beacon_node/beacon_chain/src/lib.rs | 2 + beacon_node/beacon_chain/src/metrics.rs | 28 ++ .../execution_pending_envelope.rs | 105 +++++ .../gossip_verified_envelope.rs | 445 ++++++++++++++++++ .../payload_envelope_verification/import.rs | 354 ++++++++++++++ .../src/payload_envelope_verification/mod.rs | 285 +++++++++++ .../payload_notifier.rs | 94 ++++ .../beacon_chain/tests/block_verification.rs | 2 +- beacon_node/network/src/metrics.rs | 10 + .../gossip_methods.rs | 172 ++++++- .../src/network_beacon_processor/mod.rs | 8 +- beacon_node/network/src/router.rs | 1 + 19 files changed, 1813 insertions(+), 127 deletions(-) create mode 100644 beacon_node/beacon_chain/src/envelope_times_cache.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/import.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs create mode 100644 beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 703ed24420..07f3bb01fa 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -4,9 +4,7 @@ use crate::attestation_verification::{ batch_verify_unaggregated_attestations, }; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches}; -use crate::beacon_proposer_cache::{ - BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch, -}; +use crate::beacon_proposer_cache::{BeaconProposerCache, EpochBlockProposers}; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::{ @@ -26,6 +24,7 @@ use crate::data_availability_checker::{ }; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; +use crate::envelope_times_cache::EnvelopeTimesCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::events::ServerSentEventHandler; use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_execution_payload}; @@ -66,7 +65,6 @@ use crate::sync_committee_verification::{ }; use crate::validator_monitor::{ HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS, ValidatorMonitor, get_slot_delay_ms, - timestamp_now, }; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ @@ -462,6 +460,8 @@ pub struct BeaconChain { pub early_attester_cache: EarlyAttesterCache, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, + /// A cache used to keep track of various envelope timings. + pub envelope_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, /// A cache used to produce light_client server messages @@ -4042,23 +4042,10 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, block_data) = signed_block.deconstruct(); - match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) { - Ok(Some(blobs_or_columns_store_op)) => { - ops.push(blobs_or_columns_store_op); - } - Ok(None) => {} - Err(e) => { - error!( - msg = "Restoring fork choice from disk", - error = &e, - ?block_root, - "Failed to store data columns into the database" - ); - return Err(self - .handle_import_block_db_write_error(fork_choice) - .err() - .unwrap_or(BlockError::InternalError(e))); - } + if let Some(blobs_or_columns_store_op) = + self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) + { + ops.push(blobs_or_columns_store_op); } let block = signed_block.message(); @@ -4088,7 +4075,7 @@ impl BeaconChain { // We're declaring the block "imported" at this point, since fork choice and the DB know // about it. - let block_time_imported = timestamp_now(); + let block_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); // compute state proofs for light client updates before inserting the state into the // snapshot cache. @@ -4157,7 +4144,7 @@ impl BeaconChain { } /// Check block's consistentency with any configured weak subjectivity checkpoint. - fn check_block_against_weak_subjectivity_checkpoint( + pub(crate) fn check_block_against_weak_subjectivity_checkpoint( &self, block: BeaconBlockRef, block_root: Hash256, @@ -6407,6 +6394,7 @@ impl BeaconChain { // sync anyway). self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); + self.envelope_times_cache.write().prune(slot); // Don't run heavy-weight tasks during sync. if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot { @@ -6466,62 +6454,14 @@ impl BeaconChain { accessor: impl Fn(&EpochBlockProposers) -> Result, state_provider: impl FnOnce() -> Result<(Hash256, BeaconState), E>, ) -> Result { - let cache_entry = self - .beacon_proposer_cache - .lock() - .get_or_insert_key(proposal_epoch, shuffling_decision_block); - - // If the cache entry is not initialised, run the code to initialise it inside a OnceCell. - // This prevents duplication of work across multiple threads. - // - // If it is already initialised, then `get_or_try_init` will return immediately without - // executing the initialisation code at all. - let epoch_block_proposers = cache_entry.get_or_try_init(|| { - // Fetch the state on-demand if the required epoch was missing from the cache. - // If the caller wants to not compute the state they must return an error here and then - // catch it at the call site. - let (state_root, mut state) = state_provider()?; - - // Ensure the state can compute proposer duties for `epoch`. - ensure_state_can_determine_proposers_for_epoch( - &mut state, - state_root, - proposal_epoch, - &self.spec, - )?; - - // Sanity check the state. - let latest_block_root = state.get_latest_block_root(state_root); - let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch( - proposal_epoch, - latest_block_root, - &self.spec, - )?; - if state_decision_block_root != shuffling_decision_block { - return Err(Error::ProposerCacheIncorrectState { - state_decision_block_root, - requested_decision_block_root: shuffling_decision_block, - } - .into()); - } - - let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?; - - // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have - // advanced the state completely into the new epoch. - let fork = self.spec.fork_at_epoch(proposal_epoch); - - debug!( - ?shuffling_decision_block, - epoch = %proposal_epoch, - "Priming proposer shuffling cache" - ); - - Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers)) - })?; - - // Run the accessor function on the computed epoch proposers. - accessor(epoch_block_proposers).map_err(Into::into) + crate::beacon_proposer_cache::with_proposer_cache( + &self.beacon_proposer_cache, + shuffling_decision_block, + proposal_epoch, + accessor, + state_provider, + &self.spec, + ) } /// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head @@ -7197,16 +7137,16 @@ impl BeaconChain { block_root: Hash256, block_slot: Slot, block_data: AvailableBlockData, - ) -> Result>, String> { + ) -> Option> { match block_data { - AvailableBlockData::NoData => Ok(None), + AvailableBlockData::NoData => None, AvailableBlockData::Blobs(blobs) => { debug!( %block_root, count = blobs.len(), "Writing blobs to store" ); - Ok(Some(StoreOp::PutBlobs(block_root, blobs))) + Some(StoreOp::PutBlobs(block_root, blobs)) } AvailableBlockData::DataColumns(mut data_columns) => { let columns_to_custody = self.custody_columns_for_epoch(Some( @@ -7222,7 +7162,7 @@ impl BeaconChain { count = data_columns.len(), "Writing data columns to store" ); - Ok(Some(StoreOp::PutDataColumns(block_root, data_columns))) + Some(StoreOp::PutDataColumns(block_root, data_columns)) } } } diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index 912f7f3bad..b258d7471f 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -12,12 +12,13 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use fork_choice::ExecutionStatus; use lru::LruCache; use once_cell::sync::OnceCell; +use parking_lot::Mutex; use safe_arith::SafeArith; use smallvec::SmallVec; use state_processing::state_advance::partial_state_advance; use std::num::NonZeroUsize; use std::sync::Arc; -use tracing::instrument; +use tracing::{debug, instrument}; use typenum::Unsigned; use types::new_non_zero_usize; use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot}; @@ -164,6 +165,82 @@ impl BeaconProposerCache { } } +/// Access the proposer cache, computing and caching the proposers if necessary. +/// +/// This is a free function that operates on references to the cache and spec, decoupled from +/// `BeaconChain`. The `accessor` is called with the cached `EpochBlockProposers` for the given +/// `(proposal_epoch, shuffling_decision_block)` key. If the cache entry is missing, the +/// `state_provider` closure is called to produce a state which is then used to compute and +/// cache the proposers. +pub fn with_proposer_cache( + beacon_proposer_cache: &Mutex, + shuffling_decision_block: Hash256, + proposal_epoch: Epoch, + accessor: impl Fn(&EpochBlockProposers) -> Result, + state_provider: impl FnOnce() -> Result<(Hash256, BeaconState), Err>, + spec: &ChainSpec, +) -> Result +where + Spec: EthSpec, + Err: From + From, +{ + let cache_entry = beacon_proposer_cache + .lock() + .get_or_insert_key(proposal_epoch, shuffling_decision_block); + + // If the cache entry is not initialised, run the code to initialise it inside a OnceCell. + // This prevents duplication of work across multiple threads. + // + // If it is already initialised, then `get_or_try_init` will return immediately without + // executing the initialisation code at all. + let epoch_block_proposers = cache_entry.get_or_try_init(|| { + // Fetch the state on-demand if the required epoch was missing from the cache. + // If the caller wants to not compute the state they must return an error here and then + // catch it at the call site. + let (state_root, mut state) = state_provider()?; + + // Ensure the state can compute proposer duties for `epoch`. + ensure_state_can_determine_proposers_for_epoch( + &mut state, + state_root, + proposal_epoch, + spec, + )?; + + // Sanity check the state. + let latest_block_root = state.get_latest_block_root(state_root); + let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch( + proposal_epoch, + latest_block_root, + spec, + )?; + if state_decision_block_root != shuffling_decision_block { + return Err(BeaconChainError::ProposerCacheIncorrectState { + state_decision_block_root, + requested_decision_block_root: shuffling_decision_block, + } + .into()); + } + + let proposers = state.get_beacon_proposer_indices(proposal_epoch, spec)?; + + // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have + // advanced the state completely into the new epoch. + let fork = spec.fork_at_epoch(proposal_epoch); + + debug!( + ?shuffling_decision_block, + epoch = %proposal_epoch, + "Priming proposer shuffling cache" + ); + + Ok::<_, Err>(EpochBlockProposers::new(proposal_epoch, fork, proposers)) + })?; + + // Run the accessor function on the computed epoch proposers. + accessor(epoch_block_proposers).map_err(Into::into) +} + /// Compute the proposer duties using the head state without cache. /// /// Return: diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 2021b0d952..b748bf5c6c 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -681,7 +681,8 @@ pub struct SignatureVerifiedBlock { } /// Used to await the result of executing payload with an EE. -type PayloadVerificationHandle = JoinHandle>>; +pub type PayloadVerificationHandle = + JoinHandle>>; /// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and /// ready to import into the `BeaconChain`. The validation includes: @@ -1357,7 +1358,7 @@ impl ExecutionPendingBlock { /// verification must be done upstream (e.g., via a `SignatureVerifiedBlock` /// /// Returns an error if the block is invalid, or if the block was unable to be verified. - #[instrument(skip_all, level = "debug")] + #[instrument(skip_all, level = "debug", fields(?block_root))] pub fn from_signature_verified_components( block: MaybeAvailableBlock, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 66a54d46e8..d5935b492a 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1023,6 +1023,7 @@ where )), beacon_proposer_cache, block_times_cache: <_>::default(), + envelope_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), validator_pubkey_cache: RwLock::new(validator_pubkey_cache), early_attester_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/envelope_times_cache.rs b/beacon_node/beacon_chain/src/envelope_times_cache.rs new file mode 100644 index 0000000000..84c936c210 --- /dev/null +++ b/beacon_node/beacon_chain/src/envelope_times_cache.rs @@ -0,0 +1,197 @@ +//! This module provides the `EnvelopeTimesCache` which contains information regarding payload +//! envelope timings. +//! +//! This provides `BeaconChain` and associated functions with access to the timestamps of when a +//! payload envelope was observed, verified, executed, and imported. +//! This allows for better traceability and allows us to determine the root cause for why an +//! envelope was imported late. +//! This allows us to distinguish between the following scenarios: +//! - The envelope was observed late. +//! - Consensus verification was slow. +//! - Execution verification was slow. +//! - The DB write was slow. + +use eth2::types::{Hash256, Slot}; +use std::collections::HashMap; +use std::time::Duration; + +type BlockRoot = Hash256; + +#[derive(Clone, Default)] +pub struct EnvelopeTimestamps { + /// When the envelope was first observed (gossip or RPC). + pub observed: Option, + /// When consensus verification (state transition) completed. + pub consensus_verified: Option, + /// When execution layer verification started. + pub started_execution: Option, + /// When execution layer verification completed. + pub executed: Option, + /// When the envelope was imported into the DB. + pub imported: Option, +} + +/// Delay data for envelope processing, computed relative to the slot start time. +#[derive(Debug, Default)] +pub struct EnvelopeDelays { + /// Time after start of slot we saw the envelope. + pub observed: Option, + /// The time it took to complete consensus verification of the envelope. + pub consensus_verification_time: Option, + /// The time it took to complete execution verification of the envelope. + pub execution_time: Option, + /// Time after execution until the envelope was imported. + pub imported: Option, +} + +impl EnvelopeDelays { + fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays { + let observed = times + .observed + .and_then(|observed_time| observed_time.checked_sub(slot_start_time)); + let consensus_verification_time = times + .consensus_verified + .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); + let execution_time = times + .executed + .and_then(|executed| executed.checked_sub(times.started_execution?)); + let imported = times + .imported + .and_then(|imported_time| imported_time.checked_sub(times.executed?)); + EnvelopeDelays { + observed, + consensus_verification_time, + execution_time, + imported, + } + } +} + +pub struct EnvelopeTimesCacheValue { + pub slot: Slot, + pub timestamps: EnvelopeTimestamps, + pub peer_id: Option, +} + +impl EnvelopeTimesCacheValue { + fn new(slot: Slot) -> Self { + EnvelopeTimesCacheValue { + slot, + timestamps: Default::default(), + peer_id: None, + } + } +} + +#[derive(Default)] +pub struct EnvelopeTimesCache { + pub cache: HashMap, +} + +impl EnvelopeTimesCache { + /// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than + /// any previous timestamp at which this envelope was observed. + pub fn set_time_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + peer_id: Option, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + match entry.timestamps.observed { + Some(existing) if existing <= timestamp => { + // Existing timestamp is earlier, do nothing. + } + _ => { + entry.timestamps.observed = Some(timestamp); + entry.peer_id = peer_id; + } + } + } + + /// Set the timestamp for `field` if that timestamp is less than any previously known value. + fn set_time_if_less( + &mut self, + block_root: BlockRoot, + slot: Slot, + field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option, + timestamp: Duration, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + let existing_timestamp = field(&mut entry.timestamps); + if existing_timestamp.is_none_or(|prev| timestamp < prev) { + *existing_timestamp = Some(timestamp); + } + } + + pub fn set_time_consensus_verified( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.consensus_verified, + timestamp, + ) + } + + pub fn set_time_started_execution( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.started_execution, + timestamp, + ) + } + + pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.executed, + timestamp, + ) + } + + pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.imported, + timestamp, + ) + } + + pub fn get_envelope_delays( + &self, + block_root: BlockRoot, + slot_start_time: Duration, + ) -> EnvelopeDelays { + if let Some(entry) = self.cache.get(&block_root) { + EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time) + } else { + EnvelopeDelays::default() + } + } + + /// Prune the cache to only store the most recent 2 epochs. + pub fn prune(&mut self, current_slot: Slot) { + self.cache + .retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64)); + } +} diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index a2ebed32ee..2b03a095f1 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -25,7 +25,6 @@ use state_processing::per_block_processing::{ use std::sync::Arc; use tokio::task::JoinHandle; use tracing::{Instrument, debug_span, warn}; -use tree_hash::TreeHash; use types::execution::BlockProductionVersion; use types::*; @@ -109,12 +108,18 @@ impl PayloadNotifier { if let Some(precomputed_status) = self.payload_verification_status { Ok(precomputed_status) } else { - notify_new_payload(&self.chain, self.block.message()).await + notify_new_payload( + &self.chain, + self.block.message().slot(), + self.block.message().parent_root(), + self.block.message().try_into()?, + ) + .await } } } -/// Verify that `execution_payload` contained by `block` is considered valid by an execution +/// Verify that `execution_payload` is considered valid by an execution /// engine. /// /// ## Specification @@ -123,17 +128,21 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload( +pub async fn notify_new_payload( chain: &Arc>, - block: BeaconBlockRef<'_, T::EthSpec>, + slot: Slot, + parent_beacon_block_root: Hash256, + new_payload_request: NewPayloadRequest<'_, T::EthSpec>, ) -> Result { let execution_layer = chain .execution_layer .as_ref() .ok_or(ExecutionPayloadError::NoExecutionConnection)?; - let execution_block_hash = block.execution_payload()?.block_hash(); - let new_payload_response = execution_layer.notify_new_payload(block.try_into()?).await; + let execution_block_hash = new_payload_request.execution_payload_ref().block_hash(); + let new_payload_response = execution_layer + .notify_new_payload(new_payload_request.clone()) + .await; match new_payload_response { Ok(status) => match status { @@ -149,10 +158,7 @@ async fn notify_new_payload( ?validation_error, ?latest_valid_hash, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + %slot, method = "new_payload", "Invalid execution payload" ); @@ -175,11 +181,9 @@ async fn notify_new_payload( { // This block has not yet been applied to fork choice, so the latest block that was // imported to fork choice was the parent. - let latest_root = block.parent_root(); - chain .process_invalid_execution_payload(&InvalidationOperation::InvalidateMany { - head_block_root: latest_root, + head_block_root: parent_beacon_block_root, always_invalidate_head: false, latest_valid_ancestor: latest_valid_hash, }) @@ -194,10 +198,7 @@ async fn notify_new_payload( warn!( ?validation_error, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + %slot, method = "new_payload", "Invalid execution payload block hash" ); diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 1dae2258f6..bfda52558e 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -165,13 +165,8 @@ impl BeaconChain { } // Store the blobs or data columns too - if let Some(op) = self - .get_blobs_or_columns_store_op(block_root, block.slot(), block_data) - .map_err(|e| { - HistoricalBlockError::StoreError(StoreError::DBError { - message: format!("get_blobs_or_columns_store_op error {e:?}"), - }) - })? + if let Some(op) = + self.get_blobs_or_columns_store_op(block_root, block.slot(), block_data) { blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?); } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4d3c3e193e..4efd90bd22 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -20,6 +20,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +pub mod envelope_times_cache; mod errors; pub mod events; pub mod execution_payload; @@ -41,6 +42,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_envelope_verification; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9de67ca93f..786daa09da 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -21,6 +21,34 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str = "validator_monitor_attestation_simulator_source_attester_miss_total"; +/* +* Execution Payload Envelope Processing +*/ + +pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_requests_total", + "Count of payload envelopes submitted for processing", + ) +}); +pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_successes_total", + "Count of payload envelopes processed without error", + ) +}); +pub static ENVELOPE_PROCESSING_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_seconds", + "Full runtime of payload envelope processing", + ) +}); +pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_db_write_seconds", + "Time spent writing a newly processed payload envelope and state to DB", + ) +}); /* * Block Processing */ diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs new file mode 100644 index 0000000000..86f9293c8f --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use slot_clock::SlotClock; +use state_processing::{ + VerifySignatures, + envelope_processing::{VerifyStateRoot, process_execution_payload_envelope}, +}; +use types::EthSpec; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, + PayloadVerificationOutcome, + block_verification::PayloadVerificationHandle, + payload_envelope_verification::{ + EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, + gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, + payload_notifier::PayloadNotifier, + }, +}; + +pub struct ExecutionPendingEnvelope { + pub signed_envelope: MaybeAvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_handle: PayloadVerificationHandle, +} + +impl GossipVerifiedEnvelope { + pub fn into_execution_pending_envelope( + self, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result, EnvelopeError> { + let signed_envelope = self.signed_envelope; + let envelope = &signed_envelope.message; + let payload = &envelope.payload; + + // Define a future that will verify the execution payload with an execution engine. + // + // We do this as early as possible so that later parts of this function can run in parallel + // with the payload verification. + let payload_notifier = PayloadNotifier::new( + chain.clone(), + signed_envelope.clone(), + self.block.clone(), + notify_execution_layer, + )?; + let block_root = envelope.beacon_block_root; + let slot = self.block.slot(); + + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_started_execution(block_root, slot, started_execution); + } + + let payload_verification_status = payload_notifier.notify_new_payload().await?; + Ok(PayloadVerificationOutcome { + payload_verification_status, + }) + }; + // Spawn the payload verification future as a new task, but don't wait for it to complete. + // The `payload_verification_future` will be awaited later to ensure verification completed + // successfully. + let payload_verification_handle = chain + .task_executor + .spawn_handle( + payload_verification_future, + "execution_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + + let snapshot = if let Some(snapshot) = self.snapshot { + *snapshot + } else { + load_snapshot_from_state_root::(block_root, self.block.state_root(), &chain.store)? + }; + let mut state = snapshot.pre_state; + + // All the state modifications are done in envelope_processing + process_execution_payload_envelope( + &mut state, + Some(snapshot.state_root), + &signed_envelope, + // verify signature already done for GossipVerifiedEnvelope + VerifySignatures::False, + VerifyStateRoot::True, + &chain.spec, + )?; + + Ok(ExecutionPendingEnvelope { + signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { + block_hash: payload.block_hash, + envelope: signed_envelope, + }, + import_data: EnvelopeImportData { + block_root, + post_state: Box::new(state), + }, + payload_verification_handle, + }) + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs new file mode 100644 index 0000000000..03a3a91ac5 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -0,0 +1,445 @@ +use std::sync::Arc; + +use educe::Educe; +use parking_lot::{Mutex, RwLock}; +use store::DatabaseBlock; +use tracing::{Span, debug}; +use types::{ + ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD, +}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore, + beacon_proposer_cache::{self, BeaconProposerCache}, + canonical_head::CanonicalHead, + payload_envelope_verification::{ + EnvelopeError, EnvelopeProcessingSnapshot, load_snapshot_from_state_root, + }, + validator_pubkey_cache::ValidatorPubkeyCache, +}; + +/// Bundles only the dependencies needed for gossip verification of execution payload envelopes, +/// decoupling `GossipVerifiedEnvelope::new` from the full `BeaconChain`. +pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { + pub canonical_head: &'a CanonicalHead, + pub store: &'a BeaconStore, + pub spec: &'a ChainSpec, + pub beacon_proposer_cache: &'a Mutex, + pub validator_pubkey_cache: &'a RwLock>, + pub genesis_validators_root: Hash256, +} + +/// Verify that an execution payload envelope is consistent with its beacon block +/// and execution bid. +pub(crate) fn verify_envelope_consistency( + envelope: &ExecutionPayloadEnvelope, + block: &SignedBeaconBlock, + execution_bid: &ExecutionPayloadBid, + latest_finalized_slot: Slot, +) -> Result<(), EnvelopeError> { + // Check that the envelope's slot isn't from a slot prior + // to the latest finalized slot. + if envelope.slot < latest_finalized_slot { + return Err(EnvelopeError::PriorToFinalization { + payload_slot: envelope.slot, + latest_finalized_slot, + }); + } + + // Check that the slot of the envelope matches the slot of the block. + if envelope.slot != block.slot() { + return Err(EnvelopeError::SlotMismatch { + block: block.slot(), + envelope: envelope.slot, + }); + } + + // Builder index matches committed bid. + if envelope.builder_index != execution_bid.builder_index { + return Err(EnvelopeError::BuilderIndexMismatch { + committed_bid: execution_bid.builder_index, + envelope: envelope.builder_index, + }); + } + + // The block hash should match the block hash of the execution bid. + if envelope.payload.block_hash != execution_bid.block_hash { + return Err(EnvelopeError::BlockHashMismatch { + committed_bid: execution_bid.block_hash, + envelope: envelope.payload.block_hash, + }); + } + + Ok(()) +} + +/// A wrapper around a `SignedExecutionPayloadEnvelope` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Educe)] +#[educe(Debug(bound = "T: BeaconChainTypes"))] +pub struct GossipVerifiedEnvelope { + pub signed_envelope: Arc>, + pub block: Arc>, + pub snapshot: Option>>, +} + +impl GossipVerifiedEnvelope { + pub fn new( + signed_envelope: Arc>, + ctx: &GossipVerificationContext<'_, T>, + ) -> Result { + let envelope = &signed_envelope.message; + let beacon_block_root = envelope.beacon_block_root; + + // Check that we've seen the beacon block for this envelope and that it passes validation. + // TODO(EIP-7732): We might need some type of status table in order to differentiate between: + // If we have a block_processing_table, we could have a Processed(Bid, bool) state that is only + // entered post adding to fork choice. That way, we could potentially need only a single call to make + // sure the block is valid and to do all consequent checks with the bid + // + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // + // Presently these two cases are conflated. + let fork_choice_read_lock = ctx.canonical_head.fork_choice_read_lock(); + let Some(proto_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { + return Err(EnvelopeError::BlockRootUnknown { + block_root: beacon_block_root, + }); + }; + + drop(fork_choice_read_lock); + + let latest_finalized_slot = ctx + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // TODO(EIP-7732): check that we haven't seen another valid `SignedExecutionPayloadEnvelope` + // for this block root from this builder - envelope status table check + let block = match ctx.store.try_get_full_block(&beacon_block_root)? { + Some(DatabaseBlock::Full(block)) => Arc::new(block), + Some(DatabaseBlock::Blinded(_)) | None => { + return Err(EnvelopeError::from(BeaconChainError::MissingBeaconBlock( + beacon_block_root, + ))); + } + }; + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid()? + .message; + + verify_envelope_consistency(envelope, &block, execution_bid, latest_finalized_slot)?; + + // Verify the envelope signature. + // + // For self-built envelopes, we can use the proposer cache for the fork and the + // validator pubkey cache for the proposer's pubkey, avoiding a state load from disk. + // For external builder envelopes, we must load the state to access the builder registry. + let builder_index = envelope.builder_index; + let block_slot = envelope.slot; + let envelope_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch()); + // Since the payload's block is already guaranteed to be imported, the associated `proto_block.current_epoch_shuffling_id` + // already carries the correct `shuffling_decision_block`. + let proposer_shuffling_decision_block = proto_block + .current_epoch_shuffling_id + .shuffling_decision_block; + + let (signature_is_valid, opt_snapshot) = if builder_index == BUILDER_INDEX_SELF_BUILD { + // Fast path: self-built envelopes can be verified without loading the state. + let mut opt_snapshot = None; + let proposer = beacon_proposer_cache::with_proposer_cache( + ctx.beacon_proposer_cache, + proposer_shuffling_decision_block, + envelope_epoch, + |proposers| proposers.get_slot::(block_slot), + || { + debug!( + %beacon_block_root, + "Proposer shuffling cache miss for envelope verification" + ); + let snapshot = load_snapshot_from_state_root::( + beacon_block_root, + proto_block.state_root, + ctx.store, + )?; + opt_snapshot = Some(Box::new(snapshot.clone())); + Ok::<_, EnvelopeError>((snapshot.state_root, snapshot.pre_state)) + }, + ctx.spec, + )?; + let expected_proposer = proposer.index; + let fork = proposer.fork; + + if block.message().proposer_index() != expected_proposer as u64 { + return Err(EnvelopeError::IncorrectBlockProposer { + proposer_index: block.message().proposer_index(), + local_shuffling: expected_proposer as u64, + }); + } + + let pubkey_cache = ctx.validator_pubkey_cache.read(); + let pubkey = pubkey_cache + .get(block.message().proposer_index() as usize) + .ok_or_else(|| EnvelopeError::UnknownValidator { + proposer_index: block.message().proposer_index(), + })?; + let is_valid = signed_envelope.verify_signature( + pubkey, + &fork, + ctx.genesis_validators_root, + ctx.spec, + ); + (is_valid, opt_snapshot) + } else { + // TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here. + // External builder: must load the state to get the builder pubkey. + let snapshot = load_snapshot_from_state_root::( + beacon_block_root, + proto_block.state_root, + ctx.store, + )?; + let is_valid = + signed_envelope.verify_signature_with_state(&snapshot.pre_state, ctx.spec)?; + (is_valid, Some(Box::new(snapshot))) + }; + + if !signature_is_valid { + return Err(EnvelopeError::BadSignature); + } + + Ok(Self { + signed_envelope, + block, + snapshot: opt_snapshot, + }) + } + + pub fn envelope_cloned(&self) -> Arc> { + self.signed_envelope.clone() + } +} + +impl BeaconChain { + /// Build a `GossipVerificationContext` from this `BeaconChain`. + pub fn gossip_verification_context(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + canonical_head: &self.canonical_head, + store: &self.store, + spec: &self.spec, + beacon_proposer_cache: &self.beacon_proposer_cache, + validator_pubkey_cache: &self.validator_pubkey_cache, + genesis_validators_root: self.genesis_validators_root, + } + } + + /// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the + /// gossip network. The envelope is not imported into the chain, it is just partially verified. + /// + /// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately + /// after it is returned, unless some other circumstance decides it should not be imported at + /// all. + /// + /// ## Errors + /// + /// Returns an `Err` if the given envelope was invalid, or an error was encountered during verification. + pub async fn verify_envelope_for_gossip( + self: &Arc, + envelope: Arc>, + ) -> Result, EnvelopeError> { + let chain = self.clone(); + let span = Span::current(); + self.task_executor + .clone() + .spawn_blocking_handle( + move || { + let _guard = span.enter(); + let slot = envelope.slot(); + let beacon_block_root = envelope.message.beacon_block_root; + + let ctx = chain.gossip_verification_context(); + match GossipVerifiedEnvelope::new(envelope, &ctx) { + Ok(verified) => { + debug!( + %slot, + ?beacon_block_root, + "Successfully verified gossip envelope" + ); + + Ok(verified) + } + Err(e) => { + debug!( + error = e.to_string(), + ?beacon_block_root, + %slot, + "Rejected gossip envelope" + ); + + Err(e) + } + } + }, + "gossip_envelope_verification_handle", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin)? + } +} + +#[cfg(test)] +mod tests { + use std::marker::PhantomData; + + use bls::Signature; + use ssz_types::VariableList; + use types::{ + BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, Eth1Data, ExecutionBlockHash, + ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, + Graffiti, Hash256, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadBid, Slot, + SyncAggregate, + }; + + use super::verify_envelope_consistency; + use crate::payload_envelope_verification::EnvelopeError; + + type E = MinimalEthSpec; + + fn make_envelope( + slot: Slot, + builder_index: u64, + block_hash: ExecutionBlockHash, + ) -> ExecutionPayloadEnvelope { + ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + block_hash, + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index, + beacon_block_root: Hash256::ZERO, + slot, + state_root: Hash256::ZERO, + } + } + + fn make_block(slot: Slot) -> SignedBeaconBlock { + let block = BeaconBlock::Gloas(BeaconBlockGloas { + slot, + proposer_index: 0, + parent_root: Hash256::ZERO, + state_root: Hash256::ZERO, + body: BeaconBlockBodyGloas { + randao_reveal: Signature::empty(), + eth1_data: Eth1Data { + deposit_root: Hash256::ZERO, + block_hash: Hash256::ZERO, + deposit_count: 0, + }, + graffiti: Graffiti::default(), + proposer_slashings: VariableList::empty(), + attester_slashings: VariableList::empty(), + attestations: VariableList::empty(), + deposits: VariableList::empty(), + voluntary_exits: VariableList::empty(), + sync_aggregate: SyncAggregate::empty(), + bls_to_execution_changes: VariableList::empty(), + signed_execution_payload_bid: SignedExecutionPayloadBid::empty(), + payload_attestations: VariableList::empty(), + _phantom: PhantomData, + }, + }); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + + fn make_bid(builder_index: u64, block_hash: ExecutionBlockHash) -> ExecutionPayloadBid { + ExecutionPayloadBid { + builder_index, + block_hash, + ..ExecutionPayloadBid::default() + } + } + + #[test] + fn test_valid_envelope() { + let slot = Slot::new(10); + let builder_index = 5; + let block_hash = ExecutionBlockHash::repeat_byte(0xaa); + + let envelope = make_envelope(slot, builder_index, block_hash); + let block = make_block(slot); + let bid = make_bid(builder_index, block_hash); + + assert!(verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)).is_ok()); + } + + #[test] + fn test_prior_to_finalization() { + let slot = Slot::new(5); + let builder_index = 1; + let block_hash = ExecutionBlockHash::repeat_byte(0xbb); + + let envelope = make_envelope(slot, builder_index, block_hash); + let block = make_block(slot); + let bid = make_bid(builder_index, block_hash); + let latest_finalized_slot = Slot::new(10); + + let result = + verify_envelope_consistency::(&envelope, &block, &bid, latest_finalized_slot); + assert!(matches!( + result, + Err(EnvelopeError::PriorToFinalization { .. }) + )); + } + + #[test] + fn test_slot_mismatch() { + let builder_index = 1; + let block_hash = ExecutionBlockHash::repeat_byte(0xcc); + + let envelope = make_envelope(Slot::new(10), builder_index, block_hash); + let block = make_block(Slot::new(20)); + let bid = make_bid(builder_index, block_hash); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!(result, Err(EnvelopeError::SlotMismatch { .. }))); + } + + #[test] + fn test_builder_index_mismatch() { + let slot = Slot::new(10); + let block_hash = ExecutionBlockHash::repeat_byte(0xdd); + + let envelope = make_envelope(slot, 1, block_hash); + let block = make_block(slot); + let bid = make_bid(2, block_hash); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!( + result, + Err(EnvelopeError::BuilderIndexMismatch { .. }) + )); + } + + #[test] + fn test_block_hash_mismatch() { + let slot = Slot::new(10); + let builder_index = 1; + + let envelope = make_envelope(slot, builder_index, ExecutionBlockHash::repeat_byte(0xee)); + let block = make_block(slot); + let bid = make_bid(builder_index, ExecutionBlockHash::repeat_byte(0xff)); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!( + result, + Err(EnvelopeError::BlockHashMismatch { .. }) + )); + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs new file mode 100644 index 0000000000..2ee315e559 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -0,0 +1,354 @@ +use std::sync::Arc; +use std::time::Duration; + +use fork_choice::PayloadVerificationStatus; +use slot_clock::SlotClock; +use store::StoreOp; +use tracing::{debug, error, info, info_span, instrument, warn}; +use types::{BeaconState, BlockImportSource, Hash256, Slot}; + +use super::{ + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, + ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, +}; +use crate::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, + NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, + payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, +}; + +const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; + +impl BeaconChain { + /// Returns `Ok(status)` if the given `unverified_envelope` was successfully verified and + /// imported into the chain. + /// + /// ## Errors + /// + /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during + /// verification. + #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + pub async fn process_execution_payload_envelope( + self: &Arc, + block_root: Hash256, + unverified_envelope: GossipVerifiedEnvelope, + notify_execution_layer: NotifyExecutionLayer, + block_source: BlockImportSource, + publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, + ) -> Result { + let block_slot = unverified_envelope.signed_envelope.slot(); + + // Set observed time if not already set. Usually this should be set by gossip or RPC, + // but just in case we set it again here (useful for tests). + if let Some(seen_timestamp) = self.slot_clock.now_duration() { + self.envelope_times_cache.write().set_time_observed( + block_root, + block_slot, + seen_timestamp, + None, + ); + } + + // TODO(gloas) insert the pre-executed envelope into some type of cache. + + let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); + + // A small closure to group the verification and import errors. + let chain = self.clone(); + let import_envelope = async move { + let execution_pending = unverified_envelope + .into_execution_pending_envelope(&chain, notify_execution_layer)?; + publish_fn()?; + + // Record the time it took to complete consensus verification. + if let Some(timestamp) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp); + } + + let envelope_times_cache = chain.envelope_times_cache.clone(); + let slot_clock = chain.slot_clock.clone(); + + // TODO(gloas): rename/refactor these `into_` names to be less similar and more clear + // about what the function actually does. + let executed_envelope = chain + .into_executed_payload_envelope(execution_pending) + .await + .inspect_err(|_| { + // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + })?; + + // Record the time it took to wait for execution layer verification. + if let Some(timestamp) = slot_clock.now_duration() { + envelope_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp); + } + + match executed_envelope { + ExecutedEnvelope::Available(envelope) => { + self.import_available_execution_payload_envelope(Box::new(envelope)) + .await + } + ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )), + } + }; + + // Verify and import the payload envelope. + match import_envelope.await { + // The payload envelope was successfully verified and imported. + Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + info!( + ?block_root, + %block_slot, + source = %block_source, + "Execution payload envelope imported" + ); + + // TODO(gloas) do we need to send a `PayloadImported` event to the reprocess queue? + // TODO(gloas) do we need to recompute head? + // should canonical_head return the block and the payload now? + self.recompute_head_at_current_slot().await; + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); + + Ok(status) + } + Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + debug!(?block_root, %slot, "Payload envelope awaiting blobs"); + + Ok(status) + } + Err(EnvelopeError::BeaconChainError(e)) => { + if matches!(e.as_ref(), BeaconChainError::TokioJoin(_)) { + debug!(error = ?e, "Envelope processing cancelled"); + } else { + warn!(error = ?e, "Execution payload envelope rejected"); + } + Err(EnvelopeError::BeaconChainError(e)) + } + Err(other) => { + warn!( + reason = other.to_string(), + "Execution payload envelope rejected" + ); + Err(other) + } + } + } + + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to + /// get a fully `ExecutedEnvelope`. + /// + /// An error is returned if the verification handle couldn't be awaited. + #[instrument(skip_all, level = "debug")] + async fn into_executed_payload_envelope( + self: Arc, + pending_envelope: ExecutionPendingEnvelope, + ) -> Result, EnvelopeError> { + let ExecutionPendingEnvelope { + signed_envelope, + import_data, + payload_verification_handle, + } = pending_envelope; + + let payload_verification_outcome = payload_verification_handle + .await + .map_err(BeaconChainError::TokioJoin)? + .ok_or(BeaconChainError::RuntimeShutdown)??; + + Ok(ExecutedEnvelope::new( + signed_envelope, + import_data, + payload_verification_outcome, + )) + } + + #[instrument(skip_all)] + pub async fn import_available_execution_payload_envelope( + self: &Arc, + envelope: Box>, + ) -> Result { + let AvailableExecutedEnvelope { + envelope, + import_data, + payload_verification_outcome, + } = *envelope; + + let EnvelopeImportData { + block_root, + post_state, + } = import_data; + + let block_root = { + // Capture the current span before moving into the blocking task + let current_span = tracing::Span::current(); + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + // Enter the captured span in the blocking thread + let _guard = current_span.enter(); + chain.import_execution_payload_envelope( + envelope, + block_root, + *post_state, + payload_verification_outcome.payload_verification_status, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any + /// additional verification. + /// + /// An error is returned if the envelope was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + fn import_execution_payload_envelope( + &self, + signed_envelope: AvailableEnvelope, + block_root: Hash256, + state: BeaconState, + _payload_verification_status: PayloadVerificationStatus, + ) -> Result { + // Everything in this initial section is on the hot path for processing the envelope. + // Take an upgradable read lock on fork choice so we can check if this block has already + // been imported. We don't want to repeat work importing a block that is already imported. + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + + // TODO(gloas) add defensive check to see if payload envelope is already in fork choice + // Note that a duplicate cache/payload status table should prevent this from happening + // but it doesnt hurt to be defensive. + + // TODO(gloas) when the code below is implemented we can delete this drop + drop(fork_choice_reader); + + // TODO(gloas) no fork choice logic yet + // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by + // avoiding taking other locks whilst holding this lock. + // let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + + // TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root. + // let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; + + // TODO(gloas) emit SSE event if the payload became the new head payload + + // It is important NOT to return errors here before the database commit, because the envelope + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. + + // Store the envelope, its post-state, and any data columns. + // If the write fails, revert fork choice to the version from disk, else we can + // end up with envelopes in fork choice that are missing from disk. + // See https://github.com/sigp/lighthouse/issues/2028 + let (signed_envelope, columns) = signed_envelope.deconstruct(); + + let mut ops = vec![]; + + if let Some(blobs_or_columns_store_op) = self.get_blobs_or_columns_store_op( + block_root, + signed_envelope.slot(), + AvailableBlockData::DataColumns(columns), + ) { + ops.push(blobs_or_columns_store_op); + } + + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); + + ops.push(StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )); + ops.push(StoreOp::PutState( + signed_envelope.message.state_root, + &state, + )); + + let db_span = info_span!("persist_payloads_and_blobs").entered(); + + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!( + msg = "Restoring fork choice from disk", + error = ?e, + "Database write failed!" + ); + return Err(e.into()); + // TODO(gloas) handle db write failure + // return Err(self + // .handle_import_block_db_write_error(fork_choice) + // .err() + // .unwrap_or(e.into())); + } + + drop(db_span); + + // TODO(gloas) drop fork choice lock + // The fork choice write-lock is dropped *after* the on-disk database has been updated. + // This prevents inconsistency between the two at the expense of concurrency. + // drop(fork_choice); + + // We're declaring the envelope "imported" at this point, since fork choice and the DB know + // about it. + let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); + + // TODO(gloas) depending on what happens with light clients + // we might need to do some light client related computations here + + metrics::stop_timer(db_write_timer); + + self.import_envelope_update_metrics_and_events( + block_root, + signed_envelope.slot(), + envelope_time_imported, + ); + + Ok(block_root) + } + + fn import_envelope_update_metrics_and_events( + &self, + block_root: Hash256, + envelope_slot: Slot, + envelope_time_imported: Duration, + ) { + let envelope_delay_total = + get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock); + + // Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes + // to the cache during sync. + if envelope_delay_total + < self + .slot_clock + .slot_duration() + .saturating_mul(ENVELOPE_METRICS_CACHE_SLOT_LIMIT) + { + self.envelope_times_cache.write().set_time_imported( + block_root, + envelope_slot, + envelope_time_imported, + ); + } + + // TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks). + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs new file mode 100644 index 0000000000..c707d62dc7 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -0,0 +1,285 @@ +//! The incremental processing steps (e.g., signatures verified but not the state transition) is +//! represented as a sequence of wrapper-types around the envelope. There is a linear progression of +//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see +//! diagram below). +//! +//! ```ignore +//! SignedExecutionPayloadEnvelope +//! | +//! ▼ +//! GossipVerifiedEnvelope +//! | +//! ▼ +//! ExecutionPendingEnvelope +//! | +//! await +//! ▼ +//! ExecutedEnvelope +//! +//! ``` + +use std::sync::Arc; + +use store::Error as DBError; + +use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; +use tracing::instrument; +use types::{ + BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, + ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot, +}; + +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, ExecutionPayloadError, + PayloadVerificationOutcome, +}; + +pub mod execution_pending_envelope; +pub mod gossip_verified_envelope; +pub mod import; +mod payload_notifier; + +pub use execution_pending_envelope::ExecutionPendingEnvelope; + +#[derive(PartialEq)] +pub struct EnvelopeImportData { + pub block_root: Hash256, + pub post_state: Box>, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct AvailableEnvelope { + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). + columns_available_timestamp: Option, + pub spec: Arc, +} + +impl AvailableEnvelope { + pub fn message(&self) -> &ExecutionPayloadEnvelope { + &self.envelope.message + } + + #[allow(clippy::type_complexity)] + pub fn deconstruct( + self, + ) -> ( + Arc>, + DataColumnSidecarList, + ) { + let AvailableEnvelope { + envelope, columns, .. + } = self; + (envelope, columns) + } +} + +pub enum MaybeAvailableEnvelope { + Available(AvailableEnvelope), + AvailabilityPending { + block_hash: ExecutionBlockHash, + envelope: Arc>, + }, +} + +/// This snapshot is to be used for verifying a payload envelope. +#[derive(Debug, Clone)] +pub struct EnvelopeProcessingSnapshot { + /// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope. + pub pre_state: BeaconState, + pub state_root: Hash256, + pub beacon_block_root: Hash256, +} + +/// A payload envelope that has gone through processing checks and execution by an EL client. +/// This envelope hasn't necessarily completed data availability checks. +/// +/// +/// It contains 2 variants: +/// 1. `Available`: This envelope has been executed and also contains all data to consider it +/// fully available. +/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it +/// fully available. +pub enum ExecutedEnvelope { + Available(AvailableExecutedEnvelope), + // TODO(gloas) implement availability pending + AvailabilityPending(), +} + +impl ExecutedEnvelope { + pub fn new( + envelope: MaybeAvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + match envelope { + MaybeAvailableEnvelope::Available(available_envelope) => { + Self::Available(AvailableExecutedEnvelope::new( + available_envelope, + import_data, + payload_verification_outcome, + )) + } + // TODO(gloas) implement availability pending + MaybeAvailableEnvelope::AvailabilityPending { + block_hash: _, + envelope: _, + } => Self::AvailabilityPending(), + } + } +} + +/// A payload envelope that has completed all payload processing checks including verification +/// by an EL client **and** has all requisite blob data to be imported into fork choice. +pub struct AvailableExecutedEnvelope { + pub envelope: AvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedEnvelope { + pub fn new( + envelope: AvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + envelope, + import_data, + payload_verification_outcome, + } + } +} + +#[derive(Debug)] +pub enum EnvelopeError { + /// The envelope's block root is unknown. + BlockRootUnknown { block_root: Hash256 }, + /// The signature is invalid. + BadSignature, + /// The builder index doesn't match the committed bid + BuilderIndexMismatch { committed_bid: u64, envelope: u64 }, + /// The envelope slot doesn't match the block + SlotMismatch { block: Slot, envelope: Slot }, + /// The validator index is unknown + UnknownValidator { proposer_index: u64 }, + /// The block hash doesn't match the committed bid + BlockHashMismatch { + committed_bid: ExecutionBlockHash, + envelope: ExecutionBlockHash, + }, + /// The block's proposer_index does not match the locally computed proposer + IncorrectBlockProposer { + proposer_index: u64, + local_shuffling: u64, + }, + /// The slot belongs to a block that is from a slot prior than + /// to most recently finalized slot + PriorToFinalization { + payload_slot: Slot, + latest_finalized_slot: Slot, + }, + /// Some Beacon Chain Error + BeaconChainError(Arc), + /// Some Beacon State error + BeaconStateError(BeaconStateError), + /// Some BlockProcessingError (for electra operations) + BlockProcessingError(BlockProcessingError), + /// Some EnvelopeProcessingError + EnvelopeProcessingError(EnvelopeProcessingError), + /// Error verifying the execution payload + ExecutionPayloadError(ExecutionPayloadError), + /// An error from block-level checks reused during envelope import + BlockError(BlockError), + /// Internal error + InternalError(String), +} + +impl std::fmt::Display for EnvelopeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconChainError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(e)) + } +} + +impl From for EnvelopeError { + fn from(e: ExecutionPayloadError) -> Self { + EnvelopeError::ExecutionPayloadError(e) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconStateError) -> Self { + EnvelopeError::BeaconStateError(e) + } +} + +impl From for EnvelopeError { + fn from(e: DBError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e))) + } +} + +impl From for EnvelopeError { + fn from(e: BlockError) -> Self { + EnvelopeError::BlockError(e) + } +} + +/// Pull errors up from EnvelopeProcessingError to EnvelopeError +impl From for EnvelopeError { + fn from(e: EnvelopeProcessingError) -> Self { + match e { + EnvelopeProcessingError::BadSignature => EnvelopeError::BadSignature, + EnvelopeProcessingError::BeaconStateError(e) => EnvelopeError::BeaconStateError(e), + EnvelopeProcessingError::BlockHashMismatch { + committed_bid, + envelope, + } => EnvelopeError::BlockHashMismatch { + committed_bid, + envelope, + }, + EnvelopeProcessingError::BlockProcessingError(e) => { + EnvelopeError::BlockProcessingError(e) + } + e => EnvelopeError::EnvelopeProcessingError(e), + } + } +} + +#[instrument(skip_all, level = "debug", fields(beacon_block_root = %beacon_block_root))] +/// Load state from store given a known state root and block root. +/// Use this when the proto block has already been looked up from fork choice. +pub(crate) fn load_snapshot_from_state_root( + beacon_block_root: Hash256, + block_state_root: Hash256, + store: &BeaconStore, +) -> Result, EnvelopeError> { + // TODO(EIP-7732): add metrics here + + // We can use `get_hot_state` here rather than `get_advanced_hot_state` because the envelope + // must be from the same slot as its block (so no advance is required). + let cache_state = true; + let state = store + .get_hot_state(&block_state_root, cache_state) + .map_err(EnvelopeError::from)? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!( + "Missing state for envelope block {block_state_root:?}", + )) + })?; + + Ok(EnvelopeProcessingSnapshot { + pre_state: state, + state_root: block_state_root, + beacon_block_root, + }) +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs new file mode 100644 index 0000000000..df21d33493 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas}; +use fork_choice::PayloadVerificationStatus; +use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; +use tracing::warn; +use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; + +use crate::{ + BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, + execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError, +}; + +/// Used to await the result of executing payload with a remote EE. +pub struct PayloadNotifier { + pub chain: Arc>, + envelope: Arc>, + block: Arc>, + payload_verification_status: Option, +} + +impl PayloadNotifier { + pub fn new( + chain: Arc>, + envelope: Arc>, + block: Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result { + let payload_verification_status = { + let payload_message = &envelope.message; + + match notify_execution_layer { + NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { + let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; + // TODO(gloas): check and test RLP block hash calculation post-Gloas + if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { + warn!( + block_number = ?payload_message.payload.block_number, + info = "you can silence this warning with --disable-optimistic-finalized-sync", + error = ?e, + "Falling back to slow block hash verification" + ); + None + } else { + Some(PayloadVerificationStatus::Optimistic) + } + } + _ => None, + } + }; + + Ok(Self { + chain, + envelope, + block, + payload_verification_status, + }) + } + + pub async fn notify_new_payload(self) -> Result { + if let Some(precomputed_status) = self.payload_verification_status { + Ok(precomputed_status) + } else { + let parent_root = self.block.message().parent_root(); + let request = Self::build_new_payload_request(&self.envelope, &self.block)?; + notify_new_payload(&self.chain, self.envelope.slot(), parent_root, request).await + } + } + + fn build_new_payload_request<'a>( + envelope: &'a SignedExecutionPayloadEnvelope, + block: &'a SignedBeaconBlock, + ) -> Result, BlockError> { + let bid = &block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .message; + + let versioned_hashes = bid + .blob_kzg_commitments + .iter() + .map(kzg_commitment_to_versioned_hash) + .collect(); + + Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas { + execution_payload: &envelope.message.payload, + versioned_hashes, + parent_beacon_block_root: block.message().parent_root(), + execution_requests: &envelope.message.execution_requests, + })) + } +} diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index e94e64e91d..e385e0dc48 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,5 +1,5 @@ #![cfg(not(debug_assertions))] - +// TODO(gloas) we probably need similar test for payload envelope verification use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 240fd70e01..2119acf946 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -539,6 +539,16 @@ pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new ) }); +/* + * Execution Payload Envelope Delay Metrics + */ +pub static ENVELOPE_DELAY_GOSSIP: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "payload_envelope_delay_gossip", + "The first time we see this payload envelope from gossip as a delay from the start of the slot", + ) +}); + /* * Block Delay Metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index e90018c851..3335315157 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -19,6 +18,10 @@ use beacon_chain::{ sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; +use beacon_chain::{ + blob_verification::{GossipBlobError, GossipVerifiedBlob}, + payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, +}; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; @@ -3248,25 +3251,166 @@ impl NetworkBeaconProcessor { } } - pub async fn process_gossip_execution_payload( + #[allow(clippy::too_many_arguments)] + #[instrument( + name = "lh_process_execution_payload_envelope", + parent = None, + level = "debug", + skip_all, + fields(beacon_block_root = tracing::field::Empty), + )] + pub async fn process_gossip_execution_payload_envelope( + self: Arc, + message_id: MessageId, + peer_id: PeerId, + envelope: Arc>, + seen_timestamp: Duration, + ) { + if let Some(gossip_verified_envelope) = self + .process_gossip_unverified_execution_payload_envelope( + message_id, + peer_id, + envelope.clone(), + seen_timestamp, + ) + .await + { + let beacon_block_root = gossip_verified_envelope.signed_envelope.beacon_block_root(); + + Span::current().record("beacon_block_root", beacon_block_root.to_string()); + + // TODO(gloas) in process_gossip_block here we check_and_insert on the duplicate cache + // before calling gossip_verified_block. We need this to ensure we dont try to execute the + // payload multiple times. + + self.process_gossip_verified_execution_payload_envelope( + peer_id, + gossip_verified_envelope, + ) + .await; + } + } + + async fn process_gossip_unverified_execution_payload_envelope( self: &Arc, message_id: MessageId, peer_id: PeerId, - execution_payload: SignedExecutionPayloadEnvelope, + envelope: Arc>, + seen_duration: Duration, + ) -> Option> { + let envelope_delay = + get_slot_delay_ms(seen_duration, envelope.slot(), &self.chain.slot_clock); + + let verification_result = self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await; + + let verified_envelope = match verification_result { + Ok(verified_envelope) => { + metrics::set_gauge( + &metrics::ENVELOPE_DELAY_GOSSIP, + envelope_delay.as_millis() as i64, + ); + + // Write the time the envelope was observed into the delay cache. + self.chain.envelope_times_cache.write().set_time_observed( + verified_envelope.signed_envelope.beacon_block_root(), + envelope.slot(), + seen_duration, + Some(peer_id.to_string()), + ); + + info!( + slot = %verified_envelope.signed_envelope.slot(), + root = ?verified_envelope.signed_envelope.beacon_block_root(), + "New envelope received" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + verified_envelope + } + // TODO(gloas) penalize peers accordingly + Err(_) => return None, + }; + + let envelope_slot = verified_envelope.signed_envelope.slot(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + match self.chain.slot() { + // We only need to do a simple check about the envelope slot vs the current slot because + // `verify_envelope_for_gossip` already ensures that the envelope slot is within tolerance + // for envelope imports. + Ok(current_slot) if envelope_slot > current_slot => { + warn!( + ?envelope_slot, + ?beacon_block_root, + msg = "if this happens consistently, check system clock", + "envelope arrived early" + ); + + // TODO(gloas) update metrics to note how early the envelope arrived + + let inner_self = self.clone(); + let _process_fn = Box::pin(async move { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + }); + + // TODO(gloas) send to reprocess queue + None + } + Ok(_) => Some(verified_envelope), + Err(e) => { + error!( + error = ?e, + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + None + } + } + } + + async fn process_gossip_verified_execution_payload_envelope( + self: Arc, + _peer_id: PeerId, + verified_envelope: GossipVerifiedEnvelope, ) { - // TODO(EIP-7732): Implement proper execution payload envelope gossip processing. - // This should integrate with the envelope_verification.rs module once it's implemented. + let _processing_start_time = Instant::now(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); - trace!( - %peer_id, - builder_index = execution_payload.message.builder_index, - slot = %execution_payload.message.slot, - beacon_block_root = %execution_payload.message.beacon_block_root, - "Processing execution payload envelope" - ); + #[allow(clippy::result_large_err)] + let result = self + .chain + .process_execution_payload_envelope( + beacon_block_root, + verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), + ) + .await; - // For now, ignore all envelopes since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + // TODO(gloas) metrics + // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); + + match &result { + Ok(AvailabilityProcessingStatus::Imported(_)) + | Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + // Nothing to do + } + Err(_) => { + // TODO(gloas) implement peer penalties + } + } } pub fn process_gossip_execution_payload_bid( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e1adf860de..357d6c08fd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -429,11 +429,17 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, execution_payload: Box>, + seen_timestamp: Duration, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { processor - .process_gossip_execution_payload(message_id, peer_id, *execution_payload) + .process_gossip_execution_payload_envelope( + message_id, + peer_id, + Arc::new(*execution_payload), + seen_timestamp, + ) .await }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec322..77d64c92e6 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -493,6 +493,7 @@ impl Router { message_id, peer_id, signed_execution_payload_envelope, + timestamp_now(), ), ) }