diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index 308ddcf819..b79659ae3b 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -14,7 +14,7 @@ concurrency: jobs: dockerfile-ubuntu: - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} steps: - uses: actions/checkout@v5 @@ -31,7 +31,7 @@ jobs: retention-days: 3 run-local-testnet: - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} needs: dockerfile-ubuntu steps: - uses: actions/checkout@v5 @@ -173,7 +173,7 @@ jobs: # Tests checkpoint syncing to a live network (current fork) and a running devnet (usually next scheduled fork) checkpoint-sync-test: name: checkpoint-sync-test-${{ matrix.network }} - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} needs: dockerfile-ubuntu if: contains(github.event.pull_request.labels.*.name, 'syncing') continue-on-error: true @@ -216,7 +216,7 @@ jobs: # Test syncing from genesis on a local testnet. Aims to cover forward syncing both short and long distances. genesis-sync-test: name: genesis-sync-test-${{ matrix.fork }}-${{ matrix.offline_secs }}s - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} needs: dockerfile-ubuntu strategy: matrix: @@ -259,7 +259,7 @@ jobs: # a PR is safe to merge. New jobs should be added here. local-testnet-success: name: local-testnet-success - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} needs: [ 'dockerfile-ubuntu', 'run-local-testnet', diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index c2ce6f89be..c632042351 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -97,15 +97,18 @@ jobs: name: release-tests-ubuntu needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} steps: - uses: actions/checkout@v5 # Set Java version to 21. (required since Web3Signer 24.12.0). - - uses: actions/setup-java@v4 + # On sigp/lighthouse, Java 21 is baked into the snapshot. + - if: github.repository != 'sigp/lighthouse' + uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable @@ -113,6 +116,10 @@ jobs: bins: cargo-nextest env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild - name: Run tests in release run: make test-release - name: Show cache stats @@ -123,34 +130,44 @@ jobs: name: beacon-chain-tests needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v5 - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable cache-target: release bins: cargo-nextest + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild - name: Run beacon_chain tests for all known forks run: make test-beacon-chain http-api-tests: name: http-api-tests needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v5 - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable cache-target: release bins: cargo-nextest + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild - name: Run http_api tests for all recent forks run: make test-http-api op-pool-tests: @@ -220,16 +237,21 @@ jobs: name: debug-tests-ubuntu needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v5 - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable bins: cargo-nextest + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild - name: Run tests in debug run: make test-debug state-transition-vectors-ubuntu: @@ -250,17 +272,22 @@ jobs: name: ef-tests-ubuntu needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} steps: - uses: actions/checkout@v5 - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable cache-target: release bins: cargo-nextest + - if: github.repository == 'sigp/lighthouse' + uses: Swatinem/rust-cache@v2 + with: + cache-provider: warpbuild - name: Run consensus-spec-tests with blst and fake_crypto run: make test-ef basic-simulator-ubuntu: @@ -311,14 +338,14 @@ jobs: name: execution-engine-integration-ubuntu needs: [check-labels] if: needs.check-labels.outputs.skip_ci != 'true' - runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x' || 'ubuntu-latest' }} + runs-on: ${{ github.repository == 'sigp/lighthouse' && 'warp-ubuntu-latest-x64-8x;snapshot.key=lighthouse-ubuntu-latest-v1' || 'ubuntu-latest' }} steps: - uses: actions/checkout@v5 - - name: Get latest version of stable Rust + - if: github.repository != 'sigp/lighthouse' + name: Get latest version of stable Rust uses: moonrepo/setup-rust@v1 with: channel: stable - cache-target: release cache: false env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/warpbuild-ubuntu-latest-snapshot.yml b/.github/workflows/warpbuild-ubuntu-latest-snapshot.yml new file mode 100644 index 0000000000..f32a0f0545 --- /dev/null +++ b/.github/workflows/warpbuild-ubuntu-latest-snapshot.yml @@ -0,0 +1,63 @@ +name: Bake warpbuild snapshot (lighthouse-ubuntu-latest) + +on: + workflow_dispatch: + schedule: + # Every week (Sunday at 00:00 UTC) + - cron: "0 0 * * 0" + pull_request: + branches: [stable, unstable] + paths: + - '.github/workflows/warpbuild-ubuntu-latest-snapshot.yml' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + bake: + runs-on: warp-ubuntu-latest-x64-8x + steps: + - name: Install system deps + run: | + set -euxo pipefail + sudo apt-get update -qq + sudo apt-get install -y --no-install-recommends \ + pkg-config \ + libssl-dev \ + build-essential \ + cmake \ + clang \ + llvm-dev \ + libclang-dev \ + protobuf-compiler \ + git \ + gcc \ + g++ \ + make + + - name: Install Rust toolchain (stable) + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt,clippy + + - name: Install cargo bins + run: | + cargo install --locked cargo-nextest + cargo install --locked cargo-audit + cargo install --locked cargo-deny + cargo install --locked cargo-sort + cargo install --locked cargo-hack + + - name: Install Java (Temurin 21) + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '21' + + - name: Save snapshot + uses: WarpBuilds/snapshot-save@v1 + with: + alias: 'lighthouse-ubuntu-latest-v1' + fail-on-error: true + wait-timeout-minutes: 60 diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f618cf6321..5253c85bbc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -19,13 +19,15 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::custody_context::CustodyContextSsz; use crate::data_availability_checker::{ - Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, - DataAvailabilityChecker, DataColumnReconstructionResult, + Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, + DataColumnReconstructionResult as DataColumnReconstructionResultV1, }; + +use crate::data_availability_checker::DataAvailabilityChecker; use crate::data_column_verification::{ GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn, GossipVerifiedPartialDataColumnHeader, KzgVerifiedCustodyPartialDataColumn, - KzgVerifiedPartialDataColumn, PartialColumnVerificationResult, + KzgVerifiedPartialDataColumn, PartialColumnVerificationResult, load_gloas_payload_bid, validate_partial_data_column_sidecar_for_gossip, }; use crate::early_attester_cache::EarlyAttesterCache; @@ -65,6 +67,11 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; +use crate::pending_payload_cache::PendingPayloadCache; +use crate::pending_payload_cache::{ + Availability as PayloadAvailability, + DataColumnReconstructionResult as DataColumnReconstructionResultGloas, +}; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -498,9 +505,10 @@ pub struct BeaconChain { pub validator_monitor: RwLock>, /// The slot at which blocks are downloaded back to. pub genesis_backfill_slot: Slot, - /// Provides a KZG verification and temporary storage for blocks and blobs as - /// they are collected and combined. + /// Provides KZG verification and temporary storage for pre-Gloas blocks and blobs. pub data_availability_checker: Arc>, + /// Provides KZG verification and temporary storage for post-Gloas payload envelopes. + pub pending_payload_cache: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Arc, /// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing. @@ -1180,6 +1188,7 @@ impl BeaconChain { let all_cached_columns_opt = self .data_availability_checker .get_data_columns(block_root) + .or_else(|| self.pending_payload_cache.get_data_columns(block_root)) .or_else(|| self.early_attester_cache.get_data_columns(block_root)); if let Some(mut all_cached_columns) = all_cached_columns_opt { @@ -1198,6 +1207,24 @@ impl BeaconChain { } } + pub fn cached_data_column_indexes( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Option> { + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + self.pending_payload_cache + .cached_data_column_indexes(block_root) + } else { + self.data_availability_checker + .cached_data_column_indexes(block_root) + } + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1303,6 +1330,14 @@ impl BeaconChain { return Ok(None); }; + // Gloas removes the standalone `BlobSidecar` shape — KZG commitments live in the bid and + // there's no signed-block-header / inclusion-proof to populate a `BlobSidecar` from. The + // canonical data is the column sidecar set on disk; callers needing data for a Gloas + // block should consume columns directly via `get_data_columns`. + if block.fork_name_unchecked().gloas_enabled() { + return Ok(None); + } + if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let fork_name = self.spec.fork_name_at_epoch(block.epoch()); if let Some(columns) = self.store.get_data_columns(block_root, fork_name)? { @@ -3306,12 +3341,19 @@ impl BeaconChain { )); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its samples again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); + + // Before Gloas, if this block has already been imported to fork choice it must have been + // available, so we don't need to process its samples again. In Gloas the beacon block is + // imported before the payload envelope and data columns, so this check does not apply. + if !is_gloas + && self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) { return Err(BlockError::DuplicateFullyImported(block_root)); } @@ -3401,15 +3443,34 @@ impl BeaconChain { .map(|column| column.as_data_column()), ); - let availability = self - .data_availability_checker - .put_kzg_verified_custody_data_columns( - block_root, - merge_result.full_columns.clone(), - )?; - - self.process_availability(slot, availability, || Ok(())) - .await? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns( + block_root, + bid, + &merge_result.full_columns, + ) + .map_err(BlockError::from)?; + self.process_payload_envelope_availability(slot, availability, || Ok(())) + .await? + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns( + block_root, + merge_result.full_columns.clone(), + ) + .map_err(BlockError::from)?; + self.process_availability(slot, availability, || Ok(())) + .await? + } } else { AvailabilityProcessingStatus::MissingComponents(slot, block_root) }; @@ -3521,9 +3582,12 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() && event_handler.has_data_column_sidecar_subscribers() { + let mut data_columns_iter = data_columns_iter.peekable(); + let Some(slot) = data_columns_iter.peek().map(|col| col.slot()) else { + return; + }; let imported_data_columns = self - .data_availability_checker - .cached_data_column_indexes(block_root) + .cached_data_column_indexes(block_root, slot) .unwrap_or_default(); let new_data_columns = data_columns_iter.filter(|b| !imported_data_columns.contains(b.index())); @@ -3596,6 +3660,7 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, + slot: Slot, block_root: Hash256, ) -> Result< Option<( @@ -3615,37 +3680,80 @@ impl BeaconChain { return Ok(None); } - let data_availability_checker = self.data_availability_checker.clone(); + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); - let result = self - .task_executor - .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - data_availability_checker.reconstruct_data_columns(&block_root) - }) - .await - .map_err(|_| BeaconChainError::RuntimeShutdown)??; + if is_gloas { + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let pending_payload_cache = self.pending_payload_cache.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + pending_payload_cache.reconstruct_data_columns(&block_root, bid) + }) + .await + .map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))? + .map_err(BlockError::from)?; - match result { - DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); - }; + match result { + DataColumnReconstructionResultGloas::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; - self.process_availability(slot, availability, || Ok(())) - .await - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await + .map(|status| Some((status, data_columns_to_publish)))?) + } + DataColumnReconstructionResultGloas::NotStarted(reason) + | DataColumnReconstructionResultGloas::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } } - DataColumnReconstructionResult::NotStarted(reason) - | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { - // We use metric here because logging this would be *very* noisy. - metrics::inc_counter_vec( - &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, - &[reason], - ); - Ok(None) + } else { + let data_availability_checker = self.data_availability_checker.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + data_availability_checker.reconstruct_data_columns(&block_root) + }) + .await + .map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))? + .map_err(BlockError::from)?; + + match result { + DataColumnReconstructionResultV1::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; + + Ok(self + .process_availability(slot, availability, || Ok(())) + .await + .map(|status| Some((status, data_columns_to_publish)))?) + } + DataColumnReconstructionResultV1::NotStarted(reason) + | DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } } } } @@ -3723,6 +3831,19 @@ impl BeaconChain { &chain, notify_execution_layer, )?; + + let block = execution_pending.block.block_cloned(); + if block.fork_name_unchecked().gloas_enabled() { + let bid = Arc::new( + block + .message() + .body() + .signed_execution_payload_bid()? + .clone(), + ); + chain.pending_payload_cache.insert_bid(block_root, bid); + } + publish_fn()?; // Record the time it took to complete consensus verification. @@ -3891,12 +4012,27 @@ impl BeaconChain { } } - let availability = self - .data_availability_checker - .put_gossip_verified_data_columns(block_root, slot, data_columns)?; - - self.process_availability(slot, availability, publish_fn) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let availability = self + .pending_payload_cache + .put_gossip_verified_data_columns(block_root, bid, data_columns)?; + Ok(self + .process_payload_envelope_availability(slot, availability, publish_fn) + .await?) + } else { + let availability = self + .data_availability_checker + .put_gossip_verified_data_columns(block_root, slot, data_columns)?; + Ok(self + .process_availability(slot, availability, publish_fn) + .await?) + } } fn check_blob_header_signature_and_slashability<'a>( @@ -3943,7 +4079,8 @@ impl BeaconChain { )?; let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + .put_rpc_blobs(block_root, blobs) + .map_err(BlockError::from)?; self.process_availability(slot, availability, || Ok(())) .await @@ -3955,14 +4092,20 @@ impl BeaconChain { block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { - let availability = match engine_get_blobs_output { + match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blob_header_signature_and_slashability( block_root, blobs.iter().map(|b| b.as_blob()), )?; - self.data_availability_checker - .put_kzg_verified_blobs(block_root, blobs)? + let availability = self + .data_availability_checker + .put_kzg_verified_blobs(block_root, blobs) + .map_err(BlockError::from)?; + + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) } EngineGetBlobsOutput::CustodyColumns(data_columns) => { // TODO(gloas) verify that this check is no longer relevant for gloas @@ -3975,13 +4118,31 @@ impl BeaconChain { _ => None, }), )?; - self.data_availability_checker - .put_kzg_verified_custody_data_columns(block_root, data_columns)? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns(block_root, bid, &data_columns) + .map_err(BlockError::from)?; + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await?) + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns(block_root, data_columns) + .map_err(BlockError::from)?; + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) + } } - }; - - self.process_availability(slot, availability, || Ok(())) - .await + } } /// Checks if the provided columns can make any cached blocks available, and imports immediately @@ -4001,16 +4162,29 @@ impl BeaconChain { }), )?; - // This slot value is purely informative for the consumers of - // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let availability = self.data_availability_checker.put_rpc_custody_columns( - block_root, - slot, - custody_columns, - )?; - - self.process_availability(slot, availability, || Ok(())) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let availability = self + .pending_payload_cache + .put_rpc_custody_columns(block_root, bid, custody_columns) + .map_err(BlockError::from)?; + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await?) + } else { + let availability = self + .data_availability_checker + .put_rpc_custody_columns(block_root, slot, custody_columns) + .map_err(BlockError::from)?; + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) + } } fn check_data_column_sidecar_header_signature_and_slashability<'a>( @@ -4053,16 +4227,33 @@ impl BeaconChain { async fn process_availability( self: &Arc, slot: Slot, - availability: Availability, + availability: BlockAvailability, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { - Availability::Available(block) => { + BlockAvailability::Available(block) => { publish_fn()?; - // Block is fully available, import into fork choice self.import_available_block(block).await } - Availability::MissingComponents(block_root) => Ok( + BlockAvailability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), + } + } + + pub(crate) async fn process_payload_envelope_availability( + self: &Arc, + slot: Slot, + availability: PayloadAvailability, + publish_fn: impl FnOnce() -> Result<(), BlockError>, + ) -> Result { + match availability { + PayloadAvailability::Available(available_envelope) => { + publish_fn()?; + self.import_available_execution_payload_envelope(available_envelope) + .await + } + PayloadAvailability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), ), } @@ -7572,7 +7763,7 @@ impl BeaconChain { ) } - pub(crate) fn get_blobs_or_columns_store_op( + pub fn get_blobs_or_columns_store_op( &self, block_root: Hash256, block_slot: Slot, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9a43147233..24f971f736 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -286,6 +286,10 @@ pub enum BlockError { /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 AvailabilityCheck(AvailabilityCheckError), + /// The payload envelope's block root is unknown. + EnvelopeBlockRootUnknown(Hash256), + /// Optimistic sync is not supported for Gloas payload envelopes. + OptimisticSyncNotSupported { block_root: Hash256 }, /// A Blob with a slot after PeerDAS is received and is not required to be imported. /// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could /// still receive valid blobs from a Deneb epoch after PeerDAS is activated. @@ -624,7 +628,8 @@ pub fn signature_verify_chain_segment( consensus_context, }); } - + // TODO(gloas) When implementing range and backfill sync for gloas + // we need a batch verify kzg function in the new da checker as well. chain .data_availability_checker .batch_verify_kzg_for_available_blocks(&available_blocks)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index d70561db9b..cccc965dee 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -12,6 +12,7 @@ use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sideca use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; +use crate::pending_payload_cache::PendingPayloadCache; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; @@ -986,6 +987,7 @@ where ) }; debug!(?custody_context, "Loaded persisted custody context"); + let custody_context = Arc::new(custody_context); let beacon_chain = BeaconChain { spec: self.spec.clone(), @@ -1061,14 +1063,22 @@ where data_availability_checker: Arc::new( DataAvailabilityChecker::new( complete_blob_backfill, - slot_clock, + slot_clock.clone(), self.kzg.clone(), - Arc::new(custody_context), - self.spec, + custody_context.clone(), + self.spec.clone(), enable_partial_columns, ) .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), + pending_payload_cache: Arc::new( + PendingPayloadCache::new( + self.kzg.clone(), + custody_context.clone(), + self.spec.clone(), + ) + .map_err(|e| format!("Error initializing PendingPayloadCache: {:?}", e))?, + ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), gossip_verified_payload_bid_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9d8b76aaed..637e8acdc8 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -33,6 +33,7 @@ use crate::data_column_verification::{ GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; +use crate::kzg_utils::validate_data_columns_with_commitments; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, }; @@ -490,8 +491,7 @@ impl DataAvailabilityChecker { AvailableBlockData::Blobs(blobs) => verify_kzg_for_blob_list(blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs), AvailableBlockData::DataColumns(columns) => { - verify_kzg_for_data_column_list(columns.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn) + verify_columns_against_block(&self.kzg, available_block.block(), columns) } } } @@ -504,13 +504,17 @@ impl DataAvailabilityChecker { available_blocks: &[AvailableBlock], ) -> Result<(), AvailabilityCheckError> { let mut all_blobs = Vec::new(); - let mut all_data_columns = Vec::new(); for available_block in available_blocks { - match available_block.data().to_owned() { + match available_block.data() { AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs), - AvailableBlockData::DataColumns(columns) => all_data_columns.extend(columns), + AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs.iter().cloned()), + AvailableBlockData::DataColumns(columns) => { + // Each block has its own commitments. For Gloas they live in the bid; for + // Fulu they live inline on the column. Verify per block and let the helper + // pick the right path. + verify_columns_against_block(&self.kzg, available_block.block(), columns)?; + } } } @@ -519,11 +523,6 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; } - if !all_data_columns.is_empty() { - verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; - } - Ok(()) } @@ -607,7 +606,10 @@ impl DataAvailabilityChecker { let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( &self.kzg, - &verified_data_columns, + verified_data_columns + .into_iter() + .map(|c| c.into_inner()) + .collect(), &self.spec, ) .map_err(|e| { @@ -676,6 +678,35 @@ impl DataAvailabilityChecker { } } +/// Verify a batch of data columns belonging to a single block, picking the right commitment +/// source for the block's fork (Fulu: inline on column; Gloas: from the embedded payload bid). +fn verify_columns_against_block( + kzg: &Kzg, + block: &SignedBeaconBlock, + columns: &[Arc>], +) -> Result<(), AvailabilityCheckError> { + if columns.is_empty() { + return Ok(()); + } + if block.fork_name_unchecked().gloas_enabled() { + let commitments = block + .message() + .body() + .signed_execution_payload_bid() + .map(|bid| bid.message.blob_kzg_commitments.clone()) + .map_err(|_| { + AvailabilityCheckError::Unexpected( + "Gloas block missing signed_execution_payload_bid".to_string(), + ) + })?; + validate_data_columns_with_commitments(kzg, columns.iter(), commitments.as_ref()) + .map_err(AvailabilityCheckError::InvalidColumn) + } else { + verify_kzg_for_data_column_list(columns.iter(), kzg) + .map_err(AvailabilityCheckError::InvalidColumn) + } +} + /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { pub block_cache_size: usize, @@ -874,10 +905,13 @@ impl AvailableBlock { match &block_data { AvailableBlockData::NoData => { - if columns_required { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } else if blobs_required { - return Err(AvailabilityCheckError::MissingBlobs); + // For Gloas, DA is checked for the PayloadEnvelope, not for the block. + if !block.fork_name_unchecked().gloas_enabled() { + if columns_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else if blobs_required { + return Err(AvailabilityCheckError::MissingBlobs); + } } } AvailableBlockData::Blobs(blobs) => { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 8ea3c792f4..3710937b1d 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -3,7 +3,8 @@ use crate::block_verification::{ }; use crate::data_availability_checker::MissingCellsError; use crate::kzg_utils::{ - reconstruct_data_columns, validate_full_data_columns, validate_partial_data_columns, + reconstruct_data_columns, validate_data_columns_with_commitments, validate_full_data_columns, + validate_partial_data_columns, }; use crate::observed_data_sidecars::{ Error as ObservedDataSidecarsError, ObservationKey, ObservationStrategy, Observe, @@ -20,6 +21,7 @@ use std::iter; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use store::DatabaseBlock; use tracing::{debug, instrument}; use tree_hash::TreeHash; use types::data::{ @@ -27,8 +29,9 @@ use types::data::{ PartialDataColumnSidecarError, }; use types::{ - BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, - EthSpec, Hash256, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, Slot, + BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, + DataColumnSubnetId, EthSpec, Hash256, KzgCommitment, PartialDataColumnSidecarRef, + SignedBeaconBlockHeader, SignedExecutionPayloadBid, Slot, }; /// An error occurred while validating a gossip data column. @@ -131,6 +134,24 @@ pub enum GossipDataColumnError { parent_root: Hash256, slot: Slot, }, + /// The block referenced by the data column is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the column without the referenced block, the peer isn't necessarily faulty. + BlockRootUnknown { + block_root: Hash256, + slot: Slot, + }, + /// The data column slot does not match its referenced block slot. + /// + /// ## Peer scoring + /// + /// The column sidecar is invalid and the peer is faulty. + BlockSlotMismatch { + block_slot: Slot, + data_column_slot: Slot, + }, /// The column conflicts with finalization, no need to propagate. /// /// ## Peer scoring @@ -307,21 +328,32 @@ impl GossipVerifiedDataColumn let header = c.signed_block_header.clone(); // We only process slashing info if the gossip verification failed // since we do not process the data column any further in that case. - validate_data_column_sidecar_for_gossip_fulu::( - column_sidecar, + validate_data_column_sidecar_for_gossip_fulu::(c, subnet_id, chain).map_err( + |e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }, + )?; + } + DataColumnSidecar::Gloas(data_column_gloas) => { + validate_data_column_sidecar_for_gossip_gloas::( + data_column_gloas, subnet_id, chain, - ) - .map_err(|e| { - process_block_slash_info::<_, GossipDataColumnError>( - chain, - BlockSlashInfo::from_early_error_data_column(header, e), - ) - }) + )?; } - // TODO(gloas) support gloas data column variant - DataColumnSidecar::Gloas(_) => Err(GossipDataColumnError::InvalidVariant), } + + Ok(GossipVerifiedDataColumn { + block_root: column_sidecar.block_root(), + data_column: KzgVerifiedDataColumn { + data: column_sidecar, + seen_timestamp: chain.slot_clock.now_duration().unwrap_or_default(), + }, + _phantom: PhantomData, + }) } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for block production ONLY. @@ -331,7 +363,28 @@ impl GossipVerifiedDataColumn column_sidecar: Arc>, chain: &BeaconChain, ) -> Result { - verify_data_column_sidecar(&column_sidecar, &chain.spec)?; + match column_sidecar.as_ref() { + DataColumnSidecar::Fulu(data_column_fulu) => { + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; + } + DataColumnSidecar::Gloas(_) => { + let bid = load_gloas_payload_bid(column_sidecar.block_root(), chain)?.ok_or( + GossipDataColumnError::BlockRootUnknown { + block_root: column_sidecar.block_root(), + slot: column_sidecar.slot(), + }, + )?; + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + bid.message.blob_kzg_commitments.len(), + &chain.spec, + )?; + } + } // Check if the data column is already in the DA checker cache. This happens when data columns // are made available through the `engine_getBlobs` method. If it exists in the cache, we know @@ -340,28 +393,19 @@ impl GossipVerifiedDataColumn // In this case, we should accept it for gossip propagation. verify_is_unknown_sidecar(chain, &column_sidecar)?; - match chain - .data_availability_checker - .missing_cells_for_column_sidecar(&column_sidecar) - { - Ok(Some(_)) => Ok(Self { + match missing_cells_for_column_sidecar(chain, &column_sidecar)? { + Some(_) => Ok(Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::from_execution_verified(column_sidecar), _phantom: Default::default(), }), - Ok(None) => { + None => { // Observe this data column so we don't process it again. if O::observe() { observe_gossip_data_column(&column_sidecar, chain)?; } Err(GossipDataColumnError::PriorKnownUnpublished) } - Err(MissingCellsError::MismatchesCachedColumn) => { - Err(GossipDataColumnError::MismatchesCachedColumn) - } - Err(MissingCellsError::UnexpectedError(_)) => { - todo!("handle unexpected error") - } } } @@ -440,6 +484,22 @@ impl KzgVerifiedDataColumn { .collect()) } + pub fn from_batch_with_scoring_and_commitments( + data_columns: Vec>>, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + ) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_BATCH_TIMES); + validate_data_columns_with_commitments(kzg, data_columns.iter(), kzg_commitments)?; + Ok(data_columns + .into_iter() + .map(|column| Self { + data: column, + seen_timestamp: timestamp_now(), + }) + .collect()) + } + pub fn to_data_column(self) -> Arc> { self.data } @@ -635,17 +695,11 @@ impl KzgVerifiedCustodyDataColumn { pub fn reconstruct_columns( kzg: &Kzg, - partial_set_of_columns: &[Self], + partial_set_of_columns: Vec>>, spec: &ChainSpec, ) -> Result>, KzgError> { - let all_data_columns = reconstruct_data_columns( - kzg, - partial_set_of_columns - .iter() - .map(|d| d.clone_arc()) - .collect::>(), - spec, - )?; + let all_data_columns = + reconstruct_data_columns(kzg, partial_set_of_columns.to_vec(), spec)?; let seen_timestamp = timestamp_now(); @@ -860,6 +914,26 @@ pub fn verify_kzg_for_data_column( }) } +#[instrument(skip_all, level = "debug")] +pub fn verify_kzg_for_data_column_with_commitments( + data_column: Arc>, + cells_to_verify: PartialDataColumnSidecarRef, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + seen_timestamp: Duration, +) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); + validate_partial_data_columns( + kzg, + iter::once((*data_column.index(), cells_to_verify)), + kzg_commitments, + )?; + Ok(KzgVerifiedDataColumn { + data: data_column, + seen_timestamp, + }) +} + /// Complete kzg verification for a `VerifiablePartialDataColumn`. /// /// Returns an error if the kzg verification check fails. @@ -907,16 +981,18 @@ where level = "debug" )] pub fn validate_data_column_sidecar_for_gossip_fulu( - data_column: Arc>, + data_column_fulu: &DataColumnSidecarFulu, subnet: DataColumnSubnetId, chain: &BeaconChain, -) -> Result, GossipDataColumnError> { - let DataColumnSidecar::Fulu(data_column_fulu) = data_column.as_ref() else { - return Err(GossipDataColumnError::InvalidVariant); - }; - +) -> Result<(), GossipDataColumnError> { + let data_column = Arc::new(DataColumnSidecar::Fulu(data_column_fulu.clone())); let column_slot = data_column.slot(); - verify_data_column_sidecar(&data_column, &chain.spec)?; + + verify_data_column_sidecar_with_commitments_len( + &data_column, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; verify_sidecar_not_from_future_slot(chain, column_slot)?; verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; @@ -953,13 +1029,12 @@ pub fn validate_data_column_sidecar_for_gossip_fulu( + data_column_gloas: &DataColumnSidecarGloas, + subnet: DataColumnSubnetId, + chain: &BeaconChain, +) -> Result<(), GossipDataColumnError> { + let data_column = Arc::new(DataColumnSidecar::Gloas(data_column_gloas.clone())); + let column_slot = data_column.slot(); + + if *data_column.index() >= T::EthSpec::number_of_columns() as u64 { + return Err(GossipDataColumnError::InvalidColumnIndex( + *data_column.index(), + )); + } + + if !chain + .spec + .fork_name_at_slot::(column_slot) + .gloas_enabled() + { + return Err(GossipDataColumnError::InvalidVariant); + } + + verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; + verify_sidecar_not_from_future_slot(chain, column_slot)?; + verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; + verify_is_unknown_sidecar(chain, &data_column)?; + + let bid = load_gloas_payload_bid(data_column.block_root(), chain)?.ok_or( + GossipDataColumnError::BlockRootUnknown { + block_root: data_column.block_root(), + slot: column_slot, + }, + )?; + if bid.message.slot != column_slot { + return Err(GossipDataColumnError::BlockSlotMismatch { + block_slot: bid.message.slot, + data_column_slot: column_slot, + }); + } + let kzg_commitments = &bid.message.blob_kzg_commitments; + verify_data_column_sidecar_with_commitments_len( + &data_column, + kzg_commitments.len(), + &chain.spec, + )?; + + let Some(cells_to_kzg_verify) = missing_cells_for_column_sidecar(chain, &data_column)? else { + // Observe this data column so we don't process it again. + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + return Err(GossipDataColumnError::PriorKnownUnpublished); + }; + + let kzg = &chain.kzg; + let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default(); + verify_kzg_for_data_column_with_commitments( + data_column.clone(), + cells_to_kzg_verify, + kzg_commitments.as_ref(), + kzg, + seen_timestamp, + ) + .map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?; + + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + + Ok(()) } #[instrument(skip_all, level = "debug")] @@ -1115,9 +1266,9 @@ pub enum PartialColumnVerificationResult { Err(GossipPartialDataColumnError), } -/// Verify if the data column sidecar is valid. -fn verify_data_column_sidecar( +fn verify_data_column_sidecar_with_commitments_len( data_column: &DataColumnSidecar, + commitments_len: usize, spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { if *data_column.index() >= E::number_of_columns() as u64 { @@ -1126,12 +1277,6 @@ fn verify_data_column_sidecar( )); } - // TODO(gloas): implement Gloas verification that takes kzg_commitments from block as parameter - let commitments_len = match data_column { - DataColumnSidecar::Fulu(dc) => dc.kzg_commitments.len(), - DataColumnSidecar::Gloas(_) => return Err(GossipDataColumnError::InvalidVariant), - }; - if commitments_len == 0 { return Err(GossipDataColumnError::UnexpectedDataColumn); } @@ -1164,6 +1309,82 @@ fn verify_data_column_sidecar( Ok(()) } +pub(crate) fn load_gloas_payload_bid( + block_root: Hash256, + chain: &BeaconChain, +) -> Result>>, BeaconChainError> { + if let Some(bid) = chain.pending_payload_cache.get_bid(&block_root) { + return Ok(Some(bid)); + } + + let bid = if let Some(block) = chain.early_attester_cache.get_block(block_root) { + Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ) + } else { + match chain + .store + .try_get_full_block(&block_root) + .map_err(BeaconChainError::DBError)? + { + Some(DatabaseBlock::Full(block)) => Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ), + Some(DatabaseBlock::Blinded(block)) => Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ), + None => { + return Ok(None); + } + } + }; + + chain + .pending_payload_cache + .insert_bid(block_root, bid.clone()); + + Ok(Some(bid)) +} + +fn missing_cells_for_column_sidecar<'a, T: BeaconChainTypes>( + chain: &'_ BeaconChain, + data_column: &'a DataColumnSidecar, +) -> Result>, GossipDataColumnError> { + let result = if chain + .spec + .fork_name_at_slot::(data_column.slot()) + .gloas_enabled() + { + chain + .pending_payload_cache + .missing_cells_for_column_sidecar(data_column) + } else { + chain + .data_availability_checker + .missing_cells_for_column_sidecar(data_column) + }; + + result.map_err(|err| match err { + MissingCellsError::MismatchesCachedColumn => GossipDataColumnError::MismatchesCachedColumn, + MissingCellsError::UnexpectedError(_) => todo!("handle unexpected error"), + }) +} + /// Verify that `column_sidecar` is not yet known, i.e. this is the first time `column_sidecar` has been received for the tuple: /// `(block_header.slot, block_header.proposer_index, column_sidecar.index)` fn verify_is_unknown_sidecar( @@ -1447,7 +1668,7 @@ mod test { let verify_fn = |column_sidecar: DataColumnSidecar| { let col_index = *column_sidecar.index(); validate_data_column_sidecar_for_gossip_fulu::<_, Observe>( - column_sidecar.into(), + column_sidecar.as_fulu().unwrap(), DataColumnSubnetId::from_column_index(col_index, &harness.spec), &harness.chain, ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index c94fb036f8..31852a4a61 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -119,10 +119,12 @@ impl FetchBlobsBeaconAdapter { .cached_blob_indexes(block_root) } - pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { - self.chain - .data_availability_checker - .cached_data_column_indexes(block_root) + pub(crate) fn cached_data_column_indexes( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Option> { + self.chain.cached_data_column_indexes(block_root, slot) } pub(crate) async fn process_engine_blobs( @@ -134,7 +136,7 @@ impl FetchBlobsBeaconAdapter { self.chain .process_engine_blobs(slot, block_root, blobs) .await - .map_err(FetchEngineBlobError::BlobProcessingError) + .map_err(|e| FetchEngineBlobError::BlobProcessingError(Box::new(e))) } pub(crate) fn fork_choice_contains_block(&self, block_root: &Hash256) -> bool { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index f7b4b8a29e..f6e0b9345e 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -50,7 +50,7 @@ pub enum EngineGetBlobsOutput { pub enum FetchEngineBlobError { BeaconStateError(BeaconStateError), BeaconChainError(Box), - BlobProcessingError(BlockError), + BlobProcessingError(Box), BlobSidecarError(BlobSidecarError), DataColumnSidecarError(DataColumnSidecarError), ExecutionLayerMissing, @@ -445,7 +445,7 @@ async fn compute_custody_columns_to_import( // Only consider columns that are not already known to data availability. if let Some(known_columns) = - chain_adapter_cloned.cached_data_column_indexes(&block_root) + chain_adapter_cloned.cached_data_column_indexes(&block_root, header.slot()) { custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index ef282a3eaa..37d40f3a27 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -199,7 +199,7 @@ mod get_blobs_v2 { .returning(|_| None); mock_adapter .expect_cached_data_column_indexes() - .returning(|_| None); + .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index b05a896777..14eb7ee07a 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -111,6 +111,57 @@ pub fn validate_full_data_columns<'a, E: EthSpec>( kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) } +/// Validate a batch of full `DataColumnSidecar`s against commitments supplied out-of-band. +/// +/// Gloas sidecars do not carry commitments. Their commitments come from the block's +/// `ExecutionPayloadBid`. +pub fn validate_data_columns_with_commitments<'a, E: EthSpec>( + kzg: &Kzg, + data_column_iter: impl Iterator>>, + kzg_commitments: &[KzgCommitment], +) -> Result<(), (Option, KzgError)> { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut column_indices = Vec::new(); + let mut commitments = Vec::new(); + + for data_column in data_column_iter { + let col_index = *data_column.index(); + + if data_column.column().is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + + for cell in data_column.column() { + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); + column_indices.push(col_index); + } + + for &proof in data_column.kzg_proofs() { + proofs.push(proof.0); + } + + for &commitment in kzg_commitments { + commitments.push(commitment.0); + } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column. + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } + } + + kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) +} + /// Validate a batch of partial `VerifiablePartialDataColumn`s. /// /// Partial columns may have missing cells, indicated by a bitmap. We only verify present cells. @@ -625,12 +676,19 @@ pub fn reconstruct_blobs( let blob_indices: Vec = match blob_indices_opt { Some(indices) => indices.into_iter().map(|i| i as usize).collect(), None => { - // TODO(gloas): support blob reconstruction for Gloas - // https://github.com/sigp/lighthouse/issues/7413 - let num_of_blobs = first_data_column - .kzg_commitments() - .map_err(|_| "Gloas blob reconstruction not yet supported".to_string())? - .len(); + // Fulu columns carry commitments inline; Gloas columns don't, so fall back to + // the block's payload bid commitments. + let num_of_blobs = match first_data_column.kzg_commitments() { + Ok(commitments) => commitments.len(), + Err(_) => signed_block + .message() + .body() + .signed_execution_payload_bid() + .map(|bid| bid.message.blob_kzg_commitments.len()) + .map_err(|_| { + "Gloas blob reconstruction: block missing payload bid".to_string() + })?, + }; (0..num_of_blobs).collect() } }; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d70fc1b3ec..804268a613 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,6 +48,7 @@ pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; +pub mod pending_payload_cache; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index 4b8e7347cc..b678bdbaea 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -1,23 +1,22 @@ -use std::sync::Arc; - +use bls::Hash256; use slot_clock::SlotClock; use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; -use types::EthSpec; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, PayloadVerificationOutcome, block_verification::PayloadVerificationHandle, payload_envelope_verification::{ - EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, - gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, - payload_notifier::PayloadNotifier, + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, + load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, }; pub struct ExecutionPendingEnvelope { - pub signed_envelope: MaybeAvailableEnvelope, - pub import_data: EnvelopeImportData, + pub signed_envelope: Arc>, + pub block_root: Hash256, pub payload_verification_handle: PayloadVerificationHandle, } @@ -29,7 +28,6 @@ impl GossipVerifiedEnvelope { ) -> Result, EnvelopeError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; - let payload = &envelope.payload; // Define a future that will verify the execution payload with an execution engine. // @@ -87,14 +85,8 @@ impl GossipVerifiedEnvelope { )?; Ok(ExecutionPendingEnvelope { - signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { - block_hash: payload.block_hash, - envelope: signed_envelope, - }, - import_data: EnvelopeImportData { - block_root, - _phantom: Default::default(), - }, + signed_envelope, + block_root, payload_verification_handle, }) } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index b40e8337fb..e7c900dcd7 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -9,13 +9,19 @@ use tracing::{debug, error, info, info_span, instrument, warn}; use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; use super::{ - AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, - ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, + gossip_verified_envelope::GossipVerifiedEnvelope, }; +use crate::data_column_verification::load_gloas_payload_bid; use crate::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, - payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, + NotifyExecutionLayer, + block_verification_types::AvailableBlockData, + metrics, + payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + }, + validator_monitor::get_slot_delay_ms, }; const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; @@ -28,13 +34,13 @@ impl BeaconChain { /// /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during /// verification. - #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + #[instrument(skip_all, fields(block_root = ?block_root, envelope_source = %envelope_source))] pub async fn process_execution_payload_envelope( self: &Arc, block_root: Hash256, unverified_envelope: GossipVerifiedEnvelope, notify_execution_layer: NotifyExecutionLayer, - block_source: BlockImportSource, + envelope_source: BlockImportSource, publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, ) -> Result { let block_slot = unverified_envelope.signed_envelope.slot(); @@ -50,7 +56,7 @@ impl BeaconChain { ); } - // TODO(gloas) insert the pre-executed envelope into some type of cache. + // TODO(gloas) insert the pre-executed envelope into some type of cache? let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); @@ -79,12 +85,11 @@ impl BeaconChain { let executed_envelope = chain .into_executed_payload_envelope(execution_pending) .await - .inspect_err(|_| { - // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), - // and we keep it in the cache, then the node will NOT perform lookup and - // reprocess this block until the block is evicted from DA checker, causing the - // chain to get stuck temporarily if the block is canonical. Therefore we remove - // it from the cache if execution fails. + .map_err(|error| match error { + BlockError::ExecutionPayloadError(error) => { + EnvelopeError::ExecutionPayloadError(error) + } + error => EnvelopeError::ImportError(error), })?; // Record the time it took to wait for execution layer verification. @@ -94,15 +99,9 @@ impl BeaconChain { .set_time_executed(block_root, block_slot, timestamp); } - match executed_envelope { - ExecutedEnvelope::Available(envelope) => { - self.import_available_execution_payload_envelope(Box::new(envelope)) - .await - } - ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )), - } + self.check_envelope_availability_and_import(executed_envelope) + .await + .map_err(EnvelopeError::ImportError) }; // Verify and import the payload envelope. @@ -112,7 +111,7 @@ impl BeaconChain { info!( ?block_root, %block_slot, - source = %block_source, + source = %envelope_source, "Execution payload envelope imported" ); @@ -138,6 +137,14 @@ impl BeaconChain { } Err(EnvelopeError::BeaconChainError(e)) } + Err(EnvelopeError::ImportError(BlockError::BeaconChainError(e))) => { + if matches!(e.as_ref(), BeaconChainError::TokioJoin(_)) { + debug!(error = ?e, "Envelope processing cancelled"); + } else { + warn!(error = ?e, "Execution payload envelope rejected"); + } + Err(EnvelopeError::ImportError(BlockError::BeaconChainError(e))) + } Err(other) => { warn!( reason = other.to_string(), @@ -148,6 +155,22 @@ impl BeaconChain { } } + #[instrument(skip_all)] + async fn check_envelope_availability_and_import( + self: &Arc, + envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result { + let slot = envelope.envelope.slot(); + let block_root = envelope.block_root; + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(BlockError::EnvelopeBlockRootUnknown(block_root))?; + let availability = self + .pending_payload_cache + .put_executed_payload_envelope(bid, envelope)?; + self.process_payload_envelope_availability(slot, availability, || Ok(())) + .await + } + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to /// get a fully `ExecutedEnvelope`. /// @@ -156,10 +179,10 @@ impl BeaconChain { async fn into_executed_payload_envelope( self: Arc, pending_envelope: ExecutionPendingEnvelope, - ) -> Result, EnvelopeError> { + ) -> Result, BlockError> { let ExecutionPendingEnvelope { signed_envelope, - import_data, + block_root, payload_verification_handle, } = pending_envelope; @@ -173,16 +196,13 @@ impl BeaconChain { .payload_verification_status .is_optimistic() { - return Err(EnvelopeError::OptimisticSyncNotSupported { - block_root: import_data.block_root, - }); + return Err(BlockError::OptimisticSyncNotSupported { block_root }); } - Ok(ExecutedEnvelope::new( + Ok(AvailabilityPendingExecutedEnvelope::new( signed_envelope, - import_data, + block_root, payload_verification_outcome, - self.spec.clone(), )) } @@ -190,18 +210,13 @@ impl BeaconChain { pub async fn import_available_execution_payload_envelope( self: &Arc, envelope: Box>, - ) -> Result { + ) -> Result { let AvailableExecutedEnvelope { envelope, - import_data, + block_root, payload_verification_outcome, } = *envelope; - let EnvelopeImportData { - block_root, - _phantom, - } = import_data; - let block_root = { let chain = self.clone(); self.spawn_blocking_handle( @@ -232,13 +247,13 @@ impl BeaconChain { signed_envelope: AvailableEnvelope, block_root: Hash256, payload_verification_status: PayloadVerificationStatus, - ) -> Result { + ) -> Result { // Everything in this initial section is on the hot path for processing the envelope. // Take an upgradable read lock on fork choice so we can check if this block has already // been imported. We don't want to repeat work importing a block that is already imported. let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); if !fork_choice_reader.contains_block(&block_root) { - return Err(EnvelopeError::BlockRootUnknown { block_root }); + return Err(BlockError::EnvelopeBlockRootUnknown(block_root)); } // TODO(gloas) add defensive check to see if payload envelope is already in fork choice @@ -253,7 +268,7 @@ impl BeaconChain { // node which can be eligible for head. fork_choice .on_valid_payload_envelope_received(block_root) - .map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?; + .map_err(|e| BlockError::InternalError(format!("{e:?}")))?; // TODO(gloas) emit SSE event if the payload became the new head payload diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index b153a3cd6a..b4f53b9df8 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -18,14 +18,13 @@ //! //! ``` -use std::marker::PhantomData; +use state_processing::envelope_processing::EnvelopeProcessingError; use std::sync::Arc; - -use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; use store::Error as DBError; +use strum::AsRefStr; use tracing::instrument; use types::{ - BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, + BeaconState, BeaconStateError, DataColumnSidecarList, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot, }; @@ -41,38 +40,26 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -// TODO(gloas): could remove this type completely, or remove the generic -#[derive(PartialEq)] -pub struct EnvelopeImportData { - pub block_root: Hash256, - _phantom: PhantomData, -} - #[derive(Debug)] -#[allow(dead_code)] pub struct AvailableEnvelope { - execution_block_hash: ExecutionBlockHash, envelope: Arc>, - columns: DataColumnSidecarList, + pub columns: DataColumnSidecarList, + // TODO(gloas) this field is unread, do we need it? + #[expect(dead_code)] /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). columns_available_timestamp: Option, - pub spec: Arc, } impl AvailableEnvelope { pub fn new( - execution_block_hash: ExecutionBlockHash, envelope: Arc>, columns: DataColumnSidecarList, columns_available_timestamp: Option, - spec: Arc, ) -> Self { Self { - execution_block_hash, envelope, columns, columns_available_timestamp, - spec, } } @@ -111,46 +98,25 @@ pub struct EnvelopeProcessingSnapshot { pub beacon_block_root: Hash256, } -/// A payload envelope that has gone through processing checks and execution by an EL client. -/// This envelope hasn't necessarily completed data availability checks. -/// -/// -/// It contains 2 variants: -/// 1. `Available`: This envelope has been executed and also contains all data to consider it -/// fully available. -/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it -/// fully available. -#[allow(dead_code)] -pub enum ExecutedEnvelope { - Available(AvailableExecutedEnvelope), - // TODO(gloas): check data column availability via DA checker - AvailabilityPending(), +/// A payload ernvelope that has completed all envelope procesing checks, verification +/// by an EL client but does not have all requisite columns to get imported into +/// fork choice. +pub struct AvailabilityPendingExecutedEnvelope { + pub envelope: Arc>, + pub block_root: Hash256, + pub payload_verification_outcome: PayloadVerificationOutcome, } -impl ExecutedEnvelope { +impl AvailabilityPendingExecutedEnvelope { pub fn new( - envelope: MaybeAvailableEnvelope, - import_data: EnvelopeImportData, + envelope: Arc>, + block_root: Hash256, payload_verification_outcome: PayloadVerificationOutcome, - spec: Arc, ) -> Self { - match envelope { - MaybeAvailableEnvelope::Available(available_envelope) => { - Self::Available(AvailableExecutedEnvelope::new( - available_envelope, - import_data, - payload_verification_outcome, - )) - } - // TODO(gloas): check data column availability via DA checker - MaybeAvailableEnvelope::AvailabilityPending { - block_hash, - envelope, - } => Self::Available(AvailableExecutedEnvelope::new( - AvailableEnvelope::new(block_hash, envelope, vec![], None, spec), - import_data, - payload_verification_outcome, - )), + Self { + envelope, + block_root, + payload_verification_outcome, } } } @@ -159,25 +125,25 @@ impl ExecutedEnvelope { /// by an EL client **and** has all requisite blob data to be imported into fork choice. pub struct AvailableExecutedEnvelope { pub envelope: AvailableEnvelope, - pub import_data: EnvelopeImportData, + pub block_root: Hash256, pub payload_verification_outcome: PayloadVerificationOutcome, } impl AvailableExecutedEnvelope { pub fn new( envelope: AvailableEnvelope, - import_data: EnvelopeImportData, + block_root: Hash256, payload_verification_outcome: PayloadVerificationOutcome, ) -> Self { Self { envelope, - import_data, + block_root, payload_verification_outcome, } } } -#[derive(Debug)] +#[derive(Debug, AsRefStr)] pub enum EnvelopeError { /// The envelope's block root is unknown. BlockRootUnknown { block_root: Hash256 }, @@ -205,22 +171,16 @@ pub enum EnvelopeError { payload_slot: Slot, latest_finalized_slot: Slot, }, - /// Optimistic sync is not supported for Gloas payload envelopes. - OptimisticSyncNotSupported { block_root: Hash256 }, /// Some Beacon Chain Error BeaconChainError(Arc), /// Some Beacon State error BeaconStateError(BeaconStateError), - /// Some BlockProcessingError (for electra operations) - BlockProcessingError(BlockProcessingError), /// Some EnvelopeProcessingError EnvelopeProcessingError(EnvelopeProcessingError), /// Error verifying the execution payload ExecutionPayloadError(ExecutionPayloadError), - /// An error from block-level checks reused during envelope import - BlockError(BlockError), - /// Internal error - InternalError(String), + /// An error from importing the envelope. + ImportError(BlockError), } impl std::fmt::Display for EnvelopeError { @@ -253,13 +213,6 @@ impl From for EnvelopeError { } } -impl From for EnvelopeError { - fn from(e: BlockError) -> Self { - EnvelopeError::BlockError(e) - } -} - -/// Pull errors up from EnvelopeProcessingError to EnvelopeError impl From for EnvelopeError { fn from(e: EnvelopeProcessingError) -> Self { match e { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs index eb5e13b0cc..0bbe32525a 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -31,7 +31,8 @@ impl PayloadNotifier { match notify_execution_layer { NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { - let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; + let new_payload_request = Self::build_new_payload_request(&envelope, &block) + .map_err(EnvelopeError::ImportError)?; // TODO(gloas): check and test RLP block hash calculation post-Gloas if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { warn!( diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs new file mode 100644 index 0000000000..3eaad08193 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -0,0 +1,891 @@ +//! This module builds out the data availability cache for Gloas. When a beacon block is received +//! over gossip/p2p we insert its bid into this cache, keyed by block root. As soon as the bid +//! is received we can begin using it to verify data columns. +//! +//! When a payload envelope is received over gossip/p2p we first insert it as a pre-executed envelope. A separate +//! thread eventually executes the payload envelope against the EL. Assuming the payload is executed successfully +//! the envelope is updated in the cache from `PreExecuted` -> `Executed`. Once all required custody columns +//! have been kzg verified and the envelope has been executed we can import the envelope into fork choice and store it to disk. +//! +//! Note that the block must have arrived before the envelope for the envelope to pass upstream verification checks and reach this cache. +//! However data columns can potentially arrive before the block. + +use crate::data_availability_checker::{AvailabilityCheckError, MissingCellsError}; +use crate::payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope, +}; +use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics}; +use kzg::Kzg; +use lru::LruCache; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::fmt; +use std::fmt::Debug; +use std::num::NonZeroUsize; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tracing::{Span, debug, error, instrument, trace}; +use types::{ + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, + PartialDataColumnSidecarRef, +}; + +mod pending_column; +mod pending_components; + +use crate::data_column_verification::{ + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, +}; +use crate::metrics::{ + KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, +}; +use crate::observed_data_sidecars::ObservationStrategy; +use pending_components::{PendingComponents, ReconstructColumnsDecision}; +use types::SignedExecutionPayloadBid; +use types::new_non_zero_usize; + +/// The LRU Cache stores `PendingComponents`, which store the block root, the execution payload bid, and its associated column data. +/// The execution payload bid stores the kzg commitments which we use to verify against incoming column data. +/// Setting this to 32 keeps memory usage reasonable. +/// +/// `PendingComponents` are now never removed from the cache manually and are only removed via LRU +/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time. +const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); + +/// This type is returned after adding a bid / column to the `DataAvailabilityChecker`. +/// +/// Indicates if the payloads data is fully `Available` or if we need more columns. +pub enum Availability { + MissingComponents(Hash256), + Available(Box>), +} + +impl Debug for Availability { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::MissingComponents(block_root) => { + write!(f, "MissingComponents({})", block_root) + } + Self::Available(envelope) => { + write!(f, "Available({:?})", envelope.block_root) + } + } + } +} + +pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); + +#[derive(Debug)] +pub enum DataColumnReconstructionResult { + Success(AvailabilityAndReconstructedColumns), + NotStarted(&'static str), + RecoveredColumnsNotImported(&'static str), +} + +/// Cache to hold data columns for payloads pending data availability. +/// +/// In Gloas, beacon blocks can be immediately imported into fork choice. The execution payload +/// bid contains the payloads kzg commitments. This cache tracks data columns for payloads until all +/// required columns are received. +/// +/// Usually data becomes available on its slot within a second of receiving its first component +/// over gossip. However, data may never become available if a malicious proposer does not +/// publish its data, or there are network issues. Components are only removed via LRU eviction. +pub struct PendingPayloadCache { + /// Contains all the data we keep in memory, protected by an RwLock + availability_cache: RwLock>>, + kzg: Arc, + custody_context: Arc>, + spec: Arc, +} + +impl PendingPayloadCache { + pub fn new( + kzg: Arc, + custody_context: Arc>, + spec: Arc, + ) -> Result { + Ok(Self { + availability_cache: RwLock::new(LruCache::new(OVERFLOW_LRU_CAPACITY_NON_ZERO)), + kzg, + custody_context, + spec, + }) + } + + pub fn custody_context(&self) -> &Arc> { + &self.custody_context + } + + /// Returns all cached data columns for the given block root, if any. + #[instrument(skip_all, level = "trace")] + pub fn get_data_columns( + &self, + block_root: Hash256, + ) -> Option> { + self.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns()) + }) + } + + /// Returns the indices of cached data columns for the given block root. + #[instrument(skip_all, level = "trace")] + pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + self.peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + + /// Return the cached Gloas payload bid for `block_root`, if present. + pub fn get_bid( + &self, + block_root: &Hash256, + ) -> Option>> { + self.peek_pending_components(block_root, |components| { + components.map(|components| components.bid.clone()) + }) + } + + /// Filter out cells that are already cached for the given column sidecar. + /// Returns the cells that still need KZG verification, or `None` if all cells are cached. + #[instrument(skip_all, level = "trace")] + pub fn missing_cells_for_column_sidecar<'a>( + &'_ self, + data_column: &'a DataColumnSidecar, + ) -> Result>, MissingCellsError> { + let block_root = data_column.block_root(); + let column_index = *data_column.index(); + + self.peek_pending_components(&block_root, |components| { + let Some(cached) = components.and_then(|c| c.verified_data_columns.get(&column_index)) + else { + return data_column.try_filter_to_partial_ref(|_, _, _| Ok(true)); + }; + + data_column.try_filter_to_partial_ref(|cell_idx, cell, proof| { + match cached.cell_matches(cell_idx, cell, proof) { + None => Ok(true), + Some(true) => Ok(false), + Some(false) => Err(MissingCellsError::MismatchesCachedColumn), + } + }) + }) + } + + /// Insert an executed payload envelope into the cache and performs an availability check + pub fn put_executed_payload_envelope( + &self, + bid: Arc>, + executed_envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result, AvailabilityCheckError> { + let epoch = executed_envelope.envelope.epoch(); + let beacon_block_root = executed_envelope.envelope.beacon_block_root(); + let pending_components = + self.get_pending_components(beacon_block_root, bid, |pending_components| { + pending_components.insert_executed_payload_envelope(executed_envelope); + })?; + + let num_expected_columns = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + pending_components.span.in_scope(|| { + debug!( + component = "executed envelope", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability(beacon_block_root, pending_components, num_expected_columns) + } + + /// Inserts a bid into the pending payload cache. + pub fn insert_bid(&self, block_root: Hash256, bid: Arc>) { + let mut write_lock = self.availability_cache.write(); + write_lock.get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid)); + } + + /// Perform KZG verification on RPC custody columns and insert them into the cache. + /// After insertion check if the envelope becomes available. + #[instrument(skip_all, level = "trace")] + pub fn put_rpc_custody_columns( + &self, + block_root: Hash256, + bid: Arc>, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + let kzg_verified_columns = KzgVerifiedDataColumn::from_batch_with_scoring_and_commitments( + custody_columns, + bid.message.blob_kzg_commitments.as_ref(), + &self.kzg, + ) + .map_err(AvailabilityCheckError::InvalidColumn)?; + + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let verified_custody_columns = kzg_verified_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect::>(); + + self.put_kzg_verified_custody_data_columns(block_root, bid, &verified_custody_columns) + } + + /// Perform KZG verification on gossip verified custody columns and insert them into the cache. + /// After insertion check if the envelope becomes available + #[instrument(skip_all, level = "trace")] + pub fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + bid: Arc>, + data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let custody_columns = data_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); + + self.put_kzg_verified_custody_data_columns(block_root, bid, &custody_columns) + } + + /// Insert KZG verified columns into the cache. + /// After insertion check if the envelope becomes available. + pub fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + bid: Arc>, + kzg_verified_data_columns: &[KzgVerifiedCustodyDataColumn], + ) -> Result, AvailabilityCheckError> { + let pending_components = + self.get_pending_components(block_root, bid.clone(), |pending_components| { + pending_components.merge_data_columns(kzg_verified_data_columns) + })?; + + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + + let num_expected_columns = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + pending_components.span.in_scope(|| { + debug!( + component = "data_columns", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability(block_root, pending_components, num_expected_columns) + } + + #[instrument(skip_all, level = "debug")] + pub fn reconstruct_data_columns( + &self, + block_root: &Hash256, + bid: Arc>, + ) -> Result, AvailabilityCheckError> { + let verified_data_columns = match self.check_and_set_reconstruction_started(block_root) { + ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, + ReconstructColumnsDecision::No(reason) => { + return Ok(DataColumnReconstructionResult::NotStarted(reason)); + } + }; + let existing_column_indices = verified_data_columns + .iter() + .map(|data_column| *data_column.index()) + .collect::>(); + + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + &self.kzg, + verified_data_columns, + &self.spec, + ) + .map_err(|e| { + error!( + ?block_root, + error = ?e, + "Error reconstructing data columns" + ); + self.handle_reconstruction_failure(block_root); + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); + AvailabilityCheckError::ReconstructColumnsError(e) + })?; + + let Some(slot) = all_data_columns.first().map(|d| d.as_data_column().slot()) else { + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "No new columns to import and publish", + )); + }; + + let columns_to_sample = self + .custody_context() + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + + let data_columns_to_import_and_publish = all_data_columns + .into_iter() + .filter(|d| { + columns_to_sample.contains(&d.index()) + && !existing_column_indices.contains(&d.index()) + }) + .collect::>(); + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_import_and_publish.len() as u64, + ); + + debug!( + count = data_columns_to_import_and_publish.len(), + ?block_root, + %slot, + "Reconstructed columns" + ); + + self.put_kzg_verified_custody_data_columns( + *block_root, + bid, + &data_columns_to_import_and_publish, + ) + .map(|availability| { + DataColumnReconstructionResult::Success(( + availability, + data_columns_to_import_and_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + )) + }) + } + + // ── Metrics ── + + /// Collects metrics from the data availability checker. + pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { + DataAvailabilityCheckerMetrics { + block_cache_size: self.block_cache_size(), + } + } + + /// Number of pending component entries in memory in the cache. + pub fn block_cache_size(&self) -> usize { + self.availability_cache.read().len() + } + + // ── Internal helpers ── + + fn check_availability( + &self, + block_root: Hash256, + pending_components: MappedRwLockReadGuard<'_, PendingComponents>, + num_expected_columns: usize, + ) -> Result, AvailabilityCheckError> { + if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? { + // Explicitly drop read lock before acquiring write lock + drop(pending_components); + if let Some(components) = self.availability_cache.write().get_mut(&block_root) { + // Clean up span now that data is available + components.span = Span::none(); + } + + // We never remove the pending components manually to avoid race conditions. + // Components are only removed via LRU eviction as finality advances. + Ok(Availability::Available(Box::new(available_envelope))) + } else { + Ok(Availability::MissingComponents(block_root)) + } + } + + /// Gets or creates `PendingComponents` and applies the `update_fn` while holding the write lock. + /// + /// Once the update is complete, the write lock is downgraded and a read guard with a + /// reference of the updated `PendingComponents` is returned. + /// + fn get_pending_components( + &self, + block_root: Hash256, + bid: Arc>, + update_fn: F, + ) -> Result>, AvailabilityCheckError> + where + F: FnOnce(&mut PendingComponents), + { + let mut write_lock = self.availability_cache.write(); + + { + let pending_components = write_lock + .get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid)); + update_fn(pending_components) + } + + RwLockReadGuard::try_map(RwLockWriteGuard::downgrade(write_lock), |cache| { + cache.peek(&block_root) + }) + .map_err(|_| { + AvailabilityCheckError::Unexpected("pending components should exist".to_string()) + }) + } + + fn peek_pending_components>) -> R>( + &self, + block_root: &Hash256, + f: F, + ) -> R { + f(self.availability_cache.read().peek(block_root)) + } + + /// Check whether data column reconstruction should be attempted. + /// TODO(gloas): rethink reconstruction for the cell model + fn check_and_set_reconstruction_started( + &self, + block_root: &Hash256, + ) -> ReconstructColumnsDecision { + let mut write_lock = self.availability_cache.write(); + let Some(pending_components) = write_lock.get_mut(block_root) else { + return ReconstructColumnsDecision::No("block already imported"); + }; + + let epoch = pending_components.bid.epoch(); + + let total_column_count = T::EthSpec::number_of_columns(); + let sampling_column_count = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } + let received_column_count = pending_components.num_completed_columns(); + if received_column_count >= sampling_column_count { + return ReconstructColumnsDecision::No("all sampling columns received"); + } + if received_column_count < total_column_count / 2 { + return ReconstructColumnsDecision::No("not enough columns"); + } + + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.get_cached_data_columns()) + } + + /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. + /// In this case, we remove all data columns in `PendingComponents`, reset reconstruction + /// status so that we can attempt to retrieve columns from peers again. + fn handle_reconstruction_failure(&self, block_root: &Hash256) { + if let Some(pending_components_mut) = self.availability_cache.write().get_mut(block_root) { + pending_components_mut.verified_data_columns = HashMap::new(); + pending_components_mut.reconstruction_started = false; + } + } + + /// Maintain the cache by removing entries older than the cutoff epoch. + pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { + let mut write_lock = self.availability_cache.write(); + let mut keys_to_remove = vec![]; + for (key, value) in write_lock.iter() { + if value.bid.epoch() < cutoff_epoch { + keys_to_remove.push(*key); + } + } + for key in keys_to_remove { + write_lock.pop(&key); + } + + Ok(()) + } +} + +/// Helper struct to group data availability checker metrics. +pub struct DataAvailabilityCheckerMetrics { + pub block_cache_size: usize, +} + +pub fn start_availability_cache_maintenance_service( + executor: TaskExecutor, + chain: Arc>, +) { + if chain.spec.gloas_fork_epoch.is_some() { + let da_checker = chain.pending_payload_cache.clone(); + executor.spawn( + async move { availability_cache_maintenance_service(chain, da_checker).await }, + "availability_cache_service", + ); + } else { + trace!("Gloas fork not configured, not starting availability cache maintenance service"); + } +} + +async fn availability_cache_maintenance_service( + chain: Arc>, + da_checker: Arc>, +) { + let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; + loop { + match chain + .slot_clock + .duration_to_next_epoch(T::EthSpec::slots_per_epoch()) + { + Some(duration) => { + // this service should run 3/4 of the way through the epoch + let additional_delay = (epoch_duration * 3) / 4; + tokio::time::sleep(duration + additional_delay).await; + + let Some(gloas_fork_epoch) = chain.spec.gloas_fork_epoch else { + // shutdown service if gloas fork epoch not set + break; + }; + + debug!("Availability cache maintenance service firing"); + let Some(current_epoch) = chain + .slot_clock + .now() + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) + else { + continue; + }; + + if current_epoch < gloas_fork_epoch { + // we are not in gloas yet + continue; + } + + let finalized_epoch = chain + .canonical_head + .fork_choice_read_lock() + .finalized_checkpoint() + .epoch; + + let Some(min_epochs_for_blobs) = chain + .spec + .min_epoch_data_availability_boundary(current_epoch) + else { + // Shutdown service if deneb fork epoch not set. + break; + }; + + // any data belonging to an epoch before this should be pruned + let cutoff_epoch = std::cmp::max(finalized_epoch + 1, min_epochs_for_blobs); + + if let Err(e) = da_checker.do_maintenance(cutoff_epoch) { + error!(error = ?e,"Failed to maintain availability cache"); + } + } + None => { + error!("Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + tokio::time::sleep(chain.slot_clock.slot_duration()).await; + } + }; + } +} + +#[cfg(test)] +mod data_availability_checker_tests { + use super::*; + + use crate::block_verification::PayloadVerificationOutcome; + use crate::test_utils::{ + NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, + }; + use crate::{ + custody_context::NodeCustodyType, + test_utils::{BeaconChainHarness, DiskHarnessType}, + }; + use fork_choice::PayloadVerificationStatus; + use logging::create_test_tracing_subscriber; + use rand::SeedableRng; + use rand::rngs::StdRng; + use store::{HotColdDB, StoreConfig, database::interface::BeaconNodeBackend}; + use tempfile::{TempDir, tempdir}; + use types::{ + ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, + MinimalEthSpec, SignedExecutionPayloadEnvelope, + }; + + type E = MinimalEthSpec; + type T = DiskHarnessType; + + const LOW_VALIDATOR_COUNT: usize = 32; + const RNG_SEED: u64 = 0xDEADBEEF; + + fn gloas_spec() -> Arc { + Arc::new(ForkName::Gloas.make_genesis_spec(E::default_spec())) + } + + fn get_store_with_spec( + db_path: &TempDir, + spec: Arc, + ) -> Arc, BeaconNodeBackend>> { + let hot_path = db_path.path().join("hot_db"); + let cold_path = db_path.path().join("cold_db"); + let blobs_path = db_path.path().join("blobs_db"); + let config = StoreConfig::default(); + + HotColdDB::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + config, + spec, + ) + .expect("disk store should initialize") + } + + async fn get_gloas_chain( + db_path: &TempDir, + ) -> BeaconChainHarness> { + let spec = gloas_spec::(); + + let chain_store = get_store_with_spec::(db_path, spec.clone()); + let validators_keypairs = + types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); + BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .keypairs(validators_keypairs) + .fresh_disk_store(chain_store) + .mock_execution_layer() + .build() + } + + async fn setup() -> (BeaconChainHarness, Arc>, TempDir) { + create_test_tracing_subscriber(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness = get_gloas_chain::(&chain_db_path).await; + let spec = harness.spec.clone(); + let custody_context = Arc::new(CustodyContext::::new( + NodeCustodyType::Fullnode, + generate_data_column_indices_rand_order::(), + &spec, + )); + + let cache = Arc::new( + PendingPayloadCache::::new(harness.chain.kzg.clone(), custody_context, spec.clone()) + .expect("should create cache"), + ); + (harness, cache, chain_db_path) + } + + fn make_test_signed_envelope(block_root: Hash256) -> Arc> { + Arc::new(SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: block_root, + parent_beacon_block_root: Hash256::random(), + }, + signature: bls::Signature::infinity().expect("should create infinity sig"), + }) + } + + fn make_test_executed_envelope(block_root: Hash256) -> AvailabilityPendingExecutedEnvelope { + AvailabilityPendingExecutedEnvelope { + envelope: make_test_signed_envelope(block_root), + block_root, + payload_verification_outcome: PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + }, + } + } + + fn init_block( + cache: &PendingPayloadCache, + spec: &ChainSpec, + num_blobs: NumBlobs, + seed: u64, + ) -> ( + Arc>, + Hash256, + DataColumnSidecarList, + ) { + let mut rng = StdRng::seed_from_u64(seed); + let (block, data_columns) = + generate_rand_block_and_data_columns::(ForkName::Gloas, num_blobs, &mut rng, spec); + let block_root = block.canonical_root(); + let bid = Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .expect("should get payload bid") + .clone(), + ); + cache.insert_bid(block_root, bid.clone()); + (bid, block_root, data_columns) + } + + #[tokio::test] + async fn caches_and_deduplicates_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + let epoch = bid.message.slot.epoch(E::slots_per_epoch()); + let sampling_cols = cache + .custody_context() + .sampling_columns_for_epoch(epoch, &harness.spec); + let column = data_columns + .iter() + .find(|c| sampling_cols.contains(c.index())) + .cloned() + .expect("should have a sampling column"); + let column_index = *column.index(); + + for _ in 0..2 { + cache + .put_rpc_custody_columns(block_root, bid.clone(), vec![column.clone()]) + .expect("should put column"); + } + + assert_eq!( + cache.cached_data_column_indexes(&block_root), + Some(vec![column_index]) + ); + assert_eq!( + cache.get_data_columns(block_root).map(|cols| cols.len()), + Some(1) + ); + assert_eq!(cache.block_cache_size(), 1); + } + + #[tokio::test] + async fn requires_columns_and_executed_envelope() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + + let epoch = bid.message.slot.epoch(E::slots_per_epoch()); + let num_sampling_columns = cache + .custody_context() + .sampling_columns_for_epoch(epoch, &harness.spec) + .len(); + + let result = cache + .put_rpc_custody_columns(block_root, bid.clone(), data_columns) + .expect("should put columns"); + assert!(matches!(result, Availability::MissingComponents(_))); + + let result = cache + .put_executed_payload_envelope(bid, make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + let Availability::Available(envelope) = result else { + panic!("expected available envelope"); + }; + assert_eq!(envelope.block_root, block_root); + assert_eq!(envelope.envelope.columns.len(), num_sampling_columns); + } + + #[tokio::test] + async fn zero_blob_envelope_is_available_without_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, _columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(0), RNG_SEED); + + let result = cache + .put_executed_payload_envelope(bid, make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + let Availability::Available(envelope) = result else { + panic!("zero-blob block should be available"); + }; + assert!(envelope.envelope.columns.is_empty()); + } + + #[tokio::test] + async fn partial_columns_wait_for_missing_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + + cache + .put_executed_payload_envelope(bid.clone(), make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + + let columns = data_columns.into_iter().take(1).collect(); + let result = cache + .put_rpc_custody_columns(block_root, bid, columns) + .expect("should put columns"); + assert!(matches!(result, Availability::MissingComponents(_))); + } + + #[tokio::test] + async fn reconstruction_failure_clears_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + + let epoch = bid.message.slot.epoch(E::slots_per_epoch()); + let sampling_cols = cache + .custody_context() + .sampling_columns_for_epoch(epoch, &harness.spec); + let columns: Vec<_> = data_columns + .into_iter() + .filter(|c| sampling_cols.contains(c.index())) + .take(5) + .collect(); + let num_columns = columns.len(); + + cache + .put_rpc_custody_columns(block_root, bid, columns) + .expect("should put columns"); + assert_eq!( + cache + .cached_data_column_indexes(&block_root) + .map(|indices| indices.len()), + Some(num_columns) + ); + + cache.handle_reconstruction_failure(&block_root); + assert_eq!(cache.cached_data_column_indexes(&block_root), Some(vec![])); + } + + #[tokio::test] + async fn lru_eviction_keeps_cache_bounded() { + let (harness, cache, _path) = setup().await; + let mut roots = Vec::new(); + + for i in 0..33 { + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED + i); + let column = data_columns.first().cloned().expect("should have column"); + roots.push(block_root); + cache + .put_rpc_custody_columns(block_root, bid, vec![column]) + .expect("should put columns"); + } + + assert_eq!(cache.block_cache_size(), 32); + assert!(cache.get_data_columns(roots[0]).is_none()); + assert!(cache.get_data_columns(*roots.last().unwrap()).is_some()); + } + + #[tokio::test] + async fn maintenance_prunes_old_entries() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + let block_epoch = bid.message.slot.epoch(E::slots_per_epoch()); + let column = data_columns.first().cloned().expect("should have column"); + + cache + .put_rpc_custody_columns(block_root, bid, vec![column]) + .expect("should put columns"); + + assert_eq!(cache.block_cache_size(), 1); + cache + .do_maintenance(block_epoch + 1) + .expect("maintenance should succeed"); + assert_eq!(cache.block_cache_size(), 0); + } +} diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs new file mode 100644 index 0000000000..22bb6d3620 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs @@ -0,0 +1,82 @@ +use kzg::KzgProof; +use ssz_types::VariableList; +use std::sync::Arc; +use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; + +#[derive(Clone)] +pub struct PendingColumn { + cells: Vec, KzgProof)>>, +} + +impl PendingColumn { + /// Allocate a `PendingColumn` whose `cells` vec has space for `blob_count` entries, all + /// initialised to `None`. Required so that `insert(idx, ...)` can write into `cells[idx]`. + pub fn new_with_capacity(blob_count: usize) -> Self { + Self { + cells: vec![None; blob_count], + } + } + + pub fn insert(&mut self, index: usize, cell: &Cell, proof: &KzgProof) { + if let Some(existing_cell) = self.cells.get_mut(index) + && existing_cell.is_none() + { + *existing_cell = Some((cell.clone(), *proof)); + } + } + + pub fn cell_matches(&self, index: usize, cell: &Cell, proof: &KzgProof) -> Option { + self.cells + .get(index)? + .as_ref() + .map(|(c, p)| c == cell && p == proof) + } + + pub fn is_complete(&self, blob_count: usize) -> bool { + self.cells.len() == blob_count && self.cells.iter().all(|cell| cell.is_some()) + } + + /// Build a `DataColumnSidecar` from the cached cells. + /// + /// Caller MUST have checked `is_complete(blob_count)` first; this returns `Err` only on the + /// (currently theoretically impossible) `VariableList` size-bound failures, which we surface + /// as a typed error so the caller can log/metric it instead of silently producing nothing. + pub fn to_sidecar( + &self, + index: ColumnIndex, + slot: Slot, + beacon_block_root: Hash256, + ) -> Result>, PendingColumnError> { + let mut column = Vec::with_capacity(self.cells.len()); + let mut kzg_proofs = Vec::with_capacity(self.cells.len()); + + for cell in self.cells.iter() { + let (cell, proof) = cell.as_ref().ok_or(PendingColumnError::IncompleteColumn)?; + // TODO(gloas): we likely want to go and arc all cells + column.push(cell.clone()); + kzg_proofs.push(*proof); + } + + // TODO(gloas): this hard-codes the Gloas sidecar variant. Pass the fork in once + // post-Gloas variants are introduced (or move construction to a fork-aware helper). + Ok(Arc::new(DataColumnSidecar::Gloas(DataColumnSidecarGloas { + index, + column: VariableList::try_from(column) + .map_err(|_| PendingColumnError::ColumnSizeExceedsBound)?, + kzg_proofs: VariableList::try_from(kzg_proofs) + .map_err(|_| PendingColumnError::ProofsSizeExceedsBound)?, + slot, + beacon_block_root, + }))) + } +} + +/// Errors returned by [`PendingColumn::to_sidecar`]. `IncompleteColumn` should never fire if the +/// caller checks [`PendingColumn::is_complete`] first; the size-bound variants reflect spec-bound +/// invariants and should never fire in practice. +#[derive(Debug, Clone)] +pub enum PendingColumnError { + IncompleteColumn, + ColumnSizeExceedsBound, + ProofsSizeExceedsBound, +} diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs new file mode 100644 index 0000000000..9bf6eb6436 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -0,0 +1,199 @@ +use crate::data_availability_checker::AvailabilityCheckError; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; +use crate::payload_envelope_verification::AvailableEnvelope; +use crate::payload_envelope_verification::AvailableExecutedEnvelope; +use crate::pending_payload_cache::pending_column::PendingColumn; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{Span, debug, debug_span, error}; +use types::DataColumnSidecar; +use types::{ColumnIndex, EthSpec, Hash256, SignedExecutionPayloadBid}; + +/// This represents the components of a payload pending data availability. +/// +/// The columns are all gossip and kzg verified. +/// The payload is considered "available" when all required columns are received. +pub struct PendingComponents { + pub block_root: Hash256, + pub bid: Arc>, + /// a cached post executed payload envelope + pub envelope: Option>, + pub verified_data_columns: HashMap>, + pub reconstruction_started: bool, + pub(crate) span: Span, +} + +impl PendingComponents { + pub fn num_blobs_expected(&self) -> usize { + self.bid.message.blob_kzg_commitments.len() + } + + /// Returns the completed custody columns. + /// + /// Skips columns that are not yet complete and logs an error if a complete column fails to + /// build (a spec-bound invariant would have to be violated; should never happen in practice). + pub fn get_cached_data_columns(&self) -> Vec>> { + let blob_count = self.num_blobs_expected(); + let slot = self.bid.message.slot; + let block_root = self.block_root; + self.verified_data_columns + .iter() + .filter(|(_, col)| col.is_complete(blob_count)) + .filter_map( + |(col_idx, col)| match col.to_sidecar(*col_idx, slot, block_root) { + Ok(sidecar) => Some(sidecar), + Err(e) => { + error!( + ?e, + column_index = %col_idx, + ?block_root, + "Failed to build sidecar for complete column" + ); + None + } + }, + ) + .collect() + } + + /// Returns the indices of cached custody columns + pub fn get_cached_data_columns_indices(&self) -> Vec { + self.verified_data_columns + .iter() + .filter_map(|(col_idx, col)| { + col.is_complete(self.num_blobs_expected()) + .then_some(*col_idx) + }) + .collect() + } + + /// Merges a given set of data columns into the cache. + pub(crate) fn merge_data_columns( + &mut self, + kzg_verified_data_columns: &[KzgVerifiedCustodyDataColumn], + ) { + let num_blobs_expected = self.num_blobs_expected(); + for data_column in kzg_verified_data_columns { + let data_column = data_column.as_data_column(); + // The Vec-backed `PendingColumn` keys cells by index, so we have to allocate up to + // `num_blobs_expected` entries before inserting; otherwise `cells.get_mut(idx)` returns + // None and the insert is a no-op. + let col = self + .verified_data_columns + .entry(*data_column.index()) + .or_insert_with(|| PendingColumn::new_with_capacity(num_blobs_expected)); + for (cell_idx, (cell, proof)) in data_column + .column() + .iter() + .zip(data_column.kzg_proofs().iter()) + .enumerate() + { + col.insert(cell_idx, cell, proof); + } + } + } + + // TODO(gloas): merge partial columns + + /// Inserts an executed payload envelope into the cache. + pub fn insert_executed_payload_envelope( + &mut self, + envelope: AvailabilityPendingExecutedEnvelope, + ) { + self.envelope = Some(envelope); + } + + pub fn num_completed_columns(&self) -> usize { + self.verified_data_columns + .values() + .filter(|col| col.is_complete(self.num_blobs_expected())) + .count() + } + + /// Returns `Some` if the envelope and all required data columns have been received. + pub fn make_available( + &self, + num_expected_columns: usize, + ) -> Result>, AvailabilityCheckError> { + // Check if the payload has been received and executed + let Some(envelope) = &self.envelope else { + return Ok(None); + }; + + let AvailabilityPendingExecutedEnvelope { + envelope, + block_root, + payload_verification_outcome, + } = envelope; + + let columns = if self.num_blobs_expected() == 0 { + self.span.in_scope(|| { + debug!("Bid has no blobs, data is available"); + }); + vec![] + } else { + let num_completed_columns = self.num_completed_columns(); + match num_completed_columns.cmp(&num_expected_columns) { + Ordering::Greater => { + // Should never happen + return Err(AvailabilityCheckError::Unexpected(format!( + "too many columns got {num_completed_columns} expected {num_expected_columns}" + ))); + } + Ordering::Equal => { + self.span.in_scope(|| { + debug!("All data columns received, data is available"); + }); + + self.get_cached_data_columns() + } + Ordering::Less => { + // Not enough data columns received yet + return Ok(None); + } + } + }; + + let available_envelope = AvailableEnvelope::new(envelope.clone(), columns, None); + + Ok(Some(AvailableExecutedEnvelope { + envelope: available_envelope, + block_root: *block_root, + payload_verification_outcome: payload_verification_outcome.clone(), + })) + } + + /// Constructs a fresh `PendingComponents` with no envelope and no columns yet. + pub fn new(block_root: Hash256, bid: Arc>) -> Self { + let span = debug_span!(parent: None, "lh_pending_components", %block_root); + let _guard = span.clone().entered(); + Self { + block_root, + bid, + envelope: None, + verified_data_columns: HashMap::new(), + reconstruction_started: false, + span, + } + } + + pub fn status_str(&self, num_expected_columns: usize) -> String { + format!( + "envelope {}, data_columns {}/{}", + self.envelope.is_some(), + self.verified_data_columns.len(), + num_expected_columns + ) + } +} + +// This enum is only used internally within the crate in the reconstruction function to improve +// readability, so it's OK to not box the variant value, and it shouldn't impact memory much with +// the current usage, as it's deconstructed immediately. +#[allow(clippy::large_enum_variant)] +pub(crate) enum ReconstructColumnsDecision { + Yes(Vec>>), + No(&'static str), +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 8f437998c7..68cec11b76 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2832,11 +2832,42 @@ where .await .expect("newPayload should succeed"); - // Store the envelope. + // Store the envelope and the data columns derived from the block. + // + // Production stores columns inside `import_available_execution_payload_envelope` after + // the cache is satisfied. The harness sidesteps that flow but must still persist columns + // or the `DataColumnMissing` invariant fires for any block with `num_expected_blobs > 0`. + let block = self + .chain + .store + .get_blinded_block(&block_root) + .expect("should read block from store") + .expect("block should exist in store"); + let mut ops = vec![]; + let block_with_full_payload = self + .chain + .store + .make_full_block(&block_root, block.clone()) + .expect("should reconstruct full block"); + let columns = + generate_data_column_sidecars_from_block(&block_with_full_payload, &self.spec); + if !columns.is_empty() + && let Some(store_op) = self.chain.get_blobs_or_columns_store_op( + block_root, + block.slot(), + AvailableBlockData::DataColumns(columns), + ) + { + ops.push(store_op); + } + ops.push(store::StoreOp::PutPayloadEnvelope( + block_root, + std::sync::Arc::new(signed_envelope), + )); self.chain .store - .put_payload_envelope(&block_root, &signed_envelope) - .expect("should store envelope"); + .do_atomically_with_block_and_blobs_cache(ops) + .expect("should persist envelope and columns"); // Update fork choice so it knows the payload was received. self.chain @@ -2857,11 +2888,10 @@ where block: Arc>, ) -> RangeSyncBlock { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - let has_blobs = block - .message() - .body() - .blob_kzg_commitments() - .is_ok_and(|c| !c.is_empty()); + // For Gloas, kzg commitments live in the bid (`signed_execution_payload_bid`), so the + // body's `blob_kzg_commitments()` accessor returns Err. `num_expected_blobs` already + // handles both shapes. + let has_blobs = block.num_expected_blobs() > 0; if !has_blobs { return RangeSyncBlock::new( block, @@ -3781,7 +3811,26 @@ pub fn generate_rand_block_and_blobs( SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { ref mut message, .. }) => add_blob_transactions!(message, FullPayloadFulu, num_blobs, rng, fork_name), - // TODO(EIP-7732) Add `SignedBeaconBlock::Gloas` variant + SignedBeaconBlock::Gloas(SignedBeaconBlockGloas { + ref mut message, .. + }) => { + // For Gloas, commitments are in the bid, not directly in the body. + // BlobSidecars cannot be created for Gloas because there's no merkle proof + // from the block body to the commitments. Return early with empty blob_sidecars. + let num_blobs = match num_blobs { + NumBlobs::Random => rng.random_range(DEFAULT_MIN_BLOBS..=DEFAULT_MAX_BLOBS), + NumBlobs::Number(n) => n, + NumBlobs::None => 0, + }; + let (bundle, _transactions) = + execution_layer::test_utils::generate_blobs::(num_blobs, fork_name).unwrap(); + message + .body + .signed_execution_payload_bid + .message + .blob_kzg_commitments = bundle.commitments.clone(); + return (block, blob_sidecars); + } _ => return (block, blob_sidecars), }; diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 6646fe0b1e..38636540a1 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -323,18 +323,34 @@ fn update_data_column_signed_header( ) { for old_custody_column_sidecar in data_columns.as_mut_slice() { let old_column_sidecar = old_custody_column_sidecar.as_data_column(); - let new_column_sidecar = Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu { - index: *old_column_sidecar.index(), - column: old_column_sidecar.column().clone(), - kzg_commitments: old_column_sidecar.kzg_commitments().unwrap().clone(), - kzg_proofs: old_column_sidecar.kzg_proofs().clone(), - signed_block_header: signed_block.signed_block_header(), - kzg_commitments_inclusion_proof: signed_block - .message() - .body() - .kzg_commitments_merkle_proof() - .unwrap(), - })); + let new_column_sidecar = match old_column_sidecar.as_ref() { + DataColumnSidecar::Fulu(_) => { + Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu { + index: *old_column_sidecar.index(), + column: old_column_sidecar.column().clone(), + kzg_commitments: old_column_sidecar.kzg_commitments().unwrap().clone(), + kzg_proofs: old_column_sidecar.kzg_proofs().clone(), + signed_block_header: signed_block.signed_block_header(), + kzg_commitments_inclusion_proof: signed_block + .message() + .body() + .kzg_commitments_merkle_proof() + .unwrap(), + })) + } + // Gloas columns reference the block by `beacon_block_root` instead of holding the + // block header inline, so updating the parent root just means re-keying the column to + // the new canonical root. + DataColumnSidecar::Gloas(g) => { + Arc::new(DataColumnSidecar::Gloas(types::DataColumnSidecarGloas { + index: g.index, + column: g.column.clone(), + kzg_proofs: g.kzg_proofs.clone(), + slot: g.slot, + beacon_block_root: signed_block.canonical_root(), + })) + } + }; *old_custody_column_sidecar = CustodyDataColumn::from_asserted_custody(new_column_sidecar); } } @@ -2246,7 +2262,6 @@ async fn rpc_block_allows_construction_past_da_boundary() { // Now verify the block is past the DA boundary let da_boundary = harness .chain - .data_availability_checker .data_availability_boundary() .expect("DA boundary should be set"); assert!( diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index e943514c4e..a2ca6ce2dd 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -11,7 +11,8 @@ use types::data::FixedBlobSidecarList; use types::test_utils::TestRandom; use types::{ BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, - MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot, + MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedExecutionPayloadBid, + SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -84,6 +85,15 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { let epoch = slot.epoch(E::slots_per_epoch()); random_sidecar.slot = slot; random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + + // For gloas, the bid must be known, e.g. in the pending payload cache + let mut bid = SignedExecutionPayloadBid::::empty(); + bid.message.slot = Slot::new(10); + harness + .chain + .pending_payload_cache + .insert_bid(random_sidecar.beacon_block_root, Arc::new(bid)); + DataColumnSidecar::Gloas(random_sidecar) } else { let mut random_sidecar = DataColumnSidecarFulu::random_for_test(&mut rng); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 86adf50995..171c83b2a9 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3133,6 +3133,29 @@ async fn weak_subjectivity_sync_test( let beacon_chain = Arc::new(beacon_chain); let wss_block_root = wss_block.canonical_root(); + + // For Gloas, blobs aren't a standalone shape — the WSS data is the column sidecar set, which + // `get_or_reconstruct_blobs` returns `None` for. Copy the WSS block's columns straight from + // the source store so that the destination has them after checkpoint sync, matching what + // network-driven WSS would produce in production. + if wss_block.fork_name_unchecked().gloas_enabled() + && let Ok(Some(source_columns)) = harness + .chain + .store + .get_data_columns(&wss_block_root, ForkName::Gloas) + && !source_columns.is_empty() + && let Some(store_op) = beacon_chain.get_blobs_or_columns_store_op( + wss_block_root, + wss_block.slot(), + beacon_chain::block_verification_types::AvailableBlockData::DataColumns(source_columns), + ) + { + beacon_chain + .store + .do_atomically_with_block_and_blobs_cache(vec![store_op]) + .unwrap(); + } + let store_wss_block = harness .chain .get_block(&wss_block_root) @@ -3200,12 +3223,43 @@ async fn weak_subjectivity_sync_test( .await .unwrap(); - // Store the envelope and apply it to fork choice. + // Store the envelope, its columns, and apply to fork choice. if let Some(envelope) = &snapshot.execution_envelope { + // Persist data columns for Gloas blocks. This mirrors what production does in + // `import_available_execution_payload_envelope` and what the harness now does in + // `process_envelope` — the WSS forward-sync loop bypasses both, so do it directly. + let mut ops = vec![]; + let columns_block = beacon_chain + .store + .get_blinded_block(&block_root) + .unwrap() + .and_then(|b| beacon_chain.store.make_full_block(&block_root, b).ok()); + if let Some(full_block) = columns_block { + let columns = beacon_chain::test_utils::generate_data_column_sidecars_from_block( + &full_block, + &beacon_chain.spec, + ); + if !columns.is_empty() + && let Some(store_op) = beacon_chain.get_blobs_or_columns_store_op( + block_root, + full_block.slot(), + beacon_chain::block_verification_types::AvailableBlockData::DataColumns( + columns, + ), + ) + { + ops.push(store_op); + } + } + ops.push(store::StoreOp::PutPayloadEnvelope( + block_root, + std::sync::Arc::new(envelope.as_ref().clone()), + )); beacon_chain .store - .put_payload_envelope(&block_root, envelope) + .do_atomically_with_block_and_blobs_cache(ops) .unwrap(); + // Update fork choice so head selection accounts for Full payload status. beacon_chain .canonical_head diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9dfb8304bc..7699d25816 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,8 +5,9 @@ use crate::compute_light_client_updates::{ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use beacon_chain::attestation_simulator::start_attestation_simulator_service; -use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service; +use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service as start_block_cache_maintenance_service; use beacon_chain::graffiti_calculator::start_engine_version_cache_refresh_service; +use beacon_chain::pending_payload_cache::start_availability_cache_maintenance_service as start_payload_cache_maintenance_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; use beacon_chain::{ @@ -782,7 +783,11 @@ where } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); - start_availability_cache_maintenance_service( + start_block_cache_maintenance_service( + runtime_context.executor.clone(), + beacon_chain.clone(), + ); + start_payload_cache_maintenance_service( runtime_context.executor.clone(), beacon_chain.clone(), ); diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 65e1a83840..aea0ccdd89 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -7,6 +7,7 @@ use crate::version::{ execution_optimistic_finalized_beacon_response, }; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use beacon_chain::payload_envelope_verification::EnvelopeError; use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer}; use bytes::Bytes; use eth2::types as api_types; @@ -148,7 +149,7 @@ pub async fn publish_execution_payload_envelope( PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip)), ) .map_err(|_| { - beacon_chain::payload_envelope_verification::EnvelopeError::BeaconChainError(Arc::new( + EnvelopeError::BeaconChainError(Arc::new( beacon_chain::BeaconChainError::UnableToPublish, )) }) @@ -272,7 +273,8 @@ fn build_gloas_data_columns( let index = *col.index(); match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) { Ok(verified) => Some(verified), - Err(GossipDataColumnError::PriorKnownUnpublished) => None, + Err(GossipDataColumnError::PriorKnownUnpublished) + | Err(GossipDataColumnError::PriorKnown { .. }) => None, Err(e) => { warn!( %slot, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 644ade956a..e96c86b17f 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -246,7 +246,7 @@ pub async fn publish_block>( if let Err(e) = Box::pin(chain.process_gossip_data_columns(sampling_columns, publish_fn)).await { - let msg = format!("Invalid data column: {e}"); + let msg = format!("Invalid data column: {e:?}"); return if let BroadcastValidation::Gossip = validation_level { Err(warp_utils::reject::broadcast_without_import(msg)) } else { diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 184bfffc9a..b47f8e946a 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -382,6 +382,7 @@ pub async fn proposer_boost_re_org_weight_misprediction() { /// - `num_empty_votes`: percentage of comm of attestations for the parent block /// - `num_head_votes`: number of attestations for the head block /// - `should_re_org`: whether the proposer should build on the parent rather than the head +#[allow(clippy::large_stack_frames)] pub async fn proposer_boost_re_org_test( ReOrgTest { head_slot, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b09dc95db4..4b34d7bfc0 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -1,5 +1,5 @@ use beacon_chain::{ - AvailabilityProcessingStatus, BlockError, attestation_verification::Error as AttnError, + AvailabilityProcessingStatus, attestation_verification::Error as AttnError, light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, sync_committee_verification::Error as SyncCommitteeError, @@ -733,7 +733,7 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) { } pub(crate) fn register_process_result_metrics( - result: &std::result::Result, + result: &std::result::Result>, source: BlockSource, block_component: &'static str, ) { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 0135d7f5dd..b2b5960597 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -698,15 +698,6 @@ impl NetworkBeaconProcessor { } Err(err) => { match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %slot, - %block_root, - %index, - "Invalid gossip data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %slot, @@ -732,6 +723,25 @@ impl NetworkBeaconProcessor { column_sidecar, )); } + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "ignoring", + %unknown_block_root, + "Unknown block root for column" + ); + // TODO(gloas): wire this into proper lookup sync. Sending + // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that + // mixes column processing with the attestation lookup path and is not + // the right primitive for Gloas column lookups. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( @@ -739,10 +749,12 @@ impl NetworkBeaconProcessor { "Internal error when verifying column sidecar" ) } - GossipDataColumnError::ProposalSignatureInvalid + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } @@ -904,14 +916,6 @@ impl NetworkBeaconProcessor { ) { match err { GossipPartialDataColumnError::GossipDataColumnError(err) => match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %block_root, - %index, - "Invalid gossip partial data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %block_root, @@ -933,6 +937,20 @@ impl NetworkBeaconProcessor { slot, }); } + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "requesting block", + %unknown_block_root, + "Unknown block root for partial column" + ); + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + unknown_block_root, + )); + } GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( @@ -940,10 +958,12 @@ impl NetworkBeaconProcessor { "Internal error when verifying partial column sidecar" ) } - GossipDataColumnError::ProposalSignatureInvalid + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } @@ -1323,6 +1343,7 @@ impl NetworkBeaconProcessor { let data_column_slot = verified_data_column.slot(); let data_column_index = verified_data_column.index(); + // TODO(gloas): implement partial messages if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column() && self .chain @@ -1353,7 +1374,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match &result { + match result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { debug!( @@ -1366,6 +1387,14 @@ impl NetworkBeaconProcessor { &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, processing_start_time.elapsed().as_millis() as i64, ); + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If + // importing a block results in `Imported`, notify. Do not notify of data column errors. + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { trace!( @@ -1375,7 +1404,7 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - self.check_reconstruction_trigger(*slot, block_root).await; + self.check_reconstruction_trigger(slot, &block_root).await; } }, Err(BlockError::DuplicateFullyImported(_)) => { @@ -1399,16 +1428,6 @@ impl NetworkBeaconProcessor { ); } } - - // If a block is in the da_checker, sync maybe awaiting for an event when block is finally - // imported. A block can become imported both after processing a block or data column. If a - // importing a block results in `Imported`, notify. Do not notify of data column errors. - if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: true, - }); - } } /// Process a gossip-verified partial data column by merging it in the assembler @@ -1575,7 +1594,7 @@ impl NetworkBeaconProcessor { slot, process_fn: Box::pin(async move { cloned_self - .attempt_data_column_reconstruction(block_root) + .attempt_data_column_reconstruction(slot, block_root) .await; }), }, @@ -1827,7 +1846,10 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::EnvelopeBlockRootUnknown(_)) + | Err(e @ BlockError::OptimisticSyncNotSupported { .. }) => { error!(error = %e, "Internal block gossip validation error"); return None; } @@ -3814,8 +3836,7 @@ impl NetworkBeaconProcessor { | EnvelopeError::UnknownValidator { .. } | EnvelopeError::IncorrectBlockProposer { .. } | EnvelopeError::ExecutionPayloadError(_) - | EnvelopeError::EnvelopeProcessingError(_) - | EnvelopeError::BlockError(_) => { + | EnvelopeError::EnvelopeProcessingError(_) => { self.propagate_validation_result( message_id, peer_id, @@ -3895,11 +3916,9 @@ impl NetworkBeaconProcessor { } EnvelopeError::PriorToFinalization { .. } - | EnvelopeError::OptimisticSyncNotSupported { .. } | EnvelopeError::BeaconChainError(_) | EnvelopeError::BeaconStateError(_) - | EnvelopeError::BlockProcessingError(_) - | EnvelopeError::InternalError(_) => { + | EnvelopeError::ImportError(_) => { self.propagate_validation_result( message_id, peer_id, @@ -3990,7 +4009,8 @@ impl NetworkBeaconProcessor { | EnvelopeError::BlockHashMismatch { .. } | EnvelopeError::UnknownValidator { .. } | EnvelopeError::IncorrectBlockProposer { .. } - | EnvelopeError::ExecutionPayloadError(_) => { + | EnvelopeError::ExecutionPayloadError(_) + | EnvelopeError::EnvelopeProcessingError(_) => { self.gossip_penalize_peer( peer_id, PeerAction::LowToleranceError, @@ -3998,22 +4018,11 @@ impl NetworkBeaconProcessor { ); } - EnvelopeError::EnvelopeProcessingError(_) - | EnvelopeError::BlockError(_) - | EnvelopeError::BlockRootUnknown { .. } => { - self.gossip_penalize_peer( - peer_id, - PeerAction::LowToleranceError, - "gossip_envelope_processing_error", - ); - } - - EnvelopeError::PriorToFinalization { .. } - | EnvelopeError::OptimisticSyncNotSupported { .. } + EnvelopeError::BlockRootUnknown { .. } + | EnvelopeError::PriorToFinalization { .. } | EnvelopeError::BeaconChainError(_) | EnvelopeError::BeaconStateError(_) - | EnvelopeError::BlockProcessingError(_) - | EnvelopeError::InternalError(_) => {} + | EnvelopeError::ImportError(_) => {} }, } } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index bfcff2088b..b11e78bd9b 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -980,9 +980,9 @@ impl NetworkBeaconProcessor { "Fetch blobs completed without import" ); } - Err(FetchEngineBlobError::BlobProcessingError(BlockError::DuplicateFullyImported( - .., - ))) => { + Err(FetchEngineBlobError::BlobProcessingError(e)) + if matches!(*e, BlockError::DuplicateFullyImported(..)) => + { debug!( %block_root, "Fetch blobs duplicate import" @@ -998,6 +998,7 @@ impl NetworkBeaconProcessor { } // Publish partial columns without eager send + // TODO(gloas): implement publish partial columns without eager send if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header); if !columns.is_empty() { @@ -1018,8 +1019,8 @@ impl NetworkBeaconProcessor { /// Attempts to reconstruct all data columns if the conditions checked in /// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied. #[instrument(level = "debug", skip_all, fields(?block_root))] - async fn attempt_data_column_reconstruction(self: &Arc, block_root: Hash256) { - let result = self.chain.reconstruct_data_columns(block_root).await; + async fn attempt_data_column_reconstruction(self: &Arc, slot: Slot, block_root: Hash256) { + let result = self.chain.reconstruct_data_columns(slot, block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8f89b66948..988a68c9dd 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -731,6 +731,8 @@ impl NetworkBeaconProcessor { .map(|block| block.into_available_block()) .collect::>(); + // TODO(gloas) when implementing backfill sync for gloas + // we need a batch verify kzg function in the new da checker match self .chain .data_availability_checker diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c4e7f8f8d1..21745e12db 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -10,7 +10,7 @@ use crate::{ }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; -use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::test_utils::{ @@ -1185,12 +1185,8 @@ async fn accept_processed_gossip_data_columns_without_import() { .map(|data_column| { let subnet_id = DataColumnSubnetId::from_column_index(*data_column.index(), &rig.chain.spec); - validate_data_column_sidecar_for_gossip_fulu::<_, DoNotObserve>( - data_column, - subnet_id, - &rig.chain, - ) - .expect("should be valid data column") + GossipVerifiedDataColumn::<_, DoNotObserve>::new(data_column, subnet_id, &rig.chain) + .expect("should be valid data column") }) .collect(); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 98cf3e0a1f..6d96967fd0 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -549,12 +549,18 @@ mod tests { #[test] fn no_blobs_into_responses() { + let spec = Arc::new(test_spec::()); + let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { - generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut rng) - .0 - .into() + generate_rand_block_and_blobs::( + spec.fork_name_at_epoch(Epoch::new(0)), + NumBlobs::None, + &mut rng, + ) + .0 + .into() }) .collect::>>>(); @@ -565,7 +571,6 @@ mod tests { // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); - let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); // Assert response is finished and RpcBlocks can be constructed diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 734295ac1d..68d193ff50 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -905,9 +905,13 @@ impl SyncManager { }), ); } - // TODO(gloas) support gloas data column variant DataColumnSidecar::Gloas(_) => { - error!("Gloas variant not yet supported") + // TODO(gloas): proper lookup sync for Gloas. Routing into + // `handle_unknown_block_root` here mixes column processing with the + // single-block-lookup path; the Gloas column-arrives-before-block + // case wants its own queue/wakeup. + debug!(%block_root, "Received unknown block data column message"); + self.handle_unknown_block_root(peer_id, block_root); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b1ba87c75d..7cf8a67845 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1085,10 +1085,17 @@ impl SyncNetworkContext { block_root: Hash256, lookup_peers: Arc>>, ) -> Result { + let slot = self + .chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .map(|block| block.slot) + .unwrap_or_else(|| self.chain.slot().unwrap_or_default()); + let custody_indexes_imported = self .chain - .data_availability_checker - .cached_data_column_indexes(&block_root) + .cached_data_column_indexes(&block_root, slot) .unwrap_or_default(); let current_epoch = self.chain.epoch().map_err(|e| { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index a26996ec5e..7b5cd74150 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -2089,8 +2089,7 @@ async fn too_many_processing_failures(depth: usize) { r.build_chain_and_trigger_last_block(depth).await; // Simulate that a peer always returns empty r.simulate( - SimulateConfig::new() - .with_process_result(|| BlockProcessingResult::Err(BlockError::BlockSlotLimitReached)), + SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()), ) .await; // We register multiple penalties, the lookup fails and sync does not progress @@ -2158,9 +2157,10 @@ async fn test_single_block_lookup_duplicate_response() { let mut r = TestRig::default(); r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully - r.simulate(SimulateConfig::new().with_process_result(|| { - BlockProcessingResult::Err(BlockError::DuplicateFullyImported(Hash256::ZERO)) - })) + r.simulate( + SimulateConfig::new() + .with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()), + ) .await; // The block was not actually imported r.assert_head_slot(0); diff --git a/consensus/types/src/block/signed_beacon_block.rs b/consensus/types/src/block/signed_beacon_block.rs index dd6f52426a..b4bbc1a6f7 100644 --- a/consensus/types/src/block/signed_beacon_block.rs +++ b/consensus/types/src/block/signed_beacon_block.rs @@ -354,6 +354,12 @@ impl> SignedBeaconBlock self.message() .body() .blob_kzg_commitments() + .or_else(|_| { + self.message() + .body() + .signed_execution_payload_bid() + .map(|bid| &bid.message.blob_kzg_commitments) + }) .map(|c| c.len()) .unwrap_or(0) } diff --git a/consensus/types/src/execution/signed_execution_payload_bid.rs b/consensus/types/src/execution/signed_execution_payload_bid.rs index 48da445332..37ccb9cf79 100644 --- a/consensus/types/src/execution/signed_execution_payload_bid.rs +++ b/consensus/types/src/execution/signed_execution_payload_bid.rs @@ -25,6 +25,14 @@ pub struct SignedExecutionPayloadBid { } impl SignedExecutionPayloadBid { + pub fn epoch(&self) -> crate::Epoch { + self.message.slot.epoch(E::slots_per_epoch()) + } + + pub fn slot(&self) -> crate::Slot { + self.message.slot + } + pub fn empty() -> Self { Self { message: ExecutionPayloadBid::default(),