From d9c21f5e3301570090bf10cdff4b46c9942a704e Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 27 Jan 2026 19:32:30 -0800 Subject: [PATCH] Add da router, and initial logic --- beacon_node/beacon_chain/src/beacon_chain.rs | 143 +- .../beacon_chain/src/block_verification.rs | 2 + beacon_node/beacon_chain/src/builder.rs | 45 +- .../beacon_chain/src/custody_context.rs | 9 +- .../src/data_availability_checker.rs | 226 +-- .../src/data_availability_checker_v2.rs | 1098 ++++++++++++++ .../overflow_lru_cache.rs | 1306 +++++++++++++++++ .../state_lru_cache.rs | 138 ++ .../src/data_column_availability_cache.rs | 388 +++++ .../src/data_column_verification.rs | 19 +- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 9 +- .../beacon_chain/src/fetch_blobs/mod.rs | 2 +- .../beacon_chain/src/fetch_blobs/tests.rs | 2 +- beacon_node/beacon_chain/src/kzg_utils.rs | 2 +- beacon_node/beacon_chain/src/lib.rs | 3 + beacon_node/beacon_chain/src/metrics.rs | 3 +- .../src/payload_verification_types.rs | 74 + beacon_node/beacon_chain/src/test_utils.rs | 74 + .../lighthouse_network/src/rpc/codec.rs | 2 + .../lighthouse_network/src/rpc/methods.rs | 8 +- .../lighthouse_network/tests/rpc_tests.rs | 2 + .../gossip_methods.rs | 29 +- .../src/network_beacon_processor/mod.rs | 4 +- .../network_beacon_processor/rpc_methods.rs | 4 +- .../network_beacon_processor/sync_methods.rs | 2 + .../network/src/sync/block_lookups/common.rs | 2 +- .../sync/block_lookups/single_block_lookup.rs | 6 +- .../network/src/sync/network_context.rs | 6 +- .../requests/data_columns_by_root.rs | 1 + beacon_node/network/src/sync/tests/lookups.rs | 4 + 30 files changed, 3405 insertions(+), 208 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_availability_checker_v2.rs create mode 100644 beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs create mode 100644 beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs create mode 100644 beacon_node/beacon_chain/src/data_column_availability_cache.rs create mode 100644 beacon_node/beacon_chain/src/payload_verification_types.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e5bdda384f..2b71734f15 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -25,6 +25,10 @@ use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, DataAvailabilityChecker, DataColumnReconstructionResult, }; +use crate::data_availability_checker_v2::DataAvailabilityChecker as DataAvailabilityCheckerV2; +use crate::data_column_availability_cache::{ + AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome, +}; use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; @@ -476,7 +480,8 @@ pub struct BeaconChain { pub genesis_backfill_slot: Slot, /// Provides a KZG verification and temporary storage for blocks and blobs as /// they are collected and combined. - pub data_availability_checker: Arc>, + pub data_availability_checker: + Arc, DataAvailabilityCheckerV2>>, /// The KZG trusted setup used by this chain. pub kzg: Arc, /// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing. @@ -1123,10 +1128,11 @@ impl BeaconChain { &self, block_root: Hash256, indices: &[ColumnIndex], + fork_name: ForkName, ) -> Result, Error> { let all_cached_columns_opt = self .data_availability_checker - .get_data_columns(block_root) + .get_data_columns(block_root, fork_name) .or_else(|| self.early_attester_cache.get_data_columns(block_root)); if let Some(mut all_cached_columns) = all_cached_columns_opt { @@ -1286,7 +1292,11 @@ impl BeaconChain { /// chain. Used by sync to learn the status of a block and prevent repeated downloads / /// processing attempts. pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus { - if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) { + if let Some(cached_block) = self + .data_availability_checker + .v1() + .get_cached_block(block_root) + { return cached_block; } @@ -3060,6 +3070,7 @@ impl BeaconChain { } self.emit_sse_data_column_sidecar_events( + slot, &block_root, data_columns.iter().map(|column| column.as_data_column()), ); @@ -3136,6 +3147,7 @@ impl BeaconChain { } EngineGetBlobsOutput::CustodyColumns(columns) => { self.emit_sse_data_column_sidecar_events( + slot, &block_root, columns.iter().map(|column| column.as_data_column()), ); @@ -3155,6 +3167,7 @@ impl BeaconChain { { let imported_blobs = self .data_availability_checker + .v1() .cached_blob_indexes(block_root) .unwrap_or_default(); let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index)); @@ -3169,6 +3182,7 @@ impl BeaconChain { fn emit_sse_data_column_sidecar_events<'a, I>( self: &Arc, + slot: Slot, block_root: &Hash256, data_columns_iter: I, ) where @@ -3179,7 +3193,7 @@ impl BeaconChain { { let imported_data_columns = self .data_availability_checker - .cached_data_column_indexes(block_root) + .cached_data_column_indexes(block_root, slot) .unwrap_or_default(); let new_data_columns = data_columns_iter.filter(|b| !imported_data_columns.contains(&b.index)); @@ -3232,6 +3246,7 @@ impl BeaconChain { } self.emit_sse_data_column_sidecar_events( + slot, &block_root, custody_columns.iter().map(|column| column.as_ref()), ); @@ -3242,6 +3257,7 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, + slot: Slot, block_root: Hash256, ) -> Result< Option<( @@ -3268,33 +3284,45 @@ impl BeaconChain { .task_executor .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { let _guard = current_span.enter(); - data_availability_checker.reconstruct_data_columns(&block_root) + data_availability_checker.reconstruct_data_columns(&block_root, slot) }) .await .map_err(|_| BeaconChainError::RuntimeShutdown)??; match result { - DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); - }; + ReconstructionOutcome::Block(data_column_reconstruction_result) => { + match data_column_reconstruction_result { + DataColumnReconstructionResult::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. + return Ok(None); + }; - self.process_availability(slot, availability, || Ok(())) - .await - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) - } - DataColumnReconstructionResult::NotStarted(reason) - | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { - // We use metric here because logging this would be *very* noisy. - metrics::inc_counter_vec( - &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, - &[reason], - ); - Ok(None) + self.process_availability( + slot, + AvailabilityOutcome::Block(availability), + || Ok(()), + ) + .await + .map(|availability_processing_status| { + Some((availability_processing_status, data_columns_to_publish)) + }) + } + DataColumnReconstructionResult::NotStarted(reason) + | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { + // We use metric here because logging this would be *very* noisy. + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } + } } + ReconstructionOutcome::Payload(_data_column_reconstruction_result) => todo!(), } } @@ -3343,11 +3371,9 @@ impl BeaconChain { ); } - self.data_availability_checker.put_pre_execution_block( - block_root, - unverified_block.block_cloned(), - block_source, - )?; + self.data_availability_checker + .v1() + .put_pre_execution_block(block_root, unverified_block.block_cloned(), block_source)?; // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -3382,6 +3408,7 @@ impl BeaconChain { // chain to get stuck temporarily if the block is canonical. Therefore we remove // it from the cache if execution fails. self.data_availability_checker + .v1() .remove_block_on_execution_error(&block_root); })?; @@ -3509,7 +3536,11 @@ impl BeaconChain { block: AvailabilityPendingExecutedBlock, ) -> Result { let slot = block.block.slot(); - let availability = self.data_availability_checker.put_executed_block(block)?; + let availability = AvailabilityOutcome::Block( + self.data_availability_checker + .v1() + .put_executed_block(block)?, + ); self.process_availability(slot, availability, || Ok(())) .await } @@ -3524,9 +3555,11 @@ impl BeaconChain { if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(blob.signed_block_header()); } - let availability = self - .data_availability_checker - .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?; + let availability = AvailabilityOutcome::Block( + self.data_availability_checker + .v1() + .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?, + ); self.process_availability(slot, availability, || Ok(())) .await @@ -3597,9 +3630,11 @@ impl BeaconChain { block_root, blobs.iter().flatten().map(Arc::as_ref), )?; - let availability = self - .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + let availability = AvailabilityOutcome::Block( + self.data_availability_checker + .v1() + .put_rpc_blobs(block_root, blobs)?, + ); self.process_availability(slot, availability, || Ok(())) .await @@ -3617,8 +3652,12 @@ impl BeaconChain { block_root, blobs.iter().map(|b| b.as_blob()), )?; - self.data_availability_checker - .put_kzg_verified_blobs(block_root, blobs)? + let availability = self + .data_availability_checker + .v1() + .put_kzg_verified_blobs(block_root, blobs)?; + + AvailabilityOutcome::Block(availability) } EngineGetBlobsOutput::CustodyColumns(data_columns) => { self.check_data_column_sidecar_header_signature_and_slashability( @@ -3626,7 +3665,7 @@ impl BeaconChain { data_columns.iter().map(|c| c.as_data_column()), )?; self.data_availability_checker - .put_kzg_verified_custody_data_columns(block_root, data_columns)? + .put_kzg_verified_custody_data_columns(block_root, slot, data_columns)? } }; @@ -3699,18 +3738,23 @@ impl BeaconChain { async fn process_availability( self: &Arc, slot: Slot, - availability: Availability, + availability: AvailabilityOutcome, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { - Availability::Available(block) => { - publish_fn()?; - // Block is fully available, import into fork choice - self.import_available_block(block).await + AvailabilityOutcome::Block(availability) => { + match availability { + Availability::Available(block) => { + publish_fn()?; + // Block is fully available, import into fork choice + self.import_available_block(block).await + } + Availability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), + } } - Availability::MissingComponents(block_root) => Ok( - AvailabilityProcessingStatus::MissingComponents(slot, block_root), - ), + AvailabilityOutcome::Payload(_availability) => todo!(), } } @@ -7300,12 +7344,15 @@ impl BeaconChain { /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { - self.data_availability_checker.data_availability_boundary() + self.data_availability_checker + .v1() + .data_availability_boundary() } /// Returns true if epoch is within the data availability boundary pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { self.data_availability_checker + .v1() .da_check_required_for_epoch(epoch) } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index df8c49f8de..6100335fbc 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -649,6 +649,7 @@ pub fn signature_verify_chain_segment( let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); let maybe_available_blocks = chain .data_availability_checker + .v1() .verify_kzg_for_rpc_blocks(blocks)?; // zip it back up let mut signature_verified_blocks = roots @@ -1299,6 +1300,7 @@ impl IntoExecutionPendingBlock for RpcBlock .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; let maybe_available = chain .data_availability_checker + .v1() .verify_kzg_for_rpc_block(self.clone()) .map_err(|e| { BlockSlashInfo::SignatureNotChecked( diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index dc38fc1c29..f5ee72f7e7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,6 +6,8 @@ use crate::beacon_chain::{ use crate::beacon_proposer_cache::BeaconProposerCache; use crate::custody_context::NodeCustodyType; use crate::data_availability_checker::DataAvailabilityChecker; +use crate::data_availability_checker_v2::DataAvailabilityChecker as DataAvailabilityCheckerV2; +use crate::data_column_availability_cache::DataAvailabilityRouter; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; @@ -975,6 +977,37 @@ where }; debug!(?custody_context, "Loaded persisted custody context"); + let custody_context = Arc::new(custody_context); + let da_checker_v1 = Arc::new( + DataAvailabilityChecker::new( + complete_blob_backfill, + slot_clock.clone(), + self.kzg.clone(), + store.clone(), + custody_context.clone(), + self.spec.clone(), + ) + .map_err(|e| format!("Error initializing DataAvailabilityCheckerV1: {:?}", e))?, + ); + + let da_checker_v2 = Arc::new( + DataAvailabilityCheckerV2::new( + complete_blob_backfill, + slot_clock.clone(), + self.kzg.clone(), + store.clone(), + custody_context.clone(), + self.spec.clone(), + ) + .map_err(|e| format!("Error initializing DataAvailabilityCheckerV2: {:?}", e))?, + ); + + let data_availability_checker = Arc::new(DataAvailabilityRouter::new( + da_checker_v1, + da_checker_v2, + self.spec.clone(), + )); + let beacon_chain = BeaconChain { spec: self.spec.clone(), config: self.chain_config, @@ -1043,17 +1076,7 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, - data_availability_checker: Arc::new( - DataAvailabilityChecker::new( - complete_blob_backfill, - slot_clock, - self.kzg.clone(), - store, - Arc::new(custody_context), - self.spec, - ) - .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, - ), + data_availability_checker, kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), }; diff --git a/beacon_node/beacon_chain/src/custody_context.rs b/beacon_node/beacon_chain/src/custody_context.rs index c512ce616a..cebb256a02 100644 --- a/beacon_node/beacon_chain/src/custody_context.rs +++ b/beacon_node/beacon_chain/src/custody_context.rs @@ -7,7 +7,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; use tracing::{debug, warn}; -use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; +use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, SignedExecutionPayloadEnvelope, Slot}; /// A delay before making the CGC change effective to the data availability checker. pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; @@ -527,6 +527,13 @@ impl CustodyContext { .write() .reset_validator_custody_requirements(effective_epoch); } + + pub fn data_columns_required_for_payload( + &self, + _payload: &SignedExecutionPayloadEnvelope, + ) -> bool { + todo!() + } } /// Indicates that the custody group count (CGC) has increased. diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 7aec24b8e5..dc35965c0d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -7,6 +7,7 @@ use crate::block_verification_types::{ use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; +use crate::data_column_availability_cache::DataColumnCache; use crate::{ BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics, }; @@ -142,10 +143,6 @@ impl DataAvailabilityChecker { }) } - pub fn custody_context(&self) -> &Arc> { - &self.custody_context - } - /// Checks if the block root is currently in the availability cache awaiting import because /// of missing components. /// @@ -169,30 +166,6 @@ impl DataAvailabilityChecker { }) } - /// Return the set of cached custody column indexes for `block_root`. Returns None if there is - /// no block component for `block_root`. - pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { - self.availability_cache - .peek_pending_components(block_root, |components| { - components.map(|components| components.get_cached_data_columns_indices()) - }) - } - - /// Check if the exact data column is in the availability cache. - pub fn is_data_column_cached( - &self, - block_root: &Hash256, - data_column: &DataColumnSidecar, - ) -> bool { - self.availability_cache - .peek_pending_components(block_root, |components| { - components.is_some_and(|components| { - let cached_column_opt = components.get_cached_data_column(data_column.index); - cached_column_opt.is_some_and(|cached| *cached == *data_column) - }) - }) - } - /// Get a blob from the availability cache. pub fn get_blob( &self, @@ -201,14 +174,6 @@ impl DataAvailabilityChecker { self.availability_cache.peek_blob(blob_id) } - /// Get data columns for a block from the availability cache. - pub fn get_data_columns( - &self, - block_root: Hash256, - ) -> Option> { - self.availability_cache.peek_data_columns(block_root) - } - /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. #[instrument(skip_all, level = "trace")] @@ -236,39 +201,6 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, verified_blobs) } - /// Put a list of custody columns received via RPC into the availability cache. This performs KZG - /// verification on the blobs in the list. - #[allow(clippy::type_complexity)] - #[instrument(skip_all, level = "trace")] - pub fn put_rpc_custody_columns( - &self, - block_root: Hash256, - slot: Slot, - custody_columns: DataColumnSidecarList, - ) -> Result, AvailabilityCheckError> { - // Attributes fault to the specific peer that sent an invalid column - let kzg_verified_columns = - KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; - - // Filter out columns that aren't required for custody for this slot - // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ - // not be yet effective for data availability check, as CGC changes are only effecive from - // a new epoch. - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); - let verified_custody_columns = kzg_verified_columns - .into_iter() - .filter(|col| sampling_columns.contains(&col.index())) - .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) - .collect::>(); - - self.availability_cache - .put_kzg_verified_data_columns(block_root, verified_custody_columns) - } - /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -297,47 +229,6 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, blobs) } - /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also - /// have a block cached, return the `Availability` variant triggering block import. - /// Otherwise cache the data column sidecar. - /// - /// This should only accept gossip verified data columns, so we should not have to worry about dupes. - #[instrument(skip_all, level = "trace")] - pub fn put_gossip_verified_data_columns< - O: ObservationStrategy, - I: IntoIterator>, - >( - &self, - block_root: Hash256, - slot: Slot, - data_columns: I, - ) -> Result, AvailabilityCheckError> { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_columns = self - .custody_context - .sampling_columns_for_epoch(epoch, &self.spec); - let custody_columns = data_columns - .into_iter() - .filter(|col| sampling_columns.contains(&col.index())) - .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) - .collect::>(); - - self.availability_cache - .put_kzg_verified_data_columns(block_root, custody_columns) - } - - #[instrument(skip_all, level = "trace")] - pub fn put_kzg_verified_custody_data_columns< - I: IntoIterator>, - >( - &self, - block_root: Hash256, - custody_columns: I, - ) -> Result, AvailabilityCheckError> { - self.availability_cache - .put_kzg_verified_data_columns(block_root, custody_columns) - } - /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_executed_block( @@ -573,9 +464,116 @@ impl DataAvailabilityChecker { block_cache_size: self.availability_cache.block_cache_size(), } } +} + +impl DataColumnCache for DataAvailabilityChecker { + type Availability = Availability; + type ReconstructionResult = DataColumnReconstructionResult; + + fn custody_context(&self) -> &Arc> { + &self.custody_context + } + + /// Get data columns for a block from the availability cache. + fn get_data_columns(&self, block_root: Hash256) -> Option> { + self.availability_cache.peek_data_columns(block_root) + } + + /// Return the set of cached custody column indices for `block_root`. Returns None if there is + /// no block component for `block_root`. + fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + + /// Check if the exact data column is in the availability cache. + fn is_data_column_cached( + &self, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.is_some_and(|components| { + let cached_column_opt = components.get_cached_data_column(data_column.index); + cached_column_opt.is_some_and(|cached| *cached == *data_column) + }) + }) + } + + /// Put a list of custody columns received via RPC into the availability cache. This performs KZG + /// verification on the blobs in the list. + #[allow(clippy::type_complexity)] + #[instrument(skip_all, level = "trace")] + fn put_rpc_custody_columns( + &self, + block_root: Hash256, + slot: Slot, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + // Attributes fault to the specific peer that sent an invalid column + let kzg_verified_columns = + KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + + // Filter out columns that aren't required for custody for this slot + // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ + // not be yet effective for data availability check, as CGC changes are only effecive from + // a new epoch. + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let verified_custody_columns = kzg_verified_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, verified_custody_columns) + } + + /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also + /// have a block cached, return the `Availability` variant triggering block import. + /// Otherwise cache the data column sidecar. + /// + /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + #[instrument(skip_all, level = "trace")] + fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + slot: Slot, + data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let custody_columns = data_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, custody_columns) + } + + #[instrument(skip_all, level = "trace")] + fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_kzg_verified_data_columns(block_root, custody_columns) + } #[instrument(skip_all, level = "debug")] - pub fn reconstruct_data_columns( + fn reconstruct_data_columns( &self, block_root: &Hash256, ) -> Result, AvailabilityCheckError> { @@ -675,7 +673,11 @@ pub fn start_availability_cache_maintenance_service( ) { // this cache only needs to be maintained if deneb is configured if chain.spec.deneb_fork_epoch.is_some() { - let overflow_cache = chain.data_availability_checker.availability_cache.clone(); + let overflow_cache = chain + .data_availability_checker + .v1() + .availability_cache + .clone(); executor.spawn( async move { availability_cache_maintenance_service(chain, overflow_cache).await }, "availability_cache_service", diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs new file mode 100644 index 0000000000..3378e41e25 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs @@ -0,0 +1,1098 @@ +use crate::data_availability_checker_v2::overflow_lru_cache::{ + DataAvailabilityCheckerInner, ReconstructColumnsDecision, +}; + +use crate::data_availability_checker::AvailabilityCheckError; +use crate::data_column_availability_cache::DataColumnCache; +use crate::payload_verification_types::{ + AvailabilityPendingExecutedPayload, AvailableExecutedPayload, PayloadProcessStatus, +}; +use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics}; +use educe::Educe; +use kzg::Kzg; +use slot_clock::SlotClock; +use std::collections::HashSet; +use std::fmt; +use std::fmt::Debug; +use std::num::NonZeroUsize; +use std::sync::Arc; +use std::time::Duration; +use task_executor::TaskExecutor; +use tracing::{debug, error, instrument}; +use types::{ + BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, + Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, +}; + +mod overflow_lru_cache; +mod state_lru_cache; + +use crate::data_column_verification::{ + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column_list, +}; +use crate::metrics::{ + KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, +}; +use crate::observed_data_sidecars::ObservationStrategy; +use types::new_non_zero_usize; + +/// The LRU Cache stores `PendingComponents`, which store payload and its associated column data: +/// +/// With `MAX_BLOBS_PER_BLOCK` = 48 for exa,ple, the maximum size of data columns +/// in `PendingComponents` is ~12.29 MB. Setting this to 32 means the maximum size of the cache is +/// approximately 0.4 GB. +/// +/// `PendingComponents` are now never removed from the cache manually are only removed via LRU +/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time. +const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); +const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); + +/// Cache to hold fully valid data that can't be imported to fork-choice yet. After the Gloas hard-fork +/// beacon blocks can be immediately imported into fork choice. The execution payload is now separated out from +/// the beacon block. The payload envelope and data columns are received separately from the network. The block +/// is now always considered "available". Availability checks are now made on the payload and it is considered +/// "fully available" when the payload and all required columns are inserted into this cache. +/// +/// Usually a payload becomes available on its slot within a second of receiving its first component +/// over gossip. However, a payload may never become available if a malicious proposer does not +/// publish its data, or there are network issues that prevent us from receiving it. If the payload +/// does not become available after some time we can safely forget about it. Consider these two +/// cases: +/// +/// - Global unavailability: If nobody has received the payload components it's likely that the +/// builder never made the payload available. So we can safely forget about the payload as it will +/// never become available. +/// - Local unavailability: Some fraction of the network has received all payload components, but not us. +/// Some of our peers will eventually attest to a descendant of that block and lookup sync will +/// fetch its components. Therefore it's not strictly necessary to hold to the partially available +/// payload for too long as we can recover from other peers. +/// +/// Even in periods of non-finality, the builder is expected to publish the payload's data +/// immediately. Because this cache only holds fully valid data, its capacity is bound to 1 block +/// per slot and fork: before inserting into this cache we check the proposer signature and correct +/// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch +/// data during moments of unstable network conditions. +pub struct DataAvailabilityChecker { + #[allow(dead_code)] + complete_blob_backfill: bool, + availability_cache: Arc>, + #[allow(dead_code)] + slot_clock: T::SlotClock, + kzg: Arc, + custody_context: Arc>, + spec: Arc, +} + +pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); + +#[derive(Debug)] +pub enum DataColumnReconstructionResult { + Success(AvailabilityAndReconstructedColumns), + NotStarted(&'static str), + RecoveredColumnsNotImported(&'static str), +} + +/// This type is returned after adding a payload / column to the `DataAvailabilityChecker`. +/// +/// Indicates if the payload is fully `Available` or if we need columns or payload +/// to "complete" the requirements for an `AvailablePayload`. +pub enum Availability { + MissingComponents(Hash256), + Available(Box>), +} + +impl Debug for Availability { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::MissingComponents(block_root) => { + write!(f, "MissingComponents({})", block_root) + } + Self::Available(payload) => write!(f, "Available({:?})", payload.payload.block_root), + } + } +} + +impl DataColumnCache for DataAvailabilityChecker { + type Availability = Availability; + type ReconstructionResult = DataColumnReconstructionResult; + + /// Returns the custody context used by this checker. + fn custody_context(&self) -> &Arc> { + &self.custody_context + } + + /// Returns all cached data columns for the given block root, if any. + #[instrument(skip_all, level = "trace")] + fn get_data_columns(&self, block_root: Hash256) -> Option> { + self.availability_cache.peek_data_columns(block_root) + } + + /// Returns the indices of cached data columns for the given block root. + #[instrument(skip_all, level = "trace")] + fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + + /// Checks if a specific data column is cached for the given block root. + #[instrument(skip_all, level = "trace")] + fn is_data_column_cached( + &self, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.is_some_and(|components| { + let cached_column_opt = components.get_cached_data_column(data_column.index); + cached_column_opt.is_some_and(|cached| *cached == *data_column) + }) + }) + } + + /// Insert RPC custody columns and check if the block/payload becomes available. + #[instrument(skip_all, level = "trace")] + fn put_rpc_custody_columns( + &self, + block_root: Hash256, + slot: Slot, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + // Attributes fault to the specific peer that sent an invalid column + let kzg_verified_columns = + KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + + // Filter out columns that aren't required for custody for this slot + // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ + // not be yet effective for data availability check, as CGC changes are only effecive from + // a new epoch. + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let verified_custody_columns = kzg_verified_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, verified_custody_columns) + } + + /// Check if we've cached other data columns for this payload. If it satisfies the custody requirement and we also + /// have a payload cached, return the `Availability` variant triggering payload import. + /// Otherwise cache the data column sidecar. + /// + /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + #[instrument(skip_all, level = "trace")] + fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + slot: Slot, + data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let custody_columns = data_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, custody_columns) + } + + #[instrument(skip_all, level = "trace")] + fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_kzg_verified_data_columns(block_root, custody_columns) + } + + #[instrument(skip_all, level = "debug")] + fn reconstruct_data_columns( + &self, + block_root: &Hash256, + ) -> Result, AvailabilityCheckError> { + let verified_data_columns = match self + .availability_cache + .check_and_set_reconstruction_started(block_root) + { + ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, + ReconstructColumnsDecision::No(reason) => { + return Ok(DataColumnReconstructionResult::NotStarted(reason)); + } + }; + + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + &self.kzg, + &verified_data_columns, + &self.spec, + ) + .map_err(|e| { + error!( + ?block_root, + error = ?e, + "Error reconstructing data columns" + ); + self.availability_cache + .handle_reconstruction_failure(block_root); + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); + AvailabilityCheckError::ReconstructColumnsError(e) + })?; + + // Check indices from cache again to make sure we don't publish components we've already received. + let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else { + return Err(AvailabilityCheckError::Unexpected( + "block no longer exists in the data availability checker".to_string(), + )); + }; + + let Some(slot) = all_data_columns.first().map(|d| d.as_data_column().slot()) else { + return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( + "No new columns to import and publish", + )); + }; + + let columns_to_sample = self + .custody_context() + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + + // We only need to import and publish columns that we need to sample + // and columns that we haven't already received + let data_columns_to_import_and_publish = all_data_columns + .into_iter() + .filter(|d| { + columns_to_sample.contains(&d.index()) + && !existing_column_indices.contains(&d.index()) + }) + .collect::>(); + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_import_and_publish.len() as u64, + ); + + debug!( + count = data_columns_to_import_and_publish.len(), + ?block_root, + %slot, + "Reconstructed columns" + ); + + self.availability_cache + .put_kzg_verified_data_columns(*block_root, data_columns_to_import_and_publish.clone()) + .map(|availability| { + DataColumnReconstructionResult::Success(( + availability, + data_columns_to_import_and_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + )) + }) + } +} + +impl DataAvailabilityChecker { + pub fn new( + complete_blob_backfill: bool, + slot_clock: T::SlotClock, + kzg: Arc, + store: BeaconStore, + custody_context: Arc>, + spec: Arc, + ) -> Result { + let inner = DataAvailabilityCheckerInner::new( + OVERFLOW_LRU_CAPACITY_NON_ZERO, + store, + custody_context.clone(), + spec.clone(), + )?; + Ok(Self { + complete_blob_backfill, + availability_cache: Arc::new(inner), + slot_clock, + kzg, + custody_context, + spec, + }) + } + + pub fn custody_context(&self) -> &Arc> { + &self.custody_context + } + + /// Checks if the payload associated with the given block root is currently in the availability cache awaiting import because + /// of missing components. + /// + /// Returns the cached payload wrapped in a `PayloadProcessStatus` enum if it exists. + pub fn get_cached_payload( + &self, + block_root: &Hash256, + ) -> Option> { + self.availability_cache.get_cached_payload(block_root) + } + + /// Check if we have all required columns for a payload. Returns `Availability` which has information + /// about whether all components have been received or more are required. + pub fn put_executed_payload( + &self, + executed_payload: AvailabilityPendingExecutedPayload, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_executed_payload(executed_payload) + } + + /// Inserts a pre-execution payload into the cache. + /// This does NOT override an existing executed payload. + pub fn put_pre_execution_payload( + &self, + block_root: Hash256, + payload: Arc>, + source: BlockImportSource, + ) -> Result<(), AvailabilityCheckError> { + self.availability_cache + .put_pre_execution_payload(block_root, payload, source) + } + + /// Removes a pre-execution payload from the cache. + /// This does NOT remove an existing executed payload. + pub fn remove_payload_on_execution_error(&self, block_root: &Hash256) { + self.availability_cache + .remove_pre_execution_payload(block_root); + } + + /// Verifies kzg commitments for an `AvailableBlock`.` + /// + /// WARNING: This function assumes all required blobs are already present, it does NOT + /// check if there are any missing blobs. + pub fn verify_kzg_for_available_payload( + &self, + available_payload: &AvailablePayload, + ) -> Result<(), AvailabilityCheckError> { + let block_data_required = self + .custody_context + .data_columns_required_for_payload(&available_payload.payload); + match available_payload.data() { + AvailablePayloadData::NoData => { + if block_data_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + AvailablePayloadData::DataColumns(data_columns) => { + verify_kzg_for_data_column_list(data_columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + } + } + + Ok(()) + } + + /// Performs batch kzg verification for a vector of `AvailablePayloads`. This is more efficient than + /// calling `verify_kzg_for_available_block` in a loop. + /// + /// WARNING: This function assumes all required blobs are already present, it does NOT + /// check if there are any missing blobs. + #[instrument(skip_all)] + pub fn batch_verify_kzg_for_available_payloads( + &self, + available_payloads: &Vec>, + ) -> Result<(), AvailabilityCheckError> { + let all_data_columns = available_payloads + .iter() + .filter(|available_payload| { + self.custody_context + .data_columns_required_for_payload(&available_payload.payload) + }) + // this clone is cheap as it's cloning an Arc + .filter_map(|available_payload| available_payload.column_data.data_columns()) + .flatten() + .collect::>(); + + for available_payload in available_payloads { + let payload_data_required = self + .custody_context + .data_columns_required_for_payload(&available_payload.payload); + if let AvailablePayloadData::NoData = available_payload.data() + && payload_data_required + { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + + // verify kzg for all data columns at once + if !all_data_columns.is_empty() { + // Attributes fault to the specific peer that sent an invalid column + verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + } + + Ok(()) + } + + /// Collects metrics from the data availability checker. + pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { + DataAvailabilityCheckerMetrics { + state_cache_size: self.availability_cache.state_cache_size(), + payload_cache_size: self.availability_cache.payload_cache_size(), + } + } +} + +/// Helper struct to group data availability checker metrics. +pub struct DataAvailabilityCheckerMetrics { + pub state_cache_size: usize, + pub payload_cache_size: usize, +} + +pub fn start_availability_cache_maintenance_service( + executor: TaskExecutor, + chain: Arc>, +) { + if chain.spec.gloas_fork_epoch.is_some() { + let overflow_cache = chain + .data_availability_checker + .v2() + .availability_cache + .clone(); + executor.spawn( + async move { availability_cache_maintenance_service(chain, overflow_cache).await }, + "availability_cache_service", + ); + } else { + debug!("Gloas fork not configured, not starting availability cache maintenance service"); + } + // TODO(gloas) + // this cache only needs to be maintained if deneb is configured + // if chain.spec.deneb_fork_epoch.is_some() { + // let overflow_cache = chain.data_availability_checker.availability_cache.clone(); + // executor.spawn( + // async move { availability_cache_maintenance_service(chain, overflow_cache).await }, + // "availability_cache_service", + // ); + // } else { + // debug!("Deneb fork not configured, not starting availability cache maintenance service"); + // } +} + +async fn availability_cache_maintenance_service( + chain: Arc>, + overflow_cache: Arc>, +) { + let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32; + loop { + match chain + .slot_clock + .duration_to_next_epoch(T::EthSpec::slots_per_epoch()) + { + Some(duration) => { + // this service should run 3/4 of the way through the epoch + let additional_delay = (epoch_duration * 3) / 4; + tokio::time::sleep(duration + additional_delay).await; + + let Some(gloas_fork_epoch) = chain.spec.gloas_fork_epoch else { + // shutdown service if gloas fork epoch not set + break; + }; + + debug!("Availability cache maintenance service firing"); + let Some(current_epoch) = chain + .slot_clock + .now() + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) + else { + continue; + }; + + if current_epoch < gloas_fork_epoch { + // we are not in gloas yet + continue; + } + + let finalized_epoch = chain + .canonical_head + .fork_choice_read_lock() + .finalized_checkpoint() + .epoch; + + let Some(min_epochs_for_blobs) = chain + .spec + .min_epoch_data_availability_boundary(current_epoch) + else { + // Shutdown service if deneb fork epoch not set. Unreachable as the same check is performed above. + break; + }; + + // any data belonging to an epoch before this should be pruned + let cutoff_epoch = std::cmp::max(finalized_epoch + 1, min_epochs_for_blobs); + + if let Err(e) = overflow_cache.do_maintenance(cutoff_epoch) { + error!(error = ?e,"Failed to maintain availability cache"); + } + } + None => { + error!("Failed to read slot clock"); + // If we can't read the slot clock, just wait another slot. + tokio::time::sleep(chain.slot_clock.slot_duration()).await; + } + }; + } +} + +#[derive(Debug, Clone)] +// TODO(gloas) Move this to `payload_verification_types.rs` +pub enum AvailablePayloadData { + /// Payload has zero blobs + NoData, + /// Payload has more than zero blobs + DataColumns(DataColumnSidecarList), +} + +impl AvailablePayloadData { + pub fn new_with_data_columns(columns: DataColumnSidecarList) -> Self { + if columns.is_empty() { + Self::NoData + } else { + Self::DataColumns(columns) + } + } + + pub fn data_columns(&self) -> Option> { + match self { + AvailablePayloadData::NoData => None, + AvailablePayloadData::DataColumns(data_columns) => Some(data_columns.clone()), + } + } + + pub fn data_columns_len(&self) -> usize { + if let Some(data_columns) = self.data_columns() { + data_columns.len() + } else { + 0 + } + } +} + +/// A fully available payload that is ready to be imported into fork choice. +#[derive(Debug, Clone, Educe)] +#[educe(Hash(bound(E: EthSpec)))] +pub struct AvailablePayload { + block_root: Hash256, + block: Arc>, + payload: Arc>, + #[educe(Hash(ignore))] + column_data: AvailablePayloadData, + #[educe(Hash(ignore))] + /// Timestamp at which this payload first became available (UNIX timestamp, time since 1970). + payload_available_timestamp: Option, + #[educe(Hash(ignore))] + pub spec: Arc, +} + +impl AvailablePayload { + /// Constructs an `AvailablePayload` from a payload and optional data. + /// - If `column_data` is `DataColumns`, constructs `AvailablePayload` variant after column validation. + /// - If `column_data` is `NoData`, constructs `AvailablePayload` after verifying that the payload is not expecting columns. + /// Returns `AvailabilityCheckError` if: + /// - `column_data` contains data not required by the block + /// - Required `column_data` is missing + /// - Blob count doesn't match expected + /// - Custody columns are incomplete + pub fn new( + payload: Arc>, + block: Arc>, + column_data: AvailablePayloadData, + da_checker: &DataAvailabilityChecker, + spec: Arc, + ) -> Result + where + T: BeaconChainTypes, + { + // Ensure payload availability + let columns_required = da_checker + .custody_context() + .data_columns_required_for_payload(&payload); + + match &column_data { + AvailablePayloadData::NoData => { + if columns_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + AvailablePayloadData::DataColumns(data_columns) => { + if !columns_required { + // TODO(gloas) potential refactor here + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + + let mut column_indices = da_checker + .custody_context + .custody_columns_for_epoch( + Some(payload.message.slot.epoch(T::EthSpec::slots_per_epoch())), + &spec, + ) + .iter() + .collect::>(); + + for data_column in data_columns { + column_indices.remove(&data_column.index); + } + + if !column_indices.is_empty() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + } + + Ok(Self { + block_root: payload.message.beacon_block_root, + block, + payload, + column_data, + payload_available_timestamp: None, + spec: spec.clone(), + }) + } + + pub fn payload(&self) -> &SignedExecutionPayloadEnvelope { + &self.payload + } + pub fn payload_cloned(&self) -> Arc> { + self.payload.clone() + } + + pub fn payload_available_timestamp(&self) -> Option { + self.payload_available_timestamp + } + + pub fn data(&self) -> &AvailablePayloadData { + &self.column_data + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + + #[allow(clippy::type_complexity)] + pub fn deconstruct( + self, + ) -> ( + Hash256, + Arc>, + AvailablePayloadData, + ) { + let AvailablePayload { + block_root, + payload, + column_data, + .. + } = self; + (block_root, payload, column_data) + } + + /// Only used for testing + pub fn __clone_without_recv(&self) -> Self { + Self { + block_root: self.block_root, + payload: self.payload.clone(), + block: self.block.clone(), + column_data: match &self.column_data { + AvailablePayloadData::NoData => AvailablePayloadData::NoData, + AvailablePayloadData::DataColumns(data_columns) => { + AvailablePayloadData::DataColumns(data_columns.clone()) + } + }, + payload_available_timestamp: self.payload_available_timestamp, + spec: self.spec.clone(), + } + } +} + +#[derive(Debug)] +pub enum MaybeAvailablePayload { + /// This payload is fully available. + Available(AvailablePayload), + /// This variant is not fully available and requires blobs to become fully available. + AvailabilityPending { + block_root: Hash256, + payload: Arc>, + }, +} + +impl MaybeAvailablePayload { + pub fn block_cloned(&self) -> Arc> { + match self { + Self::Available(payload) => payload.payload_cloned(), + Self::AvailabilityPending { payload, .. } => payload.clone(), + } + } +} + +// #[cfg(test)] +// mod test { +// use super::*; +// use crate::CustodyContext; +// use crate::block_verification_types::RpcBlock; +// use crate::custody_context::NodeCustodyType; +// use crate::data_column_verification::CustodyDataColumn; +// use crate::test_utils::{ +// EphemeralHarnessType, NumBlobs, generate_data_column_indices_rand_order, +// generate_rand_block_and_data_columns, get_kzg, +// }; +// use rand::SeedableRng; +// use rand::prelude::StdRng; +// use slot_clock::{SlotClock, TestingSlotClock}; +// use std::collections::HashSet; +// use std::sync::Arc; +// use std::time::Duration; +// use store::HotColdDB; +// use types::data::DataColumn; +// use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot}; + +// type E = MainnetEthSpec; +// type T = EphemeralHarnessType; + +// /// Test to verify any extra RPC columns received that are not part of the "effective" CGC for +// /// the slot are excluded from import. +// #[test] +// fn should_exclude_rpc_columns_not_required_for_sampling() { +// // SETUP +// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); +// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + +// let da_checker = new_da_checker(spec.clone()); +// let custody_context = &da_checker.custody_context; + +// // GIVEN a single 32 ETH validator is attached slot 0 +// let epoch = Epoch::new(0); +// let validator_0 = 0; +// custody_context.register_validators( +// vec![(validator_0, 32_000_000_000)], +// epoch.start_slot(E::slots_per_epoch()), +// &spec, +// ); +// assert_eq!( +// custody_context.num_of_data_columns_to_sample(epoch, &spec), +// spec.validator_custody_requirement as usize, +// "sampling size should be the minimal custody requirement == 8" +// ); + +// // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch +// let validator_1 = 1; +// let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); +// custody_context.register_validators( +// vec![(validator_1, 32_000_000_000 * 9)], +// cgc_change_slot, +// &spec, +// ); +// // AND custody columns (8) and any new extra columns (2) are received via RPC responses. +// // NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown. +// let (_, data_columns) = generate_rand_block_and_data_columns::( +// ForkName::Fulu, +// NumBlobs::Number(1), +// &mut rng, +// &spec, +// ); +// let block_root = Hash256::random(); +// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); +// let requested_columns = &custody_columns[..10]; +// da_checker +// .put_rpc_custody_columns( +// block_root, +// cgc_change_slot, +// data_columns +// .into_iter() +// .filter(|d| requested_columns.contains(&d.index)) +// .collect(), +// ) +// .expect("should put rpc custody columns"); + +// // THEN the sampling size for the end slot of the same epoch remains unchanged +// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); +// assert_eq!( +// sampling_columns.len(), +// spec.validator_custody_requirement as usize // 8 +// ); +// // AND any extra columns received via RPC responses are excluded from import. +// let actual_cached: HashSet = da_checker +// .cached_data_column_indexes(&block_root) +// .expect("should have cached data columns") +// .into_iter() +// .collect(); +// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); +// assert_eq!( +// actual_cached, expected_sampling_columns, +// "should cache only the effective sampling columns" +// ); +// assert!( +// actual_cached.len() < requested_columns.len(), +// "extra columns should be excluded" +// ) +// } + +// /// Test to verify any extra gossip columns received that are not part of the "effective" CGC for +// /// the slot are excluded from import. +// #[test] +// fn should_exclude_gossip_columns_not_required_for_sampling() { +// // SETUP +// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); +// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + +// let da_checker = new_da_checker(spec.clone()); +// let custody_context = &da_checker.custody_context; + +// // GIVEN a single 32 ETH validator is attached slot 0 +// let epoch = Epoch::new(0); +// let validator_0 = 0; +// custody_context.register_validators( +// vec![(validator_0, 32_000_000_000)], +// epoch.start_slot(E::slots_per_epoch()), +// &spec, +// ); +// assert_eq!( +// custody_context.num_of_data_columns_to_sample(epoch, &spec), +// spec.validator_custody_requirement as usize, +// "sampling size should be the minimal custody requirement == 8" +// ); + +// // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch +// let validator_1 = 1; +// let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); +// custody_context.register_validators( +// vec![(validator_1, 32_000_000_000 * 9)], +// cgc_change_slot, +// &spec, +// ); +// // AND custody columns (8) and any new extra columns (2) are received via gossip. +// // NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to +// // arrive via gossip. +// let (_, data_columns) = generate_rand_block_and_data_columns::( +// ForkName::Fulu, +// NumBlobs::Number(1), +// &mut rng, +// &spec, +// ); +// let block_root = Hash256::random(); +// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); +// let requested_columns = &custody_columns[..10]; +// let gossip_columns = data_columns +// .into_iter() +// .filter(|d| requested_columns.contains(&d.index)) +// .map(GossipVerifiedDataColumn::::__new_for_testing) +// .collect::>(); +// da_checker +// .put_gossip_verified_data_columns(block_root, cgc_change_slot, gossip_columns) +// .expect("should put gossip custody columns"); + +// // THEN the sampling size for the end slot of the same epoch remains unchanged +// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); +// assert_eq!( +// sampling_columns.len(), +// spec.validator_custody_requirement as usize // 8 +// ); +// // AND any extra columns received via gossip responses are excluded from import. +// let actual_cached: HashSet = da_checker +// .cached_data_column_indexes(&block_root) +// .expect("should have cached data columns") +// .into_iter() +// .collect(); +// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); +// assert_eq!( +// actual_cached, expected_sampling_columns, +// "should cache only the effective sampling columns" +// ); +// assert!( +// actual_cached.len() < requested_columns.len(), +// "extra columns should be excluded" +// ) +// } + +// /// Regression test for KZG verification truncation bug (https://github.com/sigp/lighthouse/pull/7927) +// #[test] +// fn verify_kzg_for_rpc_blocks_should_not_truncate_data_columns() { +// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); +// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); +// let da_checker = new_da_checker(spec.clone()); + +// // GIVEN multiple RPC blocks with data columns totalling more than 128 +// let blocks_with_columns = (0..2) +// .map(|index| { +// let (block, data_columns) = generate_rand_block_and_data_columns::( +// ForkName::Fulu, +// NumBlobs::Number(1), +// &mut rng, +// &spec, +// ); + +// let custody_columns = if index == 0 { +// // 128 valid data columns in the first block +// data_columns +// } else { +// // invalid data columns in the second block +// data_columns +// .into_iter() +// .map(|d| { +// let invalid_sidecar = DataColumnSidecar { +// column: DataColumn::::empty(), +// ..d.as_ref().clone() +// }; +// CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar)) +// .as_data_column() +// .clone() +// }) +// .collect::>() +// }; + +// let block_data = AvailableBlockData::new_with_data_columns(custody_columns); +// let da_checker = Arc::new(new_da_checker(spec.clone())); +// RpcBlock::new(Arc::new(block), Some(block_data), &da_checker, spec.clone()) +// .expect("should create RPC block with custody columns") +// }) +// .collect::>(); + +// let available_blocks = blocks_with_columns +// .iter() +// .filter_map(|block| match block { +// RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), +// RpcBlock::BlockOnly { .. } => None, +// }) +// .collect::>(); + +// // WHEN verifying all blocks together (totalling 256 data columns) +// let verification_result = +// da_checker.batch_verify_kzg_for_available_blocks(&available_blocks); + +// // THEN batch block verification should fail due to 128 invalid columns in the second block +// verification_result.expect_err("should have failed to verify blocks"); +// } + +// #[test] +// fn should_exclude_reconstructed_columns_not_required_for_sampling() { +// // SETUP +// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); +// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + +// let da_checker = new_da_checker(spec.clone()); +// let custody_context = &da_checker.custody_context; + +// // Set custody requirement to 65 columns (enough to trigger reconstruction) +// let epoch = Epoch::new(1); +// custody_context.register_validators( +// vec![(0, 2_048_000_000_000), (1, 32_000_000_000)], // 64 + 1 +// Slot::new(0), +// &spec, +// ); +// let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec); +// assert_eq!( +// sampling_requirement, 65, +// "sampling requirement should be 65" +// ); + +// let (block, data_columns) = generate_rand_block_and_data_columns::( +// ForkName::Fulu, +// NumBlobs::Number(1), +// &mut rng, +// &spec, +// ); +// let block_root = Hash256::random(); +// // Add the block to the DA checker +// da_checker +// .availability_cache +// .put_pre_execution_block(block_root, Arc::new(block), BlockImportSource::Gossip) +// .expect("should put block"); + +// // Add 64 columns to the da checker (enough to be able to reconstruct) +// // Order by all_column_indices_ordered, then take first 64 +// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); +// let custody_columns = custody_columns +// .iter() +// .filter_map(|&col_idx| data_columns.iter().find(|d| d.index == col_idx).cloned()) +// .take(64) +// .map(|d| { +// KzgVerifiedCustodyDataColumn::from_asserted_custody( +// KzgVerifiedDataColumn::__new_for_testing(d), +// ) +// }) +// .collect::>(); + +// da_checker +// .availability_cache +// .put_kzg_verified_data_columns(block_root, custody_columns) +// .expect("should put custody columns"); + +// // Try reconstrucing +// let reconstruction_result = da_checker +// .reconstruct_data_columns(&block_root) +// .expect("should reconstruct columns"); + +// // Reconstruction should succeed +// let (_availability, reconstructed_columns) = match reconstruction_result { +// DataColumnReconstructionResult::Success(result) => result, +// e => { +// panic!("Expected successful reconstruction {:?}", e); +// } +// }; + +// // Remaining 64 columns should be reconstructed +// assert_eq!( +// reconstructed_columns.len(), +// sampling_requirement - spec.number_of_custody_groups as usize / 2, +// "should reconstruct the remaining 1 columns" +// ); + +// // Only the columns required for custody (65) should be imported into the cache +// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); +// let actual_cached: HashSet = da_checker +// .cached_data_column_indexes(&block_root) +// .expect("should have cached data columns") +// .into_iter() +// .collect(); +// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); +// assert_eq!( +// actual_cached, expected_sampling_columns, +// "should cache only the required custody columns, not all reconstructed columns" +// ); +// } + +// fn new_da_checker(spec: Arc) -> DataAvailabilityChecker { +// let slot_clock = TestingSlotClock::new( +// Slot::new(0), +// Duration::from_secs(0), +// Duration::from_secs(spec.seconds_per_slot), +// ); +// let kzg = get_kzg(&spec); +// let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap()); +// let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); +// let custody_context = Arc::new(CustodyContext::new( +// NodeCustodyType::Fullnode, +// ordered_custody_column_indices, +// &spec, +// )); +// let complete_blob_backfill = false; +// DataAvailabilityChecker::new( +// complete_blob_backfill, +// slot_clock, +// kzg, +// store, +// custody_context, +// spec, +// ) +// .expect("should initialise data availability checker") +// } +// } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs new file mode 100644 index 0000000000..aace5c91ba --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs @@ -0,0 +1,1306 @@ +use super::state_lru_cache::{DietAvailabilityPendingExecutedPayload, StateLRUCache}; +use crate::BeaconChainTypes; +use crate::CustodyContext; +use crate::beacon_chain::BeaconStore; +use crate::data_availability_checker::AvailabilityCheckError; +use crate::data_availability_checker_v2::{Availability, AvailablePayload, AvailablePayloadData}; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::payload_verification_types::PayloadProcessStatus; +use crate::payload_verification_types::{ + AvailabilityPendingExecutedPayload, AvailableExecutedPayload, +}; +use lru::LruCache; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::cmp::Ordering; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tracing::{Span, debug, debug_span}; +use types::kzg_ext::KzgCommitments; +use types::{ + BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, + EthSpec, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, +}; + +#[derive(Clone)] +pub enum CachedPayload { + PreExecution(Arc>, BlockImportSource), + Executed(Box>), +} + +impl CachedPayload { + pub fn get_commitments(&self) -> KzgCommitments { + let payload = self.as_payload(); + payload.message.blob_kzg_commitments.clone() + } + + fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { + match self { + CachedPayload::PreExecution(p, _) => p, + CachedPayload::Executed(p) => p.as_payload(), + } + } + + pub fn num_blobs_expected(&self) -> usize { + self.as_payload().message.blob_kzg_commitments.len() + } +} + +/// This represents the components of a partially available payload +/// +/// The columns are all gossip and kzg verified. +/// The payload has completed all verifications except the availability check. +pub struct PendingComponents { + pub block_root: Hash256, + pub block: Option>>, + pub verified_data_columns: Vec>, + pub payload: Option>, + pub reconstruction_started: bool, + span: Span, +} + +impl PendingComponents { + #[cfg(test)] + fn get_diet_payload(&self) -> Option<&DietAvailabilityPendingExecutedPayload> { + self.payload.as_ref().and_then(|payload| match payload { + CachedPayload::Executed(payload) => Some(payload.as_ref()), + _ => None, + }) + } + + /// Returns an immutable reference to the cached data column. + pub fn get_cached_data_column( + &self, + data_column_index: u64, + ) -> Option>> { + self.verified_data_columns + .iter() + .find(|d| d.index() == data_column_index) + .map(|d| d.clone_arc()) + } + + /// Returns the indices of cached custody columns + pub fn get_cached_data_columns_indices(&self) -> Vec { + self.verified_data_columns + .iter() + .map(|d| d.index()) + .collect() + } + + /// Inserts an executed payload into the cache. + pub fn insert_executed_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload) { + self.payload = Some(CachedPayload::Executed(Box::new(payload))) + } + + /// Inserts a pre-execution payload into the cache. + /// This does NOT override an existing executed payload. + pub fn insert_pre_execution_payload( + &mut self, + payload: Arc>, + source: BlockImportSource, + ) { + if self.payload.is_none() { + self.payload = Some(CachedPayload::PreExecution(payload, source)) + } + } + + /// Merges a given set of data columns into the cache. + fn merge_data_columns>>( + &mut self, + kzg_verified_data_columns: I, + ) -> Result<(), AvailabilityCheckError> { + for data_column in kzg_verified_data_columns { + if self.get_cached_data_column(data_column.index()).is_none() { + self.verified_data_columns.push(data_column); + } + } + + Ok(()) + } + + /// Inserts a new payload. + pub fn merge_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload) { + self.insert_executed_payload(payload); + } + + /// Returns Some if the payload has received all its required data for import. The return value + /// must be persisted in the DB along with the block. + /// + /// WARNING: This function can potentially take a lot of time if the state needs to be + /// reconstructed from disk. Ensure you are not holding any write locks while calling this. + pub fn make_available( + &self, + spec: &Arc, + num_expected_columns: usize, + recover: R, + ) -> Result>, AvailabilityCheckError> + where + R: FnOnce( + DietAvailabilityPendingExecutedPayload, + &Span, + ) -> Result, AvailabilityCheckError>, + { + let Some(CachedPayload::Executed(payload)) = &self.payload else { + // Payload not available yet + return Ok(None); + }; + + let num_expected_blobs = payload.num_blobs_expected(); + let column_data = if num_expected_blobs == 0 { + Some(AvailablePayloadData::NoData) + } else { + let num_received_columns = self.verified_data_columns.len(); + match num_received_columns.cmp(&num_expected_columns) { + Ordering::Greater => { + // Should never happen + return Err(AvailabilityCheckError::Unexpected(format!( + "too many columns got {num_received_columns} expected {num_expected_columns}" + ))); + } + Ordering::Equal => { + // Block is post-peerdas, and we got enough columns + let data_columns = self + .verified_data_columns + .iter() + .map(|d| d.clone().into_inner()) + .collect::>(); + Some(AvailablePayloadData::DataColumns(data_columns)) + } + Ordering::Less => { + // Not enough data columns received yet + None + } + } + }; + + // Payload's data not available yet + let Some(column_data) = column_data else { + return Ok(None); + }; + + let Some(block) = self.block.clone() else { + // This should never happen + return Err(AvailabilityCheckError::Unexpected(format!( + "Payload is being made available but no block exists" + ))); + }; + + // Payload is available, construct `AvailableExecutedPayload` + + let payload_available_timestamp = match column_data { + AvailablePayloadData::NoData => None, + // TODO(gloas): fix with https://github.com/sigp/lighthouse/issues/7477 + AvailablePayloadData::DataColumns(_) => None, + }; + + let AvailabilityPendingExecutedPayload { + payload, + import_data, + payload_verification_outcome, + } = recover(*payload.clone(), &self.span)?; + + let available_payload = AvailablePayload { + block_root: payload.message.beacon_block_root, + payload, + block, + column_data, + payload_available_timestamp, + spec: spec.clone(), + }; + + self.span.in_scope(|| { + debug!("Payload and all data components are available"); + }); + Ok(Some(AvailableExecutedPayload::new( + available_payload, + import_data, + payload_verification_outcome, + ))) + } + + /// Returns an empty `PendingComponents` object with the given block root. + pub fn empty(block_root: Hash256) -> Self { + let span = debug_span!(parent: None, "lh_pending_components", %block_root); + let _guard = span.clone().entered(); + Self { + block_root, + block: None, + verified_data_columns: vec![], + payload: None, + reconstruction_started: false, + span, + } + } + + /// Returns the epoch of: + /// - The payload if it is cached + /// Otherwise, returns None + pub fn epoch(&self) -> Option { + // Get epoch from cached block + if let Some(payload) = &self.payload { + return Some( + payload + .as_payload() + .message + .slot + .epoch(E::slots_per_epoch()), + ); + } + + // Or, get epoch from first data column + if let Some(data_column) = self.verified_data_columns.first() { + return Some(data_column.as_data_column().epoch()); + } + + None + } + + pub fn status_str(&self, num_expected_columns: usize) -> String { + let payload_count = if self.payload.is_some() { 1 } else { 0 }; + format!( + "payload {} data_columns {}/{}", + payload_count, + self.verified_data_columns.len(), + num_expected_columns + ) + } +} + +/// This is the main struct for this module. Outside methods should +/// interact with the cache through this. +pub struct DataAvailabilityCheckerInner { + /// Contains all the data we keep in memory, protected by an RwLock + critical: RwLock>>, + /// This cache holds a limited number of states in memory and reconstructs them + /// from disk when necessary. This is necessary until we merge tree-states + state_cache: StateLRUCache, + custody_context: Arc>, + spec: Arc, +} + +// This enum is only used internally within the crate in the reconstruction function to improve +// readability, so it's OK to not box the variant value, and it shouldn't impact memory much with +// the current usage, as it's deconstructed immediately. +#[allow(clippy::large_enum_variant)] +pub(crate) enum ReconstructColumnsDecision { + Yes(Vec>), + No(&'static str), +} + +impl DataAvailabilityCheckerInner { + pub fn new( + capacity: NonZeroUsize, + beacon_store: BeaconStore, + custody_context: Arc>, + spec: Arc, + ) -> Result { + Ok(Self { + critical: RwLock::new(LruCache::new(capacity)), + state_cache: StateLRUCache::new(beacon_store, spec.clone()), + custody_context, + spec, + }) + } + + /// Returns true if the payload with the given block root is known, without altering the LRU ordering + pub fn get_cached_payload( + &self, + block_root: &Hash256, + ) -> Option> { + self.critical + .read() + .peek(block_root) + .and_then(|pending_components| { + pending_components + .payload + .as_ref() + .map(|payload| match payload { + CachedPayload::PreExecution(p, source) => { + PayloadProcessStatus::NotValidated(p.clone(), *source) + } + CachedPayload::Executed(p) => { + PayloadProcessStatus::ExecutionValidated(p.payload_cloned()) + } + }) + }) + } + + /// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering + pub fn peek_data_columns( + &self, + block_root: Hash256, + ) -> Option> { + self.critical + .read() + .peek(&block_root) + .map(|pending_components| { + pending_components + .verified_data_columns + .iter() + .map(|col| col.clone_arc()) + .collect() + }) + } + + pub fn peek_pending_components>) -> R>( + &self, + block_root: &Hash256, + f: F, + ) -> R { + f(self.critical.read().peek(block_root)) + } + + #[allow(clippy::type_complexity)] + pub fn put_kzg_verified_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + kzg_verified_data_columns: I, + ) -> Result, AvailabilityCheckError> { + let mut kzg_verified_data_columns = kzg_verified_data_columns.into_iter().peekable(); + let Some(epoch) = kzg_verified_data_columns + .peek() + .map(|verified_blob| verified_blob.as_data_column().epoch()) + else { + // No columns are processed. This can occur if all received columns were filtered out + // before this point, e.g. due to a CGC change that caused extra columns to be downloaded + // // before the new CGC took effect. + // Return `Ok` without marking the block as available. + return Ok(Availability::MissingComponents(block_root)); + }; + + let pending_components = self + .update_or_insert_pending_components(block_root, |pending_components| { + pending_components.merge_data_columns(kzg_verified_data_columns) + })?; + + let num_expected_columns = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "data_columns", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability_and_cache_components( + block_root, + pending_components, + num_expected_columns, + ) + } + + fn check_availability_and_cache_components( + &self, + block_root: Hash256, + pending_components: MappedRwLockReadGuard<'_, PendingComponents>, + num_expected_columns: usize, + ) -> Result, AvailabilityCheckError> { + if let Some(available_payload) = pending_components.make_available( + &self.spec, + num_expected_columns, + |payload, span| { + self.state_cache + .recover_pending_executed_payload(payload, span) + }, + )? { + // Explicitly drop read lock before acquiring write lock + drop(pending_components); + if let Some(components) = self.critical.write().get_mut(&block_root) { + // Clean up span now that block is available + components.span = Span::none(); + } + + // We never remove the pending components manually to avoid race conditions. + // This ensures components remain available during and right after payload import, + // preventing a race condition where a component was removed after the payload was + // imported, but re-inserted immediately, causing partial pending components to be + // stored and served to peers. + // Components are only removed via LRU eviction as finality advances. + Ok(Availability::Available(Box::new(available_payload))) + } else { + Ok(Availability::MissingComponents(block_root)) + } + } + + /// Updates or inserts a new `PendingComponents` if it doesn't exist, and then apply the + /// `update_fn` while holding the write lock. + /// + /// Once the update is complete, the write lock is downgraded and a read guard with a + /// reference of the updated `PendingComponents` is returned. + fn update_or_insert_pending_components( + &self, + block_root: Hash256, + update_fn: F, + ) -> Result>, AvailabilityCheckError> + where + F: FnOnce(&mut PendingComponents) -> Result<(), AvailabilityCheckError>, + { + let mut write_lock = self.critical.write(); + + { + let pending_components = + write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root)); + update_fn(pending_components)? + } + + RwLockReadGuard::try_map(RwLockWriteGuard::downgrade(write_lock), |cache| { + cache.peek(&block_root) + }) + .map_err(|_| { + AvailabilityCheckError::Unexpected("pending components should exist".to_string()) + }) + } + + /// Check whether data column reconstruction should be attempted. + /// + /// Potentially trigger reconstruction if all the following satisfy: + /// - Our custody requirement is more than 50% of total columns, + /// - We haven't received all required columns + /// - Reconstruction hasn't been started for the block + /// + /// If reconstruction is required, returns `PendingComponents` which contains the + /// components to be used as inputs to reconstruction, otherwise returns a `reason`. + pub fn check_and_set_reconstruction_started( + &self, + block_root: &Hash256, + ) -> ReconstructColumnsDecision { + let mut write_lock = self.critical.write(); + let Some(pending_components) = write_lock.get_mut(block_root) else { + // Block may have been imported as it does not exist in availability cache. + return ReconstructColumnsDecision::No("block already imported"); + }; + + let Some(epoch) = pending_components + .verified_data_columns + .first() + .map(|c| c.as_data_column().epoch()) + else { + return ReconstructColumnsDecision::No("not enough columns"); + }; + + let total_column_count = T::EthSpec::number_of_columns(); + let sampling_column_count = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + let received_column_count = pending_components.verified_data_columns.len(); + + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } + if received_column_count >= sampling_column_count { + return ReconstructColumnsDecision::No("all sampling columns received"); + } + if received_column_count < total_column_count / 2 { + return ReconstructColumnsDecision::No("not enough columns"); + } + + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone()) + } + + /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. + /// In this case, we remove all data columns in `PendingComponents`, reset reconstruction + /// status so that we can attempt to retrieve columns from peers again. + pub fn handle_reconstruction_failure(&self, block_root: &Hash256) { + if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) { + pending_components_mut.verified_data_columns = vec![]; + pending_components_mut.reconstruction_started = false; + } + } + + /// Inserts a pre executed payload into the cache. + /// - This does NOT trigger the availability check as the payload still needs to be executed. + /// - This does NOT override an existing cached payload to avoid overwriting an executed payload. + pub fn put_pre_execution_payload( + &self, + block_root: Hash256, + payload: Arc>, + source: BlockImportSource, + ) -> Result<(), AvailabilityCheckError> { + let epoch = payload.message.slot.epoch(T::EthSpec::slots_per_epoch()); + let pending_components = + self.update_or_insert_pending_components(block_root, |pending_components| { + pending_components.insert_pre_execution_payload(payload, source); + Ok(()) + })?; + + let num_expected_columns_opt = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "pre execution payload", + status = pending_components.status_str(num_expected_columns_opt), + "Component added to data availability checker" + ); + }); + + Ok(()) + } + + /// Removes a pre-execution payload from the cache. + /// This does NOT remove an existing executed payload. + pub fn remove_pre_execution_payload(&self, block_root: &Hash256) { + // The read lock is immediately dropped so we can safely remove the block from the cache. + if let Some(PayloadProcessStatus::NotValidated(_, _)) = self.get_cached_payload(block_root) + { + self.critical.write().pop(block_root); + } + } + + /// Check if we have all the columns for a payload. If we do, return the Availability variant that + /// triggers import of the payload. + pub fn put_executed_payload( + &self, + executed_payload: AvailabilityPendingExecutedPayload, + ) -> Result, AvailabilityCheckError> { + let epoch = executed_payload + .as_payload() + .message + .slot + .epoch(T::EthSpec::slots_per_epoch()); + let block_root = executed_payload.payload.message.beacon_block_root; + + // register the payload to get the diet block + let diet_executed_payload = self + .state_cache + .register_pending_executed_payload(executed_payload); + + let pending_components = + self.update_or_insert_pending_components(block_root, |pending_components| { + pending_components.merge_payload(diet_executed_payload); + Ok(()) + })?; + + let num_expected_columns = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "payload", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability_and_cache_components( + block_root, + pending_components, + num_expected_columns, + ) + } + + fn get_num_expected_columns(&self, epoch: Epoch) -> usize { + self.custody_context + .num_of_data_columns_to_sample(epoch, &self.spec) + } + + /// maintain the cache + pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { + // clean up any lingering states in the state cache + self.state_cache.do_maintenance(cutoff_epoch); + + // Collect keys of pending payloads from a previous epoch to cutoff + let mut write_lock = self.critical.write(); + let mut keys_to_remove = vec![]; + for (key, value) in write_lock.iter() { + if let Some(epoch) = value.epoch() + && epoch < cutoff_epoch + { + keys_to_remove.push(*key); + } + } + // Now remove keys + for key in keys_to_remove { + write_lock.pop(&key); + } + + Ok(()) + } + + #[cfg(test)] + /// get the state cache for inspection (used only for tests) + pub fn state_lru_cache(&self) -> &StateLRUCache { + &self.state_cache + } + + /// Number of states stored in memory in the cache. + pub fn state_cache_size(&self) -> usize { + self.state_cache.lru_cache().read().len() + } + + /// Number of pending component entries in memory in the cache. + pub fn payload_cache_size(&self) -> usize { + self.critical.read().len() + } +} + +#[cfg(test)] +mod test { + use super::*; + + use crate::test_utils::generate_data_column_indices_rand_order; + use crate::{ + blob_verification::GossipVerifiedBlob, + block_verification::PayloadVerificationOutcome, + block_verification_types::{AsBlock, BlockImportData}, + custody_context::NodeCustodyType, + data_availability_checker::STATE_LRU_CAPACITY_NON_ZERO, + test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, + }; + use fork_choice::PayloadVerificationStatus; + use logging::create_test_tracing_subscriber; + use state_processing::ConsensusContext; + use std::collections::VecDeque; + use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; + use tempfile::{TempDir, tempdir}; + use tracing::{debug_span, info}; + use types::new_non_zero_usize; + use types::{ExecPayload, MinimalEthSpec}; + + const LOW_VALIDATOR_COUNT: usize = 32; + const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); + + fn get_store_with_spec( + db_path: &TempDir, + spec: Arc, + ) -> Arc, BeaconNodeBackend>> { + let hot_path = db_path.path().join("hot_db"); + let cold_path = db_path.path().join("cold_db"); + let blobs_path = db_path.path().join("blobs_db"); + let config = StoreConfig::default(); + + HotColdDB::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + config, + spec, + ) + .expect("disk store should initialize") + } + async fn get_gloas_chain( + db_path: &TempDir, + ) -> BeaconChainHarness> { + let altair_fork_epoch = Epoch::new(0); + let bellatrix_fork_epoch = Epoch::new(0); + let capella_fork_epoch = Epoch::new(0); + let deneb_fork_epoch = Epoch::new(0); + let electra_fork_epoch = Epoch::new(0); + let fulu_fork_epoch = Epoch::new(0); + let gloas_fork_epoch = Epoch::new(0); + + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(altair_fork_epoch); + spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); + spec.capella_fork_epoch = Some(capella_fork_epoch); + spec.deneb_fork_epoch = Some(deneb_fork_epoch); + spec.electra_fork_epoch = Some(electra_fork_epoch); + spec.fulu_fork_epoch = Some(fulu_fork_epoch); + spec.gloas_fork_epoch = Some(gloas_fork_epoch); + let spec = Arc::new(spec); + + let chain_store = get_store_with_spec::(db_path, spec.clone()); + let validators_keypairs = + types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .keypairs(validators_keypairs) + .fresh_disk_store(chain_store) + .mock_execution_layer() + .build(); + + // go to gloas slot + let gloas_fork_slot = gloas_fork_epoch.start_slot(E::slots_per_epoch()); + harness.extend_to_slot(gloas_fork_slot).await; + let gloas_head = &harness.chain.head_snapshot().beacon_block; + assert!(gloas_head.as_gloas().is_ok()); + assert_eq!(gloas_head.slot(), gloas_fork_slot); + assert!( + gloas_head + .message() + .body() + .execution_payload() + .is_err() + "Gloas block has no payload" + ); + harness + } + + async fn availability_pending_payload( + harness: &BeaconChainHarness>, + ) -> ( + AvailabilityPendingExecutedPayload, + Vec>>, + ) + where + E: EthSpec, + Hot: ItemStore, + Cold: ItemStore, + { + let chain = &harness.chain; + let head = chain.head_snapshot(); + let parent_state = head.beacon_state.clone(); + + let target_slot = chain.slot().expect("should get slot") + 1; + let parent_root = head.beacon_block_root; + let parent_block = chain + .get_blinded_block(&parent_root) + .expect("should get block") + .expect("should have block"); + + let (signed_beacon_block_hash, (block, maybe_blobs), state) = harness + .add_block_at_slot(target_slot, parent_state) + .await + .expect("should add block"); + let block_root = signed_beacon_block_hash.into(); + assert_eq!( + block_root, + block.canonical_root(), + "block root should match" + ); + + // log kzg commitments + info!("printing kzg commitments"); + for comm in Vec::from( + block + .message() + .body() + .blob_kzg_commitments() + .expect("should be deneb fork") + .clone(), + ) { + info!(commitment = ?comm, "kzg commitment"); + } + info!("done printing kzg commitments"); + + let gossip_verified_columns = if let Some((kzg_proofs, blobs)) = maybe_blobs { + let sidecars = + DataColumnSidecar::build_sidecars(blobs, &block, kzg_proofs, &chain.spec).unwrap(); + Vec::from(sidecars) + .into_iter() + .map(|sidecar| { + let subnet = sidecar.index; + GossipVerifiedDataColumn::new(sidecar, subnet, &harness.chain) + .expect("should validate column") + }) + .collect() + } else { + vec![] + }; + + let slot = block.slot(); + let consensus_context = ConsensusContext::::new(slot); + let import_data: BlockImportData = BlockImportData { + block_root, + state, + parent_block, + consensus_context, + }; + + let payload_verification_outcome = PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + is_valid_merge_transition_block: false, + }; + + let availability_pending_block = AvailabilityPendingExecutedBlock { + block, + import_data, + payload_verification_outcome, + }; + + (availability_pending_block, gossip_verified_blobs) + } + + async fn setup_harness_and_cache( + capacity: usize, + ) -> ( + BeaconChainHarness>, + Arc>, + TempDir, + ) + where + E: EthSpec, + T: BeaconChainTypes< + HotStore = BeaconNodeBackend, + ColdStore = BeaconNodeBackend, + EthSpec = E, + >, + { + create_test_tracing_subscriber(); + let chain_db_path = tempdir().expect("should get temp dir"); + let harness = get_deneb_chain(&chain_db_path).await; + let spec = harness.spec.clone(); + let test_store = harness.chain.store.clone(); + let capacity_non_zero = new_non_zero_usize(capacity); + let custody_context = Arc::new(CustodyContext::new( + NodeCustodyType::Fullnode, + generate_data_column_indices_rand_order::(), + &spec, + )); + let cache = Arc::new( + DataAvailabilityCheckerInner::::new( + capacity_non_zero, + test_store, + custody_context, + spec.clone(), + ) + .expect("should create cache"), + ); + (harness, cache, chain_db_path) + } + + #[tokio::test] + async fn overflow_cache_test_insert_components() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let (pending_block, blobs) = availability_pending_block(&harness).await; + let root = pending_block.import_data.block_root; + + let blobs_expected = pending_block.num_blobs_expected(); + assert_eq!( + blobs.len(), + blobs_expected, + "should have expected number of blobs" + ); + assert!(cache.critical.read().is_empty(), "cache should be empty"); + let availability = cache + .put_executed_block(pending_block) + .expect("should put block"); + if blobs_expected == 0 { + assert!( + matches!(availability, Availability::Available(_)), + "block doesn't have blobs, should be available" + ); + assert_eq!( + cache.critical.read().len(), + 1, + "cache should still have block as it hasn't been imported yet" + ); + } else { + assert!( + matches!(availability, Availability::MissingComponents(_)), + "should be pending blobs" + ); + assert_eq!( + cache.critical.read().len(), + 1, + "cache should have one block" + ); + assert!( + cache.critical.read().peek(&root).is_some(), + "newly inserted block should exist in memory" + ); + } + + let mut kzg_verified_blobs = Vec::new(); + for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { + kzg_verified_blobs.push(gossip_blob.into_inner()); + let availability = cache + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .expect("should put blob"); + if blob_index == blobs_expected - 1 { + assert!(matches!(availability, Availability::Available(_))); + } else { + assert!(matches!(availability, Availability::MissingComponents(_))); + assert_eq!(cache.critical.read().len(), 1); + } + } + + let (pending_block, blobs) = availability_pending_block(&harness).await; + let blobs_expected = pending_block.num_blobs_expected(); + assert_eq!( + blobs.len(), + blobs_expected, + "should have expected number of blobs" + ); + let root = pending_block.import_data.block_root; + let mut kzg_verified_blobs = vec![]; + for gossip_blob in blobs { + kzg_verified_blobs.push(gossip_blob.into_inner()); + let availability = cache + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .expect("should put blob"); + assert!( + matches!(availability, Availability::MissingComponents(_)), + "should be pending block" + ); + assert_eq!( + cache.critical.read().len(), + 2, + "cache should have two blocks now" + ); + } + let availability = cache + .put_executed_block(pending_block) + .expect("should put block"); + assert!( + matches!(availability, Availability::Available(_)), + "block should be available: {:?}", + availability + ); + assert!( + cache.critical.read().len() == 2, + "cache should still have available block" + ); + } + + #[tokio::test] + // ensure the state cache keeps memory usage low and that it can properly recover states + // THIS TEST CAN BE DELETED ONCE TREE STATES IS MERGED AND WE RIP OUT THE STATE CACHE + async fn overflow_cache_test_state_cache() { + type E = MinimalEthSpec; + type T = DiskHarnessType; + let capacity = STATE_LRU_CAPACITY * 2; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut pending_blocks = VecDeque::new(); + let mut states = Vec::new(); + let mut state_roots = Vec::new(); + // Get enough blocks to fill the cache to capacity, ensuring all blocks have blobs + while pending_blocks.len() < capacity { + let (mut pending_block, _) = availability_pending_block(&harness).await; + if pending_block.num_blobs_expected() == 0 { + // we need blocks with blobs + continue; + } + let state_root = pending_block.import_data.state.canonical_root().unwrap(); + states.push(pending_block.import_data.state.clone()); + pending_blocks.push_back(pending_block); + state_roots.push(state_root); + } + + let state_cache = cache.state_lru_cache().lru_cache(); + let mut pushed_diet_blocks = VecDeque::new(); + + for i in 0..capacity { + let pending_block = pending_blocks.pop_front().expect("should have block"); + let block_root = pending_block.as_block().canonical_root(); + + assert_eq!( + state_cache.read().len(), + std::cmp::min(i, STATE_LRU_CAPACITY), + "state cache should be empty at start" + ); + + if i >= STATE_LRU_CAPACITY { + let lru_root = state_roots[i - STATE_LRU_CAPACITY]; + assert_eq!( + state_cache.read().peek_lru().map(|(root, _)| root), + Some(&lru_root), + "lru block should be in cache" + ); + } + + // put the block in the cache + let availability = cache + .put_executed_block(pending_block) + .expect("should put block"); + + // grab the diet block from the cache for later testing + let diet_block = cache + .critical + .read() + .peek(&block_root) + .and_then(|pending_components| pending_components.get_diet_block().cloned()) + .expect("should exist"); + pushed_diet_blocks.push_back(diet_block); + + // should be unavailable since we made sure all blocks had blobs + assert!( + matches!(availability, Availability::MissingComponents(_)), + "should be pending blobs" + ); + + if i >= STATE_LRU_CAPACITY { + let evicted_index = i - STATE_LRU_CAPACITY; + let evicted_root = state_roots[evicted_index]; + assert!( + state_cache.read().peek(&evicted_root).is_none(), + "lru root should be evicted" + ); + // get the diet block via direct conversion (testing only) + let diet_block = pushed_diet_blocks.pop_front().expect("should have block"); + // reconstruct the pending block by replaying the block on the parent state + let recovered_pending_block = cache + .state_lru_cache() + .recover_pending_executed_block(diet_block, &debug_span!("test")) + .expect("should reconstruct pending block"); + + // assert the recovered state is the same as the original + assert_eq!( + recovered_pending_block.import_data.state, states[evicted_index], + "recovered state should be the same as the original" + ); + } + } + + // now check the last block + let last_block = pushed_diet_blocks.pop_back().expect("should exist").clone(); + // the state should still be in the cache + assert!( + state_cache + .read() + .peek(&last_block.as_block().state_root()) + .is_some(), + "last block state should still be in cache" + ); + // get the diet block via direct conversion (testing only) + let diet_block = last_block.clone(); + // recover the pending block from the cache + let recovered_pending_block = cache + .state_lru_cache() + .recover_pending_executed_block(diet_block, &debug_span!("test")) + .expect("should reconstruct pending block"); + // assert the recovered state is the same as the original + assert_eq!( + Some(&recovered_pending_block.import_data.state), + states.last(), + "recovered state should be the same as the original" + ); + } +} + +#[cfg(test)] +mod pending_components_tests { + use super::*; + use crate::PayloadVerificationOutcome; + use crate::payload_verification_types::PayloadImportData; + use crate::test_utils::{NumBlobs, generate_rand_block_and_blobs, test_spec}; + use fixed_bytes::FixedBytesExtended; + use fork_choice::PayloadVerificationStatus; + use kzg::KzgCommitment; + use rand::SeedableRng; + use rand::rngs::StdRng; + use state_processing::ConsensusContext; + use types::test_utils::TestRandom; + use types::{BeaconState, ForkName, MainnetEthSpec, SignedBeaconBlock, Slot}; + + type E = MainnetEthSpec; + + type Setup = ( + SignedBeaconBlock, + RuntimeFixedVector>>>, + RuntimeFixedVector>>>, + usize, + ); + + pub fn pre_setup() -> Setup { + let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); + let spec = test_spec::(); + let (block, blobs_vec) = + generate_rand_block_and_blobs::(ForkName::Deneb, NumBlobs::Random, &mut rng); + let max_len = spec.max_blobs_per_block(block.epoch()) as usize; + let mut blobs: RuntimeFixedVector>>> = + RuntimeFixedVector::default(max_len); + + for blob in blobs_vec { + if let Some(b) = blobs.get_mut(blob.index as usize) { + *b = Some(Arc::new(blob)); + } + } + + let mut invalid_blobs: RuntimeFixedVector>>> = + RuntimeFixedVector::default(max_len); + for (index, blob) in blobs.iter().enumerate() { + if let Some(invalid_blob) = blob { + let mut blob_copy = invalid_blob.as_ref().clone(); + blob_copy.kzg_commitment = KzgCommitment::random_for_test(&mut rng); + *invalid_blobs.get_mut(index).unwrap() = Some(Arc::new(blob_copy)); + } + } + + (block, blobs, invalid_blobs, max_len) + } + + type PendingComponentsSetup = ( + DietAvailabilityPendingExecutedBlock, + RuntimeFixedVector>>, + RuntimeFixedVector>>, + ); + + pub fn setup_pending_components( + payload: SignedExecutionPayloadEnvelope, + valid_columns: RuntimeFixedVector>>>, + invalid_columns: RuntimeFixedVector>>>, + ) -> PendingComponentsSetup { + let columns = RuntimeFixedVector::new( + valid_columns + .iter() + .map(|column_opt| { + column_opt + .as_ref() + .map(|column| KzgVerifiedDataColumn::__assumed_valid(column.clone())) + }) + .collect::>(), + ); + let invalid_columns = RuntimeFixedVector::new( + invalid_columns + .iter() + .map(|column_opt| { + column_opt + .as_ref() + .map(|column| KzgVerifiedDataColumn::__assumed_valid(column.clone())) + }) + .collect::>(), + ); + let block = AvailabilityPendingExecutedBlock { + payload: Arc::new(payload), + import_data: PayloadImportData { + state: BeaconState::new(0, Default::default(), &ChainSpec::minimal()), + consensus_context: ConsensusContext::new(Slot::new(0)), + }, + payload_verification_outcome: PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + is_valid_merge_transition_block: false, + }, + }; + (payload, columns, invalid_blobs) + } + + pub fn assert_cache_consistent(cache: PendingComponents, max_len: usize) { + if let Some(cached_payload) = &cache.payload { + let cached_payload_commitments = cached_payload.get_commitments(); + for index in 0..max_len { + let payload_commitment = cached_payload_commitments.get(index).copied(); + let column_commitment_opt = cache.get_cached_data_column().get(index).unwrap(); + let column_commitment = column_commitment_opt.as_ref().map(|c| *c.get_commitment()); + assert_eq!(payload_commitment, column_commitment); + } + } else { + panic!("No cached payload") + } + } + + pub fn assert_empty_column_cache(cache: PendingComponents) { + for column_indices in cache.get_cached_data_columns_indices().iter() { + panic!("assert_empty_column_cache failed"); + } + } + + #[test] + fn valid_block_invalid_blobs_valid_blobs() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_block(block_commitments); + cache.merge_blobs(random_blobs); + cache.merge_blobs(blobs); + + assert_cache_consistent(cache, max_len); + } + + #[test] + fn invalid_blobs_block_valid_blobs() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_blobs(random_blobs); + cache.merge_block(block_commitments); + cache.merge_blobs(blobs); + + assert_cache_consistent(cache, max_len); + } + + #[test] + fn invalid_blobs_valid_blobs_block() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_blobs(random_blobs); + cache.merge_blobs(blobs); + cache.merge_block(block_commitments); + + assert_empty_blob_cache(cache); + } + + #[test] + fn block_valid_blobs_invalid_blobs() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_block(block_commitments); + cache.merge_blobs(blobs); + cache.merge_blobs(random_blobs); + + assert_cache_consistent(cache, max_len); + } + + #[test] + fn valid_blobs_block_invalid_blobs() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_blobs(blobs); + cache.merge_block(block_commitments); + cache.merge_blobs(random_blobs); + + assert_cache_consistent(cache, max_len); + } + + #[test] + fn valid_blobs_invalid_blobs_block() { + let (block_commitments, blobs, random_blobs, max_len) = pre_setup(); + let (block_commitments, blobs, random_blobs) = + setup_pending_components(block_commitments, blobs, random_blobs); + + let block_root = Hash256::zero(); + let mut cache = >::empty(block_root, max_len); + cache.merge_blobs(blobs); + cache.merge_blobs(random_blobs); + cache.merge_block(block_commitments); + + assert_cache_consistent(cache, max_len); + } + + #[test] + fn should_not_insert_pre_execution_block_if_executed_block_exists() { + let (pre_execution_block, blobs, random_blobs, max_len) = pre_setup(); + let (executed_block, _blobs, _random_blobs) = + setup_pending_components(pre_execution_block.clone(), blobs, random_blobs); + + let block_root = pre_execution_block.canonical_root(); + let mut pending_component = >::empty(block_root, max_len); + + let pre_execution_block = Arc::new(pre_execution_block); + pending_component + .insert_pre_execution_block(pre_execution_block.clone(), BlockImportSource::Gossip); + assert!( + matches!( + pending_component.block, + Some(CachedBlock::PreExecution(_, _)) + ), + "pre execution block inserted" + ); + + pending_component.insert_executed_block(executed_block); + assert!( + matches!(pending_component.block, Some(CachedBlock::Executed(_))), + "executed block inserted" + ); + + pending_component + .insert_pre_execution_block(pre_execution_block, BlockImportSource::Gossip); + assert!( + matches!(pending_component.block, Some(CachedBlock::Executed(_))), + "executed block should remain" + ); + } +} diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs new file mode 100644 index 0000000000..0eb4a7b7e2 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs @@ -0,0 +1,138 @@ +use crate::payload_verification_types::{AvailabilityPendingExecutedPayload, PayloadImportData}; +use crate::{ + BeaconChainTypes, BeaconStore, PayloadVerificationOutcome, + data_availability_checker_v2::{AvailabilityCheckError, STATE_LRU_CAPACITY_NON_ZERO}, +}; +use lru::LruCache; +use parking_lot::RwLock; +use std::sync::Arc; +use store::OnDiskConsensusContext; +use tracing::{Span, instrument}; +use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedExecutionPayloadEnvelope}; + +/// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except +/// that it is much smaller because it contains only a state root instead of +/// a full `BeaconState`. +#[derive(Clone)] +pub struct DietAvailabilityPendingExecutedPayload { + payload: Arc>, + state_root: Hash256, + consensus_context: OnDiskConsensusContext, + payload_verification_outcome: PayloadVerificationOutcome, +} + +/// Implementing the same methods as `AvailabilityPendingExecutedPayload` +impl DietAvailabilityPendingExecutedPayload { + pub fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { + &self.payload + } + + pub fn payload_cloned(&self) -> Arc> { + self.payload.clone() + } + + pub fn num_blobs_expected(&self) -> usize { + self.payload.message.blob_kzg_commitments.len() + } +} + +/// This LRU cache holds BeaconStates used for payload import. If the cache overflows, +/// the least recently used state will be dropped. If the dropped state is needed +/// later on, it will be recovered from the parent state and replaying the payload. +/// +/// WARNING: This cache assumes the parent block of any `AvailabilityPendingExecutedPayload` +/// has already been imported into ForkChoice. If this is not the case, the cache +/// will fail to recover the state when the cache overflows because it can't load +/// the parent state! +pub struct StateLRUCache { + states: RwLock>>, + store: BeaconStore, + spec: Arc, +} + +impl StateLRUCache { + pub fn new(store: BeaconStore, spec: Arc) -> Self { + Self { + states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY_NON_ZERO)), + store, + spec, + } + } + + /// This will store the state in the LRU cache and return a + /// `DietAvailabilityPendingExecutedPayload` which is much cheaper to + /// keep around in memory. + pub fn register_pending_executed_payload( + &self, + executed_payload: AvailabilityPendingExecutedPayload, + ) -> DietAvailabilityPendingExecutedPayload { + let state = executed_payload.import_data.state; + let state_root = executed_payload.payload.message.state_root; + self.states.write().put(state_root, state); + + DietAvailabilityPendingExecutedPayload { + payload: executed_payload.payload, + state_root, + consensus_context: OnDiskConsensusContext::from_consensus_context( + executed_payload.import_data.consensus_context, + ), + payload_verification_outcome: executed_payload.payload_verification_outcome, + } + } + + /// Recover the `AvailabilityPendingExecutedPayload` from the diet version. + /// This method will first check the cache and if the state is not found + /// it will reconstruct the state by loading the parent state from disk and + /// replaying the block. + #[instrument(skip_all, parent = _span, level = "debug")] + pub fn recover_pending_executed_payload( + &self, + diet_executed_payload: DietAvailabilityPendingExecutedPayload, + _span: &Span, + ) -> Result, AvailabilityCheckError> { + // Keep the state in the cache to prevent reconstruction in race conditions + let state = if let Some(state) = self.states.write().get(&diet_executed_payload.state_root) + { + state.clone() + } else { + self.reconstruct_state(&diet_executed_payload)? + }; + Ok(AvailabilityPendingExecutedPayload { + payload: diet_executed_payload.payload, + import_data: PayloadImportData { + state, + consensus_context: diet_executed_payload + .consensus_context + .into_consensus_context(), + }, + payload_verification_outcome: diet_executed_payload.payload_verification_outcome, + }) + } + + /// Reconstruct the state by loading the parent state from disk and replaying + /// the block. + #[instrument(skip_all, level = "debug")] + fn reconstruct_state( + &self, + diet_executed_block: &DietAvailabilityPendingExecutedPayload, + ) -> Result, AvailabilityCheckError> { + todo!() + } + + /// returns the state cache for inspection + pub fn lru_cache(&self) -> &RwLock>> { + &self.states + } + + /// remove any states from the cache from before the given epoch + pub fn do_maintenance(&self, cutoff_epoch: Epoch) { + let mut write_lock = self.states.write(); + while let Some((_, state)) = write_lock.peek_lru() { + if state.slot().epoch(T::EthSpec::slots_per_epoch()) < cutoff_epoch { + write_lock.pop_lru(); + } else { + break; + } + } + } +} diff --git a/beacon_node/beacon_chain/src/data_column_availability_cache.rs b/beacon_node/beacon_chain/src/data_column_availability_cache.rs new file mode 100644 index 0000000000..e8f7afe229 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_availability_cache.rs @@ -0,0 +1,388 @@ +//! Abstraction layer for data column storage across different DA checkers. +//! +//! This module provides a unified interface for data column operations that are shared +//! between the legacy `DataAvailabilityChecker` (v1, for blocks) and +//! `DataAvailabilityChecker` v2 (for payload envelopes after Gloas). +//! +//! ## Design +//! +//! - **Read operations**: Unified via the `DataColumnCache` trait +//! - **Write operations**: Return `AvailabilityOutcome` enum that wraps both checker types +//! - **Processing**: `BeaconChain::process_availability_outcome()` handles both cases +//! +//! After Gloas is fully activated and v1 is deprecated, this can be deleted and we can +//! use the Gloas DA checker directly. + +use crate::BeaconChainTypes; +use crate::custody_context::CustodyContext; +use crate::data_availability_checker::{ + Availability as BlockAvailability, AvailabilityCheckError, + DataColumnReconstructionResult as BlockReconstructionResult, +}; +use crate::data_availability_checker_v2::{ + Availability as PayloadAvailability, + DataColumnReconstructionResult as PayloadReconstructionResult, +}; +use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; +use crate::observed_data_sidecars::ObservationStrategy; +use std::sync::Arc; +use types::{ + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, ForkName, Hash256, + Slot, +}; + +/// Unified result from write operations that can come from either DA checker. +/// +/// This enum allows callers to handle availability from both v1 (blocks) and v2 (payloads) +/// through a single type, with downstream processing handled by `BeaconChain::process_availability_outcome()`. +#[derive(Debug)] +pub enum AvailabilityOutcome { + /// Block became available (pre-Gloas, from v1 checker) + Block(BlockAvailability), + /// Payload became available (post-Gloas, from v2 checker) + Payload(PayloadAvailability), +} + +impl AvailabilityOutcome { + /// Returns `true` if data is fully available and ready for import. + pub fn is_available(&self) -> bool { + match self { + Self::Block(BlockAvailability::Available(_)) => true, + Self::Block(BlockAvailability::MissingComponents(_)) => false, + Self::Payload(PayloadAvailability::Available(_)) => true, + Self::Payload(PayloadAvailability::MissingComponents(_)) => false, + } + } + + /// Returns the block root, regardless of availability status. + pub fn block_root(&self) -> Hash256 { + match self { + Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root, + Self::Block(BlockAvailability::MissingComponents(root)) => *root, + Self::Payload(PayloadAvailability::Available(payload)) => payload.payload.block_root(), + Self::Payload(PayloadAvailability::MissingComponents(root)) => *root, + } + } + + /// Converts to the inner block availability if this is a block outcome. + pub fn into_block(self) -> Option> { + match self { + Self::Block(avail) => Some(avail), + Self::Payload(_) => None, + } + } + + /// Converts to the inner payload availability if this is a payload outcome. + pub fn into_payload(self) -> Option> { + match self { + Self::Block(_) => None, + Self::Payload(avail) => Some(avail), + } + } +} + +/// Unified result from reconstruction operations. +#[derive(Debug)] +pub enum ReconstructionOutcome { + /// Block reconstruction result (pre-Gloas) + Block(BlockReconstructionResult), + /// Payload reconstruction result (post-Gloas) + Payload(PayloadReconstructionResult), +} + +impl ReconstructionOutcome { + /// Returns the reconstructed columns if successful, regardless of type. + pub fn reconstructed_columns(&self) -> Option<&DataColumnSidecarList> { + match self { + Self::Block(BlockReconstructionResult::Success((_, cols))) => Some(cols), + Self::Payload(PayloadReconstructionResult::Success((_, cols))) => Some(cols), + _ => None, + } + } + + /// Returns true if reconstruction was successful. + pub fn is_success(&self) -> bool { + matches!( + self, + Self::Block(BlockReconstructionResult::Success(_)) + | Self::Payload(PayloadReconstructionResult::Success(_)) + ) + } + + /// Returns the reason if reconstruction was not started or columns not imported. + pub fn reason(&self) -> Option<&'static str> { + match self { + Self::Block(BlockReconstructionResult::NotStarted(r)) => Some(r), + Self::Block(BlockReconstructionResult::RecoveredColumnsNotImported(r)) => Some(r), + Self::Payload(PayloadReconstructionResult::NotStarted(r)) => Some(r), + Self::Payload(PayloadReconstructionResult::RecoveredColumnsNotImported(r)) => Some(r), + _ => None, + } + } +} + +/// Trait for data column operations on availability checkers. +/// +/// Both `DataAvailabilityChecker` (v1) and `DataAvailabilityChecker` (v2) implement +/// this trait. The associated types differ: +/// - V1: Returns `Availability` containing `AvailableExecutedBlock` +/// - V2: Returns `Availability` containing `AvailableExecutedPayload` +pub trait DataColumnCache: Send + Sync { + /// The availability type returned by write operations. + /// V1 returns block availability, V2 returns payload availability. + type Availability; + + /// The reconstruction result type. + /// V1 returns `DataColumnReconstructionResult` with block availability. + /// V2 returns `DataColumnReconstructionResult` with payload availability. + type ReconstructionResult; + + /// Returns the custody context used by this checker. + fn custody_context(&self) -> &Arc>; + + /// Returns all cached data columns for the given block root, if any. + fn get_data_columns(&self, block_root: Hash256) -> Option>; + + /// Returns the indices of cached data columns for the given block root. + fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option>; + + /// Checks if a specific data column is cached for the given block root. + fn is_data_column_cached( + &self, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool; + + /// Insert RPC custody columns and check if the block/payload becomes available. + fn put_rpc_custody_columns( + &self, + block_root: Hash256, + slot: Slot, + custody_columns: DataColumnSidecarList, + ) -> Result; + + /// Insert gossip-verified data columns and check availability. + fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + slot: Slot, + data_columns: Vec>, + ) -> Result; + + /// Insert KZG-verified custody data columns and check availability. + fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result; + + /// Attempt to reconstruct missing data columns from available ones. + fn reconstruct_data_columns( + &self, + block_root: &Hash256, + ) -> Result; +} + +/// Router that directs data availability checker operations to the appropriate version based on fork. +/// +/// This wraps both the legacy (v1) and Gloas (v2) DA checkers, providing: +/// - Unified read operations that query both checkers +/// - Fork-aware routing for write operations that return `AvailabilityOutcome` +/// +/// After Gloas is fully activated and v1 is deprecated, this router can be deleted and +/// we can use the Gloas DA checker directly. +pub struct DataAvailabilityRouter +where + V1: DataColumnCache< + T, + Availability = BlockAvailability, + ReconstructionResult = BlockReconstructionResult, + >, + V2: DataColumnCache< + T, + Availability = PayloadAvailability, + ReconstructionResult = PayloadReconstructionResult, + >, +{ + /// Legacy DA checker for pre-Gloas blocks + v1: Arc, + /// Gloas DA checker for payload envelopes + v2: Arc, + spec: Arc, + _phantom: std::marker::PhantomData, +} + +impl DataAvailabilityRouter +where + V1: DataColumnCache< + T, + Availability = BlockAvailability, + ReconstructionResult = BlockReconstructionResult, + >, + V2: DataColumnCache< + T, + Availability = PayloadAvailability, + ReconstructionResult = PayloadReconstructionResult, + >, +{ + pub fn new(v1: Arc, v2: Arc, spec: Arc) -> Self { + Self { + v1, + v2, + spec, + _phantom: std::marker::PhantomData, + } + } + + /// Returns true if the given slot is in the Gloas fork or later. + fn is_gloas(&self, slot: Slot) -> bool { + self.spec + .fork_name_at_slot::(slot) + .gloas_enabled() + } + + /// Returns the custody context (same for both checkers). + pub fn custody_context(&self) -> &Arc> { + // Both checkers share the same custody context + self.v1.custody_context() + } + + /// Query data columns from the appropriate checker based on slot. + pub fn get_data_columns( + &self, + block_root: Hash256, + fork_name: ForkName, + ) -> Option> { + if fork_name.gloas_enabled() { + self.v2.get_data_columns(block_root) + } else { + self.v1.get_data_columns(block_root) + } + } + + /// Query data columns from both checkers, returning the first match. + /// + /// Use this when you don't know which fork the block belongs to, or during + /// the transition period when data might be in either checker. + pub fn get_data_columns_any( + &self, + block_root: Hash256, + ) -> Option> { + self.v1 + .get_data_columns(block_root) + .or_else(|| self.v2.get_data_columns(block_root)) + } + + pub fn is_data_column_cached( + &self, + slot: Slot, + block_root: &Hash256, + data_column: &DataColumnSidecar, + ) -> bool { + if self.is_gloas(slot) { + self.v2.is_data_column_cached(block_root, data_column) + } else { + self.v1.is_data_column_cached(block_root, data_column) + } + } + + /// Get cached column indexes from the appropriate checker based on slot. + pub fn cached_data_column_indexes( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Option> { + if self.is_gloas(slot) { + self.v2.cached_data_column_indexes(block_root) + } else { + self.v1.cached_data_column_indexes(block_root) + } + } + + /// Insert RPC custody columns, routing to the correct checker based on fork. + pub fn put_rpc_custody_columns( + &self, + block_root: Hash256, + slot: Slot, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + if self.is_gloas(slot) { + self.v2 + .put_rpc_custody_columns(block_root, slot, custody_columns) + .map(AvailabilityOutcome::Payload) + } else { + self.v1 + .put_rpc_custody_columns(block_root, slot, custody_columns) + .map(AvailabilityOutcome::Block) + } + } + + /// Insert gossip-verified data columns, routing to the correct checker based on fork. + pub fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + slot: Slot, + data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + if self.is_gloas(slot) { + self.v2 + .put_gossip_verified_data_columns(block_root, slot, data_columns) + .map(AvailabilityOutcome::Payload) + } else { + self.v1 + .put_gossip_verified_data_columns(block_root, slot, data_columns) + .map(AvailabilityOutcome::Block) + } + } + + /// Insert KZG-verified custody data columns, routing to the correct checker based on fork. + pub fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + slot: Slot, + custody_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + if self.is_gloas(slot) { + self.v2 + .put_kzg_verified_custody_data_columns(block_root, custody_columns) + .map(AvailabilityOutcome::Payload) + } else { + self.v1 + .put_kzg_verified_custody_data_columns(block_root, custody_columns) + .map(AvailabilityOutcome::Block) + } + } + + /// Attempt to reconstruct missing data columns, routing to the correct checker based on fork. + pub fn reconstruct_data_columns( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Result, AvailabilityCheckError> { + if self.is_gloas(slot) { + self.v2 + .reconstruct_data_columns(block_root) + .map(ReconstructionOutcome::Payload) + } else { + self.v1 + .reconstruct_data_columns(block_root) + .map(ReconstructionOutcome::Block) + } + } + + /// Direct access to v1 checker (for block-specific operations). + /// + /// Use this for operations that are specific to the legacy block-based DA checker, + /// such as `put_executed_block`, `get_cached_block`, blob operations, etc. + pub fn v1(&self) -> &V1 { + &self.v1 + } + + /// Direct access to v2 checker (for payload-specific operations). + /// + /// Use this for operations that are specific to the Gloas payload-based DA checker, + /// such as `put_executed_payload`, `get_cached_payload`, etc. + pub fn v2(&self) -> &V2 { + &self.v2 + } +} diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 7bb139756d..752a2350b6 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -229,6 +229,7 @@ impl GossipVerifiedDataColumn column_sidecar: Arc>, chain: &BeaconChain, ) -> Result { + let slot = column_sidecar.slot(); verify_data_column_sidecar(&column_sidecar, &chain.spec)?; // Check if the data column is already in the DA checker cache. This happens when data columns @@ -238,10 +239,11 @@ impl GossipVerifiedDataColumn // In this case, we should accept it for gossip propagation. verify_is_unknown_sidecar(chain, &column_sidecar)?; - if chain - .data_availability_checker - .is_data_column_cached(&column_sidecar.block_root(), &column_sidecar) - { + if chain.data_availability_checker.is_data_column_cached( + slot, + &column_sidecar.block_root(), + &column_sidecar, + ) { // Observe this data column so we don't process it again. if O::observe() { observe_gossip_data_column(&column_sidecar, chain)?; @@ -495,10 +497,11 @@ pub fn validate_data_column_sidecar_for_gossip FetchBlobsBeaconAdapter { pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option> { self.chain .data_availability_checker + .v1() .cached_blob_indexes(block_root) } - pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + pub(crate) fn cached_data_column_indexes( + &self, + slot: Slot, + block_root: &Hash256, + ) -> Option> { self.chain .data_availability_checker - .cached_data_column_indexes(block_root) + .cached_data_column_indexes(block_root, slot) } pub(crate) async fn process_engine_blobs( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 6559f24d23..71578807a8 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -402,7 +402,7 @@ async fn compute_custody_columns_to_import( // Only consider columns that are not already known to data availability. if let Some(known_columns) = - chain_adapter_cloned.cached_data_column_indexes(&block_root) + chain_adapter_cloned.cached_data_column_indexes(block.slot(), &block_root) { custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index cbe2f78fbd..aba35b687a 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -197,7 +197,7 @@ mod get_blobs_v2 { .returning(|_| None); mock_adapter .expect_cached_data_column_indexes() - .returning(|_| None); + .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index a1c255e3b3..8f8b497c91 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -11,7 +11,7 @@ use types::kzg_ext::KzgCommitments; use types::{ Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, SignedBeaconBlock, SignedBeaconBlockHeader, - SignedBlindedBeaconBlock, + SignedBlindedBeaconBlock, Slot, }; /// Converts a blob ssz List object to an array to be used with the kzg diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f92030a671..5db200e6ba 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,6 +18,8 @@ pub mod canonical_head; pub mod chain_config; pub mod custody_context; pub mod data_availability_checker; +pub mod data_availability_checker_v2; +pub mod data_column_availability_cache; pub mod data_column_verification; mod early_attester_cache; mod errors; @@ -42,6 +44,7 @@ pub mod observed_block_producers; pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; +pub mod payload_verification_types; pub mod persisted_beacon_chain; pub mod persisted_custody; mod persisted_fork_choice; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 6be07faa24..8afe32b7c6 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1978,7 +1978,8 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { beacon_chain.store.state_cache_len(), ); - let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); + let da_checker_metrics = beacon_chain.data_availability_checker.v1().metrics(); + set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, da_checker_metrics.block_cache_size, diff --git a/beacon_node/beacon_chain/src/payload_verification_types.rs b/beacon_node/beacon_chain/src/payload_verification_types.rs new file mode 100644 index 0000000000..f54c67ecb1 --- /dev/null +++ b/beacon_node/beacon_chain/src/payload_verification_types.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use state_processing::ConsensusContext; +use types::{BeaconState, BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope}; + +use crate::{PayloadVerificationOutcome, data_availability_checker_v2::AvailablePayload}; + +#[derive(Debug, PartialEq)] +pub struct PayloadImportData { + pub state: BeaconState, + pub consensus_context: ConsensusContext, +} + +/// A payload that has completed payload verification by an EL client but does not +/// have all requisite column data to get imported into fork choice. +pub struct AvailabilityPendingExecutedPayload { + pub payload: Arc>, + pub import_data: PayloadImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailabilityPendingExecutedPayload { + pub fn new( + payload: Arc>, + import_data: PayloadImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + payload, + import_data, + payload_verification_outcome, + } + } + + pub fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { + &self.payload + } + + pub fn num_blobs_expected(&self) -> usize { + self.payload.message.blob_kzg_commitments.len() + } +} + +/// A payload that has completed all payload verification by an EL client +/// **and** has all requisite column data to be imported into fork choice. +pub struct AvailableExecutedPayload { + pub payload: AvailablePayload, + pub import_data: PayloadImportData, + pub payload_verification_outcome: PayloadVerificationOutcome, +} + +impl AvailableExecutedPayload { + pub fn new( + payload: AvailablePayload, + import_data: PayloadImportData, + payload_verification_outcome: PayloadVerificationOutcome, + ) -> Self { + Self { + payload, + import_data, + payload_verification_outcome, + } + } +} + +pub enum PayloadProcessStatus { + /// Payload is not in any pre-import cache. Payload may be in the data-base or in the fork-choice. + Unknown, + /// Payload is currently processing but not yet validated. + NotValidated(Arc>, BlockImportSource), + /// Payload is fully valid, but not yet imported. It's cached in the da_checker while awaiting + /// columns. + ExecutionValidated(Arc>), +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index b6c235a4cb..b49e986762 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -3280,6 +3280,42 @@ macro_rules! add_blob_transactions { }}; } +macro_rules! add_blob_transactions_gloas { + ($message:expr, $num_blobs:expr, $rng:expr, $fork_name:expr) => {{ + let num_blobs = match $num_blobs { + NumBlobs::Random => $rng.random_range(DEFAULT_MIN_BLOBS..=DEFAULT_MAX_BLOBS), + NumBlobs::Number(n) => n, + NumBlobs::None => 0, + }; + let (bundle, transactions) = + execution_layer::test_utils::generate_blobs::(num_blobs, $fork_name).unwrap(); + + let payload = &mut $message.payload; + payload.transactions = <_>::default(); + for tx in Vec::from(transactions) { + payload.transactions.push(tx).unwrap(); + } + $message.blob_kzg_commitments = bundle.commitments.clone(); + bundle + }}; +} + +pub fn generate_rand_payloads_and_columns( + fork_name: ForkName, + num_blobs: NumBlobs, + rng: &mut impl Rng, +) -> (SignedExecutionPayloadEnvelope, Vec>) { + let mut payload = SignedExecutionPayloadEnvelope::random_for_test(rng); + + let mut data_column_sidecars = vec![]; + + let bundle = add_blob_transactions_gloas!(payload.message, num_blobs, rng, fork_name); + + let data_columns = generate_data_column_sidecars_from_block(&block, spec); + + todo!() +} + pub fn generate_rand_block_and_blobs( fork_name: ForkName, num_blobs: NumBlobs, @@ -3398,6 +3434,44 @@ pub fn generate_data_column_sidecars_from_block( .unwrap() } +/// Generate data column sidecars from pre-computed cells and proofs for gloas paylaods. +pub fn generate_data_column_sidecars_from_payload( + payload: &SignedExecutionPayloadEnvelope, + spec: &ChainSpec, +) -> DataColumnSidecarList { + let kzg_commitments = payload.message.blob_kzg_commitments; + if kzg_commitments.is_empty() { + return vec![]; + } + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let template_data_columns = RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + E::number_of_columns(), + ) + .unwrap(); + + let (cells, proofs) = template_data_columns + .into_iter() + .map(|sidecar| { + let DataColumnSidecar { + column, kzg_proofs, .. + } = sidecar; + // There's only one cell per column for a single blob + let cell_bytes: Vec = column.into_iter().next().unwrap().into(); + let kzg_cell = cell_bytes.try_into().unwrap(); + let kzg_proof = kzg_proofs.into_iter().next().unwrap(); + (kzg_cell, kzg_proof) + }) + .collect::<(Vec<_>, Vec<_>)>(); + + // Repeat the cells and proofs for every blob + let blob_cells_and_proofs_vec = + vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()]; + + build_data_column_sidecars(kzg_commitments.clone(), blob_cells_and_proofs_vec, spec).unwrap() +} + pub fn generate_data_column_indices_rand_order() -> Vec { let mut indices = (0..E::number_of_columns() as u64).collect::>(); indices.shuffle(&mut StdRng::seed_from_u64(42)); diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 3611f02391..57698a10b9 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -566,6 +566,7 @@ fn handle_rpc_request( decoded_buffer, spec.max_request_blocks(current_fork), )?, + fork_name: current_fork, }, ))), SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping { @@ -1089,6 +1090,7 @@ mod tests { spec.max_request_blocks(fork_name), ) .unwrap(), + fork_name, } } diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 0539877c72..da7ce901b0 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -12,6 +12,7 @@ use std::ops::Deref; use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; +use types::ForkName; use types::data::BlobIdentifier; use types::light_client::consts::MAX_REQUEST_LIGHT_CLIENT_UPDATES; use types::{ @@ -528,16 +529,21 @@ impl BlobsByRootRequest { pub struct DataColumnsByRootRequest { /// The list of beacon block roots and column indices being requested. pub data_column_ids: RuntimeVariableList>, + pub fork_name: ForkName, } impl DataColumnsByRootRequest { pub fn new( data_column_ids: Vec>, + fork_name: ForkName, max_request_blocks: usize, ) -> Result { let data_column_ids = RuntimeVariableList::new(data_column_ids, max_request_blocks) .map_err(|_| "DataColumnsByRootRequest too many column IDs")?; - Ok(Self { data_column_ids }) + Ok(Self { + data_column_ids, + fork_name, + }) } pub fn max_requested(&self) -> usize { diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 2a17a04b90..809c50c002 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -992,6 +992,7 @@ fn test_tcp_columns_by_root_chunked_rpc() { }; max_request_blocks ], + current_fork_name, max_request_blocks, ) .unwrap(); @@ -1002,6 +1003,7 @@ fn test_tcp_columns_by_root_chunked_rpc() { spec.max_request_blocks(current_fork_name), ) .unwrap(), + fork_name: current_fork_name, }; assert_eq!(req, req_decoded); let rpc_request = RequestType::DataColumnsByRoot(req); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index e51e73b756..cb134fcbdd 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1028,7 +1028,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match &result { + match result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { debug!( @@ -1041,6 +1041,14 @@ impl NetworkBeaconProcessor { &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, processing_start_time.elapsed().as_millis() as i64, ); + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If a + // importing a block results in `Imported`, notify. Do not notify of data column errors. + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { trace!( @@ -1072,11 +1080,14 @@ impl NetworkBeaconProcessor { work: Work::Reprocess( ReprocessQueueMessage::DelayColumnReconstruction( QueuedColumnReconstruction { - block_root, - slot: *slot, + block_root: block_root.into(), + slot, process_fn: Box::pin(async move { cloned_self - .attempt_data_column_reconstruction(block_root) + .attempt_data_column_reconstruction( + slot, + block_root.into(), + ) .await; }), }, @@ -1111,16 +1122,6 @@ impl NetworkBeaconProcessor { ); } } - - // If a block is in the da_checker, sync maybe awaiting for an event when block is finally - // imported. A block can become imported both after processing a block or data column. If a - // importing a block results in `Imported`, notify. Do not notify of data column errors. - if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: true, - }); - } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index fd9c2c1e55..9c1faefb28 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -835,8 +835,8 @@ impl NetworkBeaconProcessor { /// Attempts to reconstruct all data columns if the conditions checked in /// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied. #[instrument(level = "debug", skip_all, fields(?block_root))] - async fn attempt_data_column_reconstruction(self: &Arc, block_root: Hash256) { - let result = self.chain.reconstruct_data_columns(block_root).await; + async fn attempt_data_column_reconstruction(self: &Arc, slot: Slot, block_root: Hash256) { + let result = self.chain.reconstruct_data_columns(slot, block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 7cf7c01416..a711da61c3 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -306,6 +306,7 @@ impl NetworkBeaconProcessor { let block_root = blob_id.block_root; self.chain .data_availability_checker + .v1() .get_cached_block(&block_root) .and_then(|status| match status { BlockProcessStatus::NotValidated(block, _source) => Some(block), @@ -333,7 +334,7 @@ impl NetworkBeaconProcessor { } // First attempt to get the blobs from the RPC cache. - if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { + if let Ok(Some(blob)) = self.chain.data_availability_checker.v1().get_blob(id) { self.send_response( peer_id, inbound_request_id, @@ -443,6 +444,7 @@ impl NetworkBeaconProcessor { match self.chain.get_data_columns_checking_all_caches( data_column_ids_by_root.block_root, &indices_to_retrieve, + request.fork_name, ) { Ok(data_columns) => { send_data_column_count += data_columns.len(); diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6ba8bd4d3e..79dd77e61f 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -717,9 +717,11 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); + // TODO(gloas) make this work across both v1 and v2 let available_blocks = match self .chain .data_availability_checker + .v1() .verify_kzg_for_rpc_blocks(downloaded_blocks) { Ok(blocks) => blocks diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b4..b209e051bc 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -172,7 +172,7 @@ impl RequestState for CustodyRequestState { _: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) + cx.custody_lookup_request(id, self.slot, self.block_root, lookup_peers) .map_err(LookupRequestError::SendFailedNetwork) } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 43bfe29a84..0250e52468 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -233,7 +233,7 @@ impl SingleBlockLookup { ); } else if cx.chain.should_fetch_custody_columns(block_epoch) { self.component_requests = ComponentRequests::ActiveCustodyRequest( - CustodyRequestState::new(self.block_root), + CustodyRequestState::new(block.slot(), self.block_root), ); } else { self.component_requests = ComponentRequests::NotNeeded("outside da window"); @@ -391,13 +391,15 @@ impl BlobRequestState { pub struct CustodyRequestState { #[educe(Debug(ignore))] pub block_root: Hash256, + pub slot: Slot, pub state: SingleLookupRequestState>, } impl CustodyRequestState { - pub fn new(block_root: Hash256) -> Self { + pub fn new(slot: Slot, block_root: Hash256) -> Self { Self { block_root, + slot, state: SingleLookupRequestState::new(), } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 069d51764f..a58847319c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -964,6 +964,7 @@ impl SyncNetworkContext { let imported_blob_indexes = self .chain .data_availability_checker + .v1() .cached_blob_indexes(&block_root) .unwrap_or_default(); // Include only the blob indexes not yet imported (received through gossip) @@ -1078,13 +1079,14 @@ impl SyncNetworkContext { pub fn custody_lookup_request( &mut self, lookup_id: SingleLookupId, + slot: Slot, block_root: Hash256, lookup_peers: Arc>>, ) -> Result { let custody_indexes_imported = self .chain .data_availability_checker - .cached_data_column_indexes(&block_root) + .cached_data_column_indexes(&block_root, slot) .unwrap_or_default(); let current_epoch = self.chain.epoch().map_err(|e| { @@ -1366,12 +1368,14 @@ impl SyncNetworkContext { if self .chain .data_availability_checker + .v1() .data_columns_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndColumns } else if self .chain .data_availability_checker + .v1() .blobs_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndBlobs diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 34df801eaa..d06645e3af 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -26,6 +26,7 @@ impl DataColumnsByRootSingleBlockRequest { block_root: self.block_root, columns, }], + fork_name, spec.max_request_blocks(fork_name), ) } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 715928906e..371c1f260d 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1079,6 +1079,7 @@ impl TestRig { .harness .chain .data_availability_checker + .v1() .put_executed_block(executed_block) .unwrap() { @@ -1094,6 +1095,7 @@ impl TestRig { .harness .chain .data_availability_checker + .v1() .put_gossip_verified_blobs( blob.block_root(), std::iter::once(GossipVerifiedBlob::<_, Observe>::__assumed_valid( @@ -1113,6 +1115,7 @@ impl TestRig { self.harness .chain .data_availability_checker + .v1() .put_pre_execution_block(block.canonical_root(), block, BlockImportSource::Gossip) .unwrap(); } @@ -1121,6 +1124,7 @@ impl TestRig { self.harness .chain .data_availability_checker + .v1() .remove_block_on_execution_error(&block_root); self.send_sync_message(SyncMessage::GossipBlockProcessResult {