diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 37ba718111..97992a7106 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -57,13 +57,6 @@ impl BeaconChain { ); } - self.data_availability_checker - .pending_payload_cache() - .put_pre_executed_payload_envelope( - unverified_envelope.envelope_cloned(), - envelope_source, - )?; - let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_REQUESTS); @@ -95,11 +88,6 @@ impl BeaconChain { // If the envelope fails execution for whatever reason (e.g. engine offline), // and we keep it in the cache, then the node will NOT perform lookup and // reprocess this envelope until the envelope is evicted from DA checker, causing the - // chain to get stuck temporarily if the envelope is canonical. Therefore we remove - // it from the cache if execution fails. - self.data_availability_checker - .pending_payload_cache() - .remove_pre_executed_payload_envelope(&block_root); })?; // Record the time it took to wait for execution layer verification. diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index 44dfa0c895..4f6c0f81d2 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -19,22 +19,22 @@ //! DataColumnSidecarList //! | //! | -> Perform data column verification against `SignedExecutionPayloadBid` -//! │ │ +//! │ │ //! │ ▼ //! | -> KzgVerifiedCustodyDataColumn -//! //! -//! SignedExecutionPayloadEnvelope -//! │ -//! | -> CachedPayloadEnvelope::PreExecution -//! │ │ -//! │ ▼ +//! +//! SignedExecutionPayloadEnvelope +//! │ +//! | -> CachedPayloadEnvelope::PreExecution +//! │ │ +//! │ ▼ //! | -> AvailabilityPendingExecutedEnvelope -//! │ │ -//! │ ▼ -//! │ -> CachedPayloadEnvelope::Executed -//! │ │ -//! │ ▼ +//! │ │ +//! │ ▼ +//! │ -> CachedPayloadEnvelope::Executed +//! │ │ +//! │ ▼ //! | -> AvailableExecutedEnvelope (all columns present, payload executed against the EL, ready to import) use crate::data_availability_checker::{AvailabilityCheckError, MissingCellsError}; @@ -46,6 +46,7 @@ use kzg::Kzg; use lru::LruCache; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slot_clock::SlotClock; +use std::collections::HashMap; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -53,9 +54,8 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tracing::{Span, debug, error, instrument, trace}; use types::{ - BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, - EthSpec, Hash256, PartialDataColumnSidecarRef, SignedExecutionPayloadBid, - SignedExecutionPayloadEnvelope, Slot, + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, + PartialDataColumnSidecarRef, Slot, }; mod pending_column; @@ -90,16 +90,6 @@ pub enum Availability { Available(Box>), } -pub enum PayloadEnvelopeProcessingStatus { - /// Envelope is not in any pre-import cache. Envelope may be in the data-base or in the fork-choice. - Unknown, - /// Envelope is currently processing but not yet validated. - NotValidated(Arc>, BlockImportSource), - /// Envelope is fully valid, but not yet imported. It's cached in the da_checker while awaiting - /// missing envelope components. - ExecutionValidated(Arc>), -} - impl Debug for Availability { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -190,6 +180,8 @@ impl PendingPayloadCache { &'_ self, data_column: &'a DataColumnSidecar, ) -> Result>, MissingCellsError> { + // TODO(gloas): implement cell-level missing check + Ok(None) } /// Insert an executed payload envelope into the cache and performs an availability check @@ -200,7 +192,7 @@ impl PendingPayloadCache { let epoch = executed_envelope.envelope.epoch(); let beacon_block_root = executed_envelope.envelope.beacon_block_root(); let pending_components = - self.update_or_insert_pending_components(beacon_block_root, |pending_components| { + self.get_pending_components(beacon_block_root, |pending_components| { pending_components.insert_executed_payload_envelope(executed_envelope); Ok(()) })?; @@ -218,70 +210,14 @@ impl PendingPayloadCache { self.check_availability(beacon_block_root, pending_components, num_expected_columns) } - /// Insert a pre executed payload envelope in the cache - pub fn put_pre_executed_payload_envelope( - &self, - envelope: Arc>, - source: BlockImportSource, - ) -> Result<(), AvailabilityCheckError> { - let epoch = envelope.epoch(); - let beacon_block_root = envelope.beacon_block_root(); - let pending_components = - self.update_or_insert_pending_components(beacon_block_root, |pending_components| { - pending_components.insert_pre_executed_payload_envelope(envelope, source); - Ok(()) - })?; - - let num_expected_columns = self.get_num_expected_columns(epoch); - - pending_components.span.in_scope(|| { - debug!( - component = "pre executed payload envelope", - status = pending_components.status_str(num_expected_columns), - "Component added to data availability checker" - ); + /// Initialize pending components for a block. Called when the beacon block (containing the + /// bid) arrives. Sets up the slot and expected blob count so that subsequent column insertions + /// know how many cells to expect per column. + pub fn init_pending_block(&self, block_root: Hash256, slot: Slot, num_blobs_expected: usize) { + let mut write_lock = self.availability_cache.write(); + write_lock.get_or_insert_mut(block_root, || { + PendingComponents::empty(block_root, slot, num_blobs_expected, self.spec.clone()) }); - - Ok(()) - } - - /// Removes a pre-executed envelope from the cache. - /// This does NOT remove an existing executed envelope. - pub fn remove_pre_executed_payload_envelope(&self, block_root: &Hash256) { - if let Some(PayloadEnvelopeProcessingStatus::NotValidated(_, _)) = - self.get_envelope_processing_status(block_root) - { - // If the envelope is execution invalid, this status is permanent and idempotent to this - // block_root. We drop its components (e.g. columns) because they will never be useful. - self.availability_cache.write().pop(block_root); - } - } - - /// Insert an execution payload bid into the cache. - pub fn put_bid( - &self, - block_root: Hash256, - bid: Arc>, - ) -> Result, AvailabilityCheckError> { - let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); - - let pending_components = - self.update_or_insert_pending_components(block_root, |pending_components| { - pending_components.insert_bid(bid); - Ok(()) - })?; - - let num_expected_columns = self.get_num_expected_columns(epoch); - - pending_components.span.in_scope(|| { - debug!( - component = "bid", - status = pending_components.status_str(num_expected_columns), - "Component added to data availability checker" - ); - }); - - self.check_availability(block_root, pending_components, num_expected_columns) } /// Perform KZG verification on RPC custody columns and insert them into the cache. @@ -347,10 +283,9 @@ impl PendingPayloadCache { return Ok(Availability::MissingComponents(block_root)); }; - let pending_components = self - .update_or_insert_pending_components(block_root, |pending_components| { - pending_components.merge_data_columns(kzg_verified_data_columns) - })?; + let pending_components = self.get_pending_components(block_root, |pending_components| { + pending_components.merge_data_columns(kzg_verified_data_columns) + })?; let num_expected_columns = self.get_num_expected_columns(epoch); @@ -488,12 +423,14 @@ impl PendingPayloadCache { } } - /// Updates or inserts a new `PendingComponents` if it doesn't exist, and then apply the - /// `update_fn` while holding the write lock. + /// Gets existing `PendingComponents` and applies the `update_fn` while holding the write lock. /// /// Once the update is complete, the write lock is downgraded and a read guard with a /// reference of the updated `PendingComponents` is returned. - fn update_or_insert_pending_components( + /// + /// Returns an error if no pending components exist for the given block root (the block must + /// be initialized via `init_pending_block` first). + fn get_pending_components( &self, block_root: Hash256, update_fn: F, @@ -504,9 +441,11 @@ impl PendingPayloadCache { let mut write_lock = self.availability_cache.write(); { - let pending_components = write_lock.get_or_insert_mut(block_root, || { - PendingComponents::empty(block_root, self.spec.clone()) - }); + let pending_components = write_lock.get_mut(&block_root).ok_or_else(|| { + AvailabilityCheckError::Unexpected( + "pending components not initialized for block".to_string(), + ) + })?; update_fn(pending_components)? } @@ -527,6 +466,7 @@ impl PendingPayloadCache { } /// Check whether data column reconstruction should be attempted. + /// TODO(gloas): rethink reconstruction for the cell model fn check_and_set_reconstruction_started( &self, block_root: &Hash256, @@ -563,7 +503,7 @@ impl PendingPayloadCache { /// status so that we can attempt to retrieve columns from peers again. fn handle_reconstruction_failure(&self, block_root: &Hash256) { if let Some(pending_components_mut) = self.availability_cache.write().get_mut(block_root) { - pending_components_mut.verified_data_columns = vec![]; + pending_components_mut.verified_data_columns = HashMap::new(); pending_components_mut.reconstruction_started = false; } } @@ -578,9 +518,7 @@ impl PendingPayloadCache { let mut write_lock = self.availability_cache.write(); let mut keys_to_remove = vec![]; for (key, value) in write_lock.iter() { - if let Some(epoch) = value.epoch() - && epoch < cutoff_epoch - { + if value.epoch() < cutoff_epoch { keys_to_remove.push(*key); } } @@ -703,7 +641,7 @@ mod data_availability_checker_tests { use tempfile::{TempDir, tempdir}; use types::{ ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, FullPayload, - MinimalEthSpec, SignedBeaconBlock, Slot, + MinimalEthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; type E = MinimalEthSpec; @@ -807,150 +745,15 @@ mod data_availability_checker_tests { // once the Gloas harness can produce KZG-valid columns. These wrappers add KZG verification // and custody column filtering on top of `put_kzg_verified_custody_data_columns`. - #[tokio::test] - async fn test_put_columns_creates_pending_components() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (_block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - let verified_columns: Vec<_> = data_columns - .into_iter() - .take(1) // Just take one column for the test - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - - // Put columns into cache - let result = cache.put_kzg_verified_custody_data_columns(block_root, verified_columns); - assert!(result.is_ok()); - - // Check that pending components were created - assert_eq!(cache.block_cache_size(), 1); - - // Verify columns are cached - let cached_indices = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.get_cached_data_columns_indices()) - }); - assert!(cached_indices.is_some()); - assert_eq!(cached_indices.unwrap().len(), 1); - } - - #[tokio::test] - async fn test_column_deduplication() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (_block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - // Get the first column - let first_column = data_columns.first().cloned().expect("should have column"); - let column_index = *first_column.index(); - - let verified_column = KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(first_column.clone()), - ); - - // Insert the same column twice - cache - .put_kzg_verified_custody_data_columns(block_root, vec![verified_column.clone()]) - .expect("should put column"); - - cache - .put_kzg_verified_custody_data_columns(block_root, vec![verified_column]) - .expect("should put column again"); - - // Check that we still only have one column (deduplicated) - let cached_indices = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.get_cached_data_columns_indices()) - }); - assert!(cached_indices.is_some()); - let indices = cached_indices.unwrap(); - assert_eq!(indices.len(), 1); - assert_eq!(indices[0], column_index); - } - - #[tokio::test] - async fn test_columns_without_block_not_available() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (_block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - // Add all columns - let verified_columns: Vec<_> = data_columns - .into_iter() - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - - let result = cache - .put_kzg_verified_custody_data_columns(block_root, verified_columns) - .expect("should put columns"); - - // Without a bid, should still be missing components - assert!(matches!(result, Availability::MissingComponents(_))); - } - - /// Helper to create a test bid with the given block root and kzg commitments from a block. - fn make_test_bid( - block: &SignedBeaconBlock>, - ) -> Arc> { - let bid = block + fn num_blobs_in_block(block: &SignedBeaconBlock>) -> usize { + block .message() .body() .signed_execution_payload_bid() .expect("gloas block should have bid") - .clone(); - Arc::new(bid) + .message + .blob_kzg_commitments + .len() } fn make_test_signed_envelope(block_root: Hash256) -> Arc> { @@ -978,6 +781,135 @@ mod data_availability_checker_tests { } } + #[tokio::test] + async fn test_put_columns_creates_pending_components() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); + + let verified_columns: Vec<_> = data_columns + .into_iter() + .take(1) + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + let result = cache.put_kzg_verified_custody_data_columns(block_root, verified_columns); + assert!(result.is_ok()); + + assert_eq!(cache.block_cache_size(), 1); + + let cached_indices = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns_indices()) + }); + assert!(cached_indices.is_some()); + assert_eq!(cached_indices.unwrap().len(), 1); + } + + #[tokio::test] + async fn test_column_deduplication() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); + + let first_column = data_columns.first().cloned().expect("should have column"); + let column_index = *first_column.index(); + + let verified_column = KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(first_column.clone()), + ); + + cache + .put_kzg_verified_custody_data_columns(block_root, vec![verified_column.clone()]) + .expect("should put column"); + + cache + .put_kzg_verified_custody_data_columns(block_root, vec![verified_column]) + .expect("should put column again"); + + let cached_indices = cache.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns_indices()) + }); + assert!(cached_indices.is_some()); + let indices = cached_indices.unwrap(); + assert_eq!(indices.len(), 1); + assert_eq!(indices[0], column_index); + } + + #[tokio::test] + async fn test_columns_without_envelope_not_available() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); + + let verified_columns: Vec<_> = data_columns + .into_iter() + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + let result = cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Without an executed envelope, should still be missing components + assert!(matches!(result, Availability::MissingComponents(_))); + } + #[tokio::test] async fn test_full_availability_flow() { if !is_gloas_enabled() { @@ -998,13 +930,7 @@ mod data_availability_checker_tests { ); let block_root = Hash256::random(); - let bid = make_test_bid(&block); - - cache.put_bid(block_root, bid).expect("should put bid"); - assert!(matches!( - cache.put_bid(block_root, make_test_bid(&block)), - Ok(Availability::MissingComponents(_)) - )); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); let verified_columns: Vec<_> = data_columns .into_iter() @@ -1021,21 +947,6 @@ mod data_availability_checker_tests { assert!(matches!(result, Availability::MissingComponents(_))); - // Insert pre-executed envelope first - cache - .put_pre_executed_payload_envelope( - make_test_signed_envelope(block_root), - BlockImportSource::Gossip, - ) - .expect("should put pre-executed envelope"); - - let status = cache.get_envelope_processing_status(&block_root); - assert!(matches!( - status, - Some(PayloadEnvelopeProcessingStatus::NotValidated(..)) - )); - - // Upgrade to executed envelope (after EL validation) let executed_envelope = make_test_executed_envelope(block_root); let result = cache .put_executed_payload_envelope(executed_envelope) @@ -1049,152 +960,7 @@ mod data_availability_checker_tests { } #[tokio::test] - async fn test_zero_blob_bid_immediately_available() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - // Generate a block with 0 blobs — bid will have empty commitments - let (block, _data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(0), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - let bid = make_test_bid(&block); - - // Insert bid (no blobs expected) - cache.put_bid(block_root, bid).expect("should put bid"); - - // Insert executed envelope — should become available immediately (no columns needed) - let executed_envelope = make_test_executed_envelope(block_root); - let result = cache - .put_executed_payload_envelope(executed_envelope) - .expect("should put executed envelope"); - - assert!( - matches!(result, Availability::Available(_)), - "zero-blob bid should be immediately available, got {:?}", - result - ); - } - - #[tokio::test] - async fn test_columns_arrive_before_bid() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - // Columns arrive before bid - let verified_columns: Vec<_> = data_columns - .into_iter() - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - - let result = cache - .put_kzg_verified_custody_data_columns(block_root, verified_columns) - .expect("should put columns"); - assert!(matches!(result, Availability::MissingComponents(_))); - - let bid = make_test_bid(&block); - let result = cache.put_bid(block_root, bid).expect("should put bid"); - assert!(matches!(result, Availability::MissingComponents(_))); - - let executed_envelope = make_test_executed_envelope(block_root); - let result = cache - .put_executed_payload_envelope(executed_envelope) - .expect("should put executed envelope"); - - assert!( - matches!(result, Availability::Available(_)), - "expected Available after all components inserted, got {:?}", - result - ); - } - - #[tokio::test] - async fn test_pre_executed_envelope_not_available() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - // Insert bid + all columns - cache - .put_bid(block_root, make_test_bid(&block)) - .expect("should put bid"); - - let verified_columns: Vec<_> = data_columns - .into_iter() - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - cache - .put_kzg_verified_custody_data_columns(block_root, verified_columns) - .expect("should put columns"); - - // Insert pre-executed envelope (not yet validated by EL) - cache - .put_pre_executed_payload_envelope( - make_test_signed_envelope(block_root), - BlockImportSource::Gossip, - ) - .expect("should put pre-executed envelope"); - - // Should NOT be available — envelope not executed yet - let status = cache.get_envelope_processing_status(&block_root); - assert!(matches!( - status, - Some(PayloadEnvelopeProcessingStatus::NotValidated(..)) - )); - } - - #[tokio::test] - async fn test_remove_pre_executed_envelope() { + async fn test_zero_blob_immediately_available() { if !is_gloas_enabled() { return; } @@ -1203,93 +969,18 @@ mod data_availability_checker_tests { let (_harness, cache, _path) = setup_harness_and_cache::().await; let block_root = Hash256::random(); + cache.init_pending_block(block_root, Slot::new(0), 0); - // Insert pre-executed envelope - cache - .put_pre_executed_payload_envelope( - make_test_signed_envelope(block_root), - BlockImportSource::Gossip, - ) - .expect("should put pre-executed envelope"); - - // Verify it's there - assert!(cache.get_envelope_processing_status(&block_root).is_some()); - - // Remove it - cache.remove_pre_executed_payload_envelope(&block_root); - - // Should be gone - let status = cache.get_envelope_processing_status(&block_root); - assert!(status.is_none()); - } - - #[tokio::test] - async fn test_remove_pre_executed_does_not_remove_executed() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (_harness, cache, _path) = setup_harness_and_cache::().await; - - let block_root = Hash256::random(); - - // Insert executed envelope let executed_envelope = make_test_executed_envelope(block_root); - cache + let result = cache .put_executed_payload_envelope(executed_envelope) .expect("should put executed envelope"); - // Try to remove as pre-executed — should be a no-op - cache.remove_pre_executed_payload_envelope(&block_root); - - // Should still be there as executed - let status = cache.get_envelope_processing_status(&block_root); - assert!(matches!( - status, - Some(PayloadEnvelopeProcessingStatus::ExecutionValidated(..)) - )); - } - - #[tokio::test] - async fn test_reconstruction_started_flag() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (_block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, + assert!( + matches!(result, Availability::Available(_)), + "zero-blob block should be immediately available, got {:?}", + result ); - - let block_root = Hash256::random(); - - // Add some columns (not enough for reconstruction threshold) - let verified_columns: Vec<_> = data_columns - .into_iter() - .take(10) // Not enough for reconstruction - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - - cache - .put_kzg_verified_custody_data_columns(block_root, verified_columns) - .expect("should put columns"); - - // Check reconstruction decision - should say "not enough columns" - let decision = cache.check_and_set_reconstruction_started(&block_root); - assert!(matches!(decision, ReconstructColumnsDecision::No(_))); } #[tokio::test] @@ -1304,7 +995,7 @@ mod data_availability_checker_tests { let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); - let (_block, data_columns) = generate_rand_block_and_data_columns::( + let (block, data_columns) = generate_rand_block_and_data_columns::( ForkName::Gloas, NumBlobs::Number(1), &mut rng, @@ -1312,8 +1003,8 @@ mod data_availability_checker_tests { ); let block_root = Hash256::random(); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); - // Add some columns let verified_columns: Vec<_> = data_columns .into_iter() .take(5) @@ -1328,16 +1019,13 @@ mod data_availability_checker_tests { .put_kzg_verified_custody_data_columns(block_root, verified_columns) .expect("should put columns"); - // Verify columns are cached let cached_count = cache.peek_pending_components(&block_root, |components| { components.map(|c| c.verified_data_columns.len()) }); assert_eq!(cached_count, Some(5)); - // Handle reconstruction failure cache.handle_reconstruction_failure(&block_root); - // Verify columns are cleared let cached_count_after = cache.peek_pending_components(&block_root, |components| { components.map(|c| c.verified_data_columns.len()) }); @@ -1353,13 +1041,11 @@ mod data_availability_checker_tests { type T = DiskHarnessType; let (_harness, cache, _path) = setup_harness_and_cache::().await; - // Run maintenance with a future cutoff epoch let cutoff_epoch = Epoch::new(100); cache .do_maintenance(cutoff_epoch) .expect("maintenance should succeed"); - // Cache should still be empty since we didn't add anything with an epoch assert_eq!(cache.block_cache_size(), 0); } @@ -1375,7 +1061,7 @@ mod data_availability_checker_tests { let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); - let (_block, data_columns) = generate_rand_block_and_data_columns::( + let (block, data_columns) = generate_rand_block_and_data_columns::( ForkName::Gloas, NumBlobs::Number(1), &mut rng, @@ -1384,10 +1070,10 @@ mod data_availability_checker_tests { let block_root = Hash256::random(); - // No columns yet assert!(cache.get_data_columns(block_root).is_none()); - // Add columns + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); + let verified_columns: Vec<_> = data_columns .into_iter() .take(3) @@ -1402,7 +1088,6 @@ mod data_availability_checker_tests { .put_kzg_verified_custody_data_columns(block_root, verified_columns) .expect("should put columns"); - // Now columns should be returned let peeked = cache.get_data_columns(block_root); assert!(peeked.is_some()); assert_eq!(peeked.unwrap().len(), 3); @@ -1420,18 +1105,20 @@ mod data_availability_checker_tests { let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); - let (_block, data_columns) = generate_rand_block_and_data_columns::( + let (block, data_columns) = generate_rand_block_and_data_columns::( ForkName::Gloas, NumBlobs::Number(1), &mut rng, &spec, ); - // LRU capacity is 32 (OVERFLOW_LRU_CAPACITY_NON_ZERO). Insert 33 entries. + let num_blobs = num_blobs_in_block(&block); + let mut roots = Vec::new(); for _ in 0..33 { let block_root = Hash256::random(); roots.push(block_root); + cache.init_pending_block(block_root, Slot::new(0), num_blobs); let col = data_columns.first().cloned().expect("should have column"); let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( KzgVerifiedDataColumn::__new_for_testing(col), @@ -1466,11 +1153,7 @@ mod data_availability_checker_tests { ); let block_root = Hash256::random(); - - // Insert bid (gives the entry an epoch via the bid's slot) - cache - .put_bid(block_root, make_test_bid(&block)) - .expect("should put bid"); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); let col = data_columns.first().cloned().expect("should have column"); let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( @@ -1482,7 +1165,7 @@ mod data_availability_checker_tests { assert_eq!(cache.block_cache_size(), 1); - // Maintenance with cutoff in the future should prune (bid slot=0 → epoch=0 < cutoff=100) + // slot=0 → epoch=0 < cutoff=100, should prune cache .do_maintenance(Epoch::new(100)) .expect("maintenance should succeed"); @@ -1490,58 +1173,6 @@ mod data_availability_checker_tests { assert_eq!(cache.block_cache_size(), 0); } - #[tokio::test] - async fn test_double_reconstruction_prevented() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (_block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - - // Insert all columns so reconstruction threshold is met - let verified_columns: Vec<_> = data_columns - .into_iter() - .map(|col| { - KzgVerifiedCustodyDataColumn::from_asserted_custody( - KzgVerifiedDataColumn::__new_for_testing(col), - ) - }) - .collect(); - - cache - .put_kzg_verified_custody_data_columns(block_root, verified_columns) - .expect("should put columns"); - - // Manually set reconstruction_started via check_and_set - // For fullnode, sampling == all columns, so this returns No("all sampling columns received") - // But we can set the flag manually to test the guard - cache - .availability_cache - .write() - .get_mut(&block_root) - .expect("should exist") - .reconstruction_started = true; - - let decision = cache.check_and_set_reconstruction_started(&block_root); - assert!( - matches!(decision, ReconstructColumnsDecision::No(reason) if reason == "already started"), - "second reconstruction attempt should be prevented" - ); - } - #[tokio::test] async fn test_partial_columns_missing_components() { if !is_gloas_enabled() { @@ -1562,11 +1193,7 @@ mod data_availability_checker_tests { ); let block_root = Hash256::random(); - - // Insert bid and executed envelope - cache - .put_bid(block_root, make_test_bid(&block)) - .expect("should put bid"); + cache.init_pending_block(block_root, Slot::new(0), num_blobs_in_block(&block)); let executed_envelope = make_test_executed_envelope(block_root); cache diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs index 66cb8b6334..4289f17634 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs @@ -3,6 +3,7 @@ use ssz_types::VariableList; use std::sync::Arc; use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; +#[derive(Clone)] pub struct PendingColumn { cells: Vec, KzgProof)>>, } diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs index 3b09c10fe3..067a22701f 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -9,10 +9,7 @@ use std::collections::HashMap; use std::sync::Arc; use tracing::{Span, debug, debug_span}; use types::Slot; -use types::{ - ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, - SignedExecutionPayloadEnvelope, -}; +use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Hash256}; /// This represents the components of a payload pending data availability. ///