diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cdec442276..e9ec8740a0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,4 @@ /beacon_node/network/ @jxs /beacon_node/lighthouse_network/ @jxs /beacon_node/store/ @michaelsproul +/.github/forbidden-files.txt @michaelsproul diff --git a/.github/forbidden-files.txt b/.github/forbidden-files.txt new file mode 100644 index 0000000000..a08a6b4e98 --- /dev/null +++ b/.github/forbidden-files.txt @@ -0,0 +1,10 @@ +# Files that have been intentionally deleted and should not be re-added. +# This prevents accidentally reviving files during botched merges. +# Add one file path per line (relative to repo root). + +beacon_node/beacon_chain/src/otb_verification_service.rs +beacon_node/store/src/partial_beacon_state.rs +beacon_node/store/src/consensus_context.rs +beacon_node/beacon_chain/src/block_reward.rs +beacon_node/http_api/src/block_rewards.rs +common/eth2/src/lighthouse/block_rewards.rs diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 72ea9d41ae..d9efbfc148 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -72,6 +72,27 @@ jobs: steps: - name: Check that the pull request is not targeting the stable branch run: test ${{ github.base_ref }} != "stable" + + forbidden-files-check: + name: forbidden-files-check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Check for forbidden files + run: | + if [ -f .github/forbidden-files.txt ]; then + status=0 + while IFS= read -r file || [ -n "$file" ]; do + # Skip comments and empty lines + [[ "$file" =~ ^#.*$ || -z "$file" ]] && continue + if [ -f "$file" ]; then + echo "::error::Forbidden file exists: $file" + status=1 + fi + done < .github/forbidden-files.txt + exit $status + fi + release-tests-ubuntu: name: release-tests-ubuntu needs: [check-labels] @@ -430,6 +451,7 @@ jobs: needs: [ 'check-labels', 'target-branch-check', + 'forbidden-files-check', 'release-tests-ubuntu', 'beacon-chain-tests', 'op-pool-tests', diff --git a/Cargo.lock b/Cargo.lock index 5998cab204..2d349f504b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,7 +4,7 @@ version = 4 [[package]] name = "account_manager" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "bls", @@ -1285,7 +1285,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -1548,7 +1548,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "8.1.0" +version = "8.1.1" dependencies = [ "beacon_node", "bytes", @@ -4803,9 +4803,9 @@ dependencies = [ [[package]] name = "keccak" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ "cpufeatures", ] @@ -4868,7 +4868,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_chain", @@ -5354,7 +5354,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_manager", "account_utils", @@ -5486,7 +5486,7 @@ dependencies = [ [[package]] name = "lighthouse_version" -version = "8.1.0" +version = "8.1.1" dependencies = [ "regex", ] @@ -9535,7 +9535,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "8.1.0" +version = "8.1.1" dependencies = [ "account_utils", "beacon_node_fallback", @@ -10522,8 +10522,7 @@ dependencies = [ [[package]] name = "yamux" version = "0.13.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deab71f2e20691b4728b349c6cee8fc7223880fa67b6b4f92225ec32225447e5" +source = "git+https://github.com/sigp/rust-yamux?rev=575b17c0f44f4253079a6bafaa2de74ca1d6dfaa#575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" dependencies = [ "futures", "log", diff --git a/Cargo.toml b/Cargo.toml index f735b97540..222392bcb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ resolver = "2" [workspace.package] edition = "2024" -version = "8.1.0" +version = "8.1.1" [workspace.dependencies] account_utils = { path = "common/account_utils" } @@ -166,20 +166,7 @@ initialized_validators = { path = "validator_client/initialized_validators" } int_to_bytes = { path = "consensus/int_to_bytes" } itertools = "0.14" kzg = { path = "crypto/kzg" } -libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", default-features = false, features = [ - "identify", - "yamux", - "noise", - "dns", - "tcp", - "tokio", - "secp256k1", - "macros", - "metrics", - "quic", - "upnp", - "gossipsub", -] } +libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", default-features = false, features = ["identify", "yamux", "noise", "dns", "tcp", "tokio", "secp256k1", "macros", "metrics", "quic", "upnp", "gossipsub"] } libsecp256k1 = "0.7" lighthouse_network = { path = "beacon_node/lighthouse_network" } lighthouse_validator_store = { path = "validator_client/lighthouse_validator_store" } @@ -219,12 +206,7 @@ r2d2 = "0.8" rand = "0.9.0" rayon = "1.7" regex = "1" -reqwest = { version = "0.12", default-features = false, features = [ - "blocking", - "json", - "stream", - "rustls-tls", -] } +reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "stream", "rustls-tls"] } ring = "0.17" rpds = "0.11" rusqlite = { version = "0.38", features = ["bundled"] } @@ -253,12 +235,7 @@ sysinfo = "0.26" system_health = { path = "common/system_health" } task_executor = { path = "common/task_executor" } tempfile = "3" -tokio = { version = "1", features = [ - "rt-multi-thread", - "sync", - "signal", - "macros", -] } +tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal", "macros"] } tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["codec", "compat", "time"] } tracing = "0.1.40" @@ -302,3 +279,4 @@ debug = true [patch.crates-io] quick-protobuf = { git = "https://github.com/sigp/quick-protobuf.git", rev = "681f413312404ab6e51f0b46f39b0075c6f4ebfd" } +yamux = { git = "https://github.com/sigp/rust-yamux", rev = "575b17c0f44f4253079a6bafaa2de74ca1d6dfaa" } diff --git a/Makefile b/Makefile index 9786c17cc9..ad1bbbb8e8 100644 --- a/Makefile +++ b/Makefile @@ -321,8 +321,8 @@ make-ef-tests-nightly: # Verifies that crates compile with fuzzing features enabled arbitrary-fuzz: - cargo check -p state_processing --features arbitrary-fuzz,$(TEST_FEATURES) - cargo check -p slashing_protection --features arbitrary-fuzz,$(TEST_FEATURES) + cargo check -p state_processing --features arbitrary,$(TEST_FEATURES) + cargo check -p slashing_protection --features arbitrary,$(TEST_FEATURES) # Runs cargo audit (Audit Cargo.lock files for crates with security vulnerabilities reported to the RustSec Advisory Database) audit: install-audit audit-CI diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 8dd50cbc6e..05e6f12554 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "account_manager" version = { workspace = true } -authors = [ - "Paul Hauner ", - "Luke Anderson ", -] +authors = ["Paul Hauner ", "Luke Anderson "] edition = { workspace = true } [dependencies] diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 5352814dd5..ebefa6a451 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "beacon_node" version = { workspace = true } -authors = [ - "Paul Hauner ", - "Age Manning ", "Age Manning { pub early_attester_cache: EarlyAttesterCache, /// A cache used to keep track of various block timings. pub block_times_cache: Arc>, + /// A cache used to keep track of various envelope timings. + pub envelope_times_cache: Arc>, /// A cache used to track pre-finalization block roots for quick rejection. pub pre_finalization_block_cache: PreFinalizationBlockCache, /// A cache used to produce light_client server messages @@ -662,7 +662,18 @@ impl BeaconChain { .custody_context() .as_ref() .into(); - debug!(?custody_context, "Persisting custody context to store"); + + // Pattern match to avoid accidentally missing fields and to ignore deprecated fields. + let CustodyContextSsz { + validator_custody_at_head, + epoch_validator_custody_requirements, + persisted_is_supernode: _, + } = &custody_context; + debug!( + validator_custody_at_head, + ?epoch_validator_custody_requirements, + "Persisting custody context to store" + ); persist_custody_context::( self.store.clone(), @@ -4032,23 +4043,10 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, block_data) = signed_block.deconstruct(); - match self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) { - Ok(Some(blobs_or_columns_store_op)) => { - ops.push(blobs_or_columns_store_op); - } - Ok(None) => {} - Err(e) => { - error!( - msg = "Restoring fork choice from disk", - error = &e, - ?block_root, - "Failed to store data columns into the database" - ); - return Err(self - .handle_import_block_db_write_error(fork_choice) - .err() - .unwrap_or(BlockError::InternalError(e))); - } + if let Some(blobs_or_columns_store_op) = + self.get_blobs_or_columns_store_op(block_root, signed_block.slot(), block_data) + { + ops.push(blobs_or_columns_store_op); } let block = signed_block.message(); @@ -4078,7 +4076,7 @@ impl BeaconChain { // We're declaring the block "imported" at this point, since fork choice and the DB know // about it. - let block_time_imported = timestamp_now(); + let block_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); // compute state proofs for light client updates before inserting the state into the // snapshot cache. @@ -4147,7 +4145,7 @@ impl BeaconChain { } /// Check block's consistentency with any configured weak subjectivity checkpoint. - fn check_block_against_weak_subjectivity_checkpoint( + pub(crate) fn check_block_against_weak_subjectivity_checkpoint( &self, block: BeaconBlockRef, block_root: Hash256, @@ -6401,6 +6399,7 @@ impl BeaconChain { // sync anyway). self.naive_aggregation_pool.write().prune(slot); self.block_times_cache.write().prune(slot); + self.envelope_times_cache.write().prune(slot); // Don't run heavy-weight tasks during sync. if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot { @@ -6460,62 +6459,14 @@ impl BeaconChain { accessor: impl Fn(&EpochBlockProposers) -> Result, state_provider: impl FnOnce() -> Result<(Hash256, BeaconState), E>, ) -> Result { - let cache_entry = self - .beacon_proposer_cache - .lock() - .get_or_insert_key(proposal_epoch, shuffling_decision_block); - - // If the cache entry is not initialised, run the code to initialise it inside a OnceCell. - // This prevents duplication of work across multiple threads. - // - // If it is already initialised, then `get_or_try_init` will return immediately without - // executing the initialisation code at all. - let epoch_block_proposers = cache_entry.get_or_try_init(|| { - // Fetch the state on-demand if the required epoch was missing from the cache. - // If the caller wants to not compute the state they must return an error here and then - // catch it at the call site. - let (state_root, mut state) = state_provider()?; - - // Ensure the state can compute proposer duties for `epoch`. - ensure_state_can_determine_proposers_for_epoch( - &mut state, - state_root, - proposal_epoch, - &self.spec, - )?; - - // Sanity check the state. - let latest_block_root = state.get_latest_block_root(state_root); - let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch( - proposal_epoch, - latest_block_root, - &self.spec, - )?; - if state_decision_block_root != shuffling_decision_block { - return Err(Error::ProposerCacheIncorrectState { - state_decision_block_root, - requested_decision_block_root: shuffling_decision_block, - } - .into()); - } - - let proposers = state.get_beacon_proposer_indices(proposal_epoch, &self.spec)?; - - // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have - // advanced the state completely into the new epoch. - let fork = self.spec.fork_at_epoch(proposal_epoch); - - debug!( - ?shuffling_decision_block, - epoch = %proposal_epoch, - "Priming proposer shuffling cache" - ); - - Ok::<_, E>(EpochBlockProposers::new(proposal_epoch, fork, proposers)) - })?; - - // Run the accessor function on the computed epoch proposers. - accessor(epoch_block_proposers).map_err(Into::into) + crate::beacon_proposer_cache::with_proposer_cache( + &self.beacon_proposer_cache, + shuffling_decision_block, + proposal_epoch, + accessor, + state_provider, + &self.spec, + ) } /// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head @@ -7191,16 +7142,16 @@ impl BeaconChain { block_root: Hash256, block_slot: Slot, block_data: AvailableBlockData, - ) -> Result>, String> { + ) -> Option> { match block_data { - AvailableBlockData::NoData => Ok(None), + AvailableBlockData::NoData => None, AvailableBlockData::Blobs(blobs) => { debug!( %block_root, count = blobs.len(), "Writing blobs to store" ); - Ok(Some(StoreOp::PutBlobs(block_root, blobs))) + Some(StoreOp::PutBlobs(block_root, blobs)) } AvailableBlockData::DataColumns(mut data_columns) => { let columns_to_custody = self.custody_columns_for_epoch(Some( @@ -7216,7 +7167,7 @@ impl BeaconChain { count = data_columns.len(), "Writing data columns to store" ); - Ok(Some(StoreOp::PutDataColumns(block_root, data_columns))) + Some(StoreOp::PutDataColumns(block_root, data_columns)) } } } diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index 912f7f3bad..b258d7471f 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -12,12 +12,13 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use fork_choice::ExecutionStatus; use lru::LruCache; use once_cell::sync::OnceCell; +use parking_lot::Mutex; use safe_arith::SafeArith; use smallvec::SmallVec; use state_processing::state_advance::partial_state_advance; use std::num::NonZeroUsize; use std::sync::Arc; -use tracing::instrument; +use tracing::{debug, instrument}; use typenum::Unsigned; use types::new_non_zero_usize; use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot}; @@ -164,6 +165,82 @@ impl BeaconProposerCache { } } +/// Access the proposer cache, computing and caching the proposers if necessary. +/// +/// This is a free function that operates on references to the cache and spec, decoupled from +/// `BeaconChain`. The `accessor` is called with the cached `EpochBlockProposers` for the given +/// `(proposal_epoch, shuffling_decision_block)` key. If the cache entry is missing, the +/// `state_provider` closure is called to produce a state which is then used to compute and +/// cache the proposers. +pub fn with_proposer_cache( + beacon_proposer_cache: &Mutex, + shuffling_decision_block: Hash256, + proposal_epoch: Epoch, + accessor: impl Fn(&EpochBlockProposers) -> Result, + state_provider: impl FnOnce() -> Result<(Hash256, BeaconState), Err>, + spec: &ChainSpec, +) -> Result +where + Spec: EthSpec, + Err: From + From, +{ + let cache_entry = beacon_proposer_cache + .lock() + .get_or_insert_key(proposal_epoch, shuffling_decision_block); + + // If the cache entry is not initialised, run the code to initialise it inside a OnceCell. + // This prevents duplication of work across multiple threads. + // + // If it is already initialised, then `get_or_try_init` will return immediately without + // executing the initialisation code at all. + let epoch_block_proposers = cache_entry.get_or_try_init(|| { + // Fetch the state on-demand if the required epoch was missing from the cache. + // If the caller wants to not compute the state they must return an error here and then + // catch it at the call site. + let (state_root, mut state) = state_provider()?; + + // Ensure the state can compute proposer duties for `epoch`. + ensure_state_can_determine_proposers_for_epoch( + &mut state, + state_root, + proposal_epoch, + spec, + )?; + + // Sanity check the state. + let latest_block_root = state.get_latest_block_root(state_root); + let state_decision_block_root = state.proposer_shuffling_decision_root_at_epoch( + proposal_epoch, + latest_block_root, + spec, + )?; + if state_decision_block_root != shuffling_decision_block { + return Err(BeaconChainError::ProposerCacheIncorrectState { + state_decision_block_root, + requested_decision_block_root: shuffling_decision_block, + } + .into()); + } + + let proposers = state.get_beacon_proposer_indices(proposal_epoch, spec)?; + + // Use fork_at_epoch rather than the state's fork, because post-Fulu we may not have + // advanced the state completely into the new epoch. + let fork = spec.fork_at_epoch(proposal_epoch); + + debug!( + ?shuffling_decision_block, + epoch = %proposal_epoch, + "Priming proposer shuffling cache" + ); + + Ok::<_, Err>(EpochBlockProposers::new(proposal_epoch, fork, proposers)) + })?; + + // Run the accessor function on the computed epoch proposers. + accessor(epoch_block_proposers).map_err(Into::into) +} + /// Compute the proposer duties using the head state without cache. /// /// Return: diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs deleted file mode 100644 index f3924bb473..0000000000 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ /dev/null @@ -1,140 +0,0 @@ -use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; -use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; -use operation_pool::{ - AttMaxCover, MaxCover, PROPOSER_REWARD_DENOMINATOR, RewardCache, SplitAttestation, -}; -use state_processing::{ - common::get_attesting_indices_from_state, - per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards, -}; -use types::{AbstractExecPayload, BeaconBlockRef, BeaconState, EthSpec, Hash256}; - -impl BeaconChain { - pub fn compute_block_reward>( - &self, - block: BeaconBlockRef<'_, T::EthSpec, Payload>, - block_root: Hash256, - state: &BeaconState, - reward_cache: &mut RewardCache, - include_attestations: bool, - ) -> Result { - if block.slot() != state.slot() { - return Err(BeaconChainError::BlockRewardSlotError); - } - - reward_cache.update(state)?; - - let total_active_balance = state.get_total_active_balance()?; - - let split_attestations = block - .body() - .attestations() - .map(|att| { - let attesting_indices = get_attesting_indices_from_state(state, att)?; - Ok(SplitAttestation::new( - att.clone_as_attestation(), - attesting_indices, - )) - }) - .collect::, BeaconChainError>>()?; - - let mut per_attestation_rewards = split_attestations - .iter() - .map(|att| { - AttMaxCover::new( - att.as_ref(), - state, - reward_cache, - total_active_balance, - &self.spec, - ) - .ok_or(BeaconChainError::BlockRewardAttestationError) - }) - .collect::, _>>()?; - - // Update the attestation rewards for each previous attestation included. - // This is O(n^2) in the number of attestations n. - for i in 0..per_attestation_rewards.len() { - let (updated, to_update) = per_attestation_rewards.split_at_mut(i + 1); - let latest_att = &updated[i]; - - for att in to_update { - att.update_covering_set(latest_att.intermediate(), latest_att.covering_set()); - } - } - - let mut prev_epoch_total = 0; - let mut curr_epoch_total = 0; - - for cover in &per_attestation_rewards { - if cover.att.data.slot.epoch(T::EthSpec::slots_per_epoch()) == state.current_epoch() { - curr_epoch_total += cover.score() as u64; - } else { - prev_epoch_total += cover.score() as u64; - } - } - - let attestation_total = prev_epoch_total + curr_epoch_total; - - // Drop the covers. - let per_attestation_rewards = per_attestation_rewards - .into_iter() - .map(|cover| { - // Divide each reward numerator by the denominator. This can lead to the total being - // less than the sum of the individual rewards due to the fact that integer division - // does not distribute over addition. - let mut rewards = cover.fresh_validators_rewards; - rewards - .values_mut() - .for_each(|reward| *reward /= PROPOSER_REWARD_DENOMINATOR); - rewards - }) - .collect(); - - // Add the attestation data if desired. - let attestations = if include_attestations { - block - .body() - .attestations() - .map(|a| a.data().clone()) - .collect() - } else { - vec![] - }; - - let attestation_rewards = AttestationRewards { - total: attestation_total, - prev_epoch_total, - curr_epoch_total, - per_attestation_rewards, - attestations, - }; - - // Sync committee rewards. - let sync_committee_rewards = if let Ok(sync_aggregate) = block.body().sync_aggregate() { - let (_, proposer_reward_per_bit) = compute_sync_aggregate_rewards(state, &self.spec) - .map_err(|_| BeaconChainError::BlockRewardSyncError)?; - sync_aggregate.sync_committee_bits.num_set_bits() as u64 * proposer_reward_per_bit - } else { - 0 - }; - - // Total, metadata - let total = attestation_total + sync_committee_rewards; - - let meta = BlockRewardMeta { - slot: block.slot(), - parent_slot: state.latest_block_header().slot, - proposer_index: block.proposer_index(), - graffiti: block.body().graffiti().as_utf8_lossy(), - }; - - Ok(BlockReward { - total, - block_root, - meta, - attestation_rewards, - sync_committee_rewards, - }) - } -} diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index ab60b8b955..4822120cb0 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -681,7 +681,8 @@ pub struct SignatureVerifiedBlock { } /// Used to await the result of executing payload with an EE. -type PayloadVerificationHandle = JoinHandle>>; +pub type PayloadVerificationHandle = + JoinHandle>>; /// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and /// ready to import into the `BeaconChain`. The validation includes: @@ -1357,7 +1358,7 @@ impl ExecutionPendingBlock { /// verification must be done upstream (e.g., via a `SignatureVerifiedBlock` /// /// Returns an error if the block is invalid, or if the block was unable to be verified. - #[instrument(skip_all, level = "debug")] + #[instrument(skip_all, level = "debug", fields(?block_root))] pub fn from_signature_verified_components( block: MaybeAvailableBlock, block_root: Hash256, @@ -1571,24 +1572,6 @@ impl ExecutionPendingBlock { metrics::stop_timer(committee_timer); - /* - * If we have block reward listeners, compute the block reward and push it to the - * event handler. - */ - if let Some(ref event_handler) = chain.event_handler - && event_handler.has_block_reward_subscribers() - { - let mut reward_cache = Default::default(); - let block_reward = chain.compute_block_reward( - block.message(), - block_root, - &state, - &mut reward_cache, - true, - )?; - event_handler.register(EventKind::BlockReward(block_reward)); - } - /* * Perform `per_block_processing` on the block and state, returning early if the block is * invalid. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 2c1dae9215..d5935b492a 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1023,6 +1023,7 @@ where )), beacon_proposer_cache, block_times_cache: <_>::default(), + envelope_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), validator_pubkey_cache: RwLock::new(validator_pubkey_cache), early_attester_cache: <_>::default(), @@ -1083,6 +1084,11 @@ where let cgc_change_effective_slot = cgc_changed.effective_epoch.start_slot(E::slots_per_epoch()); beacon_chain.update_data_column_custody_info(Some(cgc_change_effective_slot)); + + // Persist change to disk. + beacon_chain + .persist_custody_context() + .map_err(|e| format!("Failed writing updated CGC: {e:?}"))?; } info!( diff --git a/beacon_node/beacon_chain/src/envelope_times_cache.rs b/beacon_node/beacon_chain/src/envelope_times_cache.rs new file mode 100644 index 0000000000..84c936c210 --- /dev/null +++ b/beacon_node/beacon_chain/src/envelope_times_cache.rs @@ -0,0 +1,197 @@ +//! This module provides the `EnvelopeTimesCache` which contains information regarding payload +//! envelope timings. +//! +//! This provides `BeaconChain` and associated functions with access to the timestamps of when a +//! payload envelope was observed, verified, executed, and imported. +//! This allows for better traceability and allows us to determine the root cause for why an +//! envelope was imported late. +//! This allows us to distinguish between the following scenarios: +//! - The envelope was observed late. +//! - Consensus verification was slow. +//! - Execution verification was slow. +//! - The DB write was slow. + +use eth2::types::{Hash256, Slot}; +use std::collections::HashMap; +use std::time::Duration; + +type BlockRoot = Hash256; + +#[derive(Clone, Default)] +pub struct EnvelopeTimestamps { + /// When the envelope was first observed (gossip or RPC). + pub observed: Option, + /// When consensus verification (state transition) completed. + pub consensus_verified: Option, + /// When execution layer verification started. + pub started_execution: Option, + /// When execution layer verification completed. + pub executed: Option, + /// When the envelope was imported into the DB. + pub imported: Option, +} + +/// Delay data for envelope processing, computed relative to the slot start time. +#[derive(Debug, Default)] +pub struct EnvelopeDelays { + /// Time after start of slot we saw the envelope. + pub observed: Option, + /// The time it took to complete consensus verification of the envelope. + pub consensus_verification_time: Option, + /// The time it took to complete execution verification of the envelope. + pub execution_time: Option, + /// Time after execution until the envelope was imported. + pub imported: Option, +} + +impl EnvelopeDelays { + fn new(times: EnvelopeTimestamps, slot_start_time: Duration) -> EnvelopeDelays { + let observed = times + .observed + .and_then(|observed_time| observed_time.checked_sub(slot_start_time)); + let consensus_verification_time = times + .consensus_verified + .and_then(|consensus_verified| consensus_verified.checked_sub(times.observed?)); + let execution_time = times + .executed + .and_then(|executed| executed.checked_sub(times.started_execution?)); + let imported = times + .imported + .and_then(|imported_time| imported_time.checked_sub(times.executed?)); + EnvelopeDelays { + observed, + consensus_verification_time, + execution_time, + imported, + } + } +} + +pub struct EnvelopeTimesCacheValue { + pub slot: Slot, + pub timestamps: EnvelopeTimestamps, + pub peer_id: Option, +} + +impl EnvelopeTimesCacheValue { + fn new(slot: Slot) -> Self { + EnvelopeTimesCacheValue { + slot, + timestamps: Default::default(), + peer_id: None, + } + } +} + +#[derive(Default)] +pub struct EnvelopeTimesCache { + pub cache: HashMap, +} + +impl EnvelopeTimesCache { + /// Set the observation time for `block_root` to `timestamp` if `timestamp` is less than + /// any previous timestamp at which this envelope was observed. + pub fn set_time_observed( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + peer_id: Option, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + match entry.timestamps.observed { + Some(existing) if existing <= timestamp => { + // Existing timestamp is earlier, do nothing. + } + _ => { + entry.timestamps.observed = Some(timestamp); + entry.peer_id = peer_id; + } + } + } + + /// Set the timestamp for `field` if that timestamp is less than any previously known value. + fn set_time_if_less( + &mut self, + block_root: BlockRoot, + slot: Slot, + field: impl Fn(&mut EnvelopeTimestamps) -> &mut Option, + timestamp: Duration, + ) { + let entry = self + .cache + .entry(block_root) + .or_insert_with(|| EnvelopeTimesCacheValue::new(slot)); + let existing_timestamp = field(&mut entry.timestamps); + if existing_timestamp.is_none_or(|prev| timestamp < prev) { + *existing_timestamp = Some(timestamp); + } + } + + pub fn set_time_consensus_verified( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.consensus_verified, + timestamp, + ) + } + + pub fn set_time_started_execution( + &mut self, + block_root: BlockRoot, + slot: Slot, + timestamp: Duration, + ) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.started_execution, + timestamp, + ) + } + + pub fn set_time_executed(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.executed, + timestamp, + ) + } + + pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) { + self.set_time_if_less( + block_root, + slot, + |timestamps| &mut timestamps.imported, + timestamp, + ) + } + + pub fn get_envelope_delays( + &self, + block_root: BlockRoot, + slot_start_time: Duration, + ) -> EnvelopeDelays { + if let Some(entry) = self.cache.get(&block_root) { + EnvelopeDelays::new(entry.timestamps.clone(), slot_start_time) + } else { + EnvelopeDelays::default() + } + } + + /// Prune the cache to only store the most recent 2 epochs. + pub fn prune(&mut self, current_slot: Slot) { + self.cache + .retain(|_, entry| entry.slot > current_slot.saturating_sub(64_u64)); + } +} diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 63be944eea..276edc3fe6 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -21,7 +21,6 @@ pub struct ServerSentEventHandler { late_head: Sender>, light_client_finality_update_tx: Sender>, light_client_optimistic_update_tx: Sender>, - block_reward_tx: Sender>, proposer_slashing_tx: Sender>, attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, @@ -48,7 +47,6 @@ impl ServerSentEventHandler { let (late_head, _) = broadcast::channel(capacity); let (light_client_finality_update_tx, _) = broadcast::channel(capacity); let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity); - let (block_reward_tx, _) = broadcast::channel(capacity); let (proposer_slashing_tx, _) = broadcast::channel(capacity); let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); @@ -69,7 +67,6 @@ impl ServerSentEventHandler { late_head, light_client_finality_update_tx, light_client_optimistic_update_tx, - block_reward_tx, proposer_slashing_tx, attester_slashing_tx, bls_to_execution_change_tx, @@ -142,10 +139,6 @@ impl ServerSentEventHandler { .light_client_optimistic_update_tx .send(kind) .map(|count| log_count("light client optimistic update", count)), - EventKind::BlockReward(_) => self - .block_reward_tx - .send(kind) - .map(|count| log_count("block reward", count)), EventKind::ProposerSlashing(_) => self .proposer_slashing_tx .send(kind) @@ -224,10 +217,6 @@ impl ServerSentEventHandler { self.light_client_optimistic_update_tx.subscribe() } - pub fn subscribe_block_reward(&self) -> Receiver> { - self.block_reward_tx.subscribe() - } - pub fn subscribe_attester_slashing(&self) -> Receiver> { self.attester_slashing_tx.subscribe() } @@ -292,10 +281,6 @@ impl ServerSentEventHandler { self.late_head.receiver_count() > 0 } - pub fn has_block_reward_subscribers(&self) -> bool { - self.block_reward_tx.receiver_count() > 0 - } - pub fn has_proposer_slashing_subscribers(&self) -> bool { self.proposer_slashing_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index a2ebed32ee..2b03a095f1 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -25,7 +25,6 @@ use state_processing::per_block_processing::{ use std::sync::Arc; use tokio::task::JoinHandle; use tracing::{Instrument, debug_span, warn}; -use tree_hash::TreeHash; use types::execution::BlockProductionVersion; use types::*; @@ -109,12 +108,18 @@ impl PayloadNotifier { if let Some(precomputed_status) = self.payload_verification_status { Ok(precomputed_status) } else { - notify_new_payload(&self.chain, self.block.message()).await + notify_new_payload( + &self.chain, + self.block.message().slot(), + self.block.message().parent_root(), + self.block.message().try_into()?, + ) + .await } } } -/// Verify that `execution_payload` contained by `block` is considered valid by an execution +/// Verify that `execution_payload` is considered valid by an execution /// engine. /// /// ## Specification @@ -123,17 +128,21 @@ impl PayloadNotifier { /// contains a few extra checks by running `partially_verify_execution_payload` first: /// /// https://github.com/ethereum/consensus-specs/blob/v1.1.9/specs/bellatrix/beacon-chain.md#notify_new_payload -async fn notify_new_payload( +pub async fn notify_new_payload( chain: &Arc>, - block: BeaconBlockRef<'_, T::EthSpec>, + slot: Slot, + parent_beacon_block_root: Hash256, + new_payload_request: NewPayloadRequest<'_, T::EthSpec>, ) -> Result { let execution_layer = chain .execution_layer .as_ref() .ok_or(ExecutionPayloadError::NoExecutionConnection)?; - let execution_block_hash = block.execution_payload()?.block_hash(); - let new_payload_response = execution_layer.notify_new_payload(block.try_into()?).await; + let execution_block_hash = new_payload_request.execution_payload_ref().block_hash(); + let new_payload_response = execution_layer + .notify_new_payload(new_payload_request.clone()) + .await; match new_payload_response { Ok(status) => match status { @@ -149,10 +158,7 @@ async fn notify_new_payload( ?validation_error, ?latest_valid_hash, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + %slot, method = "new_payload", "Invalid execution payload" ); @@ -175,11 +181,9 @@ async fn notify_new_payload( { // This block has not yet been applied to fork choice, so the latest block that was // imported to fork choice was the parent. - let latest_root = block.parent_root(); - chain .process_invalid_execution_payload(&InvalidationOperation::InvalidateMany { - head_block_root: latest_root, + head_block_root: parent_beacon_block_root, always_invalidate_head: false, latest_valid_ancestor: latest_valid_hash, }) @@ -194,10 +198,7 @@ async fn notify_new_payload( warn!( ?validation_error, ?execution_block_hash, - root = ?block.tree_hash_root(), - graffiti = block.body().graffiti().as_utf8_lossy(), - proposer_index = block.proposer_index(), - slot = %block.slot(), + %slot, method = "new_payload", "Invalid execution payload block hash" ); diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 1dae2258f6..bfda52558e 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -165,13 +165,8 @@ impl BeaconChain { } // Store the blobs or data columns too - if let Some(op) = self - .get_blobs_or_columns_store_op(block_root, block.slot(), block_data) - .map_err(|e| { - HistoricalBlockError::StoreError(StoreError::DBError { - message: format!("get_blobs_or_columns_store_op error {e:?}"), - }) - })? + if let Some(op) = + self.get_blobs_or_columns_store_op(block_root, block.slot(), block_data) { blob_batch.extend(self.store.convert_to_kv_batch(vec![op])?); } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index e1a190ffb3..4efd90bd22 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -10,7 +10,6 @@ mod beacon_snapshot; pub mod bellatrix_readiness; pub mod blob_verification; mod block_production; -pub mod block_reward; mod block_times_cache; mod block_verification; pub mod block_verification_types; @@ -21,6 +20,7 @@ pub mod custody_context; pub mod data_availability_checker; pub mod data_column_verification; mod early_attester_cache; +pub mod envelope_times_cache; mod errors; pub mod events; pub mod execution_payload; @@ -42,6 +42,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_envelope_verification; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 9de67ca93f..786daa09da 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -21,6 +21,34 @@ pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &st pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str = "validator_monitor_attestation_simulator_source_attester_miss_total"; +/* +* Execution Payload Envelope Processing +*/ + +pub static ENVELOPE_PROCESSING_REQUESTS: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_requests_total", + "Count of payload envelopes submitted for processing", + ) +}); +pub static ENVELOPE_PROCESSING_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "payload_envelope_processing_successes_total", + "Count of payload envelopes processed without error", + ) +}); +pub static ENVELOPE_PROCESSING_TIMES: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_seconds", + "Full runtime of payload envelope processing", + ) +}); +pub static ENVELOPE_PROCESSING_DB_WRITE: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "payload_envelope_processing_db_write_seconds", + "Time spent writing a newly processed payload envelope and state to DB", + ) +}); /* * Block Processing */ diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs new file mode 100644 index 0000000000..86f9293c8f --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use slot_clock::SlotClock; +use state_processing::{ + VerifySignatures, + envelope_processing::{VerifyStateRoot, process_execution_payload_envelope}, +}; +use types::EthSpec; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, + PayloadVerificationOutcome, + block_verification::PayloadVerificationHandle, + payload_envelope_verification::{ + EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, + gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, + payload_notifier::PayloadNotifier, + }, +}; + +pub struct ExecutionPendingEnvelope { + pub signed_envelope: MaybeAvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_handle: PayloadVerificationHandle, +} + +impl GossipVerifiedEnvelope { + pub fn into_execution_pending_envelope( + self, + chain: &Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result, EnvelopeError> { + let signed_envelope = self.signed_envelope; + let envelope = &signed_envelope.message; + let payload = &envelope.payload; + + // Define a future that will verify the execution payload with an execution engine. + // + // We do this as early as possible so that later parts of this function can run in parallel + // with the payload verification. + let payload_notifier = PayloadNotifier::new( + chain.clone(), + signed_envelope.clone(), + self.block.clone(), + notify_execution_layer, + )?; + let block_root = envelope.beacon_block_root; + let slot = self.block.slot(); + + let payload_verification_future = async move { + let chain = payload_notifier.chain.clone(); + if let Some(started_execution) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_started_execution(block_root, slot, started_execution); + } + + let payload_verification_status = payload_notifier.notify_new_payload().await?; + Ok(PayloadVerificationOutcome { + payload_verification_status, + }) + }; + // Spawn the payload verification future as a new task, but don't wait for it to complete. + // The `payload_verification_future` will be awaited later to ensure verification completed + // successfully. + let payload_verification_handle = chain + .task_executor + .spawn_handle( + payload_verification_future, + "execution_payload_verification", + ) + .ok_or(BeaconChainError::RuntimeShutdown)?; + + let snapshot = if let Some(snapshot) = self.snapshot { + *snapshot + } else { + load_snapshot_from_state_root::(block_root, self.block.state_root(), &chain.store)? + }; + let mut state = snapshot.pre_state; + + // All the state modifications are done in envelope_processing + process_execution_payload_envelope( + &mut state, + Some(snapshot.state_root), + &signed_envelope, + // verify signature already done for GossipVerifiedEnvelope + VerifySignatures::False, + VerifyStateRoot::True, + &chain.spec, + )?; + + Ok(ExecutionPendingEnvelope { + signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { + block_hash: payload.block_hash, + envelope: signed_envelope, + }, + import_data: EnvelopeImportData { + block_root, + post_state: Box::new(state), + }, + payload_verification_handle, + }) + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs new file mode 100644 index 0000000000..03a3a91ac5 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/gossip_verified_envelope.rs @@ -0,0 +1,445 @@ +use std::sync::Arc; + +use educe::Educe; +use parking_lot::{Mutex, RwLock}; +use store::DatabaseBlock; +use tracing::{Span, debug}; +use types::{ + ChainSpec, EthSpec, ExecutionPayloadBid, ExecutionPayloadEnvelope, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, consts::gloas::BUILDER_INDEX_SELF_BUILD, +}; + +use crate::{ + BeaconChain, BeaconChainError, BeaconChainTypes, BeaconStore, + beacon_proposer_cache::{self, BeaconProposerCache}, + canonical_head::CanonicalHead, + payload_envelope_verification::{ + EnvelopeError, EnvelopeProcessingSnapshot, load_snapshot_from_state_root, + }, + validator_pubkey_cache::ValidatorPubkeyCache, +}; + +/// Bundles only the dependencies needed for gossip verification of execution payload envelopes, +/// decoupling `GossipVerifiedEnvelope::new` from the full `BeaconChain`. +pub struct GossipVerificationContext<'a, T: BeaconChainTypes> { + pub canonical_head: &'a CanonicalHead, + pub store: &'a BeaconStore, + pub spec: &'a ChainSpec, + pub beacon_proposer_cache: &'a Mutex, + pub validator_pubkey_cache: &'a RwLock>, + pub genesis_validators_root: Hash256, +} + +/// Verify that an execution payload envelope is consistent with its beacon block +/// and execution bid. +pub(crate) fn verify_envelope_consistency( + envelope: &ExecutionPayloadEnvelope, + block: &SignedBeaconBlock, + execution_bid: &ExecutionPayloadBid, + latest_finalized_slot: Slot, +) -> Result<(), EnvelopeError> { + // Check that the envelope's slot isn't from a slot prior + // to the latest finalized slot. + if envelope.slot < latest_finalized_slot { + return Err(EnvelopeError::PriorToFinalization { + payload_slot: envelope.slot, + latest_finalized_slot, + }); + } + + // Check that the slot of the envelope matches the slot of the block. + if envelope.slot != block.slot() { + return Err(EnvelopeError::SlotMismatch { + block: block.slot(), + envelope: envelope.slot, + }); + } + + // Builder index matches committed bid. + if envelope.builder_index != execution_bid.builder_index { + return Err(EnvelopeError::BuilderIndexMismatch { + committed_bid: execution_bid.builder_index, + envelope: envelope.builder_index, + }); + } + + // The block hash should match the block hash of the execution bid. + if envelope.payload.block_hash != execution_bid.block_hash { + return Err(EnvelopeError::BlockHashMismatch { + committed_bid: execution_bid.block_hash, + envelope: envelope.payload.block_hash, + }); + } + + Ok(()) +} + +/// A wrapper around a `SignedExecutionPayloadEnvelope` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Educe)] +#[educe(Debug(bound = "T: BeaconChainTypes"))] +pub struct GossipVerifiedEnvelope { + pub signed_envelope: Arc>, + pub block: Arc>, + pub snapshot: Option>>, +} + +impl GossipVerifiedEnvelope { + pub fn new( + signed_envelope: Arc>, + ctx: &GossipVerificationContext<'_, T>, + ) -> Result { + let envelope = &signed_envelope.message; + let beacon_block_root = envelope.beacon_block_root; + + // Check that we've seen the beacon block for this envelope and that it passes validation. + // TODO(EIP-7732): We might need some type of status table in order to differentiate between: + // If we have a block_processing_table, we could have a Processed(Bid, bool) state that is only + // entered post adding to fork choice. That way, we could potentially need only a single call to make + // sure the block is valid and to do all consequent checks with the bid + // + // 1. Blocks we haven't seen (IGNORE), and + // 2. Blocks we've seen that are invalid (REJECT). + // + // Presently these two cases are conflated. + let fork_choice_read_lock = ctx.canonical_head.fork_choice_read_lock(); + let Some(proto_block) = fork_choice_read_lock.get_block(&beacon_block_root) else { + return Err(EnvelopeError::BlockRootUnknown { + block_root: beacon_block_root, + }); + }; + + drop(fork_choice_read_lock); + + let latest_finalized_slot = ctx + .canonical_head + .cached_head() + .finalized_checkpoint() + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + + // TODO(EIP-7732): check that we haven't seen another valid `SignedExecutionPayloadEnvelope` + // for this block root from this builder - envelope status table check + let block = match ctx.store.try_get_full_block(&beacon_block_root)? { + Some(DatabaseBlock::Full(block)) => Arc::new(block), + Some(DatabaseBlock::Blinded(_)) | None => { + return Err(EnvelopeError::from(BeaconChainError::MissingBeaconBlock( + beacon_block_root, + ))); + } + }; + let execution_bid = &block + .message() + .body() + .signed_execution_payload_bid()? + .message; + + verify_envelope_consistency(envelope, &block, execution_bid, latest_finalized_slot)?; + + // Verify the envelope signature. + // + // For self-built envelopes, we can use the proposer cache for the fork and the + // validator pubkey cache for the proposer's pubkey, avoiding a state load from disk. + // For external builder envelopes, we must load the state to access the builder registry. + let builder_index = envelope.builder_index; + let block_slot = envelope.slot; + let envelope_epoch = block_slot.epoch(T::EthSpec::slots_per_epoch()); + // Since the payload's block is already guaranteed to be imported, the associated `proto_block.current_epoch_shuffling_id` + // already carries the correct `shuffling_decision_block`. + let proposer_shuffling_decision_block = proto_block + .current_epoch_shuffling_id + .shuffling_decision_block; + + let (signature_is_valid, opt_snapshot) = if builder_index == BUILDER_INDEX_SELF_BUILD { + // Fast path: self-built envelopes can be verified without loading the state. + let mut opt_snapshot = None; + let proposer = beacon_proposer_cache::with_proposer_cache( + ctx.beacon_proposer_cache, + proposer_shuffling_decision_block, + envelope_epoch, + |proposers| proposers.get_slot::(block_slot), + || { + debug!( + %beacon_block_root, + "Proposer shuffling cache miss for envelope verification" + ); + let snapshot = load_snapshot_from_state_root::( + beacon_block_root, + proto_block.state_root, + ctx.store, + )?; + opt_snapshot = Some(Box::new(snapshot.clone())); + Ok::<_, EnvelopeError>((snapshot.state_root, snapshot.pre_state)) + }, + ctx.spec, + )?; + let expected_proposer = proposer.index; + let fork = proposer.fork; + + if block.message().proposer_index() != expected_proposer as u64 { + return Err(EnvelopeError::IncorrectBlockProposer { + proposer_index: block.message().proposer_index(), + local_shuffling: expected_proposer as u64, + }); + } + + let pubkey_cache = ctx.validator_pubkey_cache.read(); + let pubkey = pubkey_cache + .get(block.message().proposer_index() as usize) + .ok_or_else(|| EnvelopeError::UnknownValidator { + proposer_index: block.message().proposer_index(), + })?; + let is_valid = signed_envelope.verify_signature( + pubkey, + &fork, + ctx.genesis_validators_root, + ctx.spec, + ); + (is_valid, opt_snapshot) + } else { + // TODO(gloas) if we implement a builder pubkey cache, we'll need to use it here. + // External builder: must load the state to get the builder pubkey. + let snapshot = load_snapshot_from_state_root::( + beacon_block_root, + proto_block.state_root, + ctx.store, + )?; + let is_valid = + signed_envelope.verify_signature_with_state(&snapshot.pre_state, ctx.spec)?; + (is_valid, Some(Box::new(snapshot))) + }; + + if !signature_is_valid { + return Err(EnvelopeError::BadSignature); + } + + Ok(Self { + signed_envelope, + block, + snapshot: opt_snapshot, + }) + } + + pub fn envelope_cloned(&self) -> Arc> { + self.signed_envelope.clone() + } +} + +impl BeaconChain { + /// Build a `GossipVerificationContext` from this `BeaconChain`. + pub fn gossip_verification_context(&self) -> GossipVerificationContext<'_, T> { + GossipVerificationContext { + canonical_head: &self.canonical_head, + store: &self.store, + spec: &self.spec, + beacon_proposer_cache: &self.beacon_proposer_cache, + validator_pubkey_cache: &self.validator_pubkey_cache, + genesis_validators_root: self.genesis_validators_root, + } + } + + /// Returns `Ok(GossipVerifiedEnvelope)` if the supplied `envelope` should be forwarded onto the + /// gossip network. The envelope is not imported into the chain, it is just partially verified. + /// + /// The returned `GossipVerifiedEnvelope` should be provided to `Self::process_execution_payload_envelope` immediately + /// after it is returned, unless some other circumstance decides it should not be imported at + /// all. + /// + /// ## Errors + /// + /// Returns an `Err` if the given envelope was invalid, or an error was encountered during verification. + pub async fn verify_envelope_for_gossip( + self: &Arc, + envelope: Arc>, + ) -> Result, EnvelopeError> { + let chain = self.clone(); + let span = Span::current(); + self.task_executor + .clone() + .spawn_blocking_handle( + move || { + let _guard = span.enter(); + let slot = envelope.slot(); + let beacon_block_root = envelope.message.beacon_block_root; + + let ctx = chain.gossip_verification_context(); + match GossipVerifiedEnvelope::new(envelope, &ctx) { + Ok(verified) => { + debug!( + %slot, + ?beacon_block_root, + "Successfully verified gossip envelope" + ); + + Ok(verified) + } + Err(e) => { + debug!( + error = e.to_string(), + ?beacon_block_root, + %slot, + "Rejected gossip envelope" + ); + + Err(e) + } + } + }, + "gossip_envelope_verification_handle", + ) + .ok_or(BeaconChainError::RuntimeShutdown)? + .await + .map_err(BeaconChainError::TokioJoin)? + } +} + +#[cfg(test)] +mod tests { + use std::marker::PhantomData; + + use bls::Signature; + use ssz_types::VariableList; + use types::{ + BeaconBlock, BeaconBlockBodyGloas, BeaconBlockGloas, Eth1Data, ExecutionBlockHash, + ExecutionPayloadBid, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, + Graffiti, Hash256, MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadBid, Slot, + SyncAggregate, + }; + + use super::verify_envelope_consistency; + use crate::payload_envelope_verification::EnvelopeError; + + type E = MinimalEthSpec; + + fn make_envelope( + slot: Slot, + builder_index: u64, + block_hash: ExecutionBlockHash, + ) -> ExecutionPayloadEnvelope { + ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas { + block_hash, + ..ExecutionPayloadGloas::default() + }, + execution_requests: ExecutionRequests::default(), + builder_index, + beacon_block_root: Hash256::ZERO, + slot, + state_root: Hash256::ZERO, + } + } + + fn make_block(slot: Slot) -> SignedBeaconBlock { + let block = BeaconBlock::Gloas(BeaconBlockGloas { + slot, + proposer_index: 0, + parent_root: Hash256::ZERO, + state_root: Hash256::ZERO, + body: BeaconBlockBodyGloas { + randao_reveal: Signature::empty(), + eth1_data: Eth1Data { + deposit_root: Hash256::ZERO, + block_hash: Hash256::ZERO, + deposit_count: 0, + }, + graffiti: Graffiti::default(), + proposer_slashings: VariableList::empty(), + attester_slashings: VariableList::empty(), + attestations: VariableList::empty(), + deposits: VariableList::empty(), + voluntary_exits: VariableList::empty(), + sync_aggregate: SyncAggregate::empty(), + bls_to_execution_changes: VariableList::empty(), + signed_execution_payload_bid: SignedExecutionPayloadBid::empty(), + payload_attestations: VariableList::empty(), + _phantom: PhantomData, + }, + }); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + + fn make_bid(builder_index: u64, block_hash: ExecutionBlockHash) -> ExecutionPayloadBid { + ExecutionPayloadBid { + builder_index, + block_hash, + ..ExecutionPayloadBid::default() + } + } + + #[test] + fn test_valid_envelope() { + let slot = Slot::new(10); + let builder_index = 5; + let block_hash = ExecutionBlockHash::repeat_byte(0xaa); + + let envelope = make_envelope(slot, builder_index, block_hash); + let block = make_block(slot); + let bid = make_bid(builder_index, block_hash); + + assert!(verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)).is_ok()); + } + + #[test] + fn test_prior_to_finalization() { + let slot = Slot::new(5); + let builder_index = 1; + let block_hash = ExecutionBlockHash::repeat_byte(0xbb); + + let envelope = make_envelope(slot, builder_index, block_hash); + let block = make_block(slot); + let bid = make_bid(builder_index, block_hash); + let latest_finalized_slot = Slot::new(10); + + let result = + verify_envelope_consistency::(&envelope, &block, &bid, latest_finalized_slot); + assert!(matches!( + result, + Err(EnvelopeError::PriorToFinalization { .. }) + )); + } + + #[test] + fn test_slot_mismatch() { + let builder_index = 1; + let block_hash = ExecutionBlockHash::repeat_byte(0xcc); + + let envelope = make_envelope(Slot::new(10), builder_index, block_hash); + let block = make_block(Slot::new(20)); + let bid = make_bid(builder_index, block_hash); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!(result, Err(EnvelopeError::SlotMismatch { .. }))); + } + + #[test] + fn test_builder_index_mismatch() { + let slot = Slot::new(10); + let block_hash = ExecutionBlockHash::repeat_byte(0xdd); + + let envelope = make_envelope(slot, 1, block_hash); + let block = make_block(slot); + let bid = make_bid(2, block_hash); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!( + result, + Err(EnvelopeError::BuilderIndexMismatch { .. }) + )); + } + + #[test] + fn test_block_hash_mismatch() { + let slot = Slot::new(10); + let builder_index = 1; + + let envelope = make_envelope(slot, builder_index, ExecutionBlockHash::repeat_byte(0xee)); + let block = make_block(slot); + let bid = make_bid(builder_index, ExecutionBlockHash::repeat_byte(0xff)); + + let result = verify_envelope_consistency::(&envelope, &block, &bid, Slot::new(0)); + assert!(matches!( + result, + Err(EnvelopeError::BlockHashMismatch { .. }) + )); + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs new file mode 100644 index 0000000000..2ee315e559 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -0,0 +1,354 @@ +use std::sync::Arc; +use std::time::Duration; + +use fork_choice::PayloadVerificationStatus; +use slot_clock::SlotClock; +use store::StoreOp; +use tracing::{debug, error, info, info_span, instrument, warn}; +use types::{BeaconState, BlockImportSource, Hash256, Slot}; + +use super::{ + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, + ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, +}; +use crate::{ + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, + NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, + payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, +}; + +const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; + +impl BeaconChain { + /// Returns `Ok(status)` if the given `unverified_envelope` was successfully verified and + /// imported into the chain. + /// + /// ## Errors + /// + /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during + /// verification. + #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + pub async fn process_execution_payload_envelope( + self: &Arc, + block_root: Hash256, + unverified_envelope: GossipVerifiedEnvelope, + notify_execution_layer: NotifyExecutionLayer, + block_source: BlockImportSource, + publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, + ) -> Result { + let block_slot = unverified_envelope.signed_envelope.slot(); + + // Set observed time if not already set. Usually this should be set by gossip or RPC, + // but just in case we set it again here (useful for tests). + if let Some(seen_timestamp) = self.slot_clock.now_duration() { + self.envelope_times_cache.write().set_time_observed( + block_root, + block_slot, + seen_timestamp, + None, + ); + } + + // TODO(gloas) insert the pre-executed envelope into some type of cache. + + let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); + + // A small closure to group the verification and import errors. + let chain = self.clone(); + let import_envelope = async move { + let execution_pending = unverified_envelope + .into_execution_pending_envelope(&chain, notify_execution_layer)?; + publish_fn()?; + + // Record the time it took to complete consensus verification. + if let Some(timestamp) = chain.slot_clock.now_duration() { + chain + .envelope_times_cache + .write() + .set_time_consensus_verified(block_root, block_slot, timestamp); + } + + let envelope_times_cache = chain.envelope_times_cache.clone(); + let slot_clock = chain.slot_clock.clone(); + + // TODO(gloas): rename/refactor these `into_` names to be less similar and more clear + // about what the function actually does. + let executed_envelope = chain + .into_executed_payload_envelope(execution_pending) + .await + .inspect_err(|_| { + // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), + // and we keep it in the cache, then the node will NOT perform lookup and + // reprocess this block until the block is evicted from DA checker, causing the + // chain to get stuck temporarily if the block is canonical. Therefore we remove + // it from the cache if execution fails. + })?; + + // Record the time it took to wait for execution layer verification. + if let Some(timestamp) = slot_clock.now_duration() { + envelope_times_cache + .write() + .set_time_executed(block_root, block_slot, timestamp); + } + + match executed_envelope { + ExecutedEnvelope::Available(envelope) => { + self.import_available_execution_payload_envelope(Box::new(envelope)) + .await + } + ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( + "Pending payload envelope not yet implemented".to_owned(), + )), + } + }; + + // Verify and import the payload envelope. + match import_envelope.await { + // The payload envelope was successfully verified and imported. + Ok(status @ AvailabilityProcessingStatus::Imported(block_root)) => { + info!( + ?block_root, + %block_slot, + source = %block_source, + "Execution payload envelope imported" + ); + + // TODO(gloas) do we need to send a `PayloadImported` event to the reprocess queue? + // TODO(gloas) do we need to recompute head? + // should canonical_head return the block and the payload now? + self.recompute_head_at_current_slot().await; + + metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES); + + Ok(status) + } + Ok(status @ AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + debug!(?block_root, %slot, "Payload envelope awaiting blobs"); + + Ok(status) + } + Err(EnvelopeError::BeaconChainError(e)) => { + if matches!(e.as_ref(), BeaconChainError::TokioJoin(_)) { + debug!(error = ?e, "Envelope processing cancelled"); + } else { + warn!(error = ?e, "Execution payload envelope rejected"); + } + Err(EnvelopeError::BeaconChainError(e)) + } + Err(other) => { + warn!( + reason = other.to_string(), + "Execution payload envelope rejected" + ); + Err(other) + } + } + } + + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to + /// get a fully `ExecutedEnvelope`. + /// + /// An error is returned if the verification handle couldn't be awaited. + #[instrument(skip_all, level = "debug")] + async fn into_executed_payload_envelope( + self: Arc, + pending_envelope: ExecutionPendingEnvelope, + ) -> Result, EnvelopeError> { + let ExecutionPendingEnvelope { + signed_envelope, + import_data, + payload_verification_handle, + } = pending_envelope; + + let payload_verification_outcome = payload_verification_handle + .await + .map_err(BeaconChainError::TokioJoin)? + .ok_or(BeaconChainError::RuntimeShutdown)??; + + Ok(ExecutedEnvelope::new( + signed_envelope, + import_data, + payload_verification_outcome, + )) + } + + #[instrument(skip_all)] + pub async fn import_available_execution_payload_envelope( + self: &Arc, + envelope: Box>, + ) -> Result { + let AvailableExecutedEnvelope { + envelope, + import_data, + payload_verification_outcome, + } = *envelope; + + let EnvelopeImportData { + block_root, + post_state, + } = import_data; + + let block_root = { + // Capture the current span before moving into the blocking task + let current_span = tracing::Span::current(); + let chain = self.clone(); + self.spawn_blocking_handle( + move || { + // Enter the captured span in the blocking thread + let _guard = current_span.enter(); + chain.import_execution_payload_envelope( + envelope, + block_root, + *post_state, + payload_verification_outcome.payload_verification_status, + ) + }, + "payload_verification_handle", + ) + .await?? + }; + + Ok(AvailabilityProcessingStatus::Imported(block_root)) + } + + /// Accepts a fully-verified and available envelope and imports it into the chain without performing any + /// additional verification. + /// + /// An error is returned if the envelope was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + fn import_execution_payload_envelope( + &self, + signed_envelope: AvailableEnvelope, + block_root: Hash256, + state: BeaconState, + _payload_verification_status: PayloadVerificationStatus, + ) -> Result { + // Everything in this initial section is on the hot path for processing the envelope. + // Take an upgradable read lock on fork choice so we can check if this block has already + // been imported. We don't want to repeat work importing a block that is already imported. + let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); + if !fork_choice_reader.contains_block(&block_root) { + return Err(EnvelopeError::BlockRootUnknown { block_root }); + } + + // TODO(gloas) add defensive check to see if payload envelope is already in fork choice + // Note that a duplicate cache/payload status table should prevent this from happening + // but it doesnt hurt to be defensive. + + // TODO(gloas) when the code below is implemented we can delete this drop + drop(fork_choice_reader); + + // TODO(gloas) no fork choice logic yet + // Take an exclusive write-lock on fork choice. It's very important to prevent deadlocks by + // avoiding taking other locks whilst holding this lock. + // let fork_choice = parking_lot::RwLockUpgradableReadGuard::upgrade(fork_choice_reader); + + // TODO(gloas) Do we need this check? Do not import a block that doesn't descend from the finalized root. + // let signed_block = check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?; + + // TODO(gloas) emit SSE event if the payload became the new head payload + + // It is important NOT to return errors here before the database commit, because the envelope + // has already been added to fork choice and the database would be left in an inconsistent + // state if we returned early without committing. In other words, an error here would + // corrupt the node's database permanently. + + // Store the envelope, its post-state, and any data columns. + // If the write fails, revert fork choice to the version from disk, else we can + // end up with envelopes in fork choice that are missing from disk. + // See https://github.com/sigp/lighthouse/issues/2028 + let (signed_envelope, columns) = signed_envelope.deconstruct(); + + let mut ops = vec![]; + + if let Some(blobs_or_columns_store_op) = self.get_blobs_or_columns_store_op( + block_root, + signed_envelope.slot(), + AvailableBlockData::DataColumns(columns), + ) { + ops.push(blobs_or_columns_store_op); + } + + let db_write_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_DB_WRITE); + + ops.push(StoreOp::PutPayloadEnvelope( + block_root, + signed_envelope.clone(), + )); + ops.push(StoreOp::PutState( + signed_envelope.message.state_root, + &state, + )); + + let db_span = info_span!("persist_payloads_and_blobs").entered(); + + if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { + error!( + msg = "Restoring fork choice from disk", + error = ?e, + "Database write failed!" + ); + return Err(e.into()); + // TODO(gloas) handle db write failure + // return Err(self + // .handle_import_block_db_write_error(fork_choice) + // .err() + // .unwrap_or(e.into())); + } + + drop(db_span); + + // TODO(gloas) drop fork choice lock + // The fork choice write-lock is dropped *after* the on-disk database has been updated. + // This prevents inconsistency between the two at the expense of concurrency. + // drop(fork_choice); + + // We're declaring the envelope "imported" at this point, since fork choice and the DB know + // about it. + let envelope_time_imported = self.slot_clock.now_duration().unwrap_or(Duration::MAX); + + // TODO(gloas) depending on what happens with light clients + // we might need to do some light client related computations here + + metrics::stop_timer(db_write_timer); + + self.import_envelope_update_metrics_and_events( + block_root, + signed_envelope.slot(), + envelope_time_imported, + ); + + Ok(block_root) + } + + fn import_envelope_update_metrics_and_events( + &self, + block_root: Hash256, + envelope_slot: Slot, + envelope_time_imported: Duration, + ) { + let envelope_delay_total = + get_slot_delay_ms(envelope_time_imported, envelope_slot, &self.slot_clock); + + // Do not write to the cache for envelopes older than 2 epochs, this helps reduce writes + // to the cache during sync. + if envelope_delay_total + < self + .slot_clock + .slot_duration() + .saturating_mul(ENVELOPE_METRICS_CACHE_SLOT_LIMIT) + { + self.envelope_times_cache.write().set_time_imported( + block_root, + envelope_slot, + envelope_time_imported, + ); + } + + // TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks). + } +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs new file mode 100644 index 0000000000..c707d62dc7 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -0,0 +1,285 @@ +//! The incremental processing steps (e.g., signatures verified but not the state transition) is +//! represented as a sequence of wrapper-types around the envelope. There is a linear progression of +//! types, starting at a `SignedExecutionPayloadEnvelope` and finishing with an `AvailableExecutedEnvelope` (see +//! diagram below). +//! +//! ```ignore +//! SignedExecutionPayloadEnvelope +//! | +//! â–¼ +//! GossipVerifiedEnvelope +//! | +//! â–¼ +//! ExecutionPendingEnvelope +//! | +//! await +//! â–¼ +//! ExecutedEnvelope +//! +//! ``` + +use std::sync::Arc; + +use store::Error as DBError; + +use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; +use tracing::instrument; +use types::{ + BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, + ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot, +}; + +use crate::{ + BeaconChainError, BeaconChainTypes, BeaconStore, BlockError, ExecutionPayloadError, + PayloadVerificationOutcome, +}; + +pub mod execution_pending_envelope; +pub mod gossip_verified_envelope; +pub mod import; +mod payload_notifier; + +pub use execution_pending_envelope::ExecutionPendingEnvelope; + +#[derive(PartialEq)] +pub struct EnvelopeImportData { + pub block_root: Hash256, + pub post_state: Box>, +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct AvailableEnvelope { + execution_block_hash: ExecutionBlockHash, + envelope: Arc>, + columns: DataColumnSidecarList, + /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). + columns_available_timestamp: Option, + pub spec: Arc, +} + +impl AvailableEnvelope { + pub fn message(&self) -> &ExecutionPayloadEnvelope { + &self.envelope.message + } + + #[allow(clippy::type_complexity)] + pub fn deconstruct( + self, + ) -> ( + Arc>, + DataColumnSidecarList, + ) { + let AvailableEnvelope { + envelope, columns, .. + } = self; + (envelope, columns) + } +} + +pub enum MaybeAvailableEnvelope { + Available(AvailableEnvelope), + AvailabilityPending { + block_hash: ExecutionBlockHash, + envelope: Arc>, + }, +} + +/// This snapshot is to be used for verifying a payload envelope. +#[derive(Debug, Clone)] +pub struct EnvelopeProcessingSnapshot { + /// This state is equivalent to the `self.beacon_block.state_root()` before applying the envelope. + pub pre_state: BeaconState, + pub state_root: Hash256, + pub beacon_block_root: Hash256, +} + +/// A payload envelope that has gone through processing checks and execution by an EL client. +/// This envelope hasn't necessarily completed data availability checks. +/// +/// +/// It contains 2 variants: +/// 1. `Available`: This envelope has been executed and also contains all data to consider it +/// fully available. +/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it +/// fully available. +pub enum ExecutedEnvelope { + Available(AvailableExecutedEnvelope), + // TODO(gloas) implement availability pending + AvailabilityPending(), +} + +impl ExecutedEnvelope { + pub fn new( + envelope: MaybeAvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + match envelope { + MaybeAvailableEnvelope::Available(available_envelope) => { + Self::Available(AvailableExecutedEnvelope::new( + available_envelope, + import_data, + payload_verification_outcome, + )) + } + // TODO(gloas) implement availability pending + MaybeAvailableEnvelope::AvailabilityPending { + block_hash: _, + envelope: _, + } => Self::AvailabilityPending(), + } + } +} + +/// A payload envelope that has completed all payload processing checks including verification +/// by an EL client **and** has all requisite blob data to be imported into fork choice. +pub struct AvailableExecutedEnvelope { + pub envelope: AvailableEnvelope, + pub import_data: EnvelopeImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedEnvelope { + pub fn new( + envelope: AvailableEnvelope, + import_data: EnvelopeImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + envelope, + import_data, + payload_verification_outcome, + } + } +} + +#[derive(Debug)] +pub enum EnvelopeError { + /// The envelope's block root is unknown. + BlockRootUnknown { block_root: Hash256 }, + /// The signature is invalid. + BadSignature, + /// The builder index doesn't match the committed bid + BuilderIndexMismatch { committed_bid: u64, envelope: u64 }, + /// The envelope slot doesn't match the block + SlotMismatch { block: Slot, envelope: Slot }, + /// The validator index is unknown + UnknownValidator { proposer_index: u64 }, + /// The block hash doesn't match the committed bid + BlockHashMismatch { + committed_bid: ExecutionBlockHash, + envelope: ExecutionBlockHash, + }, + /// The block's proposer_index does not match the locally computed proposer + IncorrectBlockProposer { + proposer_index: u64, + local_shuffling: u64, + }, + /// The slot belongs to a block that is from a slot prior than + /// to most recently finalized slot + PriorToFinalization { + payload_slot: Slot, + latest_finalized_slot: Slot, + }, + /// Some Beacon Chain Error + BeaconChainError(Arc), + /// Some Beacon State error + BeaconStateError(BeaconStateError), + /// Some BlockProcessingError (for electra operations) + BlockProcessingError(BlockProcessingError), + /// Some EnvelopeProcessingError + EnvelopeProcessingError(EnvelopeProcessingError), + /// Error verifying the execution payload + ExecutionPayloadError(ExecutionPayloadError), + /// An error from block-level checks reused during envelope import + BlockError(BlockError), + /// Internal error + InternalError(String), +} + +impl std::fmt::Display for EnvelopeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconChainError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(e)) + } +} + +impl From for EnvelopeError { + fn from(e: ExecutionPayloadError) -> Self { + EnvelopeError::ExecutionPayloadError(e) + } +} + +impl From for EnvelopeError { + fn from(e: BeaconStateError) -> Self { + EnvelopeError::BeaconStateError(e) + } +} + +impl From for EnvelopeError { + fn from(e: DBError) -> Self { + EnvelopeError::BeaconChainError(Arc::new(BeaconChainError::DBError(e))) + } +} + +impl From for EnvelopeError { + fn from(e: BlockError) -> Self { + EnvelopeError::BlockError(e) + } +} + +/// Pull errors up from EnvelopeProcessingError to EnvelopeError +impl From for EnvelopeError { + fn from(e: EnvelopeProcessingError) -> Self { + match e { + EnvelopeProcessingError::BadSignature => EnvelopeError::BadSignature, + EnvelopeProcessingError::BeaconStateError(e) => EnvelopeError::BeaconStateError(e), + EnvelopeProcessingError::BlockHashMismatch { + committed_bid, + envelope, + } => EnvelopeError::BlockHashMismatch { + committed_bid, + envelope, + }, + EnvelopeProcessingError::BlockProcessingError(e) => { + EnvelopeError::BlockProcessingError(e) + } + e => EnvelopeError::EnvelopeProcessingError(e), + } + } +} + +#[instrument(skip_all, level = "debug", fields(beacon_block_root = %beacon_block_root))] +/// Load state from store given a known state root and block root. +/// Use this when the proto block has already been looked up from fork choice. +pub(crate) fn load_snapshot_from_state_root( + beacon_block_root: Hash256, + block_state_root: Hash256, + store: &BeaconStore, +) -> Result, EnvelopeError> { + // TODO(EIP-7732): add metrics here + + // We can use `get_hot_state` here rather than `get_advanced_hot_state` because the envelope + // must be from the same slot as its block (so no advance is required). + let cache_state = true; + let state = store + .get_hot_state(&block_state_root, cache_state) + .map_err(EnvelopeError::from)? + .ok_or_else(|| { + BeaconChainError::DBInconsistent(format!( + "Missing state for envelope block {block_state_root:?}", + )) + })?; + + Ok(EnvelopeProcessingSnapshot { + pre_state: state, + state_root: block_state_root, + beacon_block_root, + }) +} diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs new file mode 100644 index 0000000000..df21d33493 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -0,0 +1,94 @@ +use std::sync::Arc; + +use execution_layer::{NewPayloadRequest, NewPayloadRequestGloas}; +use fork_choice::PayloadVerificationStatus; +use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; +use tracing::warn; +use types::{SignedBeaconBlock, SignedExecutionPayloadEnvelope}; + +use crate::{ + BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, + execution_payload::notify_new_payload, payload_envelope_verification::EnvelopeError, +}; + +/// Used to await the result of executing payload with a remote EE. +pub struct PayloadNotifier { + pub chain: Arc>, + envelope: Arc>, + block: Arc>, + payload_verification_status: Option, +} + +impl PayloadNotifier { + pub fn new( + chain: Arc>, + envelope: Arc>, + block: Arc>, + notify_execution_layer: NotifyExecutionLayer, + ) -> Result { + let payload_verification_status = { + let payload_message = &envelope.message; + + match notify_execution_layer { + NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { + let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; + // TODO(gloas): check and test RLP block hash calculation post-Gloas + if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { + warn!( + block_number = ?payload_message.payload.block_number, + info = "you can silence this warning with --disable-optimistic-finalized-sync", + error = ?e, + "Falling back to slow block hash verification" + ); + None + } else { + Some(PayloadVerificationStatus::Optimistic) + } + } + _ => None, + } + }; + + Ok(Self { + chain, + envelope, + block, + payload_verification_status, + }) + } + + pub async fn notify_new_payload(self) -> Result { + if let Some(precomputed_status) = self.payload_verification_status { + Ok(precomputed_status) + } else { + let parent_root = self.block.message().parent_root(); + let request = Self::build_new_payload_request(&self.envelope, &self.block)?; + notify_new_payload(&self.chain, self.envelope.slot(), parent_root, request).await + } + } + + fn build_new_payload_request<'a>( + envelope: &'a SignedExecutionPayloadEnvelope, + block: &'a SignedBeaconBlock, + ) -> Result, BlockError> { + let bid = &block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .message; + + let versioned_hashes = bid + .blob_kzg_commitments + .iter() + .map(kzg_commitment_to_versioned_hash) + .collect(); + + Ok(NewPayloadRequest::Gloas(NewPayloadRequestGloas { + execution_payload: &envelope.message.payload, + versioned_hashes, + parent_beacon_block_root: block.message().parent_root(), + execution_requests: &envelope.message.execution_requests, + })) + } +} diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index e8ee628f28..acf326430b 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::attestation_verification::{ Error, batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations, diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index e94e64e91d..e385e0dc48 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,5 +1,5 @@ #![cfg(not(debug_assertions))] - +// TODO(gloas) we probably need similar test for payload envelope verification use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 8b51151f9f..f5937078e0 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{ diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index cfc53c8ce0..b6d729cc61 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,4 +1,5 @@ #![cfg(not(debug_assertions))] +#![allow(clippy::result_large_err)] use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::block_verification_types::RpcBlock; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index d6796f6a05..90968fa213 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -2048,7 +2048,7 @@ fn verify_builder_bid( .cloned() .map(|withdrawals| { Withdrawals::::try_from(withdrawals) - .map_err(InvalidBuilderPayload::SszTypesError) + .map_err(|e| Box::new(InvalidBuilderPayload::SszTypesError(e))) .map(|w| w.tree_hash_root()) }) .transpose()?; diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs deleted file mode 100644 index 891f024bf9..0000000000 --- a/beacon_node/http_api/src/block_rewards.rs +++ /dev/null @@ -1,178 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; -use eth2::lighthouse::{BlockReward, BlockRewardsQuery}; -use lru::LruCache; -use state_processing::BlockReplayer; -use std::num::NonZeroUsize; -use std::sync::Arc; -use tracing::{debug, warn}; -use types::block::BlindedBeaconBlock; -use types::new_non_zero_usize; -use warp_utils::reject::{beacon_state_error, custom_bad_request, unhandled_error}; - -const STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(2); - -/// Fetch block rewards for blocks from the canonical chain. -pub fn get_block_rewards( - query: BlockRewardsQuery, - chain: Arc>, -) -> Result, warp::Rejection> { - let start_slot = query.start_slot; - let end_slot = query.end_slot; - let prior_slot = start_slot - 1; - - if start_slot > end_slot || start_slot == 0 { - return Err(custom_bad_request(format!( - "invalid start and end: {}, {}", - start_slot, end_slot - ))); - } - - let end_block_root = chain - .block_root_at_slot(end_slot, WhenSlotSkipped::Prev) - .map_err(unhandled_error)? - .ok_or_else(|| custom_bad_request(format!("block at end slot {} unknown", end_slot)))?; - - let blocks = chain - .store - .load_blocks_to_replay(start_slot, end_slot, end_block_root) - .map_err(|e| unhandled_error(BeaconChainError::from(e)))?; - - let state_root = chain - .state_root_at_slot(prior_slot) - .map_err(unhandled_error)? - .ok_or_else(|| custom_bad_request(format!("prior state at slot {} unknown", prior_slot)))?; - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let mut state = chain - .get_state(&state_root, Some(prior_slot), true) - .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) - .map_err(unhandled_error)?; - - state - .build_caches(&chain.spec) - .map_err(beacon_state_error)?; - - let mut reward_cache = Default::default(); - let mut block_rewards = Vec::with_capacity(blocks.len()); - - let block_replayer = BlockReplayer::new(state, &chain.spec) - .pre_block_hook(Box::new(|state, block| { - state.build_all_committee_caches(&chain.spec)?; - - // Compute block reward. - let block_reward = chain.compute_block_reward( - block.message(), - block.canonical_root(), - state, - &mut reward_cache, - query.include_attestations, - )?; - block_rewards.push(block_reward); - Ok(()) - })) - .state_root_iter( - chain - .forwards_iter_state_roots_until(prior_slot, end_slot) - .map_err(unhandled_error)?, - ) - .no_signature_verification() - .minimal_block_root_verification() - .apply_blocks(blocks, None) - .map_err(unhandled_error)?; - - if block_replayer.state_root_miss() { - warn!(%start_slot, %end_slot, "Block reward state root miss"); - } - - drop(block_replayer); - - Ok(block_rewards) -} - -/// Compute block rewards for blocks passed in as input. -pub fn compute_block_rewards( - blocks: Vec>, - chain: Arc>, -) -> Result, warp::Rejection> { - let mut block_rewards = Vec::with_capacity(blocks.len()); - let mut state_cache = LruCache::new(STATE_CACHE_SIZE); - let mut reward_cache = Default::default(); - - for block in blocks { - let parent_root = block.parent_root(); - - // Check LRU cache for a constructed state from a previous iteration. - let state = if let Some(state) = state_cache.get(&(parent_root, block.slot())) { - debug!( - ?parent_root, - slot = %block.slot(), - "Re-using cached state for block rewards" - ); - state - } else { - debug!( - ?parent_root, - slot = %block.slot(), - "Fetching state for block rewards" - ); - let parent_block = chain - .get_blinded_block(&parent_root) - .map_err(unhandled_error)? - .ok_or_else(|| { - custom_bad_request(format!( - "parent block not known or not canonical: {:?}", - parent_root - )) - })?; - - // This branch is reached from the HTTP API. We assume the user wants - // to cache states so that future calls are faster. - let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) - .map_err(unhandled_error)? - .ok_or_else(|| { - custom_bad_request(format!( - "no state known for parent block: {:?}", - parent_root - )) - })?; - - let block_replayer = BlockReplayer::new(parent_state, &chain.spec) - .no_signature_verification() - .state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter()) - .minimal_block_root_verification() - .apply_blocks(vec![], Some(block.slot())) - .map_err(unhandled_error::)?; - - if block_replayer.state_root_miss() { - warn!( - parent_slot = %parent_block.slot(), - slot = %block.slot(), - "Block reward state root miss" - ); - } - - let mut state = block_replayer.into_state(); - state - .build_all_committee_caches(&chain.spec) - .map_err(beacon_state_error)?; - - state_cache.get_or_insert((parent_root, block.slot()), || state) - }; - - // Compute block reward. - let block_reward = chain - .compute_block_reward( - block.to_ref(), - block.canonical_root(), - state, - &mut reward_cache, - true, - ) - .map_err(unhandled_error)?; - block_rewards.push(block_reward); - } - - Ok(block_rewards) -} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index fd59d372d9..698394b1a5 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::result_large_err)] //! This crate contains a HTTP server which serves the endpoints listed here: //! //! https://github.com/ethereum/beacon-APIs @@ -11,7 +12,6 @@ mod attester_duties; mod beacon; mod block_id; mod block_packing_efficiency; -mod block_rewards; mod build_block_contents; mod builder_states; mod custody; @@ -3077,34 +3077,6 @@ pub fn serve( }, ); - // GET lighthouse/analysis/block_rewards - let get_lighthouse_block_rewards = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_rewards")) - .and(warp::query::()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then(|query, task_spawner: TaskSpawner, chain| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_rewards::get_block_rewards(query, chain) - }) - }); - - // POST lighthouse/analysis/block_rewards - let post_lighthouse_block_rewards = warp::path("lighthouse") - .and(warp::path("analysis")) - .and(warp::path("block_rewards")) - .and(warp_utils::json::json()) - .and(warp::path::end()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .then(|blocks, task_spawner: TaskSpawner, chain| { - task_spawner.blocking_json_task(Priority::P1, move || { - block_rewards::compute_block_rewards(blocks, chain) - }) - }); - // GET lighthouse/analysis/attestation_performance/{index} let get_lighthouse_attestation_performance = warp::path("lighthouse") .and(warp::path("analysis")) @@ -3195,9 +3167,6 @@ pub fn serve( api_types::EventTopic::LightClientOptimisticUpdate => { event_handler.subscribe_light_client_optimistic_update() } - api_types::EventTopic::BlockReward => { - event_handler.subscribe_block_reward() - } api_types::EventTopic::AttesterSlashing => { event_handler.subscribe_attester_slashing() } @@ -3374,7 +3343,6 @@ pub fn serve( .uor(get_lighthouse_staking) .uor(get_lighthouse_database_info) .uor(get_lighthouse_custody_info) - .uor(get_lighthouse_block_rewards) .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) .uor(get_beacon_light_client_finality_update) @@ -3425,7 +3393,6 @@ pub fn serve( .uor(post_validator_liveness_epoch) .uor(post_lighthouse_liveness) .uor(post_lighthouse_database_reconstruct) - .uor(post_lighthouse_block_rewards) .uor(post_lighthouse_ui_validator_metrics) .uor(post_lighthouse_ui_validator_info) .uor(post_lighthouse_finalize) diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index df237d9f9b..a9082df715 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -727,6 +727,18 @@ pub fn post_validator_prepare_beacon_proposer( debug!(error = %e, "Could not send message to the network service. \ Likely shutdown") }); + + // Write the updated custody context to disk. This happens at most 128 + // times ever, so the I/O burden should be extremely minimal. Without a + // write here we risk forgetting custody backfill progress upon an + // unclean shutdown. The custody context is otherwise only persisted in + // `BeaconChain::drop`. + if let Err(error) = chain.persist_custody_context() { + error!( + ?error, + "Failed to persist custody context after CGC update" + ); + } } } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 0016f66c01..2119acf946 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -462,6 +462,13 @@ pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING: LazyLock> ]), ) }); +pub static SYNCING_CHAIN_BATCHES: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "sync_batches", + "Number of batches in sync chains by sync type and state", + &["sync_type", "state"], + ) +}); pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock> = LazyLock::new(|| { try_create_int_gauge( "sync_single_block_lookups", @@ -532,6 +539,16 @@ pub static SYNC_RPC_REQUEST_TIME: LazyLock> = LazyLock::new ) }); +/* + * Execution Payload Envelope Delay Metrics + */ +pub static ENVELOPE_DELAY_GOSSIP: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "payload_envelope_delay_gossip", + "The first time we see this payload envelope from gossip as a delay from the start of the slot", + ) +}); + /* * Block Delay Metrics */ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index e90018c851..3335315157 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,6 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::store::Error; @@ -19,6 +18,10 @@ use beacon_chain::{ sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, }; +use beacon_chain::{ + blob_verification::{GossipBlobError, GossipVerifiedBlob}, + payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope, +}; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{Client, MessageAcceptance, MessageId, PeerAction, PeerId, ReportSource}; use logging::crit; @@ -3248,25 +3251,166 @@ impl NetworkBeaconProcessor { } } - pub async fn process_gossip_execution_payload( + #[allow(clippy::too_many_arguments)] + #[instrument( + name = "lh_process_execution_payload_envelope", + parent = None, + level = "debug", + skip_all, + fields(beacon_block_root = tracing::field::Empty), + )] + pub async fn process_gossip_execution_payload_envelope( + self: Arc, + message_id: MessageId, + peer_id: PeerId, + envelope: Arc>, + seen_timestamp: Duration, + ) { + if let Some(gossip_verified_envelope) = self + .process_gossip_unverified_execution_payload_envelope( + message_id, + peer_id, + envelope.clone(), + seen_timestamp, + ) + .await + { + let beacon_block_root = gossip_verified_envelope.signed_envelope.beacon_block_root(); + + Span::current().record("beacon_block_root", beacon_block_root.to_string()); + + // TODO(gloas) in process_gossip_block here we check_and_insert on the duplicate cache + // before calling gossip_verified_block. We need this to ensure we dont try to execute the + // payload multiple times. + + self.process_gossip_verified_execution_payload_envelope( + peer_id, + gossip_verified_envelope, + ) + .await; + } + } + + async fn process_gossip_unverified_execution_payload_envelope( self: &Arc, message_id: MessageId, peer_id: PeerId, - execution_payload: SignedExecutionPayloadEnvelope, + envelope: Arc>, + seen_duration: Duration, + ) -> Option> { + let envelope_delay = + get_slot_delay_ms(seen_duration, envelope.slot(), &self.chain.slot_clock); + + let verification_result = self + .chain + .clone() + .verify_envelope_for_gossip(envelope.clone()) + .await; + + let verified_envelope = match verification_result { + Ok(verified_envelope) => { + metrics::set_gauge( + &metrics::ENVELOPE_DELAY_GOSSIP, + envelope_delay.as_millis() as i64, + ); + + // Write the time the envelope was observed into the delay cache. + self.chain.envelope_times_cache.write().set_time_observed( + verified_envelope.signed_envelope.beacon_block_root(), + envelope.slot(), + seen_duration, + Some(peer_id.to_string()), + ); + + info!( + slot = %verified_envelope.signed_envelope.slot(), + root = ?verified_envelope.signed_envelope.beacon_block_root(), + "New envelope received" + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + verified_envelope + } + // TODO(gloas) penalize peers accordingly + Err(_) => return None, + }; + + let envelope_slot = verified_envelope.signed_envelope.slot(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); + match self.chain.slot() { + // We only need to do a simple check about the envelope slot vs the current slot because + // `verify_envelope_for_gossip` already ensures that the envelope slot is within tolerance + // for envelope imports. + Ok(current_slot) if envelope_slot > current_slot => { + warn!( + ?envelope_slot, + ?beacon_block_root, + msg = "if this happens consistently, check system clock", + "envelope arrived early" + ); + + // TODO(gloas) update metrics to note how early the envelope arrived + + let inner_self = self.clone(); + let _process_fn = Box::pin(async move { + inner_self + .process_gossip_verified_execution_payload_envelope( + peer_id, + verified_envelope, + ) + .await; + }); + + // TODO(gloas) send to reprocess queue + None + } + Ok(_) => Some(verified_envelope), + Err(e) => { + error!( + error = ?e, + %envelope_slot, + ?beacon_block_root, + location = "envelope gossip", + "Failed to defer envelope import" + ); + None + } + } + } + + async fn process_gossip_verified_execution_payload_envelope( + self: Arc, + _peer_id: PeerId, + verified_envelope: GossipVerifiedEnvelope, ) { - // TODO(EIP-7732): Implement proper execution payload envelope gossip processing. - // This should integrate with the envelope_verification.rs module once it's implemented. + let _processing_start_time = Instant::now(); + let beacon_block_root = verified_envelope.signed_envelope.beacon_block_root(); - trace!( - %peer_id, - builder_index = execution_payload.message.builder_index, - slot = %execution_payload.message.slot, - beacon_block_root = %execution_payload.message.beacon_block_root, - "Processing execution payload envelope" - ); + #[allow(clippy::result_large_err)] + let result = self + .chain + .process_execution_payload_envelope( + beacon_block_root, + verified_envelope, + NotifyExecutionLayer::Yes, + BlockImportSource::Gossip, + || Ok(()), + ) + .await; - // For now, ignore all envelopes since verification is not implemented - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); + // TODO(gloas) metrics + // register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope"); + + match &result { + Ok(AvailabilityProcessingStatus::Imported(_)) + | Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + // Nothing to do + } + Err(_) => { + // TODO(gloas) implement peer penalties + } + } } pub fn process_gossip_execution_payload_bid( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index e1adf860de..357d6c08fd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -429,11 +429,17 @@ impl NetworkBeaconProcessor { message_id: MessageId, peer_id: PeerId, execution_payload: Box>, + seen_timestamp: Duration, ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { processor - .process_gossip_execution_payload(message_id, peer_id, *execution_payload) + .process_gossip_execution_payload_envelope( + message_id, + peer_id, + Arc::new(*execution_payload), + seen_timestamp, + ) .await }; diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 8373dec322..77d64c92e6 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -493,6 +493,7 @@ impl Router { message_id, peer_id, signed_execution_payload_envelope, + timestamp_now(), ), ) } diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 7ef72c7f3a..801c9eca4d 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -8,9 +8,11 @@ //! If a batch fails, the backfill sync cannot progress. In this scenario, we mark the backfill //! sync as failed, log an error and attempt to retry once a new peer joins the node. +use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::manager::BatchProcessResult; @@ -31,6 +33,7 @@ use std::collections::{ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; use types::{ColumnIndex, Epoch, EthSpec}; @@ -1181,6 +1184,21 @@ impl BackFillSync { .epoch(T::EthSpec::slots_per_epoch()) } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["backfill", state.into()], + count as i64, + ); + } + } + /// Updates the global network state indicating the current state of a backfill sync. fn set_state(&self, state: BackFillState) { *self.network_globals.backfill_state.write() = state; diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index f9a1fcce39..e87ffd119e 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -10,10 +10,22 @@ use std::marker::PhantomData; use std::ops::Sub; use std::time::Duration; use std::time::Instant; -use strum::Display; +use strum::{Display, EnumIter, IntoStaticStr}; use types::Slot; use types::{DataColumnSidecarList, Epoch, EthSpec}; +/// Batch states used as metrics labels. +#[derive(Debug, Clone, Copy, PartialEq, Eq, EnumIter, IntoStaticStr)] +#[strum(serialize_all = "snake_case")] +pub enum BatchMetricsState { + AwaitingDownload, + Downloading, + AwaitingProcessing, + Processing, + AwaitingValidation, + Failed, +} + pub type BatchId = Epoch; /// Type of expected batch. @@ -142,6 +154,18 @@ impl BatchState { pub fn poison(&mut self) -> BatchState { std::mem::replace(self, BatchState::Poisoned) } + + /// Returns the metrics state for this batch. + pub fn metrics_state(&self) -> BatchMetricsState { + match self { + BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload, + BatchState::Downloading(_) => BatchMetricsState::Downloading, + BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing, + BatchState::Processing(_) => BatchMetricsState::Processing, + BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation, + BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed, + } + } } impl BatchInfo { diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index a964ad9a3c..fe4c7dfe4c 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -12,14 +12,16 @@ use lighthouse_network::{ }; use logging::crit; use std::hash::{DefaultHasher, Hash, Hasher}; +use strum::IntoEnumIterator; use tracing::{debug, error, info, info_span, warn}; use types::{DataColumnSidecarList, Epoch, EthSpec}; +use crate::metrics; use crate::sync::{ backfill_sync::{BACKFILL_EPOCHS_PER_BATCH, ProcessResult, SyncStart}, batch::{ - BatchConfig, BatchId, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, - ByRangeRequestType, + BatchConfig, BatchId, BatchInfo, BatchMetricsState, BatchOperationOutcome, + BatchProcessingResult, BatchState, ByRangeRequestType, }, block_sidecar_coupling::CouplingError, manager::CustodyBatchProcessResult, @@ -1114,6 +1116,21 @@ impl CustodyBackFillSync { *self.network_globals.custody_sync_state.write() = state; } + pub fn register_metrics(&self) { + for state in BatchMetricsState::iter() { + let count = self + .batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &["custody_backfill", state.into()], + count as i64, + ); + } + } + /// A fully synced peer has joined us. /// If we are in a failed state, update a local variable to indicate we are able to restart /// the failed sync on the next attempt. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index c1ab6221dd..7e618d8980 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -787,6 +787,9 @@ impl SyncManager { } _ = register_metrics_interval.tick() => { self.network.register_metrics(); + self.range_sync.register_metrics(); + self.backfill_sync.register_metrics(); + self.custody_backfill_sync.register_metrics(); } _ = epoch_interval.tick() => { self.update_sync_state(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 25ea1af76a..e3ff638121 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -3,7 +3,8 @@ use crate::metrics; use crate::network_beacon_processor::ChainSegmentProcessId; use crate::sync::batch::BatchId; use crate::sync::batch::{ - BatchConfig, BatchInfo, BatchOperationOutcome, BatchProcessingResult, BatchState, + BatchConfig, BatchInfo, BatchMetricsState, BatchOperationOutcome, BatchProcessingResult, + BatchState, }; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::{RangeRequestId, RpcRequestSendError, RpcResponseError}; @@ -234,6 +235,14 @@ impl SyncingChain { .sum() } + /// Returns the number of batches in the given metrics state. + pub fn count_batches_in_state(&self, state: BatchMetricsState) -> usize { + self.batches + .values() + .filter(|b| b.state().metrics_state() == state) + .count() + } + /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { diff --git a/beacon_node/network/src/sync/range_sync/chain_collection.rs b/beacon_node/network/src/sync/range_sync/chain_collection.rs index b91b88b55c..a087fdecdf 100644 --- a/beacon_node/network/src/sync/range_sync/chain_collection.rs +++ b/beacon_node/network/src/sync/range_sync/chain_collection.rs @@ -6,6 +6,7 @@ use super::chain::{ChainId, ProcessingResult, RemoveChain, SyncingChain}; use super::sync_type::RangeSyncType; use crate::metrics; +use crate::sync::batch::BatchMetricsState; use crate::sync::network_context::SyncNetworkContext; use beacon_chain::{BeaconChain, BeaconChainTypes}; use fnv::FnvHashMap; @@ -17,6 +18,7 @@ use smallvec::SmallVec; use std::collections::HashMap; use std::collections::hash_map::Entry; use std::sync::Arc; +use strum::IntoEnumIterator; use tracing::{debug, error}; use types::EthSpec; use types::{Epoch, Hash256, Slot}; @@ -372,7 +374,8 @@ impl ChainCollection { .iter() .map(|(id, chain)| (chain.available_peers(), !chain.is_syncing(), *id)) .collect::>(); - preferred_ids.sort_unstable(); + // Sort in descending order + preferred_ids.sort_unstable_by(|a, b| b.cmp(a)); let mut syncing_chains = SmallVec::<[Id; PARALLEL_HEAD_CHAINS]>::new(); for (_, _, id) in preferred_ids { @@ -540,6 +543,25 @@ impl ChainCollection { } } + pub fn register_metrics(&self) { + for (sync_type, chains) in [ + ("range_finalized", &self.finalized_chains), + ("range_head", &self.head_chains), + ] { + for state in BatchMetricsState::iter() { + let count: usize = chains + .values() + .map(|chain| chain.count_batches_in_state(state)) + .sum(); + metrics::set_gauge_vec( + &metrics::SYNCING_CHAIN_BATCHES, + &[sync_type, state.into()], + count as i64, + ); + } + } + } + fn update_metrics(&self) { metrics::set_gauge_vec( &metrics::SYNCING_CHAINS_COUNT, diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 86625444be..9fd72ac98a 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -376,6 +376,10 @@ where .update(network, &local, &mut self.awaiting_head_peers); } + pub fn register_metrics(&self) { + self.chains.register_metrics(); + } + /// Kickstarts sync. pub fn resume(&mut self, network: &mut SyncNetworkContext) { for (removed_chain, sync_type, remove_reason) in diff --git a/book/src/api_lighthouse.md b/book/src/api_lighthouse.md index 0442bf4ec0..2fd7290cb2 100644 --- a/book/src/api_lighthouse.md +++ b/book/src/api_lighthouse.md @@ -590,60 +590,6 @@ Caveats: This is because the state *prior* to the `start_epoch` needs to be loaded from the database, and loading a state on a boundary is most efficient. -## `/lighthouse/analysis/block_rewards` - -Fetch information about the block rewards paid to proposers for a range of consecutive blocks. - -Two query parameters are required: - -- `start_slot` (inclusive): the slot of the first block to compute rewards for. -- `end_slot` (inclusive): the slot of the last block to compute rewards for. - -Example: - -```bash -curl -X GET "http://localhost:5052/lighthouse/analysis/block_rewards?start_slot=1&end_slot=1" | jq -``` - -The first few lines of the response would look like: - -```json -[ - { - "total": 637260, - "block_root": "0x4a089c5e390bb98e66b27358f157df825128ea953cee9d191229c0bcf423a4f6", - "meta": { - "slot": "1", - "parent_slot": "0", - "proposer_index": 93, - "graffiti": "EF #vm-eth2-raw-iron-101" - }, - "attestation_rewards": { - "total": 637260, - "prev_epoch_total": 0, - "curr_epoch_total": 637260, - "per_attestation_rewards": [ - { - "50102": 780, - } - ] - } - } -] -``` - -Caveats: - -- Presently only attestation and sync committee rewards are computed. -- The output format is verbose and subject to change. Please see [`BlockReward`][block_reward_src] - in the source. -- For maximum efficiency the `start_slot` should satisfy `start_slot % slots_per_restore_point == 1`. - This is because the state *prior* to the `start_slot` needs to be loaded from the database, and - loading a state on a boundary is most efficient. - -[block_reward_src]: -https://github.com/sigp/lighthouse/tree/unstable/common/eth2/src/lighthouse/block_rewards.rs - ## `/lighthouse/analysis/block_packing` Fetch information about the block packing efficiency of blocks for a range of consecutive diff --git a/common/eip_3076/Cargo.toml b/common/eip_3076/Cargo.toml index 058e1fd1a0..157fe12cb3 100644 --- a/common/eip_3076/Cargo.toml +++ b/common/eip_3076/Cargo.toml @@ -6,7 +6,7 @@ edition = { workspace = true } [features] default = [] -arbitrary-fuzz = ["dep:arbitrary", "types/arbitrary"] +arbitrary = ["dep:arbitrary", "types/arbitrary"] json = ["dep:serde_json"] [dependencies] diff --git a/common/eip_3076/src/lib.rs b/common/eip_3076/src/lib.rs index cdd05d7b1e..0bf1a94d0e 100644 --- a/common/eip_3076/src/lib.rs +++ b/common/eip_3076/src/lib.rs @@ -13,7 +13,7 @@ pub enum Error { #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct InterchangeMetadata { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub interchange_format_version: u64, @@ -22,7 +22,7 @@ pub struct InterchangeMetadata { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct InterchangeData { pub pubkey: PublicKeyBytes, pub signed_blocks: Vec, @@ -31,7 +31,7 @@ pub struct InterchangeData { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct SignedBlock { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub slot: Slot, @@ -41,7 +41,7 @@ pub struct SignedBlock { #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct SignedAttestation { #[serde(with = "serde_utils::quoted_u64::require_quotes")] pub source_epoch: Epoch, @@ -52,7 +52,7 @@ pub struct SignedAttestation { } #[derive(Debug, Clone, PartialEq, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct Interchange { pub metadata: InterchangeMetadata, pub data: Vec, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index 993c263cbf..3c039b16b3 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -2,12 +2,11 @@ mod attestation_performance; mod block_packing_efficiency; -mod block_rewards; mod custody; pub mod sync_state; use crate::{ - BeaconNodeHttpClient, DepositData, Error, Hash256, Slot, + BeaconNodeHttpClient, DepositData, Error, Hash256, lighthouse::sync_state::SyncState, types::{AdminPeer, Epoch, GenericResponse, ValidatorId}, }; @@ -22,7 +21,6 @@ pub use attestation_performance::{ pub use block_packing_efficiency::{ BlockPackingEfficiency, BlockPackingEfficiencyQuery, ProposerInfo, UniqueAttestation, }; -pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery}; pub use custody::CustodyInfo; // Define "legacy" implementations of `Option` which use four bytes for encoding the union @@ -317,27 +315,6 @@ impl BeaconNodeHttpClient { Analysis endpoints. */ - /// `GET` lighthouse/analysis/block_rewards?start_slot,end_slot - pub async fn get_lighthouse_analysis_block_rewards( - &self, - start_slot: Slot, - end_slot: Slot, - ) -> Result, Error> { - let mut path = self.server.expose_full().clone(); - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("lighthouse") - .push("analysis") - .push("block_rewards"); - - path.query_pairs_mut() - .append_pair("start_slot", &start_slot.to_string()) - .append_pair("end_slot", &end_slot.to_string()); - - self.get(path).await - } - /// `GET` lighthouse/analysis/block_packing?start_epoch,end_epoch pub async fn get_lighthouse_analysis_block_packing( &self, diff --git a/common/eth2/src/lighthouse/block_rewards.rs b/common/eth2/src/lighthouse/block_rewards.rs deleted file mode 100644 index 38070f3539..0000000000 --- a/common/eth2/src/lighthouse/block_rewards.rs +++ /dev/null @@ -1,60 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use types::{AttestationData, Hash256, Slot}; - -/// Details about the rewards paid to a block proposer for proposing a block. -/// -/// All rewards in GWei. -/// -/// Presently this only counts attestation rewards, but in future should be expanded -/// to include information on slashings and sync committee aggregates too. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockReward { - /// Sum of all reward components. - pub total: u64, - /// Block root of the block that these rewards are for. - pub block_root: Hash256, - /// Metadata about the block, particularly reward-relevant metadata. - pub meta: BlockRewardMeta, - /// Rewards due to attestations. - pub attestation_rewards: AttestationRewards, - /// Sum of rewards due to sync committee signatures. - pub sync_committee_rewards: u64, -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockRewardMeta { - pub slot: Slot, - pub parent_slot: Slot, - pub proposer_index: u64, - pub graffiti: String, -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct AttestationRewards { - /// Total block reward from attestations included. - pub total: u64, - /// Total rewards from previous epoch attestations. - pub prev_epoch_total: u64, - /// Total rewards from current epoch attestations. - pub curr_epoch_total: u64, - /// Vec of attestation rewards for each attestation included. - /// - /// Each element of the vec is a map from validator index to reward. - pub per_attestation_rewards: Vec>, - /// The attestations themselves (optional). - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub attestations: Vec, -} - -/// Query parameters for the `/lighthouse/block_rewards` endpoint. -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -pub struct BlockRewardsQuery { - /// Lower slot limit for block rewards returned (inclusive). - pub start_slot: Slot, - /// Upper slot limit for block rewards returned (inclusive). - pub end_slot: Slot, - /// Include the full attestations themselves? - #[serde(default)] - pub include_attestations: bool, -} diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index f8376d430c..2f86170812 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -37,9 +37,6 @@ pub mod beacon_response { pub use crate::beacon_response::*; } -#[cfg(feature = "lighthouse")] -use crate::lighthouse::BlockReward; - // Re-export error types from the unified error module pub use crate::error::{ErrorMessage, Failure, IndexedErrorMessage, ResponseError as Error}; @@ -1199,8 +1196,6 @@ pub enum EventKind { LateHead(SseLateHead), LightClientFinalityUpdate(Box>>), LightClientOptimisticUpdate(Box>>), - #[cfg(feature = "lighthouse")] - BlockReward(BlockReward), PayloadAttributes(VersionedSsePayloadAttributes), ProposerSlashing(Box), AttesterSlashing(Box>), @@ -1225,8 +1220,6 @@ impl EventKind { EventKind::LateHead(_) => "late_head", EventKind::LightClientFinalityUpdate(_) => "light_client_finality_update", EventKind::LightClientOptimisticUpdate(_) => "light_client_optimistic_update", - #[cfg(feature = "lighthouse")] - EventKind::BlockReward(_) => "block_reward", EventKind::ProposerSlashing(_) => "proposer_slashing", EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", @@ -1302,10 +1295,6 @@ impl EventKind { })?), ))) } - #[cfg(feature = "lighthouse")] - "block_reward" => Ok(EventKind::BlockReward(serde_json::from_str(data).map_err( - |e| ServerError::InvalidServerSentEvent(format!("Block Reward: {:?}", e)), - )?)), "attester_slashing" => Ok(EventKind::AttesterSlashing( serde_json::from_str(data).map_err(|e| { ServerError::InvalidServerSentEvent(format!("Attester Slashing: {:?}", e)) @@ -1355,8 +1344,6 @@ pub enum EventTopic { PayloadAttributes, LightClientFinalityUpdate, LightClientOptimisticUpdate, - #[cfg(feature = "lighthouse")] - BlockReward, AttesterSlashing, ProposerSlashing, BlsToExecutionChange, @@ -1382,8 +1369,6 @@ impl FromStr for EventTopic { "late_head" => Ok(EventTopic::LateHead), "light_client_finality_update" => Ok(EventTopic::LightClientFinalityUpdate), "light_client_optimistic_update" => Ok(EventTopic::LightClientOptimisticUpdate), - #[cfg(feature = "lighthouse")] - "block_reward" => Ok(EventTopic::BlockReward), "attester_slashing" => Ok(EventTopic::AttesterSlashing), "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), @@ -1410,8 +1395,6 @@ impl fmt::Display for EventTopic { EventTopic::LateHead => write!(f, "late_head"), EventTopic::LightClientFinalityUpdate => write!(f, "light_client_finality_update"), EventTopic::LightClientOptimisticUpdate => write!(f, "light_client_optimistic_update"), - #[cfg(feature = "lighthouse")] - EventTopic::BlockReward => write!(f, "block_reward"), EventTopic::AttesterSlashing => write!(f, "attester_slashing"), EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), diff --git a/common/logging/Cargo.toml b/common/logging/Cargo.toml index 41c82dbd61..1606b8ceb4 100644 --- a/common/logging/Cargo.toml +++ b/common/logging/Cargo.toml @@ -5,7 +5,8 @@ authors = ["blacktemplar "] edition = { workspace = true } [features] -test_logger = [] # Print log output to stderr when running tests instead of dropping it +# Print log output to stderr when running tests instead of dropping it. +test_logger = [] [dependencies] chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } @@ -13,7 +14,7 @@ logroller = { workspace = true } metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -tokio = { workspace = true, features = [ "time" ] } +tokio = { workspace = true, features = ["time"] } tracing = { workspace = true } tracing-appender = { workspace = true } tracing-core = { workspace = true } diff --git a/common/malloc_utils/Cargo.toml b/common/malloc_utils/Cargo.toml index 1052128852..e90490bf09 100644 --- a/common/malloc_utils/Cargo.toml +++ b/common/malloc_utils/Cargo.toml @@ -35,7 +35,4 @@ tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats"] } # Jemalloc's background_threads feature requires Linux (pthreads). [target.'cfg(target_os = "linux")'.dependencies] -tikv-jemallocator = { version = "0.6.0", optional = true, features = [ - "stats", - "background_threads", -] } +tikv-jemallocator = { version = "0.6.0", optional = true, features = ["stats", "background_threads"] } diff --git a/consensus/state_processing/Cargo.toml b/consensus/state_processing/Cargo.toml index 7426995439..ae0af03231 100644 --- a/consensus/state_processing/Cargo.toml +++ b/consensus/state_processing/Cargo.toml @@ -7,10 +7,10 @@ edition = { workspace = true } [features] default = [] fake_crypto = ["bls/fake_crypto"] -arbitrary-fuzz = [ +arbitrary = [ "dep:arbitrary", "smallvec/arbitrary", - "types/arbitrary-fuzz", + "types/arbitrary", "merkle_proof/arbitrary", "ethereum_ssz/arbitrary", "ssz_types/arbitrary", diff --git a/consensus/state_processing/src/envelope_processing.rs b/consensus/state_processing/src/envelope_processing.rs index c2cfeae5d3..be6b7c1b29 100644 --- a/consensus/state_processing/src/envelope_processing.rs +++ b/consensus/state_processing/src/envelope_processing.rs @@ -21,7 +21,7 @@ macro_rules! envelope_verify { } /// The strategy to be used when validating the payloads state root. -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifyStateRoot { /// Validate state root. diff --git a/consensus/state_processing/src/per_block_processing.rs b/consensus/state_processing/src/per_block_processing.rs index 037e1c7cc7..5aa610e98e 100644 --- a/consensus/state_processing/src/per_block_processing.rs +++ b/consensus/state_processing/src/per_block_processing.rs @@ -55,12 +55,12 @@ use crate::common::update_progressive_balances_cache::{ initialize_progressive_balances_cache, update_progressive_balances_metrics, }; use crate::epoch_cache::initialize_epoch_cache; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; use tracing::instrument; /// The strategy to be used when validating the block's signatures. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy, Debug)] pub enum BlockSignatureStrategy { /// Do not validate any signature. Use with caution. @@ -74,7 +74,7 @@ pub enum BlockSignatureStrategy { } /// The strategy to be used when validating the block's signatures. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifySignatures { /// Validate all signatures encountered. @@ -90,7 +90,7 @@ impl VerifySignatures { } /// Control verification of the latest block header. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(PartialEq, Clone, Copy)] pub enum VerifyBlockRoot { True, diff --git a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs index c5ec80b92a..3e4f7e8189 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/validator_statuses.rs @@ -2,7 +2,7 @@ use crate::common::attesting_indices_base::get_attesting_indices; use safe_arith::SafeArith; use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, PendingAttestation}; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; /// Sets the boolean `var` on `self` to be true if it is true on `other`. Otherwise leaves `self` @@ -16,7 +16,7 @@ macro_rules! set_self_if_other_is_true { } /// The information required to reward a block producer for including an attestation in a block. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Clone, Copy, PartialEq)] pub struct InclusionInfo { /// The distance between the attestation slot and the slot that attestation was included in a @@ -48,7 +48,7 @@ impl InclusionInfo { } /// Information required to reward some validator during the current and previous epoch. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Default, Clone, PartialEq)] pub struct ValidatorStatus { /// True if the validator has been slashed, ever. @@ -118,7 +118,7 @@ impl ValidatorStatus { /// epochs. #[derive(Clone, Debug, PartialEq)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct TotalBalances { /// The effective balance increment from the spec. effective_balance_increment: u64, @@ -175,7 +175,7 @@ impl TotalBalances { /// Summarised information about validator participation in the _previous and _current_ epochs of /// some `BeaconState`. -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[derive(Debug, Clone)] pub struct ValidatorStatuses { /// Information about each individual validator from the state's validator registry. diff --git a/consensus/state_processing/src/verify_operation.rs b/consensus/state_processing/src/verify_operation.rs index a13786f9f6..1e9c3d5fe3 100644 --- a/consensus/state_processing/src/verify_operation.rs +++ b/consensus/state_processing/src/verify_operation.rs @@ -7,7 +7,7 @@ use crate::per_block_processing::{ verify_attester_slashing, verify_bls_to_execution_change, verify_exit, verify_proposer_slashing, }; -#[cfg(feature = "arbitrary-fuzz")] +#[cfg(feature = "arbitrary")] use arbitrary::Arbitrary; use educe::Educe; use smallvec::{SmallVec, smallvec}; @@ -41,14 +41,14 @@ pub trait TransformPersist { /// The inner `op` field is private, meaning instances of this type can only be constructed /// by calling `validate`. #[derive(Educe, Debug, Clone)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] #[educe( PartialEq, Eq, Hash(bound(T: TransformPersist + std::hash::Hash, E: EthSpec)) )] #[cfg_attr( - feature = "arbitrary-fuzz", + feature = "arbitrary", arbitrary(bound = "T: TransformPersist + Arbitrary<'arbitrary>, E: EthSpec") )] pub struct SigVerifiedOp { @@ -139,7 +139,7 @@ struct SigVerifiedOpDecode { /// We need to store multiple `ForkVersion`s because attester slashings contain two indexed /// attestations which may be signed using different versions. #[derive(Debug, PartialEq, Eq, Clone, Hash, Encode, Decode, TestRandom)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(Arbitrary))] pub struct VerifiedAgainst { fork_versions: SmallVec<[ForkVersion; MAX_FORKS_VERIFIED_AGAINST]>, } diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index e7e382714b..c09e3d6931 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -1,10 +1,7 @@ [package] name = "types" version = "0.2.1" -authors = [ - "Paul Hauner ", - "Age Manning ", -] +authors = ["Paul Hauner ", "Age Manning "] edition = { workspace = true } [features] @@ -22,7 +19,6 @@ arbitrary = [ "ssz_types/arbitrary", "swap_or_not_shuffle/arbitrary", ] -arbitrary-fuzz = ["arbitrary"] portable = ["bls/supranational-portable"] [dependencies] diff --git a/slasher/service/src/lib.rs b/slasher/service/src/lib.rs index ac15b49ee9..69ec59aa2c 100644 --- a/slasher/service/src/lib.rs +++ b/slasher/service/src/lib.rs @@ -1,3 +1,4 @@ +#![allow(clippy::result_large_err)] mod service; pub use service::SlasherService; diff --git a/validator_client/http_api/Cargo.toml b/validator_client/http_api/Cargo.toml index 2bd57867ac..e334ab9db0 100644 --- a/validator_client/http_api/Cargo.toml +++ b/validator_client/http_api/Cargo.toml @@ -8,14 +8,17 @@ authors = ["Sigma Prime "] name = "validator_http_api" path = "src/lib.rs" +[features] +testing = ["dep:deposit_contract", "dep:doppelganger_service", "dep:tempfile"] + [dependencies] account_utils = { workspace = true } beacon_node_fallback = { workspace = true } bls = { workspace = true } -deposit_contract = { workspace = true } +deposit_contract = { workspace = true, optional = true } directory = { workspace = true } dirs = { workspace = true } -doppelganger_service = { workspace = true } +doppelganger_service = { workspace = true, optional = true } eth2 = { workspace = true, features = ["lighthouse"] } eth2_keystore = { workspace = true } ethereum_serde_utils = { workspace = true } @@ -38,7 +41,7 @@ slot_clock = { workspace = true } sysinfo = { workspace = true } system_health = { workspace = true } task_executor = { workspace = true } -tempfile = { workspace = true } +tempfile = { workspace = true, optional = true } tokio = { workspace = true } tokio-stream = { workspace = true } tracing = { workspace = true } @@ -53,7 +56,10 @@ warp_utils = { workspace = true } zeroize = { workspace = true } [dev-dependencies] +deposit_contract = { workspace = true } +doppelganger_service = { workspace = true } futures = { workspace = true } itertools = { workspace = true } rand = { workspace = true, features = ["small_rng"] } ssz_types = { workspace = true } +tempfile = { workspace = true } diff --git a/validator_client/http_api/src/lib.rs b/validator_client/http_api/src/lib.rs index a35b4ec6c6..8e9c077e57 100644 --- a/validator_client/http_api/src/lib.rs +++ b/validator_client/http_api/src/lib.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "testing")] +pub mod test_utils; + mod api_secret; mod create_signed_voluntary_exit; mod create_validator; @@ -6,7 +9,6 @@ mod keystores; mod remotekeys; mod tests; -pub mod test_utils; pub use api_secret::PK_FILENAME; use graffiti::{delete_graffiti, get_graffiti, set_graffiti}; diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index 695a693385..8017941ca6 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -6,7 +6,7 @@ edition = { workspace = true } autotests = false [features] -arbitrary-fuzz = ["dep:arbitrary", "types/arbitrary-fuzz", "eip_3076/arbitrary-fuzz"] +arbitrary = ["dep:arbitrary", "types/arbitrary", "eip_3076/arbitrary"] portable = ["types/portable"] [dependencies] diff --git a/validator_client/slashing_protection/src/interchange_test.rs b/validator_client/slashing_protection/src/interchange_test.rs index c5c3df7ea4..996116dd1c 100644 --- a/validator_client/slashing_protection/src/interchange_test.rs +++ b/validator_client/slashing_protection/src/interchange_test.rs @@ -11,7 +11,7 @@ use tempfile::tempdir; use types::{Epoch, Hash256, Slot}; #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct MultiTestCase { pub name: String, pub genesis_validators_root: Hash256, @@ -19,7 +19,7 @@ pub struct MultiTestCase { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestCase { pub should_succeed: bool, pub contains_slashable_data: bool, @@ -29,7 +29,7 @@ pub struct TestCase { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestBlock { pub pubkey: PublicKeyBytes, pub slot: Slot, @@ -39,7 +39,7 @@ pub struct TestBlock { } #[derive(Debug, Clone, Deserialize, Serialize)] -#[cfg_attr(feature = "arbitrary-fuzz", derive(arbitrary::Arbitrary))] +#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))] pub struct TestAttestation { pub pubkey: PublicKeyBytes, pub source_epoch: Epoch, diff --git a/validator_manager/Cargo.toml b/validator_manager/Cargo.toml index 16ce1e023f..d0155698b4 100644 --- a/validator_manager/Cargo.toml +++ b/validator_manager/Cargo.toml @@ -29,4 +29,4 @@ beacon_chain = { workspace = true } http_api = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } -validator_http_api = { workspace = true } +validator_http_api = { workspace = true, features = ["testing"] }