From 285b7ebae910cc72af8427dc68d7ac0b58f68ffe Mon Sep 17 00:00:00 2001 From: Eitan Seri- Levi Date: Thu, 19 Mar 2026 08:28:45 -0700 Subject: [PATCH] Refactor --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +- .../beacon_chain/src/block_verification.rs | 1 - beacon_node/beacon_chain/src/builder.rs | 1 - .../src/data_availability_checker_v2/mod.rs | 625 ++++++++++++++++-- .../pending_components.rs | 9 +- .../src/data_availability_router.rs | 43 +- beacon_node/beacon_chain/src/metrics.rs | 11 +- .../payload_envelope_verification/import.rs | 2 +- beacon_node/client/src/builder.rs | 7 +- common/eth2/src/types.rs | 5 - 10 files changed, 628 insertions(+), 79 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1250252434..8f2c7caaa7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -22,7 +22,6 @@ use crate::data_availability_checker::{ Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, DataColumnReconstructionResult, }; -use crate::data_availability_checker_v2::Availability as PayloadAvailability; use crate::data_availability_router::{ AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome, }; @@ -3799,7 +3798,7 @@ impl BeaconChain { } } AvailabilityOutcome::Payload(_) => { - return Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string())) + Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string())) }, } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index de817e35fb..244b06f475 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -60,7 +60,6 @@ use crate::execution_payload::{ }; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::observed_block_producers::SeenBlock; -use crate::payload_envelope_verification::EnvelopeError; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 5a97bea063..a60cc614e8 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -994,7 +994,6 @@ where let da_checker_v2 = Arc::new( DataAvailabilityCheckerV2::new( - slot_clock.clone(), self.kzg.clone(), custody_context.clone(), self.spec.clone(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs index fe03f2d0f4..87d0dfdd5e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/mod.rs @@ -1,3 +1,42 @@ +//! This module builds out the data availability cache for Gloas. When a beacon block is received +//! over gossip/p2p we insert its payload into this cache, keyed by block root. As soon as the bid +//! is received we can begin using it to verify data columns. +//! +//! When a payload envelope is received over gossip/p2p we first insert it as a pre-executed envelope. A separate +//! thread eventually executes the payload envelope against the EL. Assuming the payload is executed succesfully +//! the envelope is updated in the cache from `PreExecuted` -> `Executed`. Once all required custody columns +//! have been kzg verified and the envelope has been executed we can import the envelope into fork choice and store it to disk. +//! +//! Note that the block must have arrived before the envelope for the envelope to pass upstream verification checks and reach this cache. +//! However data columns can potentially arrive before the block. +//! +//! +//! SignedBeaconBlock +//! | +//! | -> SignedExecutionPayloadBid +//! +//! +//! DataColumnSidecarList +//! | +//! | -> Perform data column verification against `SignedExecutionPayloadBid` +//! │ │ +//! │ ▼ +//! | -> KzgVerifiedCustodyDataColumn +//! +//! +//! SignedExecutionPayloadEnvelope +//! │ +//! | -> CachedPayloadEnvelope::PreExecution +//! │ │ +//! │ ▼ +//! | -> AvailabilityPendingExecutedEnvelope +//! │ │ +//! │ ▼ +//! │ -> CachedPayloadEnvelope::Executed +//! │ │ +//! │ ▼ +//! | -> AvailableExecutedEnvelope (all columns present, payload executed against the EL, ready to import) + use crate::data_availability_checker::AvailabilityCheckError; use crate::payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope, @@ -66,7 +105,9 @@ impl Debug for Availability { write!(f, "MissingComponents({})", block_root) } // TODO(gloas) fix success case - Self::Available(data) => todo!(), + Self::Available(envelope) => { + write!(f, "Available({:?})", envelope.import_data.block_root) + } } } } @@ -99,7 +140,6 @@ pub struct DataAvailabilityChecker { impl DataAvailabilityChecker { pub fn new( - _slot_clock: T::SlotClock, kzg: Arc, custody_context: Arc>, spec: Arc, @@ -174,7 +214,6 @@ impl DataAvailabilityChecker { }) } - /// Insert an executed payload envelope into the cache and performs an availability check pub fn put_executed_payload_envelope( &self, @@ -416,16 +455,19 @@ impl DataAvailabilityChecker { "Reconstructed columns" ); - self.put_kzg_verified_custody_data_columns(*block_root, data_columns_to_import_and_publish.clone()) - .map(|availability| { - DataColumnReconstructionResult::Success(( - availability, - data_columns_to_import_and_publish - .into_iter() - .map(|d| d.clone_arc()) - .collect::>(), - )) - }) + self.put_kzg_verified_custody_data_columns( + *block_root, + data_columns_to_import_and_publish.clone(), + ) + .map(|availability| { + DataColumnReconstructionResult::Success(( + availability, + data_columns_to_import_and_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + )) + }) } // ── Metrics ── @@ -664,7 +706,9 @@ async fn availability_cache_maintenance_service( mod data_availability_checker_tests { use super::*; + use crate::block_verification::PayloadVerificationOutcome; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; + use crate::payload_envelope_verification::EnvelopeImportData; use crate::test_utils::{ NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, test_spec, @@ -673,12 +717,16 @@ mod data_availability_checker_tests { custody_context::NodeCustodyType, test_utils::{BeaconChainHarness, DiskHarnessType}, }; + use fork_choice::PayloadVerificationStatus; use logging::create_test_tracing_subscriber; use rand::SeedableRng; use rand::rngs::StdRng; use store::{HotColdDB, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; - use types::{ForkName, MinimalEthSpec, Slot}; + use types::{ + BeaconState, ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, + FullPayload, MinimalEthSpec, SignedBeaconBlock, Slot, + }; type E = MinimalEthSpec; @@ -732,9 +780,7 @@ mod data_availability_checker_tests { .build() } - async fn setup_harness_and_cache( - capacity: usize, - ) -> ( + async fn setup_harness_and_cache() -> ( BeaconChainHarness>, Arc>, TempDir, @@ -756,17 +802,15 @@ mod data_availability_checker_tests { &spec, )); - todo!() - // let cache = Arc::new( - // DataAvailabilityChecker::::new( - // harness.chain.slot_clock.clone().into(), - // harness.chain.kzg.clone().unwrap(), - // custody_context, - // spec.clone(), - // ) - // .expect("should create cache"), - // ); - // (harness, cache, chain_db_path) + let cache = Arc::new( + DataAvailabilityChecker::::new( + harness.chain.kzg.clone(), + custody_context, + spec.clone(), + ) + .expect("should create cache"), + ); + (harness, cache, chain_db_path) } fn is_gloas_enabled() -> bool { @@ -781,8 +825,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (_harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (_harness, cache, _path) = setup_harness_and_cache::().await; assert_eq!(cache.block_cache_size(), 0); } @@ -793,8 +836,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -840,8 +882,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -889,8 +930,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -922,6 +962,319 @@ mod data_availability_checker_tests { assert!(matches!(result, Availability::MissingComponents(_))); } + /// Helper to create a test bid with the given block root and kzg commitments from a block. + fn make_test_bid( + block: &SignedBeaconBlock>, + ) -> Arc> { + let bid = block + .message() + .body() + .signed_execution_payload_bid() + .expect("gloas block should have bid") + .clone(); + Arc::new(bid) + } + + fn make_test_signed_envelope(block_root: Hash256) -> Arc> { + Arc::new(SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: block_root, + slot: Slot::new(0), + state_root: Hash256::ZERO, + }, + signature: bls::Signature::infinity().expect("should create infinity sig"), + }) + } + + fn make_test_executed_envelope(block_root: Hash256) -> AvailabilityPendingExecutedEnvelope { + AvailabilityPendingExecutedEnvelope { + envelope: make_test_signed_envelope(block_root), + import_data: EnvelopeImportData { + block_root, + post_state: Box::new(BeaconState::new(0, Default::default(), &gloas_spec::())), + }, + payload_verification_outcome: PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + }, + } + } + + #[tokio::test] + async fn test_full_availability_flow() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + let bid = make_test_bid(&block); + + cache.put_bid(block_root, bid).expect("should put bid"); + assert!(matches!( + cache.put_bid(block_root, make_test_bid(&block)), + Ok(Availability::MissingComponents(_)) + )); + + let verified_columns: Vec<_> = data_columns + .into_iter() + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + let result = cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + + assert!(matches!(result, Availability::MissingComponents(_))); + + // Insert pre-executed envelope first + cache + .put_pre_executed_payload_envelope( + make_test_signed_envelope(block_root), + BlockImportSource::Gossip, + ) + .expect("should put pre-executed envelope"); + + let status = cache.get_envelope_processing_status(&block_root); + assert!(matches!( + status, + Some(PayloadEnvelopeProcessingStatus::NotValidated(..)) + )); + + // Upgrade to executed envelope (after EL validation) + let executed_envelope = make_test_executed_envelope(block_root); + let result = cache + .put_executed_payload_envelope(executed_envelope) + .expect("should put executed envelope"); + + assert!( + matches!(result, Availability::Available(_)), + "expected Available, got {:?}", + result + ); + } + + #[tokio::test] + async fn test_zero_blob_bid_immediately_available() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + // Generate a block with 0 blobs — bid will have empty commitments + let (block, _data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(0), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + let bid = make_test_bid(&block); + + // Insert bid (no blobs expected) + cache.put_bid(block_root, bid).expect("should put bid"); + + // Insert executed envelope — should become available immediately (no columns needed) + let executed_envelope = make_test_executed_envelope(block_root); + let result = cache + .put_executed_payload_envelope(executed_envelope) + .expect("should put executed envelope"); + + assert!( + matches!(result, Availability::Available(_)), + "zero-blob bid should be immediately available, got {:?}", + result + ); + } + + #[tokio::test] + async fn test_columns_arrive_before_bid() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Columns arrive before bid + let verified_columns: Vec<_> = data_columns + .into_iter() + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + let result = cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + assert!(matches!(result, Availability::MissingComponents(_))); + + let bid = make_test_bid(&block); + let result = cache.put_bid(block_root, bid).expect("should put bid"); + assert!(matches!(result, Availability::MissingComponents(_))); + + let executed_envelope = make_test_executed_envelope(block_root); + let result = cache + .put_executed_payload_envelope(executed_envelope) + .expect("should put executed envelope"); + + assert!( + matches!(result, Availability::Available(_)), + "expected Available after all components inserted, got {:?}", + result + ); + } + + #[tokio::test] + async fn test_pre_executed_envelope_not_available() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Insert bid + all columns + cache + .put_bid(block_root, make_test_bid(&block)) + .expect("should put bid"); + + let verified_columns: Vec<_> = data_columns + .into_iter() + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Insert pre-executed envelope (not yet validated by EL) + cache + .put_pre_executed_payload_envelope( + make_test_signed_envelope(block_root), + BlockImportSource::Gossip, + ) + .expect("should put pre-executed envelope"); + + // Should NOT be available — envelope not executed yet + let status = cache.get_envelope_processing_status(&block_root); + assert!(matches!( + status, + Some(PayloadEnvelopeProcessingStatus::NotValidated(..)) + )); + } + + #[tokio::test] + async fn test_remove_pre_executed_envelope() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (_harness, cache, _path) = setup_harness_and_cache::().await; + + let block_root = Hash256::random(); + + // Insert pre-executed envelope + cache + .put_pre_executed_payload_envelope( + make_test_signed_envelope(block_root), + BlockImportSource::Gossip, + ) + .expect("should put pre-executed envelope"); + + // Verify it's there + assert!(cache.get_envelope_processing_status(&block_root).is_some()); + + // Remove it + cache.remove_pre_executed_payload_envelope(&block_root); + + // Should be gone + let status = cache.get_envelope_processing_status(&block_root); + assert!(status.is_none()); + } + + #[tokio::test] + async fn test_remove_pre_executed_does_not_remove_executed() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (_harness, cache, _path) = setup_harness_and_cache::().await; + + let block_root = Hash256::random(); + + // Insert executed envelope + let executed_envelope = make_test_executed_envelope(block_root); + cache + .put_executed_payload_envelope(executed_envelope) + .expect("should put executed envelope"); + + // Try to remove as pre-executed — should be a no-op + cache.remove_pre_executed_payload_envelope(&block_root); + + // Should still be there as executed + let status = cache.get_envelope_processing_status(&block_root); + assert!(matches!( + status, + Some(PayloadEnvelopeProcessingStatus::ExecutionValidated(..)) + )); + } + #[tokio::test] async fn test_reconstruction_started_flag() { if !is_gloas_enabled() { @@ -929,8 +1282,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -971,8 +1323,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -1024,10 +1375,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (_harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let block_root = Hash256::random(); + let (_harness, cache, _path) = setup_harness_and_cache::().await; // Run maintenance with a future cutoff epoch let cutoff_epoch = Epoch::new(100); @@ -1046,8 +1394,7 @@ mod data_availability_checker_tests { } type T = DiskHarnessType; - let capacity = 4; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; + let (harness, cache, _path) = setup_harness_and_cache::().await; let mut rng = StdRng::seed_from_u64(0xDEADBEEF); let spec = harness.spec.clone(); @@ -1084,4 +1431,190 @@ mod data_availability_checker_tests { assert!(peeked.is_some()); assert_eq!(peeked.unwrap().len(), 3); } + + #[tokio::test] + async fn test_lru_eviction() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + // LRU capacity is 32 (OVERFLOW_LRU_CAPACITY_NON_ZERO). Insert 33 entries. + let mut roots = Vec::new(); + for _ in 0..33 { + let block_root = Hash256::random(); + roots.push(block_root); + let col = data_columns.first().cloned().expect("should have column"); + let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + )]; + cache + .put_kzg_verified_custody_data_columns(block_root, verified) + .expect("should put columns"); + } + + assert_eq!(cache.block_cache_size(), 32); + assert!(cache.get_data_columns(roots[0]).is_none()); + assert!(cache.get_data_columns(*roots.last().unwrap()).is_some()); + } + + #[tokio::test] + async fn test_maintenance_prunes_old_entries() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Insert bid (gives the entry an epoch via the bid's slot) + cache + .put_bid(block_root, make_test_bid(&block)) + .expect("should put bid"); + + let col = data_columns.first().cloned().expect("should have column"); + let verified = vec![KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + )]; + cache + .put_kzg_verified_custody_data_columns(block_root, verified) + .expect("should put columns"); + + assert_eq!(cache.block_cache_size(), 1); + + // Maintenance with cutoff in the future should prune (bid slot=0 → epoch=0 < cutoff=100) + cache + .do_maintenance(Epoch::new(100)) + .expect("maintenance should succeed"); + + assert_eq!(cache.block_cache_size(), 0); + } + + #[tokio::test] + async fn test_double_reconstruction_prevented() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (_block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Insert all columns so reconstruction threshold is met + let verified_columns: Vec<_> = data_columns + .into_iter() + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + + // Manually set reconstruction_started via check_and_set + // For fullnode, sampling == all columns, so this returns No("all sampling columns received") + // But we can set the flag manually to test the guard + cache + .availability_cache + .write() + .get_mut(&block_root) + .expect("should exist") + .reconstruction_started = true; + + let decision = cache.check_and_set_reconstruction_started(&block_root); + assert!( + matches!(decision, ReconstructColumnsDecision::No(reason) if reason == "already started"), + "second reconstruction attempt should be prevented" + ); + } + + #[tokio::test] + async fn test_partial_columns_missing_components() { + if !is_gloas_enabled() { + return; + } + + type T = DiskHarnessType; + let (harness, cache, _path) = setup_harness_and_cache::().await; + + let mut rng = StdRng::seed_from_u64(0xDEADBEEF); + let spec = harness.spec.clone(); + + let (block, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Gloas, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + + let block_root = Hash256::random(); + + // Insert bid and executed envelope + cache + .put_bid(block_root, make_test_bid(&block)) + .expect("should put bid"); + + let executed_envelope = make_test_executed_envelope(block_root); + cache + .put_executed_payload_envelope(executed_envelope) + .expect("should put executed envelope"); + + // Insert only 1 column (need 128 for fullnode) + let verified_columns: Vec<_> = data_columns + .into_iter() + .take(1) + .map(|col| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::__new_for_testing(col), + ) + }) + .collect(); + + let result = cache + .put_kzg_verified_custody_data_columns(block_root, verified_columns) + .expect("should put columns"); + + assert!( + matches!(result, Availability::MissingComponents(_)), + "partial columns should not trigger availability" + ); + } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs index f6d4cc0321..3f9d7e54d0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/pending_components.rs @@ -8,8 +8,8 @@ use std::sync::Arc; use tracing::{Span, debug, debug_span}; use types::BlockImportSource; use types::{ - ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, - SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, + ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedExecutionPayloadBid, + SignedExecutionPayloadEnvelope, }; pub enum CachedPayloadEnvelope { @@ -22,9 +22,6 @@ pub enum CachedPayloadEnvelope { /// The columns are all gossip and kzg verified. /// The payload is considered "available" when all required columns are received. pub struct PendingComponents { - /// The block root is stored for tracing context in the span. - #[allow(dead_code)] - pub block_root: Hash256, /// The execution payload bid containing blob_kzg_commitments. pub bid: Option>>, /// a cached pre or post executed payload envelope @@ -182,7 +179,6 @@ impl PendingComponents { let span = debug_span!(parent: None, "lh_pending_components", %block_root); let _guard = span.clone().entered(); Self { - block_root, bid: None, envelope: None, verified_data_columns: vec![], @@ -240,7 +236,6 @@ mod pending_components_tests { let block_root = Hash256::random(); let components = PendingComponents::::empty(block_root, spec); - assert_eq!(components.block_root, block_root); assert!(components.bid.is_none()); assert!(components.verified_data_columns.is_empty()); assert!(!components.reconstruction_started); diff --git a/beacon_node/beacon_chain/src/data_availability_router.rs b/beacon_node/beacon_chain/src/data_availability_router.rs index 78de0d8935..656fce22ff 100644 --- a/beacon_node/beacon_chain/src/data_availability_router.rs +++ b/beacon_node/beacon_chain/src/data_availability_router.rs @@ -20,10 +20,12 @@ use crate::block_verification_types::AvailabilityPendingExecutedBlock; use crate::custody_context::CustodyContext; use crate::data_availability_checker::{ Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, - DataAvailabilityChecker, DataColumnReconstructionResult as BlockReconstructionResult, + DataAvailabilityChecker, DataAvailabilityCheckerMetrics as BlockMetrics, + DataColumnReconstructionResult as BlockReconstructionResult, }; use crate::data_availability_checker_v2::{ Availability as PayloadAvailability, DataAvailabilityChecker as DataAvailabilityCheckerV2, + DataAvailabilityCheckerMetrics as PayloadMetrics, DataColumnReconstructionResult as PayloadReconstructionResult, }; use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; @@ -387,17 +389,44 @@ impl DataAvailabilityRouter { self.v1.put_gossip_verified_blobs(block_root, blobs) } - /// Direct access to v1 checker for block execution/availability checks. - /// - /// Use this for operations that are specific to the legacy DA checker, + // ── Metrics ── + + pub fn metrics(&self) -> DataAvailabilityRouterMetrics { + DataAvailabilityRouterMetrics { + block: self.v1.metrics(), + payload: self.v2.metrics(), + } + } + + // ── Direct access ── + + /// Direct access to the block-level DA checker (pre-Gloas). + /// Used for block availability checks, range sync, and blob verification. pub fn v1(&self) -> &Arc> { &self.v1 } - /// Direct access to v2 checker for payload availability checks. - /// - /// Use this for operations that are specific to the Gloas DA checker, + /// Direct access to the envelope-level DA checker (Gloas). + /// Used for payload envelope availability checks and column verification. pub fn v2(&self) -> &Arc> { &self.v2 } } + +pub struct DataAvailabilityRouterMetrics { + pub block: BlockMetrics, + pub payload: PayloadMetrics, +} + +pub fn start_availability_cache_maintenance_service( + executor: task_executor::TaskExecutor, + chain: Arc>, +) { + crate::data_availability_checker::start_availability_cache_maintenance_service( + executor.clone(), + chain.clone(), + ); + crate::data_availability_checker_v2::start_availability_cache_maintenance_service( + executor, chain, + ); +} diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 89421637a9..a6d9fef59c 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1897,6 +1897,12 @@ pub static DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "data_availability_payload_cache_size", + "Number of entries in the data availability payload envelope cache.", + ) +}); pub static DATA_AVAILABILITY_RECONSTRUCTION_TIME: LazyLock> = LazyLock::new(|| { try_create_histogram( @@ -1999,12 +2005,11 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { beacon_chain.store.state_cache_len(), ); - // TODO(gloas) configure v2 metrics - let da_checker_metrics = beacon_chain.data_availability_checker.v1().metrics(); + let da_checker_metrics = beacon_chain.data_availability_checker.metrics(); set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, - da_checker_metrics.block_cache_size, + da_checker_metrics.block.block_cache_size, ); if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 4f660362c6..84fffb9d3b 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -173,7 +173,7 @@ impl BeaconChain { ) -> Result { match availability { AvailabilityOutcome::Block(_) => { - return Err(EnvelopeError::InternalError("Received a block availability outcome variant when a payload envelope variant was expected".to_string())) + Err(EnvelopeError::InternalError("Received a block availability outcome variant when a payload envelope variant was expected".to_string())) } AvailabilityOutcome::Payload(availability) => match availability { PayloadAvailability::Available(available_envelope) => { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 75d0455ac3..a3ab6f80d4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,8 +5,7 @@ use crate::compute_light_client_updates::{ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use beacon_chain::attestation_simulator::start_attestation_simulator_service; -use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service; -use beacon_chain::data_availability_checker_v2::start_availability_cache_maintenance_service as start_availability_cache_maintenance_service_v2; +use beacon_chain::data_availability_router::start_availability_cache_maintenance_service; use beacon_chain::graffiti_calculator::start_engine_version_cache_refresh_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; @@ -787,10 +786,6 @@ where runtime_context.executor.clone(), beacon_chain.clone(), ); - start_availability_cache_maintenance_service_v2( - runtime_context.executor.clone(), - beacon_chain.clone(), - ); start_engine_version_cache_refresh_service( beacon_chain.as_ref(), runtime_context.executor.clone(), diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 4977332ccc..94dff95bc6 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1867,11 +1867,6 @@ pub type SignedBlockContentsTuple = ( Option<(KzgProofs, BlobsList)>, ); -pub type SignedPayloadEnvelopeContentsTuple = ( - Arc>, - Option<(KzgProofs, BlobsList)>, -); - fn parse_required_header( headers: &HeaderMap, header_name: &str,