From 4535753c9b562ed35876e93cd45b1118f0022ae2 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Mon, 27 Apr 2026 11:36:09 +0200 Subject: [PATCH] starting to cell-ize --- beacon_node/beacon_chain/src/beacon_chain.rs | 16 +- .../beacon_chain/src/block_verification.rs | 2 +- beacon_node/beacon_chain/src/builder.rs | 2 +- .../src/data_availability_checker.rs | 4 +- .../src/data_availability_router.rs | 128 ++++++++------- .../src/data_column_verification.rs | 14 +- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 1 + .../beacon_chain/src/fetch_blobs/mod.rs | 2 +- beacon_node/beacon_chain/src/lib.rs | 2 +- .../payload_envelope_verification/import.rs | 8 +- .../mod.rs | 69 +++----- .../pending_payload_cache/pending_column.rs | 63 ++++++++ .../pending_components.rs | 152 +++++++----------- beacon_node/beacon_chain/src/test_utils.rs | 12 +- .../beacon_chain/tests/block_verification.rs | 46 ++++-- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- .../network/src/sync/network_context.rs | 5 +- beacon_node/network/src/sync/tests/lookups.rs | 5 +- 18 files changed, 297 insertions(+), 236 deletions(-) rename beacon_node/beacon_chain/src/{data_availability_checker_v2 => pending_payload_cache}/mod.rs (96%) create mode 100644 beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs rename beacon_node/beacon_chain/src/{data_availability_checker_v2 => pending_payload_cache}/pending_components.rs (62%) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c52220d787..e7cfe615a1 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,7 +23,6 @@ use crate::data_availability_checker::{ DataColumnReconstructionResult as DataColumnReconstructionResultV1, }; -use crate::data_availability_checker_v2::DataColumnReconstructionResult as DataColumnReconstructionResultV2; use crate::data_availability_router::{ AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome, }; @@ -68,6 +67,7 @@ use crate::partial_data_column_assembler::PartialMergeResult; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; +use crate::pending_payload_cache::DataColumnReconstructionResult as DataColumnReconstructionResultV2; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -2388,7 +2388,11 @@ impl BeaconChain { let _timer = metrics::start_timer( &metrics::PARTIAL_DATA_COLUMN_SIDECAR_HEADER_GOSSIP_VERIFICATION_TIMES, ); - let Some(assembler) = self.data_availability_checker.partial_assembler() else { + let Some(assembler) = self + .data_availability_checker + .pending_block_cache() + .partial_assembler() + else { return Err(GossipPartialDataColumnError::PartialColumnsDisabled); }; if let Some(cached_header) = assembler.get_header(&block_root) { @@ -3341,7 +3345,11 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - let Some(assembler) = self.data_availability_checker.partial_assembler() else { + let Some(assembler) = self + .data_availability_checker + .pending_block_cache() + .partial_assembler() + else { // Partial messages are apparently not activated return Ok(None); }; @@ -3369,6 +3377,7 @@ impl BeaconChain { ); self.emit_sse_data_column_sidecar_events( + slot, &block_root, merge_result .full_columns @@ -3380,6 +3389,7 @@ impl BeaconChain { .data_availability_checker .put_kzg_verified_custody_data_columns( block_root, + slot, merge_result.full_columns.clone(), )?; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index a58a76a2eb..688824d35e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1203,7 +1203,7 @@ impl SignatureVerifiedBlock { block, AvailableBlockData::NoData, // TODO(gloas) shouldnt matter which da checker we pass? - chain.data_availability_checker.v1(), + chain.data_availability_checker.pending_block_cache(), chain.spec.clone(), ) .map_err(BlockError::AvailabilityCheck)?, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 4d1964a0e4..b8aeef0700 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,7 +6,6 @@ use crate::beacon_chain::{ use crate::beacon_proposer_cache::BeaconProposerCache; use crate::custody_context::NodeCustodyType; use crate::data_availability_checker::DataAvailabilityChecker; -use crate::data_availability_checker_v2::DataAvailabilityChecker as DataAvailabilityCheckerV2; use crate::data_availability_router::DataAvailabilityRouter; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; @@ -14,6 +13,7 @@ use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sideca use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; +use crate::pending_payload_cache::PendingPayloadCache as DataAvailabilityCheckerV2; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 8e6bccb9b3..2150d7598b 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -689,12 +689,12 @@ pub fn start_availability_cache_maintenance_service( if chain.spec.deneb_fork_epoch.is_some() { let overflow_cache = chain .data_availability_checker - .v1() + .pending_block_cache() .availability_cache .clone(); let partial_assembler = chain .data_availability_checker - .v1() + .pending_block_cache() .partial_assembler .clone(); executor.spawn( diff --git a/beacon_node/beacon_chain/src/data_availability_router.rs b/beacon_node/beacon_chain/src/data_availability_router.rs index cca5ff207d..0e45a847d3 100644 --- a/beacon_node/beacon_chain/src/data_availability_router.rs +++ b/beacon_node/beacon_chain/src/data_availability_router.rs @@ -1,8 +1,8 @@ //! Abstraction layer for data availability operations across different DA checkers. //! //! This module provides a unified interface for availability operations that are shared -//! between the legacy `DataAvailabilityChecker` (v1, for blocks) and -//! `DataAvailabilityChecker` v2 (for payload envelopes after Gloas). +//! between the legacy `DataAvailabilityChecker` (for blocks) and +//! `DataAvailabilityCache` (for payload envelopes after Gloas). //! //! ## Design //! @@ -21,20 +21,20 @@ use crate::custody_context::CustodyContext; use crate::data_availability_checker::{ Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, DataAvailabilityCheckerMetrics as BlockMetrics, - DataColumnReconstructionResult as BlockReconstructionResult, -}; -use crate::data_availability_checker_v2::{ - Availability as PayloadAvailability, DataAvailabilityChecker as DataAvailabilityCheckerV2, - DataAvailabilityCheckerMetrics as PayloadMetrics, - DataColumnReconstructionResult as PayloadReconstructionResult, + DataColumnReconstructionResult as BlockReconstructionResult, MissingCellsError, }; use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; use crate::observed_data_sidecars::ObservationStrategy; +use crate::pending_payload_cache::{ + Availability as PayloadAvailability, DataAvailabilityCheckerMetrics as PayloadMetrics, + DataColumnReconstructionResult as PayloadReconstructionResult, PendingPayloadCache, +}; use std::sync::Arc; use types::data::{BlobIdentifier, FixedBlobSidecarList}; use types::{ BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot, + DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarRef, + SignedBeaconBlock, Slot, }; /// Unified result from operations that can come from either DA checker. @@ -65,7 +65,6 @@ impl AvailabilityOutcome { match self { Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root, Self::Block(BlockAvailability::MissingComponents(root)) => *root, - // For payload availability, the first element of the tuple is the block root Self::Payload(PayloadAvailability::Available(available_data)) => { available_data.envelope.message().beacon_block_root } @@ -139,19 +138,23 @@ impl ReconstructionOutcome { /// we can use the V2 DA checker directly. pub struct DataAvailabilityRouter { /// Legacy DA checker for pre-Gloas blocks - v1: Arc>, + pending_block_cache: Arc>, /// Gloas DA checker for payload envelopes - v2: Arc>, + pending_payload_cache: Arc>, spec: Arc, } impl DataAvailabilityRouter { pub fn new( - v1: Arc>, - v2: Arc>, + pending_block_cache: Arc>, + pending_payload_cache: Arc>, spec: Arc, ) -> Self { - Self { v1, v2, spec } + Self { + pending_block_cache, + pending_payload_cache, + spec, + } } /// Returns true if the given slot is in the Gloas fork or later. @@ -166,7 +169,7 @@ impl DataAvailabilityRouter { /// Returns the custody context (same for both checkers). pub fn custody_context(&self) -> &Arc> { // Both checkers share the same custody context - self.v1.custody_context() + self.pending_block_cache.custody_context() } /// Query data columns from the appropriate checker based on fork. @@ -176,22 +179,23 @@ impl DataAvailabilityRouter { fork_name: ForkName, ) -> Option> { if fork_name.gloas_enabled() { - self.v2.get_data_columns(block_root) + self.pending_payload_cache.get_data_columns(block_root) } else { - self.v1.get_data_columns(block_root) + self.pending_block_cache.get_data_columns(block_root) } } - pub fn is_data_column_cached( - &self, + pub fn missing_cells_for_column_sidecar<'a>( + &'_ self, slot: Slot, - block_root: &Hash256, - data_column: &DataColumnSidecar, - ) -> bool { + data_column: &'a DataColumnSidecar, + ) -> Result>, MissingCellsError> { if self.is_gloas(slot) { - self.v2.is_data_column_cached(block_root, data_column) + self.pending_payload_cache + .missing_cells_for_column_sidecar(data_column) } else { - self.v1.is_data_column_cached(block_root, data_column) + self.pending_block_cache + .missing_cells_for_column_sidecar(data_column) } } @@ -202,9 +206,11 @@ impl DataAvailabilityRouter { slot: Slot, ) -> Option> { if self.is_gloas(slot) { - self.v2.cached_data_column_indexes(block_root) + self.pending_payload_cache + .cached_data_column_indexes(block_root) } else { - self.v1.cached_data_column_indexes(block_root) + self.pending_block_cache + .cached_data_column_indexes(block_root) } } @@ -216,11 +222,11 @@ impl DataAvailabilityRouter { custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { if self.is_gloas(slot) { - self.v2 + self.pending_payload_cache .put_rpc_custody_columns(block_root, slot, custody_columns) .map(AvailabilityOutcome::Payload) } else { - self.v1 + self.pending_block_cache .put_rpc_custody_columns(block_root, slot, custody_columns) .map(AvailabilityOutcome::Block) } @@ -234,11 +240,11 @@ impl DataAvailabilityRouter { data_columns: Vec>, ) -> Result, AvailabilityCheckError> { if self.is_gloas(slot) { - self.v2 + self.pending_payload_cache .put_gossip_verified_data_columns(block_root, slot, data_columns) .map(AvailabilityOutcome::Payload) } else { - self.v1 + self.pending_block_cache .put_gossip_verified_data_columns(block_root, slot, data_columns) .map(AvailabilityOutcome::Block) } @@ -252,11 +258,11 @@ impl DataAvailabilityRouter { custody_columns: Vec>, ) -> Result, AvailabilityCheckError> { if self.is_gloas(slot) { - self.v2 + self.pending_payload_cache .put_kzg_verified_custody_data_columns(block_root, custody_columns) .map(AvailabilityOutcome::Payload) } else { - self.v1 + self.pending_block_cache .put_kzg_verified_custody_data_columns(block_root, custody_columns) .map(AvailabilityOutcome::Block) } @@ -269,11 +275,11 @@ impl DataAvailabilityRouter { slot: Slot, ) -> Result, AvailabilityCheckError> { if self.is_gloas(slot) { - self.v2 + self.pending_payload_cache .reconstruct_data_columns(block_root) .map(ReconstructionOutcome::Payload) } else { - self.v1 + self.pending_block_cache .reconstruct_data_columns(block_root) .map(ReconstructionOutcome::Block) } @@ -283,22 +289,23 @@ impl DataAvailabilityRouter { /// Returns the data availability boundary epoch (v1). pub fn data_availability_boundary(&self) -> Option { - self.v1.data_availability_boundary() + self.pending_block_cache.data_availability_boundary() } /// Returns whether a DA check is required for the given epoch (v1). pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { - self.v1.da_check_required_for_epoch(epoch) + self.pending_block_cache.da_check_required_for_epoch(epoch) } /// Returns whether blobs are required for the given epoch (v1). pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { - self.v1.blobs_required_for_epoch(epoch) + self.pending_block_cache.blobs_required_for_epoch(epoch) } /// Returns whether data columns are required for the given epoch (v1). pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { - self.v1.data_columns_required_for_epoch(epoch) + self.pending_block_cache + .data_columns_required_for_epoch(epoch) } /// Verifies KZG commitments for a single available block (v1). @@ -306,7 +313,8 @@ impl DataAvailabilityRouter { &self, available_block: &AvailableBlock, ) -> Result<(), AvailabilityCheckError> { - self.v1.verify_kzg_for_available_block(available_block) + self.pending_block_cache + .verify_kzg_for_available_block(available_block) } /// Batch verifies KZG commitments for multiple available blocks (v1). @@ -314,7 +322,7 @@ impl DataAvailabilityRouter { &self, available_blocks: &[AvailableBlock], ) -> Result<(), AvailabilityCheckError> { - self.v1 + self.pending_block_cache .batch_verify_kzg_for_available_blocks(available_blocks) } @@ -323,17 +331,17 @@ impl DataAvailabilityRouter { &self, blob_id: &BlobIdentifier, ) -> Result>>, AvailabilityCheckError> { - self.v1.get_blob(blob_id) + self.pending_block_cache.get_blob(blob_id) } /// Returns the cached blob indexes for a given block root (v1). pub fn cached_blob_indexes(&self, block_root: &Hash256) -> Option> { - self.v1.cached_blob_indexes(block_root) + self.pending_block_cache.cached_blob_indexes(block_root) } /// Returns the cached block for a given block root (v1). pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { - self.v1.get_cached_block(block_root) + self.pending_block_cache.get_cached_block(block_root) } /// Inserts a pre-execution block into the cache (v1). @@ -343,7 +351,8 @@ impl DataAvailabilityRouter { block: Arc>, source: BlockImportSource, ) -> Result<(), AvailabilityCheckError> { - self.v1.put_pre_execution_block(block_root, block, source) + self.pending_block_cache + .put_pre_execution_block(block_root, block, source) } /// Insert an executed block and check availability (v1). @@ -351,12 +360,13 @@ impl DataAvailabilityRouter { &self, executed_block: AvailabilityPendingExecutedBlock, ) -> Result, AvailabilityCheckError> { - self.v1.put_executed_block(executed_block) + self.pending_block_cache.put_executed_block(executed_block) } /// Removes a pre-execution block from the cache on execution error (v1). pub fn remove_block_on_execution_error(&self, block_root: &Hash256) { - self.v1.remove_block_on_execution_error(block_root) + self.pending_block_cache + .remove_block_on_execution_error(block_root) } /// Insert blobs received via RPC and check availability (v1). @@ -365,7 +375,7 @@ impl DataAvailabilityRouter { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result, AvailabilityCheckError> { - self.v1.put_rpc_blobs(block_root, blobs) + self.pending_block_cache.put_rpc_blobs(block_root, blobs) } /// Insert KZG-verified blobs and check availability (v1). @@ -374,7 +384,8 @@ impl DataAvailabilityRouter { block_root: Hash256, blobs: I, ) -> Result, AvailabilityCheckError> { - self.v1.put_kzg_verified_blobs(block_root, blobs) + self.pending_block_cache + .put_kzg_verified_blobs(block_root, blobs) } /// Insert gossip-verified blobs into the v1 checker. @@ -386,15 +397,16 @@ impl DataAvailabilityRouter { block_root: Hash256, blobs: I, ) -> Result, AvailabilityCheckError> { - self.v1.put_gossip_verified_blobs(block_root, blobs) + self.pending_block_cache + .put_gossip_verified_blobs(block_root, blobs) } // ── Metrics ── pub fn metrics(&self) -> DataAvailabilityRouterMetrics { DataAvailabilityRouterMetrics { - block: self.v1.metrics(), - payload: self.v2.metrics(), + block: self.pending_block_cache.metrics(), + payload: self.pending_payload_cache.metrics(), } } @@ -402,14 +414,14 @@ impl DataAvailabilityRouter { /// Direct access to the block-level DA checker (pre-Gloas). /// Used for block availability checks, range sync, and blob verification. - pub fn v1(&self) -> &Arc> { - &self.v1 + pub fn pending_block_cache(&self) -> &Arc> { + &self.pending_block_cache } /// Direct access to the envelope-level DA checker (Gloas). /// Used for payload envelope availability checks and column verification. - pub fn v2(&self) -> &Arc> { - &self.v2 + pub fn pending_payload_cache(&self) -> &Arc> { + &self.pending_payload_cache } } @@ -426,7 +438,5 @@ pub fn start_availability_cache_maintenance_service( executor.clone(), chain.clone(), ); - crate::data_availability_checker_v2::start_availability_cache_maintenance_service( - executor, chain, - ); + crate::pending_payload_cache::start_availability_cache_maintenance_service(executor, chain); } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index c2be03be0b..c360f90301 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -541,7 +541,11 @@ impl GossipVerifiedPartialDataColumnHeader { let header = Arc::new(header); // Cache the valid header - let Some(assembler) = chain.data_availability_checker.partial_assembler() else { + let Some(assembler) = chain + .data_availability_checker + .pending_block_cache() + .partial_assembler() + else { return Err(GossipPartialDataColumnError::PartialColumnsDisabled); }; let newly_cached = assembler.init(group_id, header.clone()); @@ -1005,7 +1009,11 @@ pub fn validate_partial_data_column_sidecar_for_gossip( } } } else { - let Some(assembler) = chain.data_availability_checker.partial_assembler() else { + let Some(assembler) = chain + .data_availability_checker + .pending_block_cache() + .partial_assembler() + else { return PartialColumnVerificationResult::Err( GossipPartialDataColumnError::PartialColumnsDisabled, ); @@ -1062,6 +1070,7 @@ pub fn validate_partial_data_column_sidecar_for_gossip( let column = Arc::from(column); let cells_to_kzg_verify = match chain .data_availability_checker + .pending_block_cache() .missing_cells_for_partial_column_sidecar(&column) { Ok(Some(cells_to_kzg_verify)) => cells_to_kzg_verify, @@ -1622,6 +1631,7 @@ mod test { harness .chain .data_availability_checker + .pending_block_cache() .partial_assembler() .unwrap() .init(block_root, header.clone()); diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 7547a04e32..aaefb5cd3e 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -39,6 +39,7 @@ impl FetchBlobsBeaconAdapter { pub(crate) fn partial_assembler(&self) -> Option>> { self.chain .data_availability_checker + .pending_block_cache() .partial_assembler() .cloned() } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index de65cfcc2e..e2ac20509b 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -445,7 +445,7 @@ async fn compute_custody_columns_to_import( // Only consider columns that are not already known to data availability. if let Some(known_columns) = - chain_adapter_cloned.cached_data_column_indexes(block.slot(), &block_root) + chain_adapter_cloned.cached_data_column_indexes(header.slot(), &block_root) { custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 524b5ad639..4af5b1627e 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,7 +18,6 @@ pub mod canonical_head; pub mod chain_config; pub mod custody_context; pub mod data_availability_checker; -pub mod data_availability_checker_v2; pub mod data_availability_router; pub mod data_column_verification; mod early_attester_cache; @@ -49,6 +48,7 @@ pub mod partial_data_column_assembler; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; +pub mod pending_payload_cache; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 4576d9892e..37ba718111 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -12,7 +12,7 @@ use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, }; -use crate::data_availability_checker_v2::Availability as PayloadAvailability; +use crate::pending_payload_cache::Availability as PayloadAvailability; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, @@ -58,7 +58,7 @@ impl BeaconChain { } self.data_availability_checker - .v2() + .pending_payload_cache() .put_pre_executed_payload_envelope( unverified_envelope.envelope_cloned(), envelope_source, @@ -98,7 +98,7 @@ impl BeaconChain { // chain to get stuck temporarily if the envelope is canonical. Therefore we remove // it from the cache if execution fails. self.data_availability_checker - .v2() + .pending_payload_cache() .remove_pre_executed_payload_envelope(&block_root); })?; @@ -202,7 +202,7 @@ impl BeaconChain { let slot = envelope.envelope.slot(); let availability = AvailabilityOutcome::Payload( self.data_availability_checker - .v2() + .pending_payload_cache() .put_executed_payload_envelope(envelope)?, ); self.process_payload_envelope_availability(slot, availability, || Ok(())) diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs similarity index 96% rename from beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs rename to beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index c6d757cfca..fde5c98327 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -37,7 +37,7 @@ //! │ ▼ //! | -> AvailableExecutedEnvelope (all columns present, payload executed against the EL, ready to import) -use crate::data_availability_checker::AvailabilityCheckError; +use crate::data_availability_checker::{AvailabilityCheckError, MissingCellsError}; use crate::payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope, }; @@ -54,9 +54,11 @@ use task_executor::TaskExecutor; use tracing::{Span, debug, error, instrument, trace}; use types::{ BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, - EthSpec, Hash256, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, Slot, + EthSpec, Hash256, PartialDataColumnSidecarRef, SignedExecutionPayloadBid, + SignedExecutionPayloadEnvelope, Slot, }; +mod pending_column; mod pending_components; use crate::data_column_verification::{ @@ -130,7 +132,7 @@ pub enum DataColumnReconstructionResult { /// Usually data becomes available on its slot within a second of receiving its first component /// over gossip. However, data may never become available if a malicious proposer does not /// publish its data, or there are network issues. Components are only removed via LRU eviction. -pub struct DataAvailabilityChecker { +pub struct PendingPayloadCache { /// Contains all the data we keep in memory, protected by an RwLock availability_cache: RwLock>>, kzg: Arc, @@ -138,7 +140,7 @@ pub struct DataAvailabilityChecker { spec: Arc, } -impl DataAvailabilityChecker { +impl PendingPayloadCache { pub fn new( kzg: Arc, custody_context: Arc>, @@ -166,7 +168,9 @@ impl DataAvailabilityChecker { components.map(|c| { c.verified_data_columns .iter() - .map(|col| col.clone_arc()) + .filter_map(|(col_idx, col)| { + col.try_to_sidecar(*col_idx, c.slot, block_root, c.num_blobs_expected) + }) .collect() }) }) @@ -182,36 +186,10 @@ impl DataAvailabilityChecker { /// Checks if a specific data column is cached for the given block root. #[instrument(skip_all, level = "trace")] - pub fn is_data_column_cached( - &self, - block_root: &Hash256, - data_column: &DataColumnSidecar, - ) -> bool { - self.peek_pending_components(block_root, |components| { - components.is_some_and(|components| { - let cached_column_opt = components.get_cached_data_column(*data_column.index()); - cached_column_opt.is_some_and(|cached| *cached == *data_column) - }) - }) - } - - /// Returns the envelope processing status for the given `block_root`. - pub fn get_envelope_processing_status( - &self, - block_root: &Hash256, - ) -> Option> { - self.peek_pending_components(block_root, |components| { - components.and_then(|c| { - c.envelope.as_ref().map(|envelope| match envelope { - pending_components::CachedPayloadEnvelope::PreExecution(e, source) => { - PayloadEnvelopeProcessingStatus::NotValidated(e.clone(), *source) - } - pending_components::CachedPayloadEnvelope::Executed(e) => { - PayloadEnvelopeProcessingStatus::ExecutionValidated(e.envelope.clone()) - } - }) - }) - }) + pub fn missing_cells_for_column_sidecar<'a>( + &'_ self, + data_column: &'a DataColumnSidecar, + ) -> Result>, MissingCellsError> { } /// Insert an executed payload envelope into the cache and performs an availability check @@ -628,7 +606,10 @@ pub fn start_availability_cache_maintenance_service( chain: Arc>, ) { if chain.spec.gloas_fork_epoch.is_some() { - let da_checker = chain.data_availability_checker.v2().clone(); + let da_checker = chain + .data_availability_checker + .pending_payload_cache() + .clone(); executor.spawn( async move { availability_cache_maintenance_service(chain, da_checker).await }, "availability_cache_service", @@ -640,7 +621,7 @@ pub fn start_availability_cache_maintenance_service( async fn availability_cache_maintenance_service( chain: Arc>, - da_checker: Arc>, + da_checker: Arc>, ) { let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; loop { @@ -725,8 +706,8 @@ mod data_availability_checker_tests { use store::{HotColdDB, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; use types::{ - BeaconState, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, - FullPayload, MinimalEthSpec, SignedBeaconBlock, Slot, + ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, FullPayload, + MinimalEthSpec, SignedBeaconBlock, Slot, }; type E = MinimalEthSpec; @@ -783,7 +764,7 @@ mod data_availability_checker_tests { async fn setup_harness_and_cache() -> ( BeaconChainHarness>, - Arc>, + Arc>, TempDir, ) where @@ -804,12 +785,8 @@ mod data_availability_checker_tests { )); let cache = Arc::new( - DataAvailabilityChecker::::new( - harness.chain.kzg.clone(), - custody_context, - spec.clone(), - ) - .expect("should create cache"), + PendingPayloadCache::::new(harness.chain.kzg.clone(), custody_context, spec.clone()) + .expect("should create cache"), ); (harness, cache, chain_db_path) } diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs new file mode 100644 index 0000000000..66cb8b6334 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs @@ -0,0 +1,63 @@ +use kzg::KzgProof; +use ssz_types::VariableList; +use std::sync::Arc; +use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; + +pub struct PendingColumn { + cells: Vec, KzgProof)>>, +} + +impl PendingColumn { + pub fn new_with_capacity(blobs: usize) -> Self { + Self { + cells: vec![None; blobs], + } + } + + pub fn insert(&mut self, index: usize, cell: &Cell, proof: &KzgProof) { + if let Some(existing_cell) = self.cells.get_mut(index) + && existing_cell.is_none() + { + *existing_cell = Some((cell.clone(), proof.clone())); + } + } + + // TODO(gloas): insert_from_partial + + pub fn is_complete(&self, blob_count: usize) -> bool { + self.cells.len() == blob_count && self.cells.iter().all(|cell| cell.is_some()) + } + + pub fn try_to_sidecar( + &self, + index: ColumnIndex, + slot: Slot, + beacon_block_root: Hash256, + blob_count: usize, + ) -> Option>> { + if self.cells.len() != blob_count { + return None; + } + + let mut column = Vec::with_capacity(self.cells.len()); + let mut kzg_proofs = Vec::with_capacity(self.cells.len()); + + for cell in self.cells.iter() { + let Some((cell, proof)) = cell else { + return None; + }; + // TODO(gloas): we likely want to go and arc all cells + column.push(cell.clone()); + kzg_proofs.push(proof.clone()); + } + + Some(Arc::new(DataColumnSidecar::Gloas(DataColumnSidecarGloas { + index, + // TODO(gloas): this should not error, but we need to catch it + column: VariableList::try_from(column).ok()?, + kzg_proofs: VariableList::try_from(kzg_proofs).ok()?, + slot, + beacon_block_root, + }))) + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs similarity index 62% rename from beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs rename to beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs index 3f9d7e54d0..758a1705f3 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -3,52 +3,40 @@ use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; use crate::payload_envelope_verification::AvailableEnvelope; use crate::payload_envelope_verification::AvailableExecutedEnvelope; +use crate::pending_payload_cache::pending_column::PendingColumn; use std::cmp::Ordering; +use std::collections::HashMap; use std::sync::Arc; use tracing::{Span, debug, debug_span}; -use types::BlockImportSource; +use types::Slot; use types::{ - ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedExecutionPayloadBid, + ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedExecutionPayloadEnvelope, }; -pub enum CachedPayloadEnvelope { - PreExecution(Arc>, BlockImportSource), - Executed(Box>), -} - /// This represents the components of a payload pending data availability. /// /// The columns are all gossip and kzg verified. /// The payload is considered "available" when all required columns are received. pub struct PendingComponents { - /// The execution payload bid containing blob_kzg_commitments. - pub bid: Option>>, - /// a cached pre or post executed payload envelope - pub envelope: Option>, - pub verified_data_columns: Vec>, + pub slot: Slot, + pub num_blobs_expected: usize, + /// a cached post executed payload envelope + pub envelope: Option>, + pub verified_data_columns: HashMap>, pub reconstruction_started: bool, pub(crate) span: Span, spec: Arc, } impl PendingComponents { - /// Returns an immutable reference to the cached data column. - pub fn get_cached_data_column( - &self, - data_column_index: u64, - ) -> Option>> { - self.verified_data_columns - .iter() - .find(|d| d.index() == data_column_index) - .map(|d| d.clone_arc()) - } - /// Returns the indices of cached custody columns pub fn get_cached_data_columns_indices(&self) -> Vec { self.verified_data_columns .iter() - .map(|d| d.index()) + .filter_map(|(col_idx, col)| { + col.is_complete(self.num_blobs_expected).then_some(*col_idx) + }) .collect() } @@ -58,60 +46,49 @@ impl PendingComponents { kzg_verified_data_columns: I, ) -> Result<(), AvailabilityCheckError> { for data_column in kzg_verified_data_columns { - if self.get_cached_data_column(data_column.index()).is_none() { - self.verified_data_columns.push(data_column); + let data_column = data_column.as_data_column(); + let col = self + .verified_data_columns + .entry(*data_column.index()) + .or_insert_with(|| PendingColumn::new_with_capacity(self.num_blobs_expected)); + for (cell_idx, (cell, proof)) in data_column + .column() + .iter() + .zip(data_column.kzg_proofs().iter()) + .enumerate() + { + col.insert(cell_idx, cell, proof); } } Ok(()) } - /// Inserts an execution payload bid into the cache. - pub fn insert_bid(&mut self, bid: Arc>) { - self.bid = Some(bid); - } + // TODO(gloas): merge partial columns /// Inserts an executed payload envelope into the cache. pub fn insert_executed_payload_envelope( &mut self, envelope: AvailabilityPendingExecutedEnvelope, ) { - self.envelope = Some(CachedPayloadEnvelope::Executed(Box::new(envelope))) - } - - /// Inserts a pre-executed payload envelope into the cache. - pub fn insert_pre_executed_payload_envelope( - &mut self, - envelope: Arc>, - import_source: BlockImportSource, - ) { - self.envelope = Some(CachedPayloadEnvelope::PreExecution(envelope, import_source)) + self.envelope = Some(envelope); } /// Returns the number of blobs expected by reading the bid's kzg commitments. /// Returns an error if the bid is not cached. This function should only be called /// after ensuring that the bid has been cached. - pub fn num_blobs_expected(&self) -> Result { - let bid = self - .bid - .as_ref() - .ok_or_else(|| AvailabilityCheckError::Unexpected("No bid available".to_string()))?; - - Ok(bid.message.blob_kzg_commitments.len()) + pub fn num_blobs_expected(&self) -> usize { + self.num_blobs_expected } /// Returns `Some` if the envelope and all required data columns have been received. pub fn make_available( &self, + block_hash: Hash256, num_expected_columns: usize, ) -> Result>, AvailabilityCheckError> { - // If no bid has been received, we can start verifying the columns - if self.bid.is_none() { - return Ok(None); - } - // Check if the payload has been received and executed - let Some(CachedPayloadEnvelope::Executed(envelope)) = self.envelope.as_ref() else { + let Some(envelope) = &self.envelope else { return Ok(None); }; @@ -119,33 +96,30 @@ impl PendingComponents { envelope, import_data, payload_verification_outcome, - } = envelope.as_ref(); + } = envelope; - // Get the number of blobs expected from the bid - let num_expected_blobs = self.num_blobs_expected()?; - - let columns = if num_expected_blobs == 0 { + let columns = if self.num_blobs_expected == 0 { self.span.in_scope(|| { debug!("Bid has no blobs, data is available"); }); vec![] } else { - let num_received_columns = self.verified_data_columns.len(); - match num_received_columns.cmp(&num_expected_columns) { + let data_columns: Vec<_> = self + .verified_data_columns + .iter() + .filter_map(|(col_idx, col)| { + col.try_to_sidecar(*col_idx, self.slot, block_hash, self.num_blobs_expected) + }) + .collect(); + let num_completed_columns = data_columns.len(); + match num_completed_columns.cmp(&num_expected_columns) { Ordering::Greater => { // Should never happen return Err(AvailabilityCheckError::Unexpected(format!( - "too many columns got {num_received_columns} expected {num_expected_columns}" + "too many columns got {num_completed_columns} expected {num_expected_columns}" ))); } Ordering::Equal => { - // We have enough columns - let data_columns = self - .verified_data_columns - .iter() - .map(|d| d.clone().into_inner()) - .collect::>(); - self.span.in_scope(|| { debug!("All data columns received, data is available"); }); @@ -175,13 +149,19 @@ impl PendingComponents { } /// Returns an empty `PendingComponents` object with the given block root. - pub fn empty(block_root: Hash256, spec: Arc) -> Self { - let span = debug_span!(parent: None, "lh_pending_components", %block_root); + pub fn empty( + block_root: Hash256, + slot: Slot, + num_blobs_expected: usize, + spec: Arc, + ) -> Self { + let span = debug_span!(parent: None, "lh_pending_components", %block_root, %slot); let _guard = span.clone().entered(); Self { - bid: None, + slot, + num_blobs_expected, envelope: None, - verified_data_columns: vec![], + verified_data_columns: HashMap::new(), reconstruction_started: false, span, spec, @@ -189,18 +169,8 @@ impl PendingComponents { } /// Returns the epoch of the bid or first data column, if available. - pub fn epoch(&self) -> Option { - // Get epoch from bid - if let Some(bid) = &self.bid { - return Some(bid.message.slot.epoch(E::slots_per_epoch())); - } - - // Or, get epoch from first data column - if let Some(data_column) = self.verified_data_columns.first() { - return Some(data_column.as_data_column().epoch()); - } - - None + pub fn epoch(&self) -> Epoch { + self.slot.epoch(E::slots_per_epoch()) } pub fn status_str(&self, num_expected_columns: usize) -> String { @@ -221,6 +191,7 @@ pub(crate) enum ReconstructColumnsDecision { No(&'static str), } +/* #[cfg(test)] mod pending_components_tests { use crate::test_utils::test_spec; @@ -230,18 +201,6 @@ mod pending_components_tests { type E = MinimalEthSpec; - #[test] - fn test_empty_pending_components() { - let spec = Arc::new(test_spec::()); - let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root, spec); - - assert!(components.bid.is_none()); - assert!(components.verified_data_columns.is_empty()); - assert!(!components.reconstruction_started); - assert!(components.epoch().is_none()); - } - #[test] fn test_get_cached_data_columns_indices_empty() { let spec = Arc::new(test_spec::()); @@ -289,3 +248,4 @@ mod pending_components_tests { assert!(result.unwrap().is_none()); } } +*/ diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 0f02d528a4..39da3ce0ae 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2826,7 +2826,7 @@ where return RangeSyncBlock::new( block, AvailableBlockData::NoData, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), ) .unwrap(); @@ -2845,7 +2845,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), ) .unwrap() @@ -2860,7 +2860,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), ) .unwrap() @@ -2889,14 +2889,14 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), )? } else { RangeSyncBlock::new( block, AvailableBlockData::NoData, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), )? } @@ -2916,7 +2916,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.v1(), + self.chain.data_availability_checker.pending_block_cache(), self.chain.spec.clone(), )? }) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index e0e87dde9e..5e27985558 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -165,7 +165,7 @@ where RangeSyncBlock::new( block, block_data, - chain.data_availability_checker.v1(), + chain.data_availability_checker.pending_block_cache(), chain.spec.clone(), ) .unwrap() @@ -180,7 +180,7 @@ where RangeSyncBlock::new( block, block_data, - chain.data_availability_checker.v1(), + chain.data_availability_checker.pending_block_cache(), chain.spec.clone(), ) .unwrap() @@ -188,7 +188,7 @@ where None => RangeSyncBlock::new( block, AvailableBlockData::NoData, - chain.data_availability_checker.v1(), + chain.data_availability_checker.pending_block_cache(), chain.spec.clone(), ) .unwrap(), @@ -462,7 +462,10 @@ async fn chain_segment_non_linear_parent_roots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.spec.clone(), ) .unwrap(); @@ -502,7 +505,10 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.spec.clone(), ) .unwrap(); @@ -532,7 +538,10 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.chain.spec.clone(), ) .unwrap(); @@ -1714,7 +1723,10 @@ async fn add_base_block_to_altair_chain() { let base_range_sync_block = RangeSyncBlock::new( Arc::new(base_block.clone()), AvailableBlockData::NoData, - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.spec.clone(), ) .unwrap(); @@ -1958,7 +1970,10 @@ async fn import_duplicate_block_unrealized_justification() { let range_sync_block = RangeSyncBlock::new( block.clone(), AvailableBlockData::NoData, - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.spec.clone(), ) .unwrap(); @@ -2092,7 +2107,10 @@ async fn range_sync_block_construction_fails_with_wrong_blob_count() { let result = RangeSyncBlock::new( Arc::new(block), block_data, - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.chain.spec.clone(), ); @@ -2170,7 +2188,10 @@ async fn range_sync_block_rejects_missing_custody_columns() { let result = RangeSyncBlock::new( Arc::new(block), block_data, - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.chain.spec.clone(), ); @@ -2261,7 +2282,10 @@ async fn rpc_block_allows_construction_past_da_boundary() { let result = RangeSyncBlock::new( Arc::new(block), AvailableBlockData::NoData, - harness.chain.data_availability_checker.v1(), + harness + .chain + .data_availability_checker + .pending_block_cache(), harness.chain.spec.clone(), ); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 5d6f644dad..3040f91342 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3300,7 +3300,7 @@ async fn weak_subjectivity_sync_test( AvailableBlock::new( Arc::new(corrupt_block), data, - beacon_chain.data_availability_checker.v1(), + beacon_chain.data_availability_checker.pending_block_cache(), Arc::new(spec), ) .expect("available block") diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 11722663ad..87addbfd8b 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -778,7 +778,10 @@ impl SyncNetworkContext { let range_req = entry.get_mut(); if let Some(blocks_result) = range_req.responses( - self.chain.data_availability_checker.v1().clone(), + self.chain + .data_availability_checker + .pending_block_cache() + .clone(), self.chain.spec.clone(), ) { if let Err(CouplingError::DataColumnPeerFailure { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 2de00bb219..9a3f0a5311 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1110,7 +1110,10 @@ impl TestRig { let range_sync_block = RangeSyncBlock::new( block, block_data, - self.harness.chain.data_availability_checker.v1(), + self.harness + .chain + .data_availability_checker + .pending_block_cache(), self.harness.chain.spec.clone(), ) .unwrap();