From d047ace41fe9e22ab645316c11a085f1e2cc7244 Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Fri, 30 Jan 2026 09:24:41 -0800 Subject: [PATCH] new da checker doesn't need payloads --- beacon_node/beacon_chain/src/builder.rs | 1 - .../src/data_availability_checker_v2.rs | 782 +---------- .../overflow_lru_cache.rs | 1195 +++++++---------- .../src/data_availability_router.rs | 5 +- .../src/payload_verification_types.rs | 32 +- beacon_node/beacon_chain/src/test_utils.rs | 15 +- 6 files changed, 555 insertions(+), 1475 deletions(-) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index ecc55b19a9..04aba2d2a4 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -993,7 +993,6 @@ where let da_checker_v2 = Arc::new( DataAvailabilityCheckerV2::new( - complete_blob_backfill, slot_clock.clone(), self.kzg.clone(), custody_context.clone(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs index 187cfc54b0..a386d1f5a0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs @@ -4,24 +4,18 @@ use crate::data_availability_checker_v2::overflow_lru_cache::{ use crate::data_availability_checker::AvailabilityCheckError; use crate::data_availability_router::DataColumnCache; -use crate::payload_verification_types::{ - AvailabilityPendingExecutedPayload, AvailableExecutedPayload, PayloadProcessStatus, -}; use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics}; -use educe::Educe; use kzg::Kzg; use slot_clock::SlotClock; -use std::collections::HashSet; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; use std::sync::Arc; -use std::time::Duration; use task_executor::TaskExecutor; use tracing::{debug, error, instrument}; use types::{ - BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, + SignedBeaconBlock, Slot, }; mod overflow_lru_cache; @@ -36,50 +30,33 @@ use crate::metrics::{ use crate::observed_data_sidecars::ObservationStrategy; use types::new_non_zero_usize; -/// The LRU Cache stores `PendingComponents`, which store payload and its associated column data: +/// The LRU Cache stores `PendingComponents`, which store block and its associated column data. +/// Setting this to 32 keeps memory usage reasonable. /// -/// With `MAX_BLOBS_PER_BLOCK` = 48 for exa,ple, the maximum size of data columns -/// in `PendingComponents` is ~12.29 MB. Setting this to 32 means the maximum size of the cache is -/// approximately 0.4 GB. -/// -/// `PendingComponents` are now never removed from the cache manually are only removed via LRU +/// `PendingComponents` are now never removed from the cache manually and are only removed via LRU /// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time. const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); -/// Cache to hold fully valid data that can't be imported to fork-choice yet. After the Gloas hard-fork -/// beacon blocks can be immediately imported into fork choice. The execution payload is now separated out from -/// the beacon block. The payload envelope and data columns are received separately from the network. The block -/// is now always considered "available". Availability checks are now made on the payload and it is considered -/// "fully available" when the payload and all required columns are inserted into this cache. +/// Represents available data for a block - the block root and its data columns. +pub type AvailableData = (Hash256, DataColumnSidecarList); + +/// This type is returned after adding a block / column to the `DataAvailabilityChecker`. /// -/// Usually a payload becomes available on its slot within a second of receiving its first component -/// over gossip. However, a payload may never become available if a malicious proposer does not -/// publish its data, or there are network issues that prevent us from receiving it. If the payload -/// does not become available after some time we can safely forget about it. Consider these two -/// cases: -/// -/// - Global unavailability: If nobody has received the payload components it's likely that the -/// builder never made the payload available. So we can safely forget about the payload as it will -/// never become available. -/// - Local unavailability: Some fraction of the network has received all payload components, but not us. -/// Some of our peers will eventually attest to a descendant of that block and lookup sync will -/// fetch its components. Therefore it's not strictly necessary to hold to the partially available -/// payload for too long as we can recover from other peers. -/// -/// Even in periods of non-finality, the builder is expected to publish the payload's data -/// immediately. Because this cache only holds fully valid data, its capacity is bound to 1 block -/// per slot and fork: before inserting into this cache we check the proposer signature and correct -/// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch -/// data during moments of unstable network conditions. -pub struct DataAvailabilityChecker { - #[allow(dead_code)] - complete_blob_backfill: bool, - availability_cache: Arc>, - #[allow(dead_code)] - slot_clock: T::SlotClock, - kzg: Arc, - custody_context: Arc>, - spec: Arc, +/// Indicates if the block's data is fully `Available` or if we need more columns. +pub enum Availability { + MissingComponents(Hash256), + Available(Box>), +} + +impl Debug for Availability { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::MissingComponents(block_root) => { + write!(f, "MissingComponents({})", block_root) + } + Self::Available(data) => write!(f, "Available({}, {} columns)", data.0, data.1.len()), + } + } } pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); @@ -91,24 +68,22 @@ pub enum DataColumnReconstructionResult { RecoveredColumnsNotImported(&'static str), } -/// This type is returned after adding a payload / column to the `DataAvailabilityChecker`. +/// Cache to hold data columns for blocks pending data availability. /// -/// Indicates if the payload is fully `Available` or if we need columns or payload -/// to "complete" the requirements for an `AvailablePayload`. -pub enum Availability { - MissingComponents(Hash256), - Available(Box>), -} - -impl Debug for Availability { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::MissingComponents(block_root) => { - write!(f, "MissingComponents({})", block_root) - } - Self::Available(payload) => write!(f, "Available({:?})", payload.payload.block_root), - } - } +/// In Gloas, beacon blocks can be immediately imported into fork choice. The execution payload +/// is separated from the beacon block. This cache tracks data columns for payloads until all +/// required columns are received. +/// +/// Usually data becomes available on its slot within a second of receiving its first component +/// over gossip. However, data may never become available if a malicious proposer does not +/// publish its data, or there are network issues. Components are only removed via LRU eviction. +pub struct DataAvailabilityChecker { + availability_cache: Arc>, + #[allow(dead_code)] + slot_clock: T::SlotClock, + kzg: Arc, + custody_context: Arc>, + spec: Arc, } impl DataColumnCache for DataAvailabilityChecker { @@ -151,7 +126,7 @@ impl DataColumnCache for DataAvailabilityChecker { }) } - /// Insert RPC custody columns and check if the block/payload becomes available. + /// Insert RPC custody columns and check if the block becomes available. #[instrument(skip_all, level = "trace")] fn put_rpc_custody_columns( &self, @@ -165,9 +140,6 @@ impl DataColumnCache for DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidColumn)?; // Filter out columns that aren't required for custody for this slot - // This is required because `data_columns_by_root` requests the **latest** CGC that _may_ - // not be yet effective for data availability check, as CGC changes are only effecive from - // a new epoch. let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let sampling_columns = self .custody_context @@ -182,11 +154,9 @@ impl DataColumnCache for DataAvailabilityChecker { .put_kzg_verified_data_columns(block_root, verified_custody_columns) } - /// Check if we've cached other data columns for this payload. If it satisfies the custody requirement and we also - /// have a payload cached, return the `Availability` variant triggering payload import. - /// Otherwise cache the data column sidecar. - /// - /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + /// Check if we've cached other data columns for this block. If it satisfies the custody + /// requirement and we also have the block cached, return the `Availability` variant + /// triggering import. Otherwise cache the data column sidecar. #[instrument(skip_all, level = "trace")] fn put_gossip_verified_data_columns( &self, @@ -309,7 +279,6 @@ impl DataColumnCache for DataAvailabilityChecker { impl DataAvailabilityChecker { pub fn new( - complete_blob_backfill: bool, slot_clock: T::SlotClock, kzg: Arc, custody_context: Arc>, @@ -321,7 +290,6 @@ impl DataAvailabilityChecker { spec.clone(), )?; Ok(Self { - complete_blob_backfill, availability_cache: Arc::new(inner), slot_clock, kzg, @@ -334,125 +302,38 @@ impl DataAvailabilityChecker { &self.custody_context } - /// Checks if the payload associated with the given block root is currently in the availability cache awaiting import because - /// of missing components. - /// - /// Returns the cached payload wrapped in a `PayloadProcessStatus` enum if it exists. - pub fn get_cached_payload( - &self, - block_root: &Hash256, - ) -> Option> { - self.availability_cache.get_cached_payload(block_root) - } - - /// Check if we have all required columns for a payload. Returns `Availability` which has information - /// about whether all components have been received or more are required. - pub fn put_executed_payload( - &self, - executed_payload: AvailabilityPendingExecutedPayload, - ) -> Result, AvailabilityCheckError> { - self.availability_cache - .put_executed_payload(executed_payload) - } - - /// Inserts a pre-execution payload into the cache. - /// This does NOT override an existing executed payload. - pub fn put_pre_execution_payload( + /// Insert a block into the cache and check if data becomes available. + pub fn put_block( &self, block_root: Hash256, - payload: Arc>, - source: BlockImportSource, - ) -> Result<(), AvailabilityCheckError> { - self.availability_cache - .put_pre_execution_payload(block_root, payload, source) + block: Arc>, + ) -> Result, AvailabilityCheckError> { + self.availability_cache.put_block(block_root, block) } - /// Removes a pre-execution payload from the cache. - /// This does NOT remove an existing executed payload. - pub fn remove_payload_on_execution_error(&self, block_root: &Hash256) { - self.availability_cache - .remove_pre_execution_payload(block_root); - } - - /// Verifies kzg commitments for an `AvailableBlock`.` - /// - /// WARNING: This function assumes all required blobs are already present, it does NOT - /// check if there are any missing blobs. - pub fn verify_kzg_for_available_payload( + /// Verifies kzg commitments for data columns. + pub fn verify_kzg_for_data_columns( &self, - available_payload: &AvailablePayload, + data_columns: &DataColumnSidecarList, ) -> Result<(), AvailabilityCheckError> { - let block_data_required = self - .custody_context - .data_columns_required_for_payload(&available_payload.payload); - match available_payload.data() { - AvailablePayloadData::NoData => { - if block_data_required { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - } - AvailablePayloadData::DataColumns(data_columns) => { - verify_kzg_for_data_column_list(data_columns.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; - } - } - - Ok(()) - } - - /// Performs batch kzg verification for a vector of `AvailablePayloads`. This is more efficient than - /// calling `verify_kzg_for_available_block` in a loop. - /// - /// WARNING: This function assumes all required blobs are already present, it does NOT - /// check if there are any missing blobs. - #[instrument(skip_all)] - pub fn batch_verify_kzg_for_available_payloads( - &self, - available_payloads: &Vec>, - ) -> Result<(), AvailabilityCheckError> { - let all_data_columns = available_payloads - .iter() - .filter(|available_payload| { - self.custody_context - .data_columns_required_for_payload(&available_payload.payload) - }) - // this clone is cheap as it's cloning an Arc - .filter_map(|available_payload| available_payload.column_data.data_columns()) - .flatten() - .collect::>(); - - for available_payload in available_payloads { - let payload_data_required = self - .custody_context - .data_columns_required_for_payload(&available_payload.payload); - if let AvailablePayloadData::NoData = available_payload.data() - && payload_data_required - { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - } - - // verify kzg for all data columns at once - if !all_data_columns.is_empty() { - // Attributes fault to the specific peer that sent an invalid column - verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) + if !data_columns.is_empty() { + verify_kzg_for_data_column_list(data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } - Ok(()) } /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { - payload_cache_size: self.availability_cache.payload_cache_size(), + block_cache_size: self.availability_cache.block_cache_size(), } } } /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { - pub payload_cache_size: usize, + pub block_cache_size: usize, } pub fn start_availability_cache_maintenance_service( @@ -472,17 +353,6 @@ pub fn start_availability_cache_maintenance_service( } else { debug!("Gloas fork not configured, not starting availability cache maintenance service"); } - // TODO(gloas) - // this cache only needs to be maintained if deneb is configured - // if chain.spec.deneb_fork_epoch.is_some() { - // let overflow_cache = chain.data_availability_checker.availability_cache.clone(); - // executor.spawn( - // async move { availability_cache_maintenance_service(chain, overflow_cache).await }, - // "availability_cache_service", - // ); - // } else { - // debug!("Deneb fork not configured, not starting availability cache maintenance service"); - // } } async fn availability_cache_maintenance_service( @@ -529,7 +399,7 @@ async fn availability_cache_maintenance_service( .spec .min_epoch_data_availability_boundary(current_epoch) else { - // Shutdown service if deneb fork epoch not set. Unreachable as the same check is performed above. + // Shutdown service if deneb fork epoch not set. break; }; @@ -548,543 +418,3 @@ async fn availability_cache_maintenance_service( }; } } - -#[derive(Debug, Clone)] -// TODO(gloas) Move this to `payload_verification_types.rs` -pub enum AvailablePayloadData { - /// Payload has zero blobs - NoData, - /// Payload has more than zero blobs - DataColumns(DataColumnSidecarList), -} - -impl AvailablePayloadData { - pub fn new_with_data_columns(columns: DataColumnSidecarList) -> Self { - if columns.is_empty() { - Self::NoData - } else { - Self::DataColumns(columns) - } - } - - pub fn data_columns(&self) -> Option> { - match self { - AvailablePayloadData::NoData => None, - AvailablePayloadData::DataColumns(data_columns) => Some(data_columns.clone()), - } - } - - pub fn data_columns_len(&self) -> usize { - if let Some(data_columns) = self.data_columns() { - data_columns.len() - } else { - 0 - } - } -} - -/// A fully available payload that is ready to be imported into fork choice. -#[derive(Debug, Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -pub struct AvailablePayload { - block_root: Hash256, - block: Arc>, - payload: Arc>, - #[educe(Hash(ignore))] - column_data: AvailablePayloadData, - #[educe(Hash(ignore))] - /// Timestamp at which this payload first became available (UNIX timestamp, time since 1970). - payload_available_timestamp: Option, - #[educe(Hash(ignore))] - pub spec: Arc, -} - -impl AvailablePayload { - /// Constructs an `AvailablePayload` from a payload and optional data. - /// - If `column_data` is `DataColumns`, constructs `AvailablePayload` variant after column validation. - /// - If `column_data` is `NoData`, constructs `AvailablePayload` after verifying that the payload is not expecting columns. - /// Returns `AvailabilityCheckError` if: - /// - `column_data` contains data not required by the block - /// - Required `column_data` is missing - pub fn new( - payload: Arc>, - block: Arc>, - column_data: AvailablePayloadData, - da_checker: &DataAvailabilityChecker, - spec: Arc, - ) -> Result - where - T: BeaconChainTypes, - { - // Ensure payload availability - let columns_required = da_checker - .custody_context() - .data_columns_required_for_payload(&payload); - - match &column_data { - AvailablePayloadData::NoData => { - if columns_required { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - } - AvailablePayloadData::DataColumns(data_columns) => { - if !columns_required { - // TODO(gloas) potential refactor here - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - - let mut column_indices = da_checker - .custody_context - .custody_columns_for_epoch( - Some(payload.message.slot.epoch(T::EthSpec::slots_per_epoch())), - &spec, - ) - .iter() - .collect::>(); - - for data_column in data_columns { - column_indices.remove(&data_column.index()); - } - - if !column_indices.is_empty() { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - } - } - - Ok(Self { - block_root: payload.message.beacon_block_root, - block, - payload, - column_data, - payload_available_timestamp: None, - spec: spec.clone(), - }) - } - - pub fn payload(&self) -> &SignedExecutionPayloadEnvelope { - &self.payload - } - pub fn payload_cloned(&self) -> Arc> { - self.payload.clone() - } - - pub fn payload_available_timestamp(&self) -> Option { - self.payload_available_timestamp - } - - pub fn data(&self) -> &AvailablePayloadData { - &self.column_data - } - - pub fn block_root(&self) -> Hash256 { - self.block_root - } - - #[allow(clippy::type_complexity)] - pub fn deconstruct( - self, - ) -> ( - Hash256, - Arc>, - AvailablePayloadData, - ) { - let AvailablePayload { - block_root, - payload, - column_data, - .. - } = self; - (block_root, payload, column_data) - } - - /// Only used for testing - pub fn __clone_without_recv(&self) -> Self { - Self { - block_root: self.block_root, - payload: self.payload.clone(), - block: self.block.clone(), - column_data: match &self.column_data { - AvailablePayloadData::NoData => AvailablePayloadData::NoData, - AvailablePayloadData::DataColumns(data_columns) => { - AvailablePayloadData::DataColumns(data_columns.clone()) - } - }, - payload_available_timestamp: self.payload_available_timestamp, - spec: self.spec.clone(), - } - } -} - -#[derive(Debug)] -pub enum MaybeAvailablePayload { - /// This payload is fully available. - Available(AvailablePayload), - /// This variant is not fully available and requires blobs to become fully available. - AvailabilityPending { - block_root: Hash256, - payload: Arc>, - }, -} - -impl MaybeAvailablePayload { - pub fn block_cloned(&self) -> Arc> { - match self { - Self::Available(payload) => payload.payload_cloned(), - Self::AvailabilityPending { payload, .. } => payload.clone(), - } - } -} - -// #[cfg(test)] -// mod test { -// use super::*; -// use crate::CustodyContext; -// use crate::block_verification_types::RpcBlock; -// use crate::custody_context::NodeCustodyType; -// use crate::data_column_verification::CustodyDataColumn; -// use crate::test_utils::{ -// EphemeralHarnessType, NumBlobs, generate_data_column_indices_rand_order, -// generate_rand_block_and_data_columns, get_kzg, -// }; -// use rand::SeedableRng; -// use rand::prelude::StdRng; -// use slot_clock::{SlotClock, TestingSlotClock}; -// use std::collections::HashSet; -// use std::sync::Arc; -// use std::time::Duration; -// use store::HotColdDB; -// use types::data::DataColumn; -// use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot}; - -// type E = MainnetEthSpec; -// type T = EphemeralHarnessType; - -// /// Test to verify any extra RPC columns received that are not part of the "effective" CGC for -// /// the slot are excluded from import. -// #[test] -// fn should_exclude_rpc_columns_not_required_for_sampling() { -// // SETUP -// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); -// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - -// let da_checker = new_da_checker(spec.clone()); -// let custody_context = &da_checker.custody_context; - -// // GIVEN a single 32 ETH validator is attached slot 0 -// let epoch = Epoch::new(0); -// let validator_0 = 0; -// custody_context.register_validators( -// vec![(validator_0, 32_000_000_000)], -// epoch.start_slot(E::slots_per_epoch()), -// &spec, -// ); -// assert_eq!( -// custody_context.num_of_data_columns_to_sample(epoch, &spec), -// spec.validator_custody_requirement as usize, -// "sampling size should be the minimal custody requirement == 8" -// ); - -// // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch -// let validator_1 = 1; -// let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); -// custody_context.register_validators( -// vec![(validator_1, 32_000_000_000 * 9)], -// cgc_change_slot, -// &spec, -// ); -// // AND custody columns (8) and any new extra columns (2) are received via RPC responses. -// // NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown. -// let (_, data_columns) = generate_rand_block_and_data_columns::( -// ForkName::Fulu, -// NumBlobs::Number(1), -// &mut rng, -// &spec, -// ); -// let block_root = Hash256::random(); -// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); -// let requested_columns = &custody_columns[..10]; -// da_checker -// .put_rpc_custody_columns( -// block_root, -// cgc_change_slot, -// data_columns -// .into_iter() -// .filter(|d| requested_columns.contains(&d.index)) -// .collect(), -// ) -// .expect("should put rpc custody columns"); - -// // THEN the sampling size for the end slot of the same epoch remains unchanged -// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); -// assert_eq!( -// sampling_columns.len(), -// spec.validator_custody_requirement as usize // 8 -// ); -// // AND any extra columns received via RPC responses are excluded from import. -// let actual_cached: HashSet = da_checker -// .cached_data_column_indexes(&block_root) -// .expect("should have cached data columns") -// .into_iter() -// .collect(); -// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); -// assert_eq!( -// actual_cached, expected_sampling_columns, -// "should cache only the effective sampling columns" -// ); -// assert!( -// actual_cached.len() < requested_columns.len(), -// "extra columns should be excluded" -// ) -// } - -// /// Test to verify any extra gossip columns received that are not part of the "effective" CGC for -// /// the slot are excluded from import. -// #[test] -// fn should_exclude_gossip_columns_not_required_for_sampling() { -// // SETUP -// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); -// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - -// let da_checker = new_da_checker(spec.clone()); -// let custody_context = &da_checker.custody_context; - -// // GIVEN a single 32 ETH validator is attached slot 0 -// let epoch = Epoch::new(0); -// let validator_0 = 0; -// custody_context.register_validators( -// vec![(validator_0, 32_000_000_000)], -// epoch.start_slot(E::slots_per_epoch()), -// &spec, -// ); -// assert_eq!( -// custody_context.num_of_data_columns_to_sample(epoch, &spec), -// spec.validator_custody_requirement as usize, -// "sampling size should be the minimal custody requirement == 8" -// ); - -// // WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch -// let validator_1 = 1; -// let cgc_change_slot = epoch.end_slot(E::slots_per_epoch()); -// custody_context.register_validators( -// vec![(validator_1, 32_000_000_000 * 9)], -// cgc_change_slot, -// &spec, -// ); -// // AND custody columns (8) and any new extra columns (2) are received via gossip. -// // NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to -// // arrive via gossip. -// let (_, data_columns) = generate_rand_block_and_data_columns::( -// ForkName::Fulu, -// NumBlobs::Number(1), -// &mut rng, -// &spec, -// ); -// let block_root = Hash256::random(); -// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); -// let requested_columns = &custody_columns[..10]; -// let gossip_columns = data_columns -// .into_iter() -// .filter(|d| requested_columns.contains(&d.index)) -// .map(GossipVerifiedDataColumn::::__new_for_testing) -// .collect::>(); -// da_checker -// .put_gossip_verified_data_columns(block_root, cgc_change_slot, gossip_columns) -// .expect("should put gossip custody columns"); - -// // THEN the sampling size for the end slot of the same epoch remains unchanged -// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); -// assert_eq!( -// sampling_columns.len(), -// spec.validator_custody_requirement as usize // 8 -// ); -// // AND any extra columns received via gossip responses are excluded from import. -// let actual_cached: HashSet = da_checker -// .cached_data_column_indexes(&block_root) -// .expect("should have cached data columns") -// .into_iter() -// .collect(); -// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); -// assert_eq!( -// actual_cached, expected_sampling_columns, -// "should cache only the effective sampling columns" -// ); -// assert!( -// actual_cached.len() < requested_columns.len(), -// "extra columns should be excluded" -// ) -// } - -// /// Regression test for KZG verification truncation bug (https://github.com/sigp/lighthouse/pull/7927) -// #[test] -// fn verify_kzg_for_rpc_blocks_should_not_truncate_data_columns() { -// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); -// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); -// let da_checker = new_da_checker(spec.clone()); - -// // GIVEN multiple RPC blocks with data columns totalling more than 128 -// let blocks_with_columns = (0..2) -// .map(|index| { -// let (block, data_columns) = generate_rand_block_and_data_columns::( -// ForkName::Fulu, -// NumBlobs::Number(1), -// &mut rng, -// &spec, -// ); - -// let custody_columns = if index == 0 { -// // 128 valid data columns in the first block -// data_columns -// } else { -// // invalid data columns in the second block -// data_columns -// .into_iter() -// .map(|d| { -// let invalid_sidecar = DataColumnSidecar { -// column: DataColumn::::empty(), -// ..d.as_ref().clone() -// }; -// CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar)) -// .as_data_column() -// .clone() -// }) -// .collect::>() -// }; - -// let block_data = AvailableBlockData::new_with_data_columns(custody_columns); -// let da_checker = Arc::new(new_da_checker(spec.clone())); -// RpcBlock::new(Arc::new(block), Some(block_data), &da_checker, spec.clone()) -// .expect("should create RPC block with custody columns") -// }) -// .collect::>(); - -// let available_blocks = blocks_with_columns -// .iter() -// .filter_map(|block| match block { -// RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), -// RpcBlock::BlockOnly { .. } => None, -// }) -// .collect::>(); - -// // WHEN verifying all blocks together (totalling 256 data columns) -// let verification_result = -// da_checker.batch_verify_kzg_for_available_blocks(&available_blocks); - -// // THEN batch block verification should fail due to 128 invalid columns in the second block -// verification_result.expect_err("should have failed to verify blocks"); -// } - -// #[test] -// fn should_exclude_reconstructed_columns_not_required_for_sampling() { -// // SETUP -// let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec())); -// let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - -// let da_checker = new_da_checker(spec.clone()); -// let custody_context = &da_checker.custody_context; - -// // Set custody requirement to 65 columns (enough to trigger reconstruction) -// let epoch = Epoch::new(1); -// custody_context.register_validators( -// vec![(0, 2_048_000_000_000), (1, 32_000_000_000)], // 64 + 1 -// Slot::new(0), -// &spec, -// ); -// let sampling_requirement = custody_context.num_of_data_columns_to_sample(epoch, &spec); -// assert_eq!( -// sampling_requirement, 65, -// "sampling requirement should be 65" -// ); - -// let (block, data_columns) = generate_rand_block_and_data_columns::( -// ForkName::Fulu, -// NumBlobs::Number(1), -// &mut rng, -// &spec, -// ); -// let block_root = Hash256::random(); -// // Add the block to the DA checker -// da_checker -// .availability_cache -// .put_pre_execution_block(block_root, Arc::new(block), BlockImportSource::Gossip) -// .expect("should put block"); - -// // Add 64 columns to the da checker (enough to be able to reconstruct) -// // Order by all_column_indices_ordered, then take first 64 -// let custody_columns = custody_context.custody_columns_for_epoch(None, &spec); -// let custody_columns = custody_columns -// .iter() -// .filter_map(|&col_idx| data_columns.iter().find(|d| d.index == col_idx).cloned()) -// .take(64) -// .map(|d| { -// KzgVerifiedCustodyDataColumn::from_asserted_custody( -// KzgVerifiedDataColumn::__new_for_testing(d), -// ) -// }) -// .collect::>(); - -// da_checker -// .availability_cache -// .put_kzg_verified_data_columns(block_root, custody_columns) -// .expect("should put custody columns"); - -// // Try reconstrucing -// let reconstruction_result = da_checker -// .reconstruct_data_columns(&block_root) -// .expect("should reconstruct columns"); - -// // Reconstruction should succeed -// let (_availability, reconstructed_columns) = match reconstruction_result { -// DataColumnReconstructionResult::Success(result) => result, -// e => { -// panic!("Expected successful reconstruction {:?}", e); -// } -// }; - -// // Remaining 64 columns should be reconstructed -// assert_eq!( -// reconstructed_columns.len(), -// sampling_requirement - spec.number_of_custody_groups as usize / 2, -// "should reconstruct the remaining 1 columns" -// ); - -// // Only the columns required for custody (65) should be imported into the cache -// let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec); -// let actual_cached: HashSet = da_checker -// .cached_data_column_indexes(&block_root) -// .expect("should have cached data columns") -// .into_iter() -// .collect(); -// let expected_sampling_columns = sampling_columns.iter().copied().collect::>(); -// assert_eq!( -// actual_cached, expected_sampling_columns, -// "should cache only the required custody columns, not all reconstructed columns" -// ); -// } - -// fn new_da_checker(spec: Arc) -> DataAvailabilityChecker { -// let slot_clock = TestingSlotClock::new( -// Slot::new(0), -// Duration::from_secs(0), -// Duration::from_secs(spec.seconds_per_slot), -// ); -// let kzg = get_kzg(&spec); -// let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap()); -// let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); -// let custody_context = Arc::new(CustodyContext::new( -// NodeCustodyType::Fullnode, -// ordered_custody_column_indices, -// &spec, -// )); -// let complete_blob_backfill = false; -// DataAvailabilityChecker::new( -// complete_blob_backfill, -// slot_clock, -// kzg, -// store, -// custody_context, -// spec, -// ) -// .expect("should initialise data availability checker") -// } -// } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs index fe5c0860f0..b88378648e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs @@ -1,66 +1,29 @@ use crate::BeaconChainTypes; use crate::CustodyContext; use crate::data_availability_checker::AvailabilityCheckError; -use crate::data_availability_checker_v2::{Availability, AvailablePayload, AvailablePayloadData}; +use crate::data_availability_checker_v2::Availability; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; -use crate::payload_verification_types::PayloadProcessStatus; -use crate::payload_verification_types::{ - AvailabilityPendingExecutedPayload, AvailableExecutedPayload, -}; use lru::LruCache; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::cmp::Ordering; use std::num::NonZeroUsize; use std::sync::Arc; use tracing::{Span, debug, debug_span}; -use types::kzg_ext::KzgCommitments; use types::{ - BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, - EthSpec, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, + SignedBeaconBlock, }; -#[derive(Clone)] -pub enum CachedPayload { - PreExecution(Arc>, BlockImportSource), - Executed(Box>), -} - -#[allow(dead_code)] -impl CachedPayload { - pub fn get_commitments(&self) -> KzgCommitments { - let payload = self.as_payload(); - payload.message.blob_kzg_commitments.clone() - } - - fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { - match self { - CachedPayload::PreExecution(p, _) => p, - CachedPayload::Executed(p) => &p.payload, - } - } - - pub fn num_blobs_expected(&self) -> usize { - self.as_payload().message.blob_kzg_commitments.len() - } - - pub fn payload_cloned(&self) -> Arc> { - match self { - CachedPayload::PreExecution(p, _) => p.clone(), - CachedPayload::Executed(p) => p.payload.clone(), - } - } -} - -/// This represents the components of a partially available payload +/// This represents the components of a payload pending data availability. /// /// The columns are all gossip and kzg verified. -/// The payload has completed all verifications except the availability check. -#[allow(dead_code)] +/// The payload is considered "available" when all required columns are received. pub struct PendingComponents { + /// The block root is stored for tracing context in the span. + #[allow(dead_code)] pub block_root: Hash256, pub block: Option>>, pub verified_data_columns: Vec>, - pub payload: Option>, pub reconstruction_started: bool, span: Span, } @@ -85,23 +48,6 @@ impl PendingComponents { .collect() } - /// Inserts an executed payload into the cache. - pub fn insert_executed_payload(&mut self, payload: AvailabilityPendingExecutedPayload) { - self.payload = Some(CachedPayload::Executed(Box::new(payload))) - } - - /// Inserts a pre-execution payload into the cache. - /// This does NOT override an existing executed payload. - pub fn insert_pre_execution_payload( - &mut self, - payload: Arc>, - source: BlockImportSource, - ) { - if self.payload.is_none() { - self.payload = Some(CachedPayload::PreExecution(payload, source)) - } - } - /// Merges a given set of data columns into the cache. fn merge_data_columns>>( &mut self, @@ -116,94 +62,80 @@ impl PendingComponents { Ok(()) } - /// Inserts a new payload. - pub fn merge_payload(&mut self, payload: AvailabilityPendingExecutedPayload) { - self.insert_executed_payload(payload); + /// Inserts a block into the cache. + pub fn insert_block(&mut self, block: Arc>) { + self.block = Some(block); } - /// Returns Some if the payload has received all its required data for import. The return value - /// must be persisted in the DB along with the payload. + /// Returns the number of blobs expected for this block by reading the bid's kzg commitments. + /// Returns an error if the block is not cached or not a Gloas block. + pub fn num_blobs_expected(&self) -> Result { + let block = self.block.as_ref().ok_or_else(|| { + AvailabilityCheckError::Unexpected("No block available".to_string()) + })?; + + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .map_err(|_| { + AvailabilityCheckError::Unexpected( + "Block does not have execution payload bid (not a Gloas block?)".to_string(), + ) + })?; + + Ok(bid.message.blob_kzg_commitments.len()) + } + + /// Returns Some if all required data columns have been received. pub fn make_available( &self, - spec: &Arc, num_expected_columns: usize, - ) -> Result>, AvailabilityCheckError> { - let Some(CachedPayload::Executed(executed_payload)) = &self.payload else { - // Payload not available yet + ) -> Result>, AvailabilityCheckError> { + // Check if we have a block - if not, still waiting + if self.block.is_none() { return Ok(None); - }; + } - let num_expected_blobs = executed_payload.num_blobs_expected(); - let column_data = if num_expected_blobs == 0 { - Some(AvailablePayloadData::NoData) - } else { - let num_received_columns = self.verified_data_columns.len(); - match num_received_columns.cmp(&num_expected_columns) { - Ordering::Greater => { - // Should never happen - return Err(AvailabilityCheckError::Unexpected(format!( - "too many columns got {num_received_columns} expected {num_expected_columns}" - ))); - } - Ordering::Equal => { - // We have enough columns - let data_columns = self - .verified_data_columns - .iter() - .map(|d| d.clone().into_inner()) - .collect::>(); - Some(AvailablePayloadData::DataColumns(data_columns)) - } - Ordering::Less => { - // Not enough data columns received yet - None - } + // Get the number of blobs expected from the block's bid + // This will error if the block doesn't have a bid (not Gloas) + let num_expected_blobs = self.num_blobs_expected()?; + + if num_expected_blobs == 0 { + // No blobs expected, data is available (empty) + self.span.in_scope(|| { + debug!("Block has no blobs, data is available"); + }); + return Ok(Some(vec![])); + } + + let num_received_columns = self.verified_data_columns.len(); + match num_received_columns.cmp(&num_expected_columns) { + Ordering::Greater => { + // Should never happen + Err(AvailabilityCheckError::Unexpected(format!( + "too many columns got {num_received_columns} expected {num_expected_columns}" + ))) } - }; + Ordering::Equal => { + // We have enough columns + let data_columns = self + .verified_data_columns + .iter() + .map(|d| d.clone().into_inner()) + .collect::>(); - // Payload's data not available yet - let Some(column_data) = column_data else { - return Ok(None); - }; + self.span.in_scope(|| { + debug!("All data columns received, data is available"); + }); - let Some(block) = self.block.clone() else { - // This should never happen - return Err(AvailabilityCheckError::Unexpected( - "Block doesn't exist for the payload being made available".to_owned(), - )); - }; - - // Payload is available, construct `AvailableExecutedPayload` - - let payload_available_timestamp = match column_data { - AvailablePayloadData::NoData => None, - // TODO(gloas): fix with https://github.com/sigp/lighthouse/issues/7477 - AvailablePayloadData::DataColumns(_) => None, - }; - - let AvailabilityPendingExecutedPayload { - payload, - import_data, - payload_verification_outcome, - } = executed_payload.as_ref().clone(); - - let available_payload = AvailablePayload { - block_root: payload.message.beacon_block_root, - payload, - block, - column_data, - payload_available_timestamp, - spec: spec.clone(), - }; - - self.span.in_scope(|| { - debug!("Payload and all data components are available"); - }); - Ok(Some(AvailableExecutedPayload::new( - available_payload, - import_data, - payload_verification_outcome, - ))) + Ok(Some(data_columns)) + } + Ordering::Less => { + // Not enough data columns received yet + Ok(None) + } + } } /// Returns an empty `PendingComponents` object with the given block root. @@ -214,25 +146,16 @@ impl PendingComponents { block_root, block: None, verified_data_columns: vec![], - payload: None, reconstruction_started: false, span, } } - /// Returns the epoch of: - /// - The payload if it is cached - /// Otherwise, returns None + /// Returns the epoch of the block or first data column, if available. pub fn epoch(&self) -> Option { - // Get epoch from cached payload - if let Some(payload) = &self.payload { - return Some( - payload - .as_payload() - .message - .slot - .epoch(E::slots_per_epoch()), - ); + // Get epoch from block + if let Some(block) = &self.block { + return Some(block.slot().epoch(E::slots_per_epoch())); } // Or, get epoch from first data column @@ -244,10 +167,10 @@ impl PendingComponents { } pub fn status_str(&self, num_expected_columns: usize) -> String { - let payload_count = if self.payload.is_some() { 1 } else { 0 }; + let block_status = if self.block.is_some() { "yes" } else { "no" }; format!( - "payload {} data_columns {}/{}", - payload_count, + "block {} data_columns {}/{}", + block_status, self.verified_data_columns.len(), num_expected_columns ) @@ -285,29 +208,6 @@ impl DataAvailabilityCheckerInner { }) } - /// Returns true if the payload with the given block root is known, without altering the LRU ordering - pub fn get_cached_payload( - &self, - block_root: &Hash256, - ) -> Option> { - self.critical - .read() - .peek(block_root) - .and_then(|pending_components| { - pending_components - .payload - .as_ref() - .map(|payload| match payload { - CachedPayload::PreExecution(p, source) => { - PayloadProcessStatus::NotValidated(p.clone(), *source) - } - CachedPayload::Executed(p) => { - PayloadProcessStatus::ExecutionValidated(p.payload.clone()) - } - }) - }) - } - /// Fetch data columns of a given `block_root` from the cache without affecting the LRU ordering pub fn peek_data_columns( &self, @@ -333,6 +233,33 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } + /// Insert a block into the cache and check if data becomes available. + pub fn put_block( + &self, + block_root: Hash256, + block: Arc>, + ) -> Result, AvailabilityCheckError> { + let epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + + let pending_components = + self.update_or_insert_pending_components(block_root, |pending_components| { + pending_components.insert_block(block); + Ok(()) + })?; + + let num_expected_columns = self.get_num_expected_columns(epoch); + + pending_components.span.in_scope(|| { + debug!( + component = "block", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability(block_root, pending_components, num_expected_columns) + } + #[allow(clippy::type_complexity)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, @@ -344,12 +271,12 @@ impl DataAvailabilityCheckerInner { let mut kzg_verified_data_columns = kzg_verified_data_columns.into_iter().peekable(); let Some(epoch) = kzg_verified_data_columns .peek() - .map(|verified_blob| verified_blob.as_data_column().epoch()) + .map(|verified_col| verified_col.as_data_column().epoch()) else { // No columns are processed. This can occur if all received columns were filtered out // before this point, e.g. due to a CGC change that caused extra columns to be downloaded - // // before the new CGC took effect. - // Return `Ok` without marking the payload as available. + // before the new CGC took effect. + // Return `Ok` without marking the block as available. return Ok(Availability::MissingComponents(block_root)); }; @@ -368,36 +295,30 @@ impl DataAvailabilityCheckerInner { ); }); - self.check_availability_and_cache_components( - block_root, - pending_components, - num_expected_columns, - ) + self.check_availability(block_root, pending_components, num_expected_columns) } - fn check_availability_and_cache_components( + fn check_availability( &self, block_root: Hash256, pending_components: MappedRwLockReadGuard<'_, PendingComponents>, num_expected_columns: usize, ) -> Result, AvailabilityCheckError> { - if let Some(available_payload) = - pending_components.make_available(&self.spec, num_expected_columns)? - { + if let Some(columns) = pending_components.make_available(num_expected_columns)? { // Explicitly drop read lock before acquiring write lock drop(pending_components); if let Some(components) = self.critical.write().get_mut(&block_root) { - // Clean up span now that payload is available + // Clean up span now that data is available components.span = Span::none(); } // We never remove the pending components manually to avoid race conditions. - // This ensures components remain available during and right after payload import, - // preventing a race condition where a component was removed after the payload was + // This ensures components remain available during and right after block import, + // preventing a race condition where a component was removed after the block was // imported, but re-inserted immediately, causing partial pending components to be // stored and served to peers. // Components are only removed via LRU eviction as finality advances. - Ok(Availability::Available(Box::new(available_payload))) + Ok(Availability::Available(Box::new((block_root, columns)))) } else { Ok(Availability::MissingComponents(block_root)) } @@ -437,7 +358,7 @@ impl DataAvailabilityCheckerInner { /// Potentially trigger reconstruction if all the following satisfy: /// - Our custody requirement is more than 50% of total columns, /// - We haven't received all required columns - /// - Reconstruction hasn't been started for the payload + /// - Reconstruction hasn't been started for the block /// /// If reconstruction is required, returns `PendingComponents` which contains the /// components to be used as inputs to reconstruction, otherwise returns a `reason`. @@ -447,8 +368,8 @@ impl DataAvailabilityCheckerInner { ) -> ReconstructColumnsDecision { let mut write_lock = self.critical.write(); let Some(pending_components) = write_lock.get_mut(block_root) else { - // Payload may have been imported as it does not exist in availability cache. - return ReconstructColumnsDecision::No("payload already imported"); + // Block may have been imported as it does not exist in availability cache. + return ReconstructColumnsDecision::No("block already imported"); }; let Some(epoch) = pending_components @@ -489,81 +410,6 @@ impl DataAvailabilityCheckerInner { } } - /// Inserts a pre executed payload into the cache. - /// - This does NOT trigger the availability check as the payload still needs to be executed. - /// - This does NOT override an existing cached payload to avoid overwriting an executed payload. - pub fn put_pre_execution_payload( - &self, - block_root: Hash256, - payload: Arc>, - source: BlockImportSource, - ) -> Result<(), AvailabilityCheckError> { - let epoch = payload.message.slot.epoch(T::EthSpec::slots_per_epoch()); - let pending_components = - self.update_or_insert_pending_components(block_root, |pending_components| { - pending_components.insert_pre_execution_payload(payload, source); - Ok(()) - })?; - - let num_expected_columns_opt = self.get_num_expected_columns(epoch); - - pending_components.span.in_scope(|| { - debug!( - component = "pre execution payload", - status = pending_components.status_str(num_expected_columns_opt), - "Component added to data availability checker" - ); - }); - - Ok(()) - } - - /// Removes a pre-execution payload from the cache. - /// This does NOT remove an existing executed payload. - pub fn remove_pre_execution_payload(&self, block_root: &Hash256) { - // The read lock is immediately dropped so we can safely remove the payload from the cache. - if let Some(PayloadProcessStatus::NotValidated(_, _)) = self.get_cached_payload(block_root) - { - self.critical.write().pop(block_root); - } - } - - /// Check if we have all the columns for a payload. If we do, return the Availability variant that - /// triggers import of the payload. - pub fn put_executed_payload( - &self, - executed_payload: AvailabilityPendingExecutedPayload, - ) -> Result, AvailabilityCheckError> { - let epoch = executed_payload - .as_payload() - .message - .slot - .epoch(T::EthSpec::slots_per_epoch()); - let block_root = executed_payload.payload.message.beacon_block_root; - - let pending_components = - self.update_or_insert_pending_components(block_root, |pending_components| { - pending_components.merge_payload(executed_payload); - Ok(()) - })?; - - let num_expected_columns = self.get_num_expected_columns(epoch); - - pending_components.span.in_scope(|| { - debug!( - component = "payload", - status = pending_components.status_str(num_expected_columns), - "Component added to data availability checker" - ); - }); - - self.check_availability_and_cache_components( - block_root, - pending_components, - num_expected_columns, - ) - } - fn get_num_expected_columns(&self, epoch: Epoch) -> usize { self.custody_context .num_of_data_columns_to_sample(epoch, &self.spec) @@ -571,7 +417,7 @@ impl DataAvailabilityCheckerInner { /// maintain the cache pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { - // Collect keys of pending payloads from a previous epoch to cutoff + // Collect keys of pending blocks from a previous epoch to cutoff let mut write_lock = self.critical.write(); let mut keys_to_remove = vec![]; for (key, value) in write_lock.iter() { @@ -590,31 +436,111 @@ impl DataAvailabilityCheckerInner { } /// Number of pending component entries in memory in the cache. - pub fn payload_cache_size(&self) -> usize { + pub fn block_cache_size(&self) -> usize { self.critical.read().len() } } #[cfg(test)] -mod test { +mod pending_components_tests { + use super::*; + use types::MinimalEthSpec; + + type E = MinimalEthSpec; + + #[test] + fn test_empty_pending_components() { + let block_root = Hash256::random(); + let components = PendingComponents::::empty(block_root); + + assert_eq!(components.block_root, block_root); + assert!(components.block.is_none()); + assert!(components.verified_data_columns.is_empty()); + assert!(!components.reconstruction_started); + assert!(components.epoch().is_none()); + } + + #[test] + fn test_get_cached_data_columns_indices_empty() { + let block_root = Hash256::random(); + let components = PendingComponents::::empty(block_root); + + let indices = components.get_cached_data_columns_indices(); + assert!(indices.is_empty()); + } + + #[test] + fn test_status_str_no_block() { + let block_root = Hash256::random(); + let components = PendingComponents::::empty(block_root); + + let status = components.status_str(10); + assert_eq!(status, "block no data_columns 0/10"); + } + + #[test] + fn test_num_blobs_expected_no_block() { + let block_root = Hash256::random(); + let components = PendingComponents::::empty(block_root); + + let result = components.num_blobs_expected(); + assert!(result.is_err()); + // Error should be AvailabilityCheckError::Unexpected + assert!(matches!( + result.unwrap_err(), + AvailabilityCheckError::Unexpected(_) + )); + } + + #[test] + fn test_make_available_no_block_returns_none() { + let block_root = Hash256::random(); + let components = PendingComponents::::empty(block_root); + + // Without a block, make_available should return Ok(None) + let result = components.make_available(10); + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + } +} + +#[cfg(test)] +mod data_availability_checker_tests { use super::*; - use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; - use crate::test_utils::generate_data_column_indices_rand_order; + use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; + use crate::test_utils::{ + generate_data_column_indices_rand_order, test_spec, NumBlobs, + generate_rand_block_and_data_columns, + }; use crate::{ - block_verification_types::AsBlock, custody_context::NodeCustodyType, - test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, + test_utils::{BeaconChainHarness, DiskHarnessType}, }; use logging::create_test_tracing_subscriber; - use std::collections::HashSet; - use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; + use store::{HotColdDB, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; - use types::MinimalEthSpec; + use types::{ForkName, MinimalEthSpec, Slot}; use types::new_non_zero_usize; + use rand::SeedableRng; + use rand::rngs::StdRng; + + type E = MinimalEthSpec; const LOW_VALIDATOR_COUNT: usize = 32; + fn gloas_spec() -> Arc { + let mut spec = E::default_spec(); + spec.altair_fork_epoch = Some(Epoch::new(0)); + spec.bellatrix_fork_epoch = Some(Epoch::new(0)); + spec.capella_fork_epoch = Some(Epoch::new(0)); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.electra_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + spec.gloas_fork_epoch = Some(Epoch::new(0)); + Arc::new(spec) + } + fn get_store_with_spec( db_path: &TempDir, spec: Arc, @@ -634,26 +560,11 @@ mod test { ) .expect("disk store should initialize") } + async fn get_gloas_chain( db_path: &TempDir, ) -> BeaconChainHarness> { - let altair_fork_epoch = Epoch::new(0); - let bellatrix_fork_epoch = Epoch::new(0); - let capella_fork_epoch = Epoch::new(0); - let deneb_fork_epoch = Epoch::new(0); - let electra_fork_epoch = Epoch::new(0); - let fulu_fork_epoch = Epoch::new(0); - let gloas_fork_epoch = Epoch::new(0); - - let mut spec = E::default_spec(); - spec.altair_fork_epoch = Some(altair_fork_epoch); - spec.bellatrix_fork_epoch = Some(bellatrix_fork_epoch); - spec.capella_fork_epoch = Some(capella_fork_epoch); - spec.deneb_fork_epoch = Some(deneb_fork_epoch); - spec.electra_fork_epoch = Some(electra_fork_epoch); - spec.fulu_fork_epoch = Some(fulu_fork_epoch); - spec.gloas_fork_epoch = Some(gloas_fork_epoch); - let spec = Arc::new(spec); + let spec = gloas_spec::(); let chain_store = get_store_with_spec::(db_path, spec.clone()); let validators_keypairs = @@ -666,33 +577,12 @@ mod test { .build(); // go to gloas slot - let gloas_fork_slot = gloas_fork_epoch.start_slot(E::slots_per_epoch()); + let gloas_fork_slot = Slot::new(0); harness.extend_to_slot(gloas_fork_slot).await; - let gloas_head = &harness.chain.head_snapshot().beacon_block; - assert!(gloas_head.as_gloas().is_ok()); - assert_eq!(gloas_head.slot(), gloas_fork_slot); - assert!( - gloas_head.message().body().execution_payload().is_err(), - "Gloas block has no payload" - ); harness } - async fn availability_pending_payload( - _harness: &BeaconChainHarness>, - ) -> ( - AvailabilityPendingExecutedPayload, - Vec>>, - ) - where - E: EthSpec, - Hot: ItemStore, - Cold: ItemStore, - { - todo!() - } - - async fn setup_harness_and_cache( + async fn setup_harness_and_cache( capacity: usize, ) -> ( BeaconChainHarness>, @@ -700,7 +590,6 @@ mod test { TempDir, ) where - E: EthSpec, T: BeaconChainTypes< HotStore = BeaconNodeBackend, ColdStore = BeaconNodeBackend, @@ -711,7 +600,6 @@ mod test { let chain_db_path = tempdir().expect("should get temp dir"); let harness = get_gloas_chain(&chain_db_path).await; let spec = harness.spec.clone(); - let _test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, @@ -725,216 +613,145 @@ mod test { (harness, cache, chain_db_path) } - #[tokio::test] - #[ignore] // TODO(gloas): Implement availability_pending_payload - async fn overflow_cache_test_insert_components() { - type E = MinimalEthSpec; - type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let (pending_payload, columns) = availability_pending_payload(&harness).await; - let root = pending_payload.as_payload().beacon_block_root(); - - let mut expected_column_indices = harness - .chain - .data_availability_checker - .custody_context() - .custody_columns_for_epoch(None, &harness.chain.spec) - .iter() - .collect::>(); - let columns_expected = pending_payload.num_blobs_expected(); - - assert_eq!( - columns.len(), - expected_column_indices.len(), - "should have expected number of blobs" - ); - assert!(cache.critical.read().is_empty(), "cache should be empty"); - let availability = cache - .put_executed_payload(pending_payload) - .expect("should put payload"); - if columns_expected == 0 { - assert!( - matches!(availability, Availability::Available(_)), - "payload doesn't have columns, should be available" - ); - assert_eq!( - cache.critical.read().len(), - 1, - "cache should still have payload as it hasn't been imported yet" - ); - } else { - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should be pending columns" - ); - assert_eq!( - cache.critical.read().len(), - 1, - "cache should have one payload" - ); - assert!( - cache.critical.read().peek(&root).is_some(), - "newly inserted payload should exist in memory" - ); - } - - let mut kzg_verified_columns = Vec::new(); - - for gossip_column in columns.into_iter() { - let col_index = gossip_column.index(); - kzg_verified_columns.push(KzgVerifiedCustodyDataColumn::from_asserted_custody( - gossip_column.into_inner(), - )); - let availability = cache - .put_kzg_verified_data_columns(root, kzg_verified_columns.clone().into_iter()) - .expect("should put column"); - - expected_column_indices.remove(&col_index); - - if expected_column_indices.is_empty() { - assert!(matches!(availability, Availability::Available(_))); - } else { - assert!(matches!(availability, Availability::MissingComponents(_))); - assert_eq!(cache.critical.read().len(), 1); - } - } - - let (pending_payload, columns) = availability_pending_payload(&harness).await; - let expected_column_indices = harness - .chain - .data_availability_checker - .custody_context() - .custody_columns_for_epoch(None, &harness.chain.spec) - .iter() - .collect::>(); - - assert_eq!( - columns.len(), - expected_column_indices.len(), - "should have expected number of columns" - ); - let root = pending_payload.as_payload().beacon_block_root(); - - let mut kzg_verified_columns = vec![]; - for gossip_column in columns { - kzg_verified_columns.push(KzgVerifiedCustodyDataColumn::from_asserted_custody( - gossip_column.into_inner(), - )); - let availability = cache - .put_kzg_verified_data_columns(root, kzg_verified_columns.clone()) - .expect("should put column"); - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should be pending payload" - ); - assert_eq!( - cache.critical.read().len(), - 2, - "cache should have two payloads now" - ); - } - let availability = cache - .put_executed_payload(pending_payload) - .expect("should put payload"); - assert!( - matches!(availability, Availability::Available(_)), - "payload should be available: {:?}", - availability - ); - assert!( - cache.critical.read().len() == 2, - "cache should still have available payload" - ); - } -} - -#[cfg(test)] -mod pending_components_tests { - use super::*; - use crate::PayloadVerificationOutcome; - use crate::data_column_verification::KzgVerifiedDataColumn; - use crate::payload_verification_types::PayloadImportData; - use crate::test_utils::{NumBlobs, generate_rand_payload_and_columns, test_spec}; - use fork_choice::PayloadVerificationStatus; - use kzg::KzgCommitment; - use rand::SeedableRng; - use rand::rngs::StdRng; - use ssz_types::VariableList; - use state_processing::ConsensusContext; - use types::test_utils::TestRandom; - use types::{BeaconState, ForkName, MainnetEthSpec, SignedExecutionPayloadEnvelope, Slot}; - - type E = MainnetEthSpec; - - type Setup = ( - SignedExecutionPayloadEnvelope, - DataColumnSidecarList, - DataColumnSidecarList, - ); - - /// Returns true if gloas is enabled for testing. Tests should skip if this returns false. fn is_gloas_enabled() -> bool { let spec = test_spec::(); spec.fork_name_at_slot::(Slot::new(0)).gloas_enabled() } - fn pre_setup() -> Setup { - let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64); - let spec = test_spec::(); + #[tokio::test] + async fn test_cache_creation() { + if !is_gloas_enabled() { + return; + } - assert!( - spec.fork_name_at_slot::(Slot::new(0)).gloas_enabled(), - "pre_setup() only works after gloas" - ); + type T = DiskHarnessType; + let capacity = 4; + let (_harness, cache, _path) = setup_harness_and_cache::(capacity).await; + assert_eq!(cache.block_cache_size(), 0); + } - let (payload, columns) = generate_rand_payload_and_columns::( - ForkName::Gloas, - NumBlobs::Random, + #[tokio::test] + async fn test_put_columns_creates_pending_components() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + // Generate a block with data columns + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, // Use Fulu for now as Gloas generation may not be ready + NumBlobs::Number(1), &mut rng, &spec, ); - // Create invalid columns by mutating kzg_commitments - let invalid_columns: DataColumnSidecarList = columns - .iter() + let block_root = Hash256::random(); + + // Convert to KzgVerifiedCustodyDataColumn + let verified_columns: Vec<_> = data_columns + .into_iter() + .take(1) // Just take one column for the test .map(|col| { - let mut col_clone = col.as_ref().clone(); - // Mutate commitments to make them invalid - let mut commitments: Vec<_> = col_clone.kzg_commitments().iter().copied().collect(); - for commitment in commitments.iter_mut() { - *commitment = KzgCommitment::random_for_test(&mut rng); - } - let new_commitments = - VariableList::try_from(commitments).expect("commitments within bounds"); - match &mut col_clone { - DataColumnSidecar::Gloas(gloas) => { - gloas.kzg_commitments = new_commitments; - } - DataColumnSidecar::Fulu(fulu) => { - fulu.kzg_commitments = new_commitments; - } - } - Arc::new(col_clone) + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) }) .collect(); - (payload, columns, invalid_columns) + // Put columns into cache + let result = cache.put_kzg_verified_data_columns(block_root, verified_columns); + assert!(result.is_ok()); + + // Check that pending components were created + assert_eq!(cache.block_cache_size(), 1); + + // Verify columns are cached + let cached_indices = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns_indices()) + }); + assert!(cached_indices.is_some()); + assert_eq!(cached_indices.unwrap().len(), 1); } - type PendingComponentsSetup = ( - AvailabilityPendingExecutedPayload, - Vec>, - Vec>, - ); + #[tokio::test] + async fn test_column_deduplication() { + if !is_gloas_enabled() { + return; + } - fn setup_pending_components( - payload: SignedExecutionPayloadEnvelope, - valid_columns: DataColumnSidecarList, - invalid_columns: DataColumnSidecarList, - ) -> PendingComponentsSetup { - let columns: Vec> = valid_columns + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Get the first column + let first_column = data_columns.first().cloned().expect("should have column"); + let column_index = *first_column.index(); + + let verified_column = KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(first_column.clone()), + ); + + // Insert the same column twice + cache + .put_kzg_verified_data_columns(block_root, vec![verified_column.clone()]) + .expect("should put column"); + + cache + .put_kzg_verified_data_columns(block_root, vec![verified_column]) + .expect("should put column again"); + + // Check that we still only have one column (deduplicated) + let cached_indices = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns_indices()) + }); + assert!(cached_indices.is_some()); + let indices = cached_indices.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0], column_index); + } + + #[tokio::test] + async fn test_columns_without_block_not_available() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Add all columns + let verified_columns: Vec<_> = data_columns .into_iter() .map(|col| { KzgVerifiedCustodyDataColumn::from_asserted_custody( @@ -943,8 +760,40 @@ mod pending_components_tests { }) .collect(); - let invalid_columns: Vec> = invalid_columns + let result = cache + .put_kzg_verified_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Without a block, should still be missing components + assert!(matches!(result, Availability::MissingComponents(_))); + } + + #[tokio::test] + async fn test_reconstruction_started_flag() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Add some columns (not enough for reconstruction threshold) + let verified_columns: Vec<_> = data_columns .into_iter() + .take(10) // Not enough for reconstruction .map(|col| { KzgVerifiedCustodyDataColumn::from_asserted_custody( KzgVerifiedDataColumn::__new_for_testing(col), @@ -952,222 +801,138 @@ mod pending_components_tests { }) .collect(); - let executed_payload = AvailabilityPendingExecutedPayload::new( - Arc::new(payload.clone()), - PayloadImportData { - state: BeaconState::new(0, Default::default(), &test_spec::()), - consensus_context: ConsensusContext::new(payload.message.slot), - }, - PayloadVerificationOutcome { - payload_verification_status: PayloadVerificationStatus::Verified, - is_valid_merge_transition_block: false, - }, + cache + .put_kzg_verified_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Check reconstruction decision - should say "not enough columns" + let decision = cache.check_and_set_reconstruction_started(&block_root); + assert!(matches!(decision, ReconstructColumnsDecision::No(_))); + } + + #[tokio::test] + async fn test_handle_reconstruction_failure_clears_columns() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, ); - (executed_payload, columns, invalid_columns) + let block_root = Hash256::random(); + + // Add some columns + let verified_columns: Vec<_> = data_columns + .into_iter() + .take(5) + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + cache + .put_kzg_verified_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Verify columns are cached + let cached_count = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.verified_data_columns.len()) + }); + assert_eq!(cached_count, Some(5)); + + // Handle reconstruction failure + cache.handle_reconstruction_failure(&block_root); + + // Verify columns are cleared + let cached_count_after = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.verified_data_columns.len()) + }); + assert_eq!(cached_count_after, Some(0)); } - fn assert_cache_consistent(cache: &PendingComponents) { - let cached_payload = cache - .payload - .as_ref() - .expect("expected cached payload to be present"); - let payload_commitments = cached_payload.get_commitments(); - // Each column should have commitments matching the payload - for col in &cache.verified_data_columns { - let col_commitments = col.as_data_column().kzg_commitments(); - assert_eq!( - payload_commitments, - *col_commitments, - "column {} commitments should match payload commitments", - col.index() - ); - } - } - - #[allow(dead_code)] - fn assert_empty_column_cache(cache: &PendingComponents) { - assert!( - cache.verified_data_columns.is_empty(), - "expected empty column cache but found {} columns", - cache.verified_data_columns.len() - ); - } - - // v2 merge_data_columns deduplicates by index (first-in wins). When invalid columns - // are merged first, they persist even after valid columns are merged at the same indices. - - #[test] - fn payload_invalid_columns_valid_columns() { + #[tokio::test] + async fn test_maintenance_removes_old_entries() { if !is_gloas_enabled() { return; } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache.merge_payload(executed_payload); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - // Invalid columns were inserted first, valid columns are deduplicated away. - // The cache has columns but they have invalid commitments (not matching payload). - assert!(!cache.verified_data_columns.is_empty()); + type T = DiskHarnessType; + let capacity = 4; + let (_harness, cache, _path) = setup_harness_and_cache::(capacity).await; + + let block_root = Hash256::random(); + + // Create an empty entry in the cache + let _ = cache.peek_pending_components(&block_root, |_| {}); + + // Manually insert a pending component by putting empty columns + // This will create an entry but it won't have an epoch + // For this test, we need an entry with a known epoch + + // Run maintenance with a future cutoff epoch + let cutoff_epoch = Epoch::new(100); + cache.do_maintenance(cutoff_epoch).expect("maintenance should succeed"); + + // Cache should still be empty since we didn't add anything with an epoch + assert_eq!(cache.block_cache_size(), 0); } - #[test] - fn invalid_columns_payload_valid_columns() { + #[tokio::test] + async fn test_peek_data_columns() { if !is_gloas_enabled() { return; } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - cache.merge_payload(executed_payload); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - // Invalid columns were first, valid ones deduplicated away. - assert!(!cache.verified_data_columns.is_empty()); - } + type T = DiskHarnessType; + let capacity = 4; + let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - #[test] - fn invalid_columns_valid_columns_payload() { - if !is_gloas_enabled() { - return; - } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - cache.merge_payload(executed_payload); - - // Invalid columns were first, valid ones deduplicated away. - assert!(!cache.verified_data_columns.is_empty()); - } - - #[test] - fn payload_valid_columns_invalid_columns() { - if !is_gloas_enabled() { - return; - } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); - - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache.merge_payload(executed_payload); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - - // Valid columns were inserted first, so they persist. Cache should be consistent. - assert_cache_consistent(&cache); - } - - #[test] - fn valid_columns_payload_invalid_columns() { - if !is_gloas_enabled() { - return; - } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); - - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - cache.merge_payload(executed_payload); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - - // Valid columns were inserted first, so they persist. Cache should be consistent. - assert_cache_consistent(&cache); - } - - #[test] - fn valid_columns_invalid_columns_payload() { - if !is_gloas_enabled() { - return; - } - let (payload, columns, invalid_columns) = pre_setup(); - let (executed_payload, columns, invalid_columns) = - setup_pending_components(payload, columns, invalid_columns); - - let block_root = Hash256::ZERO; - let mut cache = >::empty(block_root); - cache - .merge_data_columns(columns) - .expect("merge should succeed"); - cache - .merge_data_columns(invalid_columns) - .expect("merge should succeed"); - cache.merge_payload(executed_payload); - - // Valid columns were inserted first, so they persist. Cache should be consistent. - assert_cache_consistent(&cache); - } - - #[test] - fn should_not_insert_pre_execution_payload_if_executed_payload_exists() { - if !is_gloas_enabled() { - return; - } - let (payload, _columns, _invalid_columns) = pre_setup(); - let (executed_payload, _columns, _invalid_columns) = - setup_pending_components(payload.clone(), _columns, _invalid_columns); - - let block_root = Hash256::ZERO; - let mut pending_component = >::empty(block_root); - - let pre_execution_payload = Arc::new(payload); - pending_component - .insert_pre_execution_payload(pre_execution_payload.clone(), BlockImportSource::Gossip); - assert!( - matches!( - pending_component.payload, - Some(CachedPayload::PreExecution(_, _)) - ), - "pre execution payload inserted" + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, ); - pending_component.insert_executed_payload(executed_payload); - assert!( - matches!(pending_component.payload, Some(CachedPayload::Executed(_))), - "executed payload inserted" - ); + let block_root = Hash256::random(); - pending_component - .insert_pre_execution_payload(pre_execution_payload, BlockImportSource::Gossip); - assert!( - matches!(pending_component.payload, Some(CachedPayload::Executed(_))), - "executed payload should remain" - ); + // No columns yet + assert!(cache.peek_data_columns(block_root).is_none()); + + // Add columns + let verified_columns: Vec<_> = data_columns + .into_iter() + .take(3) + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + cache + .put_kzg_verified_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Now columns should be returned + let peeked = cache.peek_data_columns(block_root); + assert!(peeked.is_some()); + assert_eq!(peeked.unwrap().len(), 3); } } diff --git a/beacon_node/beacon_chain/src/data_availability_router.rs b/beacon_node/beacon_chain/src/data_availability_router.rs index 847c25ce76..95716559b2 100644 --- a/beacon_node/beacon_chain/src/data_availability_router.rs +++ b/beacon_node/beacon_chain/src/data_availability_router.rs @@ -59,7 +59,8 @@ impl AvailabilityOutcome { match self { Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root, Self::Block(BlockAvailability::MissingComponents(root)) => *root, - Self::Payload(PayloadAvailability::Available(payload)) => payload.payload.block_root(), + // For payload availability, the first element of the tuple is the block root + Self::Payload(PayloadAvailability::Available(available_data)) => available_data.0, Self::Payload(PayloadAvailability::MissingComponents(root)) => *root, } } @@ -126,7 +127,7 @@ impl ReconstructionOutcome { /// Both `DataAvailabilityChecker` (v1) and `DataAvailabilityChecker` (v2) implement /// this trait. The associated types differ: /// - V1: Returns `Availability` containing `AvailableExecutedBlock` -/// - V2: Returns `Availability` containing `AvailableExecutedPayload` +/// - V2: Returns `Availability` containing `(Hash256, DataColumnSidecarList)` (block root + columns) pub trait DataColumnCache: Send + Sync { /// The availability type returned by write operations. /// V1 returns block availability, V2 returns payload availability. diff --git a/beacon_node/beacon_chain/src/payload_verification_types.rs b/beacon_node/beacon_chain/src/payload_verification_types.rs index 02868bb597..94c8b6cb5e 100644 --- a/beacon_node/beacon_chain/src/payload_verification_types.rs +++ b/beacon_node/beacon_chain/src/payload_verification_types.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use state_processing::ConsensusContext; use types::{BeaconState, BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope}; -use crate::{PayloadVerificationOutcome, data_availability_checker_v2::AvailablePayload}; +use crate::PayloadVerificationOutcome; #[derive(Debug, Clone, PartialEq)] pub struct PayloadImportData { @@ -13,6 +13,10 @@ pub struct PayloadImportData { /// A payload that has completed payload verification by an EL client but does not /// have all requisite column data to get imported into fork choice. +/// +/// Note: The number of expected blobs is not available from this type directly since +/// blob commitments are in the block's execution payload bid, not the payload envelope. +/// Use the associated block to get this information. #[derive(Clone)] pub struct AvailabilityPendingExecutedPayload { pub payload: Arc>, @@ -36,32 +40,6 @@ impl AvailabilityPendingExecutedPayload { pub fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { &self.payload } - - pub fn num_blobs_expected(&self) -> usize { - self.payload.message.blob_kzg_commitments.len() - } -} - -/// A payload that has completed all payload verification by an EL client -/// **and** has all requisite column data to be imported into fork choice. -pub struct AvailableExecutedPayload { - pub payload: AvailablePayload, - pub import_data: PayloadImportData, - pub payload_verification_outcome: PayloadVerificationOutcome, -} - -impl AvailableExecutedPayload { - pub fn new( - payload: AvailablePayload, - import_data: PayloadImportData, - payload_verification_outcome: PayloadVerificationOutcome, - ) -> Self { - Self { - payload, - import_data, - payload_verification_outcome, - } - } } pub enum PayloadProcessStatus { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 29ed742ac0..e818f72f1c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -3431,7 +3431,8 @@ macro_rules! add_blob_transactions_gloas { for tx in Vec::from(transactions) { payload.transactions.push(tx).unwrap(); } - $message.blob_kzg_commitments = bundle.commitments.clone(); + // Note: In Gloas, blob_kzg_commitments are in the bid (block body), not the payload envelope. + // The commitments are returned via the bundle for the caller to use. bundle }}; } @@ -3444,9 +3445,12 @@ pub fn generate_rand_payload_and_columns( ) -> (SignedExecutionPayloadEnvelope, DataColumnSidecarList) { let mut payload = SignedExecutionPayloadEnvelope::random_for_test(rng); - let _bundle = add_blob_transactions_gloas!(payload.message, num_blobs, rng, fork_name); + let bundle = add_blob_transactions_gloas!(payload.message, num_blobs, rng, fork_name); - let data_columns = generate_data_column_sidecars_from_payload(&payload, spec); + // In Gloas, blob_kzg_commitments are in the bid (block body), not the payload envelope. + // We pass them from the bundle to generate the data columns. + let kzg_commitments = bundle.commitments; + let data_columns = generate_data_column_sidecars_from_payload(&payload, kzg_commitments, spec); (payload, data_columns) } @@ -3607,11 +3611,14 @@ pub fn generate_data_column_sidecars_from_block( } /// Generate data column sidecars from pre-computed cells and proofs for gloas payloads. +/// +/// Note: In Gloas, `blob_kzg_commitments` are in the bid (block body), not the payload envelope. +/// The caller must provide the commitments separately. pub fn generate_data_column_sidecars_from_payload( payload: &SignedExecutionPayloadEnvelope, + kzg_commitments: KzgCommitments, spec: &ChainSpec, ) -> DataColumnSidecarList { - let kzg_commitments = payload.message.blob_kzg_commitments.clone(); if kzg_commitments.is_empty() { return vec![]; }