From d7f5e24ede5f85c9f40fb4a4ce415792212a3f88 Mon Sep 17 00:00:00 2001 From: Daniel Knopik Date: Wed, 29 Apr 2026 13:01:32 +0200 Subject: [PATCH] nuke router --- beacon_node/beacon_chain/src/beacon_chain.rs | 349 ++++++++------ .../beacon_chain/src/block_verification.rs | 2 +- beacon_node/beacon_chain/src/builder.rs | 11 +- .../src/data_availability_checker.rs | 12 +- .../src/data_availability_router.rs | 448 ------------------ .../src/data_column_verification.rs | 19 +- .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 5 +- beacon_node/beacon_chain/src/lib.rs | 1 - beacon_node/beacon_chain/src/metrics.rs | 2 +- .../payload_envelope_verification/import.rs | 34 +- .../src/pending_payload_cache/mod.rs | 5 +- beacon_node/beacon_chain/src/test_utils.rs | 12 +- .../beacon_chain/tests/block_verification.rs | 51 +- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- beacon_node/client/src/builder.rs | 9 +- .../gossip_methods.rs | 1 - .../src/network_beacon_processor/mod.rs | 7 +- .../src/network_beacon_processor/tests.rs | 2 +- .../network/src/sync/network_context.rs | 9 +- beacon_node/network/src/sync/tests/lookups.rs | 5 +- 20 files changed, 259 insertions(+), 727 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/data_availability_router.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6281688b31..ac74875398 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,9 +23,7 @@ use crate::data_availability_checker::{ DataColumnReconstructionResult as DataColumnReconstructionResultV1, }; -use crate::data_availability_router::{ - AvailabilityOutcome, DataAvailabilityRouter, ReconstructionOutcome, -}; +use crate::data_availability_checker::DataAvailabilityChecker; use crate::data_column_verification::{ GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn, GossipVerifiedPartialDataColumnHeader, KzgVerifiedCustodyPartialDataColumn, @@ -70,6 +68,7 @@ use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBid #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; use crate::pending_payload_cache::DataColumnReconstructionResult as DataColumnReconstructionResultV2; +use crate::pending_payload_cache::{Availability as PayloadAvailability, PendingPayloadCache}; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -503,9 +502,10 @@ pub struct BeaconChain { pub validator_monitor: RwLock>, /// The slot at which blocks are downloaded back to. pub genesis_backfill_slot: Slot, - /// Provides a KZG verification and temporary storage for blocks and blobs as - /// they are collected and combined. - pub data_availability_checker: Arc>, + /// Provides KZG verification and temporary storage for pre-Gloas blocks and blobs. + pub data_availability_checker: Arc>, + /// Provides KZG verification and temporary storage for post-Gloas payload envelopes. + pub pending_payload_cache: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Arc, /// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing. @@ -1183,10 +1183,12 @@ impl BeaconChain { indices: &[ColumnIndex], fork_name: ForkName, ) -> Result, Error> { - let all_cached_columns_opt = self - .data_availability_checker - .get_data_columns(block_root, fork_name) - .or_else(|| self.early_attester_cache.get_data_columns(block_root)); + let all_cached_columns_opt = if fork_name.gloas_enabled() { + self.pending_payload_cache.get_data_columns(block_root) + } else { + self.data_availability_checker.get_data_columns(block_root) + } + .or_else(|| self.early_attester_cache.get_data_columns(block_root)); if let Some(mut all_cached_columns) = all_cached_columns_opt { all_cached_columns.retain(|col| indices.contains(col.index())); @@ -2420,11 +2422,7 @@ impl BeaconChain { let _timer = metrics::start_timer( &metrics::PARTIAL_DATA_COLUMN_SIDECAR_HEADER_GOSSIP_VERIFICATION_TIMES, ); - let Some(assembler) = self - .data_availability_checker - .pending_block_cache() - .partial_assembler() - else { + let Some(assembler) = self.data_availability_checker.partial_assembler() else { return Err(GossipPartialDataColumnError::PartialColumnsDisabled); }; if let Some(cached_header) = assembler.get_header(&block_root) { @@ -3377,11 +3375,7 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - let Some(assembler) = self - .data_availability_checker - .pending_block_cache() - .partial_assembler() - else { + let Some(assembler) = self.data_availability_checker.partial_assembler() else { // Partial messages are apparently not activated return Ok(None); }; @@ -3417,16 +3411,29 @@ impl BeaconChain { .map(|column| column.as_data_column()), ); - let availability = self - .data_availability_checker - .put_kzg_verified_custody_data_columns( - block_root, - slot, - merge_result.full_columns.clone(), - )?; - - self.process_availability(slot, availability, || Ok(())) - .await? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns( + block_root, + merge_result.full_columns.clone(), + )?; + self.process_payload_availability(slot, availability, || Ok(())) + .await? + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns( + block_root, + merge_result.full_columns.clone(), + )?; + self.process_availability(slot, availability, || Ok(())) + .await? + } } else { AvailabilityProcessingStatus::MissingComponents(slot, block_root) }; @@ -3540,10 +3547,18 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() && event_handler.has_data_column_sidecar_subscribers() { - let imported_data_columns = self - .data_availability_checker - .cached_data_column_indexes(block_root, slot) - .unwrap_or_default(); + let imported_data_columns = if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + self.pending_payload_cache + .cached_data_column_indexes(block_root) + } else { + self.data_availability_checker + .cached_data_column_indexes(block_root) + } + .unwrap_or_default(); let new_data_columns = data_columns_iter.filter(|b| !imported_data_columns.contains(b.index())); @@ -3636,80 +3651,73 @@ impl BeaconChain { return Ok(None); } - let data_availability_checker = self.data_availability_checker.clone(); + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); - let result = self - .task_executor - .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - data_availability_checker.reconstruct_data_columns(&block_root, slot) - }) - .await - .map_err(|_| BeaconChainError::RuntimeShutdown)??; + if is_gloas { + let pending_payload_cache = self.pending_payload_cache.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + pending_payload_cache.reconstruct_data_columns(&block_root) + }) + .await + .map_err(|_| BeaconChainError::RuntimeShutdown)??; - match result { - ReconstructionOutcome::Block(data_column_reconstruction_result) => { - match data_column_reconstruction_result { - DataColumnReconstructionResultV1::Success(( - availability, - data_columns_to_publish, - )) => { - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); - }; + match result { + DataColumnReconstructionResultV2::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; - self.process_availability( - slot, - AvailabilityOutcome::Block(availability), - || Ok(()), - ) + self.process_payload_availability(slot, availability, || Ok(())) .await - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) - } - DataColumnReconstructionResultV1::NotStarted(reason) - | DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => { - // We use metric here because logging this would be *very* noisy. - metrics::inc_counter_vec( - &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, - &[reason], - ); - Ok(None) - } + .map(|status| Some((status, data_columns_to_publish))) + } + DataColumnReconstructionResultV2::NotStarted(reason) + | DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) } } - // TODO(gloas) handle data column reconstruction for gloas. - ReconstructionOutcome::Payload(data_column_reconstruction_result) => { - match data_column_reconstruction_result { - DataColumnReconstructionResultV2::Success(( - availability, - data_columns_to_publish, - )) => { - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); - }; + } else { + let pending_block_cache = self.data_availability_checker.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + pending_block_cache.reconstruct_data_columns(&block_root) + }) + .await + .map_err(|_| BeaconChainError::RuntimeShutdown)??; - self.process_availability( - slot, - AvailabilityOutcome::Payload(availability), - || Ok(()), - ) + match result { + DataColumnReconstructionResultV1::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; + + self.process_availability(slot, availability, || Ok(())) .await - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) - } - DataColumnReconstructionResultV2::NotStarted(reason) - | DataColumnReconstructionResultV2::RecoveredColumnsNotImported(reason) => { - // We use metric here because logging this would be *very* noisy. - metrics::inc_counter_vec( - &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, - &[reason], - ); - Ok(None) - } + .map(|status| Some((status, data_columns_to_publish))) + } + DataColumnReconstructionResultV1::NotStarted(reason) + | DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) } } } @@ -3912,8 +3920,7 @@ impl BeaconChain { block: AvailabilityPendingExecutedBlock, ) -> Result { let slot = block.block.slot(); - let availability = - AvailabilityOutcome::Block(self.data_availability_checker.put_executed_block(block)?); + let availability = self.data_availability_checker.put_executed_block(block)?; self.process_availability(slot, availability, || Ok(())) .await } @@ -3928,10 +3935,9 @@ impl BeaconChain { if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(blob.signed_block_header()); } - let availability = AvailabilityOutcome::Block( - self.data_availability_checker - .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?, - ); + let availability = self + .data_availability_checker + .put_gossip_verified_blobs(blob.block_root(), std::iter::once(blob))?; self.process_availability(slot, availability, || Ok(())) .await @@ -3958,12 +3964,23 @@ impl BeaconChain { } } - let availability = self - .data_availability_checker - .put_gossip_verified_data_columns(block_root, slot, data_columns)?; - - self.process_availability(slot, availability, publish_fn) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_gossip_verified_data_columns(block_root, slot, data_columns)?; + self.process_payload_availability(slot, availability, publish_fn) + .await + } else { + let availability = self + .data_availability_checker + .put_gossip_verified_data_columns(block_root, slot, data_columns)?; + self.process_availability(slot, availability, publish_fn) + .await + } } fn check_blob_header_signature_and_slashability<'a>( @@ -4008,10 +4025,9 @@ impl BeaconChain { block_root, blobs.iter().flatten().map(Arc::as_ref), )?; - let availability = AvailabilityOutcome::Block( - self.data_availability_checker - .put_rpc_blobs(block_root, blobs)?, - ); + let availability = self + .data_availability_checker + .put_rpc_blobs(block_root, blobs)?; self.process_availability(slot, availability, || Ok(())) .await @@ -4023,7 +4039,7 @@ impl BeaconChain { block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { - let availability = match engine_get_blobs_output { + match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blob_header_signature_and_slashability( block_root, @@ -4033,7 +4049,8 @@ impl BeaconChain { .data_availability_checker .put_kzg_verified_blobs(block_root, blobs)?; - AvailabilityOutcome::Block(availability) + self.process_availability(slot, availability, || Ok(())) + .await } EngineGetBlobsOutput::CustodyColumns(data_columns) => { // TODO(gloas) verify that this check is no longer relevant for gloas @@ -4046,13 +4063,25 @@ impl BeaconChain { _ => None, }), )?; - self.data_availability_checker - .put_kzg_verified_custody_data_columns(block_root, slot, data_columns)? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns(block_root, data_columns)?; + self.process_payload_availability(slot, availability, || Ok(())) + .await + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns(block_root, data_columns)?; + self.process_availability(slot, availability, || Ok(())) + .await + } } - }; - - self.process_availability(slot, availability, || Ok(())) - .await + } } /// Checks if the provided columns can make any cached blocks available, and imports immediately @@ -4072,16 +4101,27 @@ impl BeaconChain { }), )?; - // This slot value is purely informative for the consumers of - // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let availability = self.data_availability_checker.put_rpc_custody_columns( - block_root, - slot, - custody_columns, - )?; - - self.process_availability(slot, availability, || Ok(())) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self.pending_payload_cache.put_rpc_custody_columns( + block_root, + slot, + custody_columns, + )?; + self.process_payload_availability(slot, availability, || Ok(())) + .await + } else { + let availability = self.data_availability_checker.put_rpc_custody_columns( + block_root, + slot, + custody_columns, + )?; + self.process_availability(slot, availability, || Ok(())) + .await + } } fn check_data_column_sidecar_header_signature_and_slashability<'a>( @@ -4124,25 +4164,36 @@ impl BeaconChain { async fn process_availability( self: &Arc, slot: Slot, - availability: AvailabilityOutcome, + availability: BlockAvailability, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { - AvailabilityOutcome::Block(availability) => { - match availability { - BlockAvailability::Available(block) => { - publish_fn()?; - // Block is fully available, import into fork choice - self.import_available_block(block).await - } - BlockAvailability::MissingComponents(block_root) => Ok( - AvailabilityProcessingStatus::MissingComponents(slot, block_root), - ), - } + BlockAvailability::Available(block) => { + publish_fn()?; + self.import_available_block(block).await } - AvailabilityOutcome::Payload(_) => { - Err(BlockError::InternalError("Received a payload envelope availability outcome variant when a block variant was expected".to_string())) - }, + BlockAvailability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), + } + } + + async fn process_payload_availability( + self: &Arc, + slot: Slot, + availability: PayloadAvailability, + publish_fn: impl FnOnce() -> Result<(), BlockError>, + ) -> Result { + match availability { + PayloadAvailability::Available(available_envelope) => { + publish_fn()?; + self.import_available_execution_payload_envelope(available_envelope) + .await + .map_err(|e| BlockError::InternalError(e.to_string())) + } + PayloadAvailability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), } } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 688824d35e..4b02d77f1a 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1203,7 +1203,7 @@ impl SignatureVerifiedBlock { block, AvailableBlockData::NoData, // TODO(gloas) shouldnt matter which da checker we pass? - chain.data_availability_checker.pending_block_cache(), + &chain.data_availability_checker, chain.spec.clone(), ) .map_err(BlockError::AvailabilityCheck)?, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 3f658f0d11..d3a1d851ea 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -6,7 +6,6 @@ use crate::beacon_chain::{ use crate::beacon_proposer_cache::BeaconProposerCache; use crate::custody_context::NodeCustodyType; use crate::data_availability_checker::DataAvailabilityChecker; -use crate::data_availability_router::DataAvailabilityRouter; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sidecars_gloas}; @@ -1011,11 +1010,8 @@ where .map_err(|e| format!("Error initializing DataAvailabilityCheckerV2: {:?}", e))?, ); - let data_availability_checker = Arc::new(DataAvailabilityRouter::new( - da_checker_v1, - da_checker_v2, - self.spec.clone(), - )); + let pending_block_cache = da_checker_v1; + let pending_payload_cache = da_checker_v2; let beacon_chain = BeaconChain { spec: self.spec.clone(), @@ -1088,7 +1084,8 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, - data_availability_checker, + data_availability_checker: pending_block_cache, + pending_payload_cache, kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), gossip_verified_payload_bid_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 9fc95f0171..b4abbb5290 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -690,16 +690,8 @@ pub fn start_availability_cache_maintenance_service( ) { // this cache only needs to be maintained if deneb is configured if chain.spec.deneb_fork_epoch.is_some() { - let overflow_cache = chain - .data_availability_checker - .pending_block_cache() - .availability_cache - .clone(); - let partial_assembler = chain - .data_availability_checker - .pending_block_cache() - .partial_assembler - .clone(); + let overflow_cache = chain.data_availability_checker.availability_cache.clone(); + let partial_assembler = chain.data_availability_checker.partial_assembler.clone(); executor.spawn( async move { availability_cache_maintenance_service(chain, overflow_cache, partial_assembler) diff --git a/beacon_node/beacon_chain/src/data_availability_router.rs b/beacon_node/beacon_chain/src/data_availability_router.rs deleted file mode 100644 index 3b46771146..0000000000 --- a/beacon_node/beacon_chain/src/data_availability_router.rs +++ /dev/null @@ -1,448 +0,0 @@ -//! Abstraction layer for data availability operations across different DA checkers. -//! -//! This module provides a unified interface for availability operations that are shared -//! between the legacy `DataAvailabilityChecker` (for blocks) and -//! `DataAvailabilityCache` (for payload envelopes after Gloas). -//! -//! ## Design -//! -//! - **Unified operations**: Shared column operations dispatched to v1 or v2 -//! - **Fork-aware routing**: `DataAvailabilityRouter` dispatches to v1 or v2 based on slot -//! - **Processing**: `BeaconChain::process_availability_outcome()` handles both result types -//! -//! After Gloas is fully activated and v1 is deprecated, this can be deleted and we can -//! use the Gloas DA checker directly. - -use crate::BeaconChainTypes; -use crate::BlockProcessStatus; -use crate::blob_verification::{GossipVerifiedBlob, KzgVerifiedBlob}; -use crate::block_verification_types::AvailabilityPendingExecutedBlock; -use crate::custody_context::CustodyContext; -use crate::data_availability_checker::{ - Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, - DataAvailabilityChecker, DataAvailabilityCheckerMetrics as BlockMetrics, - DataColumnReconstructionResult as BlockReconstructionResult, MissingCellsError, -}; -use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; -use crate::observed_data_sidecars::ObservationStrategy; -use crate::pending_payload_cache::{ - Availability as PayloadAvailability, DataAvailabilityCheckerMetrics as PayloadMetrics, - DataColumnReconstructionResult as PayloadReconstructionResult, PendingPayloadCache, -}; -use std::sync::Arc; -use types::data::{BlobIdentifier, FixedBlobSidecarList}; -use types::{ - BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, - DataColumnSidecarList, Epoch, EthSpec, ForkName, Hash256, PartialDataColumnSidecarRef, - SignedBeaconBlock, Slot, -}; - -/// Unified result from operations that can come from either DA checker. -/// -/// This enum allows callers to handle availability from both v1 (blocks) and v2 (payloads) -/// through a single type, with downstream processing handled by `BeaconChain::process_availability_outcome()`. -#[derive(Debug)] -pub enum AvailabilityOutcome { - /// Block became available (pre-Gloas, from v1 checker) - Block(BlockAvailability), - /// Payload became available (post-Gloas, from v2 checker) - Payload(PayloadAvailability), -} - -impl AvailabilityOutcome { - /// Returns `true` if data is fully available and ready for import. - pub fn is_available(&self) -> bool { - match self { - Self::Block(BlockAvailability::Available(_)) => true, - Self::Block(BlockAvailability::MissingComponents(_)) => false, - Self::Payload(PayloadAvailability::Available(_)) => true, - Self::Payload(PayloadAvailability::MissingComponents(_)) => false, - } - } - - /// Returns the block root, regardless of availability status. - pub fn block_root(&self) -> Hash256 { - match self { - Self::Block(BlockAvailability::Available(block)) => block.import_data.block_root, - Self::Block(BlockAvailability::MissingComponents(root)) => *root, - Self::Payload(PayloadAvailability::Available(available_data)) => { - available_data.envelope.message().beacon_block_root - } - Self::Payload(PayloadAvailability::MissingComponents(root)) => *root, - } - } - - /// Converts to the inner block availability if this is a block outcome. - pub fn into_block(self) -> Option> { - match self { - Self::Block(avail) => Some(avail), - Self::Payload(_) => None, - } - } - - /// Converts to the inner payload availability if this is a payload outcome. - pub fn into_payload(self) -> Option> { - match self { - Self::Block(_) => None, - Self::Payload(avail) => Some(avail), - } - } -} - -/// Unified result from reconstruction operations. -#[derive(Debug)] -pub enum ReconstructionOutcome { - /// Block reconstruction result (pre-Gloas) - Block(BlockReconstructionResult), - /// Payload reconstruction result (post-Gloas) - Payload(PayloadReconstructionResult), -} - -impl ReconstructionOutcome { - /// Returns the reconstructed columns if successful, regardless of type. - pub fn reconstructed_columns(&self) -> Option<&DataColumnSidecarList> { - match self { - Self::Block(BlockReconstructionResult::Success((_, cols))) => Some(cols), - Self::Payload(PayloadReconstructionResult::Success((_, cols))) => Some(cols), - _ => None, - } - } - - /// Returns true if reconstruction was successful. - pub fn is_success(&self) -> bool { - matches!( - self, - Self::Block(BlockReconstructionResult::Success(_)) - | Self::Payload(PayloadReconstructionResult::Success(_)) - ) - } - - /// Returns the reason if reconstruction was not started or columns not imported. - pub fn reason(&self) -> Option<&'static str> { - match self { - Self::Block(BlockReconstructionResult::NotStarted(r)) => Some(r), - Self::Block(BlockReconstructionResult::RecoveredColumnsNotImported(r)) => Some(r), - Self::Payload(PayloadReconstructionResult::NotStarted(r)) => Some(r), - Self::Payload(PayloadReconstructionResult::RecoveredColumnsNotImported(r)) => Some(r), - _ => None, - } - } -} - -/// Router that directs data availability checker operations to the appropriate version based on fork. -/// -/// This wraps both the legacy (v1) and Gloas (v2) DA checkers, providing unified operations -/// that dispatch to the correct checker based on fork. -/// -/// After Gloas is fully activated and v1 is deprecated, this router can be deleted and -/// we can use the V2 DA checker directly. -pub struct DataAvailabilityRouter { - /// Legacy DA checker for pre-Gloas blocks - pending_block_cache: Arc>, - /// Gloas DA checker for payload envelopes - pending_payload_cache: Arc>, - spec: Arc, -} - -impl DataAvailabilityRouter { - pub fn new( - pending_block_cache: Arc>, - pending_payload_cache: Arc>, - spec: Arc, - ) -> Self { - Self { - pending_block_cache, - pending_payload_cache, - spec, - } - } - - /// Returns true if the given slot is in the Gloas fork or later. - fn is_gloas(&self, slot: Slot) -> bool { - self.spec - .fork_name_at_slot::(slot) - .gloas_enabled() - } - - // ── Shared methods (dispatched to v1 or v2 based on fork) ── - - /// Returns the custody context (same for both checkers). - pub fn custody_context(&self) -> &Arc> { - // Both checkers share the same custody context - self.pending_block_cache.custody_context() - } - - /// Query data columns from the appropriate checker based on fork. - pub fn get_data_columns( - &self, - block_root: Hash256, - fork_name: ForkName, - ) -> Option> { - if fork_name.gloas_enabled() { - self.pending_payload_cache.get_data_columns(block_root) - } else { - self.pending_block_cache.get_data_columns(block_root) - } - } - - pub fn missing_cells_for_column_sidecar<'a>( - &'_ self, - slot: Slot, - data_column: &'a DataColumnSidecar, - ) -> Result>, MissingCellsError> { - if self.is_gloas(slot) { - self.pending_payload_cache - .missing_cells_for_column_sidecar(data_column) - } else { - self.pending_block_cache - .missing_cells_for_column_sidecar(data_column) - } - } - - /// Get cached column indexes from the appropriate checker based on slot. - pub fn cached_data_column_indexes( - &self, - block_root: &Hash256, - slot: Slot, - ) -> Option> { - if self.is_gloas(slot) { - self.pending_payload_cache - .cached_data_column_indexes(block_root) - } else { - self.pending_block_cache - .cached_data_column_indexes(block_root) - } - } - - /// Insert RPC custody columns, routing to the correct checker based on slot. - pub fn put_rpc_custody_columns( - &self, - block_root: Hash256, - slot: Slot, - custody_columns: DataColumnSidecarList, - ) -> Result, AvailabilityCheckError> { - if self.is_gloas(slot) { - self.pending_payload_cache - .put_rpc_custody_columns(block_root, slot, custody_columns) - .map(AvailabilityOutcome::Payload) - } else { - self.pending_block_cache - .put_rpc_custody_columns(block_root, slot, custody_columns) - .map(AvailabilityOutcome::Block) - } - } - - /// Insert gossip-verified data columns, routing to the correct checker based on slot. - pub fn put_gossip_verified_data_columns( - &self, - block_root: Hash256, - slot: Slot, - data_columns: Vec>, - ) -> Result, AvailabilityCheckError> { - if self.is_gloas(slot) { - self.pending_payload_cache - .put_gossip_verified_data_columns(block_root, slot, data_columns) - .map(AvailabilityOutcome::Payload) - } else { - self.pending_block_cache - .put_gossip_verified_data_columns(block_root, slot, data_columns) - .map(AvailabilityOutcome::Block) - } - } - - /// Insert KZG-verified custody data columns, routing to the correct checker based on slot. - pub fn put_kzg_verified_custody_data_columns( - &self, - block_root: Hash256, - slot: Slot, - custody_columns: Vec>, - ) -> Result, AvailabilityCheckError> { - if self.is_gloas(slot) { - self.pending_payload_cache - .put_kzg_verified_custody_data_columns(block_root, custody_columns) - .map(AvailabilityOutcome::Payload) - } else { - self.pending_block_cache - .put_kzg_verified_custody_data_columns(block_root, custody_columns) - .map(AvailabilityOutcome::Block) - } - } - - /// Attempt to reconstruct missing data columns, routing to the correct checker based on slot. - pub fn reconstruct_data_columns( - &self, - block_root: &Hash256, - slot: Slot, - ) -> Result, AvailabilityCheckError> { - if self.is_gloas(slot) { - self.pending_payload_cache - .reconstruct_data_columns(block_root) - .map(ReconstructionOutcome::Payload) - } else { - self.pending_block_cache - .reconstruct_data_columns(block_root) - .map(ReconstructionOutcome::Block) - } - } - - // ── V1-only methods (blobs, blocks, boundary queries) ── - - /// Returns the data availability boundary epoch (v1). - pub fn data_availability_boundary(&self) -> Option { - self.pending_block_cache.data_availability_boundary() - } - - /// Returns whether a DA check is required for the given epoch (v1). - pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { - self.pending_block_cache.da_check_required_for_epoch(epoch) - } - - /// Returns whether blobs are required for the given epoch (v1). - pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { - self.pending_block_cache.blobs_required_for_epoch(epoch) - } - - /// Returns whether data columns are required for the given epoch (v1). - pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { - self.pending_block_cache - .data_columns_required_for_epoch(epoch) - } - - /// Verifies KZG commitments for a single available block (v1). - pub fn verify_kzg_for_available_block( - &self, - available_block: &AvailableBlock, - ) -> Result<(), AvailabilityCheckError> { - self.pending_block_cache - .verify_kzg_for_available_block(available_block) - } - - /// Batch verifies KZG commitments for multiple available blocks (v1). - pub fn batch_verify_kzg_for_available_blocks( - &self, - available_blocks: &[AvailableBlock], - ) -> Result<(), AvailabilityCheckError> { - self.pending_block_cache - .batch_verify_kzg_for_available_blocks(available_blocks) - } - - /// Get a blob from the availability cache (v1). - pub fn get_blob( - &self, - blob_id: &BlobIdentifier, - ) -> Result>>, AvailabilityCheckError> { - self.pending_block_cache.get_blob(blob_id) - } - - /// Returns the cached blob indexes for a given block root (v1). - pub fn cached_blob_indexes(&self, block_root: &Hash256) -> Option> { - self.pending_block_cache.cached_blob_indexes(block_root) - } - - /// Returns the cached block for a given block root (v1). - pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { - self.pending_block_cache.get_cached_block(block_root) - } - - /// Inserts a pre-execution block into the cache. - pub fn put_pre_execution_block( - &self, - block_root: Hash256, - block: Arc>, - source: BlockImportSource, - ) -> Result<(), AvailabilityCheckError> { - if let ForkName::Gloas = block.fork_name_unchecked() { - self.pending_payload_cache - .init_pending_block(block_root, block); - Ok(()) - } else { - self.pending_block_cache - .put_pre_execution_block(block_root, block, source) - } - } - - /// Insert an executed block and check availability (v1). - pub fn put_executed_block( - &self, - executed_block: AvailabilityPendingExecutedBlock, - ) -> Result, AvailabilityCheckError> { - self.pending_block_cache.put_executed_block(executed_block) - } - - /// Removes a pre-execution block from the cache on execution error (v1). - pub fn remove_block_on_execution_error(&self, block_root: &Hash256) { - self.pending_block_cache - .remove_block_on_execution_error(block_root) - } - - /// Insert blobs received via RPC and check availability (v1). - pub fn put_rpc_blobs( - &self, - block_root: Hash256, - blobs: FixedBlobSidecarList, - ) -> Result, AvailabilityCheckError> { - self.pending_block_cache.put_rpc_blobs(block_root, blobs) - } - - /// Insert KZG-verified blobs and check availability (v1). - pub fn put_kzg_verified_blobs>>( - &self, - block_root: Hash256, - blobs: I, - ) -> Result, AvailabilityCheckError> { - self.pending_block_cache - .put_kzg_verified_blobs(block_root, blobs) - } - - /// Insert gossip-verified blobs into the v1 checker. - pub fn put_gossip_verified_blobs< - I: IntoIterator>, - O: ObservationStrategy, - >( - &self, - block_root: Hash256, - blobs: I, - ) -> Result, AvailabilityCheckError> { - self.pending_block_cache - .put_gossip_verified_blobs(block_root, blobs) - } - - // ── Metrics ── - - pub fn metrics(&self) -> DataAvailabilityRouterMetrics { - DataAvailabilityRouterMetrics { - block: self.pending_block_cache.metrics(), - payload: self.pending_payload_cache.metrics(), - } - } - - // ── Direct access ── - - /// Direct access to the block-level DA checker (pre-Gloas). - /// Used for block availability checks, range sync, and blob verification. - pub fn pending_block_cache(&self) -> &Arc> { - &self.pending_block_cache - } - - /// Direct access to the envelope-level DA checker (Gloas). - /// Used for payload envelope availability checks and column verification. - pub fn pending_payload_cache(&self) -> &Arc> { - &self.pending_payload_cache - } -} - -pub struct DataAvailabilityRouterMetrics { - pub block: BlockMetrics, - pub payload: PayloadMetrics, -} - -pub fn start_availability_cache_maintenance_service( - executor: task_executor::TaskExecutor, - chain: Arc>, -) { - crate::data_availability_checker::start_availability_cache_maintenance_service( - executor.clone(), - chain.clone(), - ); - crate::pending_payload_cache::start_availability_cache_maintenance_service(executor, chain); -} diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 24911cdc19..b420965024 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -331,7 +331,6 @@ impl GossipVerifiedDataColumn column_sidecar: Arc>, chain: &BeaconChain, ) -> Result { - let slot = column_sidecar.slot(); verify_data_column_sidecar(&column_sidecar, &chain.spec)?; // Check if the data column is already in the DA checker cache. This happens when data columns @@ -343,7 +342,7 @@ impl GossipVerifiedDataColumn match chain .data_availability_checker - .missing_cells_for_column_sidecar(slot, &column_sidecar) + .missing_cells_for_column_sidecar(&column_sidecar) { Ok(Some(_)) => Ok(Self { block_root: column_sidecar.block_root(), @@ -541,11 +540,7 @@ impl GossipVerifiedPartialDataColumnHeader { let header = Arc::new(header); // Cache the valid header - let Some(assembler) = chain - .data_availability_checker - .pending_block_cache() - .partial_assembler() - else { + let Some(assembler) = chain.data_availability_checker.partial_assembler() else { return Err(GossipPartialDataColumnError::PartialColumnsDisabled); }; let newly_cached = assembler.init(group_id, header.clone()); @@ -929,7 +924,7 @@ pub fn validate_data_column_sidecar_for_gossip_fulu { GossipDataColumnError::MismatchesCachedColumn @@ -1003,11 +998,7 @@ pub fn validate_partial_data_column_sidecar_for_gossip( } } } else { - let Some(assembler) = chain - .data_availability_checker - .pending_block_cache() - .partial_assembler() - else { + let Some(assembler) = chain.data_availability_checker.partial_assembler() else { return PartialColumnVerificationResult::Err( GossipPartialDataColumnError::PartialColumnsDisabled, ); @@ -1064,7 +1055,6 @@ pub fn validate_partial_data_column_sidecar_for_gossip( let column = Arc::from(column); let cells_to_kzg_verify = match chain .data_availability_checker - .pending_block_cache() .missing_cells_for_partial_column_sidecar(&column) { Ok(Some(cells_to_kzg_verify)) => cells_to_kzg_verify, @@ -1625,7 +1615,6 @@ mod test { harness .chain .data_availability_checker - .pending_block_cache() .partial_assembler() .unwrap() .init(block_root, header.clone()); diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index aaefb5cd3e..abfcc8508f 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -39,7 +39,6 @@ impl FetchBlobsBeaconAdapter { pub(crate) fn partial_assembler(&self) -> Option>> { self.chain .data_availability_checker - .pending_block_cache() .partial_assembler() .cloned() } @@ -122,12 +121,12 @@ impl FetchBlobsBeaconAdapter { pub(crate) fn cached_data_column_indexes( &self, - slot: Slot, + _slot: Slot, block_root: &Hash256, ) -> Option> { self.chain .data_availability_checker - .cached_data_column_indexes(block_root, slot) + .cached_data_column_indexes(block_root) } pub(crate) async fn process_engine_blobs( diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 12f3a86956..804268a613 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,7 +18,6 @@ pub mod canonical_head; pub mod chain_config; pub mod custody_context; pub mod data_availability_checker; -pub mod data_availability_router; pub mod data_column_verification; mod early_attester_cache; pub mod envelope_times_cache; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ef3b1995c3..9739038b3a 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -2149,7 +2149,7 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { set_gauge_by_usize( &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, - da_checker_metrics.block.block_cache_size, + da_checker_metrics.block_cache_size, ); if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index a0466e2eb5..15cd0ee3b4 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -17,7 +17,6 @@ use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, block_verification_types::AvailableBlockData, - data_availability_router::AvailabilityOutcome, metrics, payload_envelope_verification::{ AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, @@ -153,41 +152,30 @@ impl BeaconChain { async fn process_payload_envelope_availability( self: &Arc, slot: Slot, - availability: AvailabilityOutcome, + availability: PayloadAvailability, publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, ) -> Result { match availability { - AvailabilityOutcome::Block(_) => { - Err(EnvelopeError::InternalError("Received a block availability outcome variant when a payload envelope variant was expected".to_string())) + PayloadAvailability::Available(available_envelope) => { + publish_fn()?; + self.import_available_execution_payload_envelope(available_envelope) + .await } - AvailabilityOutcome::Payload(availability) => match availability { - PayloadAvailability::Available(available_envelope) => { - publish_fn()?; - - // Payload envelope is fully available - self.import_available_execution_payload_envelope(available_envelope) - .await - } - PayloadAvailability::MissingComponents(block_root) => Ok( - AvailabilityProcessingStatus::MissingComponents(slot, block_root), - ), - }, + PayloadAvailability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), } } - /// Checks if the payload envelope is available, and imports immediately if so, otherwise caches the envelope - /// in the data availability checker. #[instrument(skip_all)] async fn check_envelope_availability_and_import( self: &Arc, envelope: AvailabilityPendingExecutedEnvelope, ) -> Result { let slot = envelope.envelope.slot(); - let availability = AvailabilityOutcome::Payload( - self.data_availability_checker - .pending_payload_cache() - .put_executed_payload_envelope(envelope)?, - ); + let availability = self + .pending_payload_cache + .put_executed_payload_envelope(envelope)?; self.process_payload_envelope_availability(slot, availability, || Ok(())) .await } diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index 976bfe8c67..2337697fab 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -556,10 +556,7 @@ pub fn start_availability_cache_maintenance_service( chain: Arc>, ) { if chain.spec.gloas_fork_epoch.is_some() { - let da_checker = chain - .data_availability_checker - .pending_payload_cache() - .clone(); + let da_checker = chain.pending_payload_cache.clone(); executor.spawn( async move { availability_cache_maintenance_service(chain, da_checker).await }, "availability_cache_service", diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 1ba6fa64d6..c3db26e95c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2828,7 +2828,7 @@ where return RangeSyncBlock::new( block, AvailableBlockData::NoData, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), ) .unwrap(); @@ -2847,7 +2847,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), ) .unwrap() @@ -2862,7 +2862,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), ) .unwrap() @@ -2891,14 +2891,14 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), )? } else { RangeSyncBlock::new( block, AvailableBlockData::NoData, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), )? } @@ -2918,7 +2918,7 @@ where RangeSyncBlock::new( block, block_data, - self.chain.data_availability_checker.pending_block_cache(), + &self.chain.data_availability_checker, self.chain.spec.clone(), )? }) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 5e27985558..b2db85713f 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -165,7 +165,7 @@ where RangeSyncBlock::new( block, block_data, - chain.data_availability_checker.pending_block_cache(), + &chain.data_availability_checker, chain.spec.clone(), ) .unwrap() @@ -180,7 +180,7 @@ where RangeSyncBlock::new( block, block_data, - chain.data_availability_checker.pending_block_cache(), + &chain.data_availability_checker, chain.spec.clone(), ) .unwrap() @@ -188,7 +188,7 @@ where None => RangeSyncBlock::new( block, AvailableBlockData::NoData, - chain.data_availability_checker.pending_block_cache(), + &chain.data_availability_checker, chain.spec.clone(), ) .unwrap(), @@ -462,10 +462,7 @@ async fn chain_segment_non_linear_parent_roots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.spec.clone(), ) .unwrap(); @@ -505,10 +502,7 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.spec.clone(), ) .unwrap(); @@ -538,10 +532,7 @@ async fn chain_segment_non_linear_slots() { blocks[3] = RangeSyncBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), blocks[3].block_data().clone(), - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.chain.spec.clone(), ) .unwrap(); @@ -1723,10 +1714,7 @@ async fn add_base_block_to_altair_chain() { let base_range_sync_block = RangeSyncBlock::new( Arc::new(base_block.clone()), AvailableBlockData::NoData, - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.spec.clone(), ) .unwrap(); @@ -1757,7 +1745,7 @@ async fn add_base_block_to_altair_chain() { RangeSyncBlock::new( Arc::new(base_block), AvailableBlockData::NoData, - harness.chain.data_availability_checker.v1(), + &harness.chain.pending_block_cache, harness.spec.clone() ) .unwrap() @@ -1902,7 +1890,7 @@ async fn add_altair_block_to_base_chain() { RangeSyncBlock::new( Arc::new(altair_block), AvailableBlockData::NoData, - harness.chain.data_availability_checker.v1(), + &harness.chain.pending_block_cache, harness.spec.clone() ) .unwrap() @@ -1970,10 +1958,7 @@ async fn import_duplicate_block_unrealized_justification() { let range_sync_block = RangeSyncBlock::new( block.clone(), AvailableBlockData::NoData, - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.spec.clone(), ) .unwrap(); @@ -2107,10 +2092,7 @@ async fn range_sync_block_construction_fails_with_wrong_blob_count() { let result = RangeSyncBlock::new( Arc::new(block), block_data, - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.chain.spec.clone(), ); @@ -2188,10 +2170,7 @@ async fn range_sync_block_rejects_missing_custody_columns() { let result = RangeSyncBlock::new( Arc::new(block), block_data, - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.chain.spec.clone(), ); @@ -2267,7 +2246,6 @@ async fn rpc_block_allows_construction_past_da_boundary() { // Now verify the block is past the DA boundary let da_boundary = harness .chain - .data_availability_checker .data_availability_boundary() .expect("DA boundary should be set"); assert!( @@ -2282,10 +2260,7 @@ async fn rpc_block_allows_construction_past_da_boundary() { let result = RangeSyncBlock::new( Arc::new(block), AvailableBlockData::NoData, - harness - .chain - .data_availability_checker - .pending_block_cache(), + &harness.chain.data_availability_checker, harness.chain.spec.clone(), ); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 3040f91342..86adf50995 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3300,7 +3300,7 @@ async fn weak_subjectivity_sync_test( AvailableBlock::new( Arc::new(corrupt_block), data, - beacon_chain.data_availability_checker.pending_block_cache(), + &beacon_chain.data_availability_checker, Arc::new(spec), ) .expect("available block") diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 6955e8e252..7699d25816 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,8 +5,9 @@ use crate::compute_light_client_updates::{ use crate::config::{ClientGenesis, Config as ClientConfig}; use crate::notifier::spawn_notifier; use beacon_chain::attestation_simulator::start_attestation_simulator_service; -use beacon_chain::data_availability_router::start_availability_cache_maintenance_service; +use beacon_chain::data_availability_checker::start_availability_cache_maintenance_service as start_block_cache_maintenance_service; use beacon_chain::graffiti_calculator::start_engine_version_cache_refresh_service; +use beacon_chain::pending_payload_cache::start_availability_cache_maintenance_service as start_payload_cache_maintenance_service; use beacon_chain::proposer_prep_service::start_proposer_prep_service; use beacon_chain::schema_change::migrate_schema; use beacon_chain::{ @@ -782,7 +783,11 @@ where } start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone()); - start_availability_cache_maintenance_service( + start_block_cache_maintenance_service( + runtime_context.executor.clone(), + beacon_chain.clone(), + ); + start_payload_cache_maintenance_service( runtime_context.executor.clone(), beacon_chain.clone(), ); diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 4d1f522cf0..c83fd9244d 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1328,7 +1328,6 @@ impl NetworkBeaconProcessor { && self .chain .data_availability_checker - .pending_block_cache() .partial_assembler() .is_some_and(|a| !a.is_complete(block_root, verified_data_column.index())) { diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 41867907b2..23515d5901 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -999,12 +999,7 @@ impl NetworkBeaconProcessor { // Publish partial columns without eager send // TODO(gloas): implement - if let Some(assembler) = self - .chain - .data_availability_checker - .pending_block_cache() - .partial_assembler() - { + if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header); if !columns.is_empty() { debug!(block = %block_root, "Publishing all partials after getBlobs"); diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 76c6ba812d..0ff9200737 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -1196,7 +1196,7 @@ async fn accept_processed_gossip_data_columns_without_import() { let block_root = rig.next_block.canonical_root(); rig.chain - .data_availability_checker + .pending_block_cache .put_gossip_verified_data_columns(block_root, rig.next_block.slot(), verified_data_columns) .expect("should put data columns into availability cache"); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 87addbfd8b..ed28099b2e 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -778,10 +778,7 @@ impl SyncNetworkContext { let range_req = entry.get_mut(); if let Some(blocks_result) = range_req.responses( - self.chain - .data_availability_checker - .pending_block_cache() - .clone(), + self.chain.data_availability_checker.clone(), self.chain.spec.clone(), ) { if let Err(CouplingError::DataColumnPeerFailure { @@ -1085,14 +1082,14 @@ impl SyncNetworkContext { pub fn custody_lookup_request( &mut self, lookup_id: SingleLookupId, - slot: Slot, + _slot: Slot, block_root: Hash256, lookup_peers: Arc>>, ) -> Result { let custody_indexes_imported = self .chain .data_availability_checker - .cached_data_column_indexes(&block_root, slot) + .cached_data_column_indexes(&block_root) .unwrap_or_default(); let current_epoch = self.chain.epoch().map_err(|e| { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 9a3f0a5311..a26996ec5e 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1110,10 +1110,7 @@ impl TestRig { let range_sync_block = RangeSyncBlock::new( block, block_data, - self.harness - .chain - .data_availability_checker - .pending_block_cache(), + &self.harness.chain.data_availability_checker, self.harness.chain.spec.clone(), ) .unwrap();