diff --git a/beacon_node/beacon_chain/benches/benches.rs b/beacon_node/beacon_chain/benches/benches.rs index e71a19d8c1..de0bd05a1f 100644 --- a/beacon_node/beacon_chain/benches/benches.rs +++ b/beacon_node/beacon_chain/benches/benches.rs @@ -53,6 +53,13 @@ fn all_benches(c: &mut Criterion) { ) .unwrap(); + let kzg_commitments = signed_block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .clone(); + let spec = spec.clone(); c.bench_function(&format!("reconstruct_{}", blob_count), |b| { @@ -60,6 +67,7 @@ fn all_benches(c: &mut Criterion) { black_box(reconstruct_data_columns( &kzg, column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2].to_vec(), + &kzg_commitments, spec.as_ref(), )) }) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f618cf6321..af8cd477d6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -19,9 +19,11 @@ pub use crate::canonical_head::CanonicalHead; use crate::chain_config::ChainConfig; use crate::custody_context::CustodyContextSsz; use crate::data_availability_checker::{ - Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, - DataAvailabilityChecker, DataColumnReconstructionResult, + Availability as BlockAvailability, AvailabilityCheckError, AvailableBlock, AvailableBlockData, + DataColumnReconstructionResult as DataColumnReconstructionResultV1, }; + +use crate::data_availability_checker::DataAvailabilityChecker; use crate::data_column_verification::{ GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn, GossipVerifiedPartialDataColumnHeader, KzgVerifiedCustodyPartialDataColumn, @@ -36,7 +38,6 @@ use crate::execution_payload::{NotifyExecutionLayer, PreparePayloadHandle, get_e use crate::fetch_blobs::EngineGetBlobsOutput; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiSettings}; -use crate::kzg_utils::reconstruct_blobs; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -65,6 +66,11 @@ use crate::payload_attestation_verification::VerifiedPayloadAttestationMessage; use crate::payload_bid_verification::payload_bid_cache::GossipVerifiedPayloadBidCache; #[cfg(not(test))] use crate::payload_envelope_streamer::{EnvelopeRequestSource, launch_payload_envelope_stream}; +use crate::pending_payload_cache::PendingPayloadCache; +use crate::pending_payload_cache::{ + Availability as PayloadAvailability, + DataColumnReconstructionResult as DataColumnReconstructionResultGloas, +}; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::persist_custody_context; @@ -498,9 +504,10 @@ pub struct BeaconChain { pub validator_monitor: RwLock>, /// The slot at which blocks are downloaded back to. pub genesis_backfill_slot: Slot, - /// Provides a KZG verification and temporary storage for blocks and blobs as - /// they are collected and combined. + /// Provides KZG verification and temporary storage for pre-Gloas blocks and blobs. pub data_availability_checker: Arc>, + /// Provides KZG verification and temporary storage for post-Gloas payload envelopes. + pub pending_payload_cache: Arc>, /// The KZG trusted setup used by this chain. pub kzg: Arc, /// RNG instance used by the chain. Currently used for shuffling column sidecars in block publishing. @@ -1180,6 +1187,7 @@ impl BeaconChain { let all_cached_columns_opt = self .data_availability_checker .get_data_columns(block_root) + .or_else(|| self.pending_payload_cache.get_data_columns(block_root)) .or_else(|| self.early_attester_cache.get_data_columns(block_root)); if let Some(mut all_cached_columns) = all_cached_columns_opt { @@ -1198,6 +1206,24 @@ impl BeaconChain { } } + pub fn cached_data_column_indexes( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Option> { + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + self.pending_payload_cache + .cached_data_column_indexes(block_root) + } else { + self.data_availability_checker + .cached_data_column_indexes(block_root) + } + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1286,45 +1312,6 @@ impl BeaconChain { .map_err(Error::from) } - /// Returns the blobs at the given root, if any. - /// - /// Uses the `block.epoch()` to determine whether to retrieve blobs or columns from the store. - /// - /// If at least 50% of columns are retrieved, blobs will be reconstructed and returned, - /// otherwise an error `InsufficientColumnsToReconstructBlobs` is returned. - /// - /// ## Errors - /// May return a database error. - pub fn get_or_reconstruct_blobs( - &self, - block_root: &Hash256, - ) -> Result>, Error> { - let Some(block) = self.store.get_blinded_block(block_root)? else { - return Ok(None); - }; - - if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { - let fork_name = self.spec.fork_name_at_epoch(block.epoch()); - if let Some(columns) = self.store.get_data_columns(block_root, fork_name)? { - let num_required_columns = T::EthSpec::number_of_columns() / 2; - let reconstruction_possible = columns.len() >= num_required_columns; - if reconstruction_possible { - reconstruct_blobs(&self.kzg, columns, None, &block, &self.spec) - .map(Some) - .map_err(Error::FailedToReconstructBlobs) - } else { - Err(Error::InsufficientColumnsToReconstructBlobs { - columns_found: columns.len(), - }) - } - } else { - Ok(None) - } - } else { - Ok(self.get_blobs(block_root)?.blobs()) - } - } - /// Returns the data columns at the given root, if any. /// /// ## Errors @@ -3306,13 +3293,7 @@ impl BeaconChain { )); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its samples again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { + if self.is_block_data_imported(block_root, slot) { return Err(BlockError::DuplicateFullyImported(block_root)); } @@ -3357,12 +3338,7 @@ impl BeaconChain { return Ok(None); }; - // If this block has already been imported to forkchoice it must have been available - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { + if self.is_block_data_imported(block_root, slot) { return Err(BlockError::DuplicateFullyImported(block_root)); } @@ -3401,15 +3377,28 @@ impl BeaconChain { .map(|column| column.as_data_column()), ); - let availability = self - .data_availability_checker - .put_kzg_verified_custody_data_columns( - block_root, - merge_result.full_columns.clone(), - )?; - - self.process_availability(slot, availability, || Ok(())) - .await? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns(block_root, &merge_result.full_columns) + .map_err(BlockError::from)?; + self.process_payload_envelope_availability(slot, availability, || Ok(())) + .await? + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns( + block_root, + merge_result.full_columns.clone(), + ) + .map_err(BlockError::from)?; + self.process_availability(slot, availability, || Ok(())) + .await? + } } else { AvailabilityProcessingStatus::MissingComponents(slot, block_root) }; @@ -3426,13 +3415,7 @@ impl BeaconChain { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result { - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its blobs again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { + if self.is_block_data_imported(block_root, slot) { return Err(BlockError::DuplicateFullyImported(block_root)); } @@ -3521,9 +3504,12 @@ impl BeaconChain { if let Some(event_handler) = self.event_handler.as_ref() && event_handler.has_data_column_sidecar_subscribers() { + let mut data_columns_iter = data_columns_iter.peekable(); + let Some(slot) = data_columns_iter.peek().map(|col| col.slot()) else { + return; + }; let imported_data_columns = self - .data_availability_checker - .cached_data_column_indexes(block_root) + .cached_data_column_indexes(block_root, slot) .unwrap_or_default(); let new_data_columns = data_columns_iter.filter(|b| !imported_data_columns.contains(b.index())); @@ -3554,15 +3540,7 @@ impl BeaconChain { )); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its columns again. - // TODO(gloas) the block will be available in fork choice for gloas. This does not indicate availability - // anymore. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { + if self.is_block_data_imported(block_root, slot) { return Err(BlockError::DuplicateFullyImported(block_root)); } @@ -3596,6 +3574,7 @@ impl BeaconChain { pub async fn reconstruct_data_columns( self: &Arc, + slot: Slot, block_root: Hash256, ) -> Result< Option<( @@ -3604,48 +3583,84 @@ impl BeaconChain { )>, BlockError, > { - // As of now we only reconstruct data columns on supernodes, so if the block is already - // available on a supernode, there's no need to reconstruct as the node must already have - // all columns. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { + // As of now we only reconstruct data columns on supernodes, so if all availability data + // for the block is already imported, there's nothing left to reconstruct. + if self.is_block_data_imported(block_root, slot) { return Ok(None); } - let data_availability_checker = self.data_availability_checker.clone(); + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); - let result = self - .task_executor - .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - data_availability_checker.reconstruct_data_columns(&block_root) - }) - .await - .map_err(|_| BeaconChainError::RuntimeShutdown)??; + if is_gloas { + let pending_payload_cache = self.pending_payload_cache.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + pending_payload_cache.reconstruct_data_columns(&block_root) + }) + .await + .map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))? + .map_err(BlockError::from)?; - match result { - DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => { - let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { - // This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success. - return Ok(None); - }; + match result { + DataColumnReconstructionResultGloas::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; - self.process_availability(slot, availability, || Ok(())) - .await - .map(|availability_processing_status| { - Some((availability_processing_status, data_columns_to_publish)) - }) + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await + .map(|status| Some((status, data_columns_to_publish)))?) + } + DataColumnReconstructionResultGloas::NotStarted(reason) + | DataColumnReconstructionResultGloas::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } } - DataColumnReconstructionResult::NotStarted(reason) - | DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => { - // We use metric here because logging this would be *very* noisy. - metrics::inc_counter_vec( - &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, - &[reason], - ); - Ok(None) + } else { + let data_availability_checker = self.data_availability_checker.clone(); + let result = self + .task_executor + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + data_availability_checker.reconstruct_data_columns(&block_root) + }) + .await + .map_err(|_| BlockError::from(BeaconChainError::RuntimeShutdown))? + .map_err(BlockError::from)?; + + match result { + DataColumnReconstructionResultV1::Success(( + availability, + data_columns_to_publish, + )) => { + let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else { + return Ok(None); + }; + + Ok(self + .process_availability(slot, availability, || Ok(())) + .await + .map(|status| Some((status, data_columns_to_publish)))?) + } + DataColumnReconstructionResultV1::NotStarted(reason) + | DataColumnReconstructionResultV1::RecoveredColumnsNotImported(reason) => { + metrics::inc_counter_vec( + &metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL, + &[reason], + ); + Ok(None) + } } } } @@ -3659,6 +3674,32 @@ impl BeaconChain { } } + /// Returns true when no further availability data for `block_root` should be processed. + /// + /// Pre-Gloas: + /// - true once the block is fully imported into fork choice. + /// + /// Gloas: + /// - true only once the payload envelope and required data columns are fully imported. + /// The beacon block itself may already be present in fork choice before this is true. + fn is_block_data_imported(&self, block_root: Hash256, slot: Slot) -> bool { + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); + + let fork_choice = self.canonical_head.fork_choice_read_lock(); + if !fork_choice.contains_block(&block_root) { + return false; + } + + if !is_gloas { + return true; + } + + fork_choice.is_payload_received(&block_root) + } + /// Returns `Ok(block_root)` if the given `unverified_block` was successfully verified and /// imported into the chain. /// @@ -3723,6 +3764,19 @@ impl BeaconChain { &chain, notify_execution_layer, )?; + + let block = execution_pending.block.block_cloned(); + if block.fork_name_unchecked().gloas_enabled() { + let bid = Arc::new( + block + .message() + .body() + .signed_execution_payload_bid()? + .clone(), + ); + chain.pending_payload_cache.insert_bid(block_root, bid); + } + publish_fn()?; // Record the time it took to complete consensus verification. @@ -3891,12 +3945,25 @@ impl BeaconChain { } } - let availability = self - .data_availability_checker - .put_gossip_verified_data_columns(block_root, slot, data_columns)?; - - self.process_availability(slot, availability, publish_fn) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_gossip_verified_data_columns(block_root, data_columns)?; + Ok(self + .process_payload_envelope_availability(slot, availability, publish_fn) + .await?) + } else { + let availability = self + .data_availability_checker + .put_gossip_verified_data_columns(block_root, slot, data_columns)?; + Ok(self + .process_availability(slot, availability, publish_fn) + .await?) + } } fn check_blob_header_signature_and_slashability<'a>( @@ -3943,7 +4010,8 @@ impl BeaconChain { )?; let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + .put_rpc_blobs(block_root, blobs) + .map_err(BlockError::from)?; self.process_availability(slot, availability, || Ok(())) .await @@ -3955,14 +4023,20 @@ impl BeaconChain { block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { - let availability = match engine_get_blobs_output { + match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blob_header_signature_and_slashability( block_root, blobs.iter().map(|b| b.as_blob()), )?; - self.data_availability_checker - .put_kzg_verified_blobs(block_root, blobs)? + let availability = self + .data_availability_checker + .put_kzg_verified_blobs(block_root, blobs) + .map_err(BlockError::from)?; + + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) } EngineGetBlobsOutput::CustodyColumns(data_columns) => { // TODO(gloas) verify that this check is no longer relevant for gloas @@ -3975,13 +4049,29 @@ impl BeaconChain { _ => None, }), )?; - self.data_availability_checker - .put_kzg_verified_custody_data_columns(block_root, data_columns)? + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_kzg_verified_custody_data_columns(block_root, &data_columns) + .map_err(BlockError::from)?; + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await?) + } else { + let availability = self + .data_availability_checker + .put_kzg_verified_custody_data_columns(block_root, data_columns) + .map_err(BlockError::from)?; + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) + } } - }; - - self.process_availability(slot, availability, || Ok(())) - .await + } } /// Checks if the provided columns can make any cached blocks available, and imports immediately @@ -4001,16 +4091,27 @@ impl BeaconChain { }), )?; - // This slot value is purely informative for the consumers of - // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let availability = self.data_availability_checker.put_rpc_custody_columns( - block_root, - slot, - custody_columns, - )?; - - self.process_availability(slot, availability, || Ok(())) - .await + if self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled() + { + let availability = self + .pending_payload_cache + .put_rpc_custody_columns(block_root, custody_columns) + .map_err(BlockError::from)?; + Ok(self + .process_payload_envelope_availability(slot, availability, || Ok(())) + .await?) + } else { + let availability = self + .data_availability_checker + .put_rpc_custody_columns(block_root, slot, custody_columns) + .map_err(BlockError::from)?; + Ok(self + .process_availability(slot, availability, || Ok(())) + .await?) + } } fn check_data_column_sidecar_header_signature_and_slashability<'a>( @@ -4053,16 +4154,33 @@ impl BeaconChain { async fn process_availability( self: &Arc, slot: Slot, - availability: Availability, + availability: BlockAvailability, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { - Availability::Available(block) => { + BlockAvailability::Available(block) => { publish_fn()?; - // Block is fully available, import into fork choice self.import_available_block(block).await } - Availability::MissingComponents(block_root) => Ok( + BlockAvailability::MissingComponents(block_root) => Ok( + AvailabilityProcessingStatus::MissingComponents(slot, block_root), + ), + } + } + + pub(crate) async fn process_payload_envelope_availability( + self: &Arc, + slot: Slot, + availability: PayloadAvailability, + publish_fn: impl FnOnce() -> Result<(), BlockError>, + ) -> Result { + match availability { + PayloadAvailability::Available(available_envelope) => { + publish_fn()?; + self.import_available_execution_payload_envelope(available_envelope) + .await + } + PayloadAvailability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), ), } @@ -7572,7 +7690,7 @@ impl BeaconChain { ) } - pub(crate) fn get_blobs_or_columns_store_op( + pub fn get_blobs_or_columns_store_op( &self, block_root: Hash256, block_slot: Slot, diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 9a43147233..24f971f736 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -286,6 +286,10 @@ pub enum BlockError { /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 AvailabilityCheck(AvailabilityCheckError), + /// The payload envelope's block root is unknown. + EnvelopeBlockRootUnknown(Hash256), + /// Optimistic sync is not supported for Gloas payload envelopes. + OptimisticSyncNotSupported { block_root: Hash256 }, /// A Blob with a slot after PeerDAS is received and is not required to be imported. /// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could /// still receive valid blobs from a Deneb epoch after PeerDAS is activated. @@ -624,7 +628,8 @@ pub fn signature_verify_chain_segment( consensus_context, }); } - + // TODO(gloas) When implementing range and backfill sync for gloas + // we need a batch verify kzg function in the new da checker as well. chain .data_availability_checker .batch_verify_kzg_for_available_blocks(&available_blocks)?; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 13dac087ef..e668bef7c0 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -12,6 +12,7 @@ use crate::kzg_utils::{build_data_column_sidecars_fulu, build_data_column_sideca use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::observed_data_sidecars::ObservedDataSidecars; +use crate::pending_payload_cache::PendingPayloadCache; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_custody::load_custody_context; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; @@ -987,6 +988,7 @@ where ) }; debug!(?custody_context, "Loaded persisted custody context"); + let custody_context = Arc::new(custody_context); let beacon_chain = BeaconChain { spec: self.spec.clone(), @@ -1062,14 +1064,22 @@ where data_availability_checker: Arc::new( DataAvailabilityChecker::new( complete_blob_backfill, - slot_clock, + slot_clock.clone(), self.kzg.clone(), - Arc::new(custody_context), - self.spec, + custody_context.clone(), + self.spec.clone(), enable_partial_columns, ) .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), + pending_payload_cache: Arc::new( + PendingPayloadCache::new( + self.kzg.clone(), + custody_context.clone(), + self.spec.clone(), + ) + .map_err(|e| format!("Error initializing PendingPayloadCache: {:?}", e))?, + ), kzg: self.kzg.clone(), rng: Arc::new(Mutex::new(rng)), gossip_verified_payload_bid_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 0e6515ebbd..b3ab2e6975 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -988,6 +988,25 @@ impl BeaconChain { .start_slot(T::EthSpec::slots_per_epoch()), ); + // Prune the Gloas pending-payload cache. Anything older than the data-availability + // boundary cannot still be in flight; finalised entries are also safe to drop. + if self.spec.gloas_fork_epoch.is_some() { + let finalized_epoch = new_view.finalized_checkpoint.epoch; + let current_epoch = new_snapshot + .beacon_state + .slot() + .epoch(T::EthSpec::slots_per_epoch()); + if let Some(min_epochs_for_blobs) = self + .spec + .min_epoch_data_availability_boundary(current_epoch) + { + let cutoff_epoch = std::cmp::max(finalized_epoch + 1, min_epochs_for_blobs); + if let Err(e) = self.pending_payload_cache.do_maintenance(cutoff_epoch) { + error!(error = ?e, "Failed to prune pending payload cache on finalization"); + } + } + } + if let Some(event_handler) = self.event_handler.as_ref() && event_handler.has_finalized_subscribers() { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f0fa9c7794..cfd8ee7d34 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -33,6 +33,7 @@ use crate::data_column_verification::{ GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; +use crate::kzg_utils::validate_data_columns_with_commitments; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, }; @@ -490,8 +491,7 @@ impl DataAvailabilityChecker { AvailableBlockData::Blobs(blobs) => verify_kzg_for_blob_list(blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs), AvailableBlockData::DataColumns(columns) => { - verify_kzg_for_data_column_list(columns.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn) + verify_columns_against_block(&self.kzg, available_block.block(), columns) } } } @@ -504,13 +504,17 @@ impl DataAvailabilityChecker { available_blocks: &[AvailableBlock], ) -> Result<(), AvailabilityCheckError> { let mut all_blobs = Vec::new(); - let mut all_data_columns = Vec::new(); for available_block in available_blocks { - match available_block.data().to_owned() { + match available_block.data() { AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs), - AvailableBlockData::DataColumns(columns) => all_data_columns.extend(columns), + AvailableBlockData::Blobs(blobs) => all_blobs.extend(blobs.iter().cloned()), + AvailableBlockData::DataColumns(columns) => { + // Each block has its own commitments. For Gloas they live in the bid; for + // Fulu they live inline on the column. Verify per block and let the helper + // pick the right path. + verify_columns_against_block(&self.kzg, available_block.block(), columns)?; + } } } @@ -519,11 +523,6 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; } - if !all_data_columns.is_empty() { - verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; - } - Ok(()) } @@ -605,9 +604,21 @@ impl DataAvailabilityChecker { metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + let columns: Vec<_> = verified_data_columns + .into_iter() + .map(|c| c.into_inner()) + .collect(); + // Fulu columns carry their commitments; reconstruction needs the count to drive the + // per-blob recovery loop. + let kzg_commitments = columns + .first() + .and_then(|c| c.kzg_commitments().ok().cloned()) + .ok_or(AvailabilityCheckError::InvalidVariant)?; + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( &self.kzg, - &verified_data_columns, + columns, + &kzg_commitments, &self.spec, ) .map_err(|e| { @@ -676,6 +687,35 @@ impl DataAvailabilityChecker { } } +/// Verify a batch of data columns belonging to a single block, picking the right commitment +/// source for the block's fork (Fulu: inline on column; Gloas: from the embedded payload bid). +fn verify_columns_against_block( + kzg: &Kzg, + block: &SignedBeaconBlock, + columns: &[Arc>], +) -> Result<(), AvailabilityCheckError> { + if columns.is_empty() { + return Ok(()); + } + if block.fork_name_unchecked().gloas_enabled() { + let commitments = block + .message() + .body() + .signed_execution_payload_bid() + .map(|bid| bid.message.blob_kzg_commitments.clone()) + .map_err(|_| { + AvailabilityCheckError::Unexpected( + "Gloas block missing signed_execution_payload_bid".to_string(), + ) + })?; + validate_data_columns_with_commitments(kzg, columns.iter(), commitments.as_ref()) + .map_err(AvailabilityCheckError::InvalidColumn) + } else { + verify_kzg_for_data_column_list(columns.iter(), kzg) + .map_err(AvailabilityCheckError::InvalidColumn) + } +} + /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { pub block_cache_size: usize, @@ -874,10 +914,13 @@ impl AvailableBlock { match &block_data { AvailableBlockData::NoData => { - if columns_required { - return Err(AvailabilityCheckError::MissingCustodyColumns); - } else if blobs_required { - return Err(AvailabilityCheckError::MissingBlobs); + // For Gloas, DA is checked for the PayloadEnvelope, not for the block. + if !block.fork_name_unchecked().gloas_enabled() { + if columns_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else if blobs_required { + return Err(AvailabilityCheckError::MissingBlobs); + } } } AvailableBlockData::Blobs(blobs) => { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index af3cb72c03..ab69a62985 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,6 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), + MissingBid(Hash256), InvalidColumn((Option, KzgError)), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { @@ -23,6 +24,7 @@ pub enum Error { RebuildingStateCaches(BeaconStateError), SlotClockError, InvalidAvailableBlockData, + InvalidVariant, } #[derive(PartialEq, Eq)] @@ -38,6 +40,7 @@ impl Error { match self { Error::SszTypes(_) | Error::MissingBlobs + | Error::MissingBid(_) | Error::MissingCustodyColumns | Error::StoreError(_) | Error::DecodeError(_) @@ -46,7 +49,8 @@ impl Error { | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) | Error::SlotClockError - | Error::InvalidAvailableBlockData => ErrorCategory::Internal, + | Error::InvalidAvailableBlockData + | Error::InvalidVariant => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 7d1bba2de9..3034e196b9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -109,7 +109,7 @@ impl PendingComponents { .unwrap_or(false) } - /// Returns the indices of cached custody columns + /// Returns the indices of cached sampling columns pub fn get_cached_data_columns_indices(&self) -> Vec { self.verified_data_columns .iter() diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 8ea3c792f4..71562b376b 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -3,7 +3,8 @@ use crate::block_verification::{ }; use crate::data_availability_checker::MissingCellsError; use crate::kzg_utils::{ - reconstruct_data_columns, validate_full_data_columns, validate_partial_data_columns, + reconstruct_data_columns, validate_data_columns_with_commitments, validate_full_data_columns, + validate_partial_data_columns, }; use crate::observed_data_sidecars::{ Error as ObservedDataSidecarsError, ObservationKey, ObservationStrategy, Observe, @@ -20,6 +21,7 @@ use std::iter; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use store::DatabaseBlock; use tracing::{debug, instrument}; use tree_hash::TreeHash; use types::data::{ @@ -27,13 +29,16 @@ use types::data::{ PartialDataColumnSidecarError, }; use types::{ - BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, - EthSpec, Hash256, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, Slot, + BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, + KzgCommitment, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, SignedExecutionPayloadBid, + Slot, }; /// An error occurred while validating a gossip data column. #[derive(Debug)] pub enum GossipDataColumnError { + /// Internal logic error: the column sidecar variant does not match the expected fork. + /// This is not a peer fault and should not be used to penalize peers. InvalidVariant, /// There was an error whilst processing the data column. It is not known if it is /// valid or invalid. @@ -82,10 +87,7 @@ pub enum GossipDataColumnError { /// ## Peer scoring /// /// The column is invalid or the peer is faulty. - InvalidSubnetId { - received: u64, - expected: u64, - }, + InvalidSubnetId { received: u64, expected: u64 }, /// The column sidecar is from a slot that is later than the current slot (with respect to the /// gossip clock disparity). /// @@ -118,18 +120,27 @@ pub enum GossipDataColumnError { /// ## Peer scoring /// /// The column is invalid and the peer is faulty. - ProposerIndexMismatch { - sidecar: usize, - local: usize, - }, + ProposerIndexMismatch { sidecar: usize, local: usize }, /// The provided columns's parent block is unknown. /// /// ## Peer scoring /// /// We cannot process the columns without validating its parent, the peer isn't necessarily faulty. - ParentUnknown { - parent_root: Hash256, - slot: Slot, + ParentUnknown { parent_root: Hash256, slot: Slot }, + /// The block referenced by the data column is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the column without the referenced block, the peer isn't necessarily faulty. + BlockRootUnknown { block_root: Hash256, slot: Slot }, + /// The data column slot does not match its referenced block slot. + /// + /// ## Peer scoring + /// + /// The column sidecar is invalid and the peer is faulty. + BlockSlotMismatch { + block_slot: Slot, + data_column_slot: Slot, }, /// The column conflicts with finalization, no need to propagate. /// @@ -137,9 +148,7 @@ pub enum GossipDataColumnError { /// /// It's unclear if this column is valid, but it conflicts with finality and shouldn't be /// imported. - NotFinalizedDescendant { - block_parent_root: Hash256, - }, + NotFinalizedDescendant { block_parent_root: Hash256 }, /// Invalid kzg commitment inclusion proof /// /// ## Peer scoring @@ -187,10 +196,7 @@ pub enum GossipDataColumnError { /// ## Peer scoring /// /// The column sidecar is invalid and the peer is faulty - InconsistentProofsLength { - cells_len: usize, - proofs_len: usize, - }, + InconsistentProofsLength { cells_len: usize, proofs_len: usize }, /// The number of KZG commitments exceeds the maximum number of blobs allowed for the fork. The /// sidecar is invalid. /// @@ -200,6 +206,12 @@ pub enum GossipDataColumnError { max_blobs_per_block: usize, commitments_len: usize, }, + + /// An internal error occurred. + /// + /// ## Peer scoring + /// This is an internal issue, the peer isn't at fault. + InternalError(String), } impl From for GossipDataColumnError { @@ -302,26 +314,35 @@ impl GossipVerifiedDataColumn subnet_id: DataColumnSubnetId, chain: &BeaconChain, ) -> Result { - match column_sidecar.as_ref() { - DataColumnSidecar::Fulu(c) => { - let header = c.signed_block_header.clone(); + let data_column = match column_sidecar.as_ref() { + DataColumnSidecar::Fulu(column_sidecar_fulu) => { + let header = &column_sidecar_fulu.signed_block_header; // We only process slashing info if the gossip verification failed // since we do not process the data column any further in that case. validate_data_column_sidecar_for_gossip_fulu::( - column_sidecar, + column_sidecar.clone(), subnet_id, chain, ) .map_err(|e| { process_block_slash_info::<_, GossipDataColumnError>( chain, - BlockSlashInfo::from_early_error_data_column(header, e), + BlockSlashInfo::from_early_error_data_column(header.clone(), e), ) - }) + })? } - // TODO(gloas) support gloas data column variant - DataColumnSidecar::Gloas(_) => Err(GossipDataColumnError::InvalidVariant), - } + DataColumnSidecar::Gloas(_) => validate_data_column_sidecar_for_gossip_gloas::( + column_sidecar.clone(), + subnet_id, + chain, + )?, + }; + + Ok(GossipVerifiedDataColumn { + block_root: column_sidecar.block_root(), + data_column, + _phantom: PhantomData, + }) } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for block production ONLY. @@ -331,7 +352,28 @@ impl GossipVerifiedDataColumn column_sidecar: Arc>, chain: &BeaconChain, ) -> Result { - verify_data_column_sidecar(&column_sidecar, &chain.spec)?; + match column_sidecar.as_ref() { + DataColumnSidecar::Fulu(data_column_fulu) => { + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; + } + DataColumnSidecar::Gloas(_) => { + let bid = load_gloas_payload_bid(column_sidecar.block_root(), chain)?.ok_or( + GossipDataColumnError::BlockRootUnknown { + block_root: column_sidecar.block_root(), + slot: column_sidecar.slot(), + }, + )?; + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + bid.message.blob_kzg_commitments.len(), + &chain.spec, + )?; + } + } // Check if the data column is already in the DA checker cache. This happens when data columns // are made available through the `engine_getBlobs` method. If it exists in the cache, we know @@ -340,28 +382,20 @@ impl GossipVerifiedDataColumn // In this case, we should accept it for gossip propagation. verify_is_unknown_sidecar(chain, &column_sidecar)?; - match chain - .data_availability_checker - .missing_cells_for_column_sidecar(&column_sidecar) - { - Ok(Some(_)) => Ok(Self { + // Check if this column contains any cells not already in the cache. If all cells are + // already cached, reject as `PriorKnownUnpublished` to avoid redundant processing. + match missing_cells_for_column_sidecar(chain, &column_sidecar)? { + Some(_) => Ok(Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::from_execution_verified(column_sidecar), _phantom: Default::default(), }), - Ok(None) => { - // Observe this data column so we don't process it again. + None => { if O::observe() { observe_gossip_data_column(&column_sidecar, chain)?; } Err(GossipDataColumnError::PriorKnownUnpublished) } - Err(MissingCellsError::MismatchesCachedColumn) => { - Err(GossipDataColumnError::MismatchesCachedColumn) - } - Err(MissingCellsError::UnexpectedError(_)) => { - todo!("handle unexpected error") - } } } @@ -430,12 +464,30 @@ impl KzgVerifiedDataColumn { data_columns: Vec>>, kzg: &Kzg, ) -> Result, (Option, KzgError)> { + let seen_timestamp = timestamp_now(); verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() .map(|column| Self { data: column, - seen_timestamp: timestamp_now(), + seen_timestamp, + }) + .collect()) + } + + pub fn from_batch_with_scoring_and_commitments( + data_columns: Vec>>, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + ) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_BATCH_TIMES); + let seen_timestamp = timestamp_now(); + validate_data_columns_with_commitments(kzg, data_columns.iter(), kzg_commitments)?; + Ok(data_columns + .into_iter() + .map(|column| Self { + data: column, + seen_timestamp, }) .collect()) } @@ -635,17 +687,12 @@ impl KzgVerifiedCustodyDataColumn { pub fn reconstruct_columns( kzg: &Kzg, - partial_set_of_columns: &[Self], + partial_set_of_columns: Vec>>, + kzg_commitments: &[KzgCommitment], spec: &ChainSpec, ) -> Result>, KzgError> { - let all_data_columns = reconstruct_data_columns( - kzg, - partial_set_of_columns - .iter() - .map(|d| d.clone_arc()) - .collect::>(), - spec, - )?; + let all_data_columns = + reconstruct_data_columns(kzg, partial_set_of_columns, kzg_commitments, spec)?; let seen_timestamp = timestamp_now(); @@ -860,6 +907,26 @@ pub fn verify_kzg_for_data_column( }) } +#[instrument(skip_all, level = "debug")] +pub fn verify_kzg_for_data_column_with_commitments( + data_column: Arc>, + cells_to_verify: PartialDataColumnSidecarRef, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + seen_timestamp: Duration, +) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); + validate_partial_data_columns( + kzg, + iter::once((*data_column.index(), cells_to_verify)), + kzg_commitments, + )?; + Ok(KzgVerifiedDataColumn { + data: data_column, + seen_timestamp, + }) +} + /// Complete kzg verification for a `VerifiablePartialDataColumn`. /// /// Returns an error if the kzg verification check fails. @@ -910,13 +977,18 @@ pub fn validate_data_column_sidecar_for_gossip_fulu>, subnet: DataColumnSubnetId, chain: &BeaconChain, -) -> Result, GossipDataColumnError> { +) -> Result, GossipDataColumnError> { let DataColumnSidecar::Fulu(data_column_fulu) = data_column.as_ref() else { return Err(GossipDataColumnError::InvalidVariant); }; let column_slot = data_column.slot(); - verify_data_column_sidecar(&data_column, &chain.spec)?; + + verify_data_column_sidecar_with_commitments_len( + &data_column, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; verify_sidecar_not_from_future_slot(chain, column_slot)?; verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; @@ -935,7 +1007,10 @@ pub fn validate_data_column_sidecar_for_gossip_fulu { GossipDataColumnError::MismatchesCachedColumn } - MissingCellsError::UnexpectedError(_) => todo!("handle unexpected error"), + MissingCellsError::UnexpectedError(e) => GossipDataColumnError::InternalError(format!( + "An unexpected error occurred while validating fulu data columns. {:?}", + e + )), })? else { // Observe this data column so we don't process it again. @@ -945,7 +1020,7 @@ pub fn validate_data_column_sidecar_for_gossip_fulu( + data_column: Arc>, + subnet: DataColumnSubnetId, + chain: &BeaconChain, +) -> Result, GossipDataColumnError> { + let DataColumnSidecar::Gloas(_) = data_column.as_ref() else { + return Err(GossipDataColumnError::InvalidVariant); + }; + + let column_slot = data_column.slot(); + + if *data_column.index() >= T::EthSpec::number_of_columns() as u64 { + return Err(GossipDataColumnError::InvalidColumnIndex( + *data_column.index(), + )); + } + verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; + verify_sidecar_not_from_future_slot(chain, column_slot)?; + verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; + verify_is_unknown_sidecar(chain, &data_column)?; + + let bid = load_gloas_payload_bid(data_column.block_root(), chain)?.ok_or( + GossipDataColumnError::BlockRootUnknown { + block_root: data_column.block_root(), + slot: column_slot, + }, + )?; + if bid.message.slot != column_slot { + return Err(GossipDataColumnError::BlockSlotMismatch { + block_slot: bid.message.slot, + data_column_slot: column_slot, + }); + } + let kzg_commitments = &bid.message.blob_kzg_commitments; + verify_data_column_sidecar_with_commitments_len( + &data_column, + kzg_commitments.len(), + &chain.spec, + )?; + + let Some(cells_to_kzg_verify) = missing_cells_for_column_sidecar(chain, &data_column)? else { + // Observe this data column so we don't process it again. + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + return Err(GossipDataColumnError::PriorKnownUnpublished); + }; + + let kzg = &chain.kzg; + let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default(); + let kzg_verified = verify_kzg_for_data_column_with_commitments( + data_column.clone(), + cells_to_kzg_verify, + kzg_commitments.as_ref(), + kzg, + seen_timestamp, + ) + .map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?; + + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + + Ok(kzg_verified) } #[instrument(skip_all, level = "debug")] @@ -1115,9 +1259,9 @@ pub enum PartialColumnVerificationResult { Err(GossipPartialDataColumnError), } -/// Verify if the data column sidecar is valid. -fn verify_data_column_sidecar( +fn verify_data_column_sidecar_with_commitments_len( data_column: &DataColumnSidecar, + commitments_len: usize, spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { if *data_column.index() >= E::number_of_columns() as u64 { @@ -1126,12 +1270,6 @@ fn verify_data_column_sidecar( )); } - // TODO(gloas): implement Gloas verification that takes kzg_commitments from block as parameter - let commitments_len = match data_column { - DataColumnSidecar::Fulu(dc) => dc.kzg_commitments.len(), - DataColumnSidecar::Gloas(_) => return Err(GossipDataColumnError::InvalidVariant), - }; - if commitments_len == 0 { return Err(GossipDataColumnError::UnexpectedDataColumn); } @@ -1164,6 +1302,93 @@ fn verify_data_column_sidecar( Ok(()) } +/// Loads the Gloas payload bid for `block_root` from the `pending_payload_cache`, the +/// `early_attester_cache`, or the on-disk store (in that order). +/// +/// TODO(gloas): the store fallback is a synchronous disk read and several callers run inside +/// `async` gossip / RPC validation paths. Move the disk path off the async runtime (e.g. behind +/// `spawn_blocking`) — or restructure callers to fetch the bid before entering async — once the +/// gossip pipeline is reworked for Gloas. The cache and early-attester paths are short +/// in-memory locks and acceptable as-is. +pub(crate) fn load_gloas_payload_bid( + block_root: Hash256, + chain: &BeaconChain, +) -> Result>>, BeaconChainError> { + if let Some(bid) = chain.pending_payload_cache.get_bid(&block_root) { + return Ok(Some(bid)); + } + + let bid = if let Some(block) = chain.early_attester_cache.get_block(block_root) { + Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ) + } else { + match chain + .store + .try_get_full_block(&block_root) + .map_err(BeaconChainError::DBError)? + { + Some(DatabaseBlock::Full(block)) => Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ), + Some(DatabaseBlock::Blinded(block)) => Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .map_err(BeaconChainError::BeaconStateError)? + .clone(), + ), + None => { + return Ok(None); + } + } + }; + + chain + .pending_payload_cache + .insert_bid(block_root, bid.clone()); + + Ok(Some(bid)) +} + +fn missing_cells_for_column_sidecar<'a, T: BeaconChainTypes>( + chain: &'_ BeaconChain, + data_column: &'a DataColumnSidecar, +) -> Result>, GossipDataColumnError> { + let result = if chain + .spec + .fork_name_at_slot::(data_column.slot()) + .gloas_enabled() + { + chain + .pending_payload_cache + .missing_cells_for_column_sidecar(data_column) + } else { + chain + .data_availability_checker + .missing_cells_for_column_sidecar(data_column) + }; + + result.map_err(|err| match err { + MissingCellsError::MismatchesCachedColumn => GossipDataColumnError::MismatchesCachedColumn, + MissingCellsError::UnexpectedError(e) => GossipDataColumnError::InternalError(format!( + "An unexpected error occurred while calculating missing partial cells {:?}", + e + )), + }) +} + /// Verify that `column_sidecar` is not yet known, i.e. this is the first time `column_sidecar` has been received for the tuple: /// `(block_header.slot, block_header.proposer_index, column_sidecar.index)` fn verify_is_unknown_sidecar( @@ -1187,10 +1412,15 @@ fn verify_is_unknown_sidecar( } fn verify_column_inclusion_proof( - data_column: &DataColumnSidecarFulu, + data_column: &DataColumnSidecar, ) -> Result<(), GossipDataColumnError> { let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_INCLUSION_PROOF_VERIFICATION); - if !data_column.verify_inclusion_proof() { + + let DataColumnSidecar::Fulu(data_column_fulu) = data_column else { + return Err(GossipDataColumnError::InvalidVariant); + }; + + if !data_column_fulu.verify_inclusion_proof() { return Err(GossipDataColumnError::InvalidInclusionProof); } @@ -1447,7 +1677,7 @@ mod test { let verify_fn = |column_sidecar: DataColumnSidecar| { let col_index = *column_sidecar.index(); validate_data_column_sidecar_for_gossip_fulu::<_, Observe>( - column_sidecar.into(), + Arc::new(column_sidecar), DataColumnSubnetId::from_column_index(col_index, &harness.spec), &harness.chain, ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index c94fb036f8..f5ba647fce 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -119,10 +119,12 @@ impl FetchBlobsBeaconAdapter { .cached_blob_indexes(block_root) } - pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { - self.chain - .data_availability_checker - .cached_data_column_indexes(block_root) + pub(crate) fn cached_data_column_indexes( + &self, + block_root: &Hash256, + slot: Slot, + ) -> Option> { + self.chain.cached_data_column_indexes(block_root, slot) } pub(crate) async fn process_engine_blobs( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index f7b4b8a29e..351e35666a 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -445,7 +445,7 @@ async fn compute_custody_columns_to_import( // Only consider columns that are not already known to data availability. if let Some(known_columns) = - chain_adapter_cloned.cached_data_column_indexes(&block_root) + chain_adapter_cloned.cached_data_column_indexes(&block_root, header.slot()) { custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index ef282a3eaa..37d40f3a27 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -199,7 +199,7 @@ mod get_blobs_v2 { .returning(|_| None); mock_adapter .expect_cached_data_column_indexes() - .returning(|_| None); + .returning(|_, _| None); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index b05a896777..bc803efe93 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -111,6 +111,57 @@ pub fn validate_full_data_columns<'a, E: EthSpec>( kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) } +/// Validate a batch of full `DataColumnSidecar`s against commitments supplied out-of-band. +/// +/// Gloas sidecars do not carry commitments. Their commitments come from the block's +/// `ExecutionPayloadBid`. +pub fn validate_data_columns_with_commitments<'a, E: EthSpec>( + kzg: &Kzg, + data_column_iter: impl Iterator>>, + kzg_commitments: &[KzgCommitment], +) -> Result<(), (Option, KzgError)> { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut column_indices = Vec::new(); + let mut commitments = Vec::new(); + + for data_column in data_column_iter { + let col_index = *data_column.index(); + + if data_column.column().is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + + for cell in data_column.column() { + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); + column_indices.push(col_index); + } + + for &proof in data_column.kzg_proofs() { + proofs.push(proof.0); + } + + for &commitment in kzg_commitments { + commitments.push(commitment.0); + } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column. + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } + } + + kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) +} + /// Validate a batch of partial `VerifiablePartialDataColumn`s. /// /// Partial columns may have missing cells, indicated by a bitmap. We only verify present cells. @@ -618,19 +669,17 @@ pub fn reconstruct_blobs( // Sort data columns by index to ensure ascending order for KZG operations data_columns.sort_unstable_by_key(|dc| *dc.index()); - let first_data_column = data_columns - .first() - .ok_or("data_columns should have at least one element".to_string())?; + if data_columns.is_empty() { + return Err("data_columns should have at least one element".to_string()); + } let blob_indices: Vec = match blob_indices_opt { Some(indices) => indices.into_iter().map(|i| i as usize).collect(), None => { - // TODO(gloas): support blob reconstruction for Gloas - // https://github.com/sigp/lighthouse/issues/7413 - let num_of_blobs = first_data_column - .kzg_commitments() - .map_err(|_| "Gloas blob reconstruction not yet supported".to_string())? - .len(); + let num_of_blobs = signed_block + .message() + .blob_kzg_commitments_len() + .ok_or_else(|| "Block does not have blob KZG commitments".to_string())?; (0..num_of_blobs).collect() } }; @@ -689,9 +738,14 @@ pub fn reconstruct_blobs( } /// Reconstruct all data columns from a subset of data column sidecars (requires at least 50%). +/// +/// `kzg_commitments` are the commitments for the underlying blobs. For Fulu they live in the +/// column itself; for Gloas they live in the bid. We take them as a parameter so this function +/// works for both forks (mirroring `validate_data_columns_with_commitments`). pub fn reconstruct_data_columns( kzg: &Kzg, mut data_columns: Vec>>, + kzg_commitments: &[KzgCommitment], spec: &ChainSpec, ) -> Result, KzgError> { // Sort data columns by index to ensure ascending order for KZG operations @@ -703,16 +757,7 @@ pub fn reconstruct_data_columns( "data_columns should have at least one element".to_string(), ))?; - // TODO(gloas): support data column reconstruction for Gloas - // https://github.com/sigp/lighthouse/issues/7413 - let num_of_blobs = first_data_column - .kzg_commitments() - .map_err(|_| { - KzgError::InconsistentArrayLength( - "Gloas data column reconstruction not yet supported".to_string(), - ) - })? - .len(); + let num_of_blobs = kzg_commitments.len(); let blob_cells_and_proofs_vec = (0..num_of_blobs) .into_par_iter() @@ -757,8 +802,9 @@ pub fn reconstruct_data_columns( #[cfg(test)] mod test { use crate::kzg_utils::{ - blobs_to_data_column_sidecars, blobs_to_data_column_sidecars_gloas, reconstruct_blobs, - reconstruct_data_columns, validate_full_data_columns, + blob_to_kzg_commitment, blobs_to_data_column_sidecars, blobs_to_data_column_sidecars_gloas, + reconstruct_blobs, reconstruct_data_columns, validate_data_columns_with_commitments, + validate_full_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -787,9 +833,13 @@ mod test { test_reconstruct_blobs_from_data_columns_unordered(&kzg, &fulu_spec); test_validate_data_columns(&kzg, &fulu_spec); + test_validate_data_columns_with_commitments(&kzg, &fulu_spec); + let gloas_spec = ForkName::Gloas.make_genesis_spec(E::default_spec()); test_build_data_columns_gloas(&kzg, &gloas_spec); test_build_data_columns_gloas_empty(&kzg, &gloas_spec); + test_reconstruct_data_columns_gloas(&kzg, &gloas_spec); + test_validate_data_columns_with_commitments_gloas(&kzg, &gloas_spec); } #[track_caller] @@ -806,6 +856,63 @@ mod test { assert!(result.is_ok()); } + #[track_caller] + fn test_validate_data_columns_with_commitments(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 2; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + + let commitments = signed_block + .message() + .body() + .blob_kzg_commitments() + .unwrap(); + + let result = + validate_data_columns_with_commitments(kzg, column_sidecars.iter(), commitments); + assert!(result.is_ok()); + + // Verify that wrong commitments cause a failure + let bad_commitments = vec![KzgCommitment::empty_for_testing(); num_of_blobs]; + let result = + validate_data_columns_with_commitments(kzg, column_sidecars.iter(), &bad_commitments); + assert!(result.is_err()); + } + + #[track_caller] + fn test_validate_data_columns_with_commitments_gloas(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 2; + let (blobs, _proofs) = create_test_gloas_blobs::(num_of_blobs); + let blob_refs: Vec<_> = blobs.iter().collect(); + let column_sidecars = blobs_to_data_column_sidecars_gloas::( + &blob_refs, + Hash256::random(), + Slot::new(0), + kzg, + spec, + ) + .unwrap(); + + let commitments: Vec = blobs + .iter() + .map(|blob| blob_to_kzg_commitment::(kzg, blob).unwrap()) + .collect(); + + let result = + validate_data_columns_with_commitments(kzg, column_sidecars.iter(), &commitments); + assert!(result.is_ok()); + + // Verify that wrong commitments cause a failure + let bad_commitments = vec![KzgCommitment::empty_for_testing(); num_of_blobs]; + let result = + validate_data_columns_with_commitments(kzg, column_sidecars.iter(), &bad_commitments); + assert!(result.is_err()); + } + #[track_caller] fn test_build_data_columns_empty(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 0; @@ -918,11 +1025,18 @@ mod test { let column_sidecars = blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) .unwrap(); + let commitments = signed_block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .clone(); // Now reconstruct let reconstructed_columns = reconstruct_data_columns( kzg, column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2].to_vec(), + &commitments, spec, ) .unwrap(); @@ -942,12 +1056,49 @@ mod test { let column_sidecars = blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) .unwrap(); + let commitments = signed_block + .message() + .body() + .blob_kzg_commitments() + .unwrap() + .clone(); // Test reconstruction with columns in reverse order (non-ascending) let mut subset_columns: Vec<_> = column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2].to_vec(); subset_columns.reverse(); // This would fail without proper sorting in reconstruct_data_columns - let reconstructed_columns = reconstruct_data_columns(kzg, subset_columns, spec).unwrap(); + let reconstructed_columns = + reconstruct_data_columns(kzg, subset_columns, &commitments, spec).unwrap(); + + for i in 0..E::number_of_columns() { + assert_eq!(reconstructed_columns.get(i), column_sidecars.get(i), "{i}"); + } + } + + /// Reconstruct a full Gloas column set from a 50% subset and assert the recovered sidecars + /// match the originals. Commitments come from the bid (here mocked via the same + /// `KzgCommitments` used to build the columns) since Gloas columns don't carry them. + #[track_caller] + fn test_reconstruct_data_columns_gloas(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 2; + let (blobs, _proofs) = create_test_gloas_blobs::(num_of_blobs); + let blob_refs: Vec<_> = blobs.iter().collect(); + let column_sidecars = blobs_to_data_column_sidecars_gloas::( + &blob_refs, + Hash256::random(), + Slot::new(0), + kzg, + spec, + ) + .unwrap(); + + let commitments = + KzgCommitments::::new(vec![KzgCommitment::empty_for_testing(); num_of_blobs]) + .unwrap(); + + let subset = column_sidecars[..column_sidecars.len() / 2].to_vec(); + let reconstructed_columns = + reconstruct_data_columns(kzg, subset, &commitments, spec).unwrap(); for i in 0..E::number_of_columns() { assert_eq!(reconstructed_columns.get(i), column_sidecars.get(i), "{i}"); diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d70fc1b3ec..804268a613 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -48,6 +48,7 @@ pub mod payload_attestation_verification; pub mod payload_bid_verification; pub mod payload_envelope_streamer; pub mod payload_envelope_verification; +pub mod pending_payload_cache; pub mod pending_payload_envelopes; pub mod persisted_beacon_chain; pub mod persisted_custody; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 43c3337bc9..df1b005820 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -2043,6 +2043,12 @@ pub static DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "pending_payload_cache_size", + "Number of entries in the pending payload availability cache.", + ) +}); pub static DATA_AVAILABILITY_RECONSTRUCTION_TIME: LazyLock> = LazyLock::new(|| { try_create_histogram( @@ -2150,6 +2156,10 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { &DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE, da_checker_metrics.block_cache_size, ); + set_gauge_by_usize( + &PENDING_PAYLOAD_CACHE_SIZE, + beacon_chain.pending_payload_cache.cache_size(), + ); if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size); diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs index 4b8e7347cc..b678bdbaea 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/execution_pending_envelope.rs @@ -1,23 +1,22 @@ -use std::sync::Arc; - +use bls::Hash256; use slot_clock::SlotClock; use state_processing::{VerifySignatures, envelope_processing::verify_execution_payload_envelope}; -use types::EthSpec; +use std::sync::Arc; +use types::{EthSpec, SignedExecutionPayloadEnvelope}; use crate::{ BeaconChain, BeaconChainError, BeaconChainTypes, NotifyExecutionLayer, PayloadVerificationOutcome, block_verification::PayloadVerificationHandle, payload_envelope_verification::{ - EnvelopeError, EnvelopeImportData, MaybeAvailableEnvelope, - gossip_verified_envelope::GossipVerifiedEnvelope, load_snapshot_from_state_root, - payload_notifier::PayloadNotifier, + EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, + load_snapshot_from_state_root, payload_notifier::PayloadNotifier, }, }; pub struct ExecutionPendingEnvelope { - pub signed_envelope: MaybeAvailableEnvelope, - pub import_data: EnvelopeImportData, + pub signed_envelope: Arc>, + pub block_root: Hash256, pub payload_verification_handle: PayloadVerificationHandle, } @@ -29,7 +28,6 @@ impl GossipVerifiedEnvelope { ) -> Result, EnvelopeError> { let signed_envelope = self.signed_envelope; let envelope = &signed_envelope.message; - let payload = &envelope.payload; // Define a future that will verify the execution payload with an execution engine. // @@ -87,14 +85,8 @@ impl GossipVerifiedEnvelope { )?; Ok(ExecutionPendingEnvelope { - signed_envelope: MaybeAvailableEnvelope::AvailabilityPending { - block_hash: payload.block_hash, - envelope: signed_envelope, - }, - import_data: EnvelopeImportData { - block_root, - _phantom: Default::default(), - }, + signed_envelope, + block_root, payload_verification_handle, }) } diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index b40e8337fb..73ddb43273 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -9,13 +9,18 @@ use tracing::{debug, error, info, info_span, instrument, warn}; use types::{BlockImportSource, Hash256, SignedExecutionPayloadEnvelope}; use super::{ - AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, EnvelopeImportData, - ExecutedEnvelope, gossip_verified_envelope::GossipVerifiedEnvelope, + AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, + gossip_verified_envelope::GossipVerifiedEnvelope, }; use crate::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, - NotifyExecutionLayer, block_verification_types::AvailableBlockData, metrics, - payload_envelope_verification::ExecutionPendingEnvelope, validator_monitor::get_slot_delay_ms, + AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, + NotifyExecutionLayer, + block_verification_types::AvailableBlockData, + metrics, + payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, ExecutionPendingEnvelope, + }, + validator_monitor::get_slot_delay_ms, }; const ENVELOPE_METRICS_CACHE_SLOT_LIMIT: u32 = 64; @@ -28,13 +33,13 @@ impl BeaconChain { /// /// Returns an `Err` if the given payload envelope was invalid, or an error was encountered during /// verification. - #[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))] + #[instrument(skip_all, fields(block_root = ?block_root, envelope_source = %envelope_source))] pub async fn process_execution_payload_envelope( self: &Arc, block_root: Hash256, unverified_envelope: GossipVerifiedEnvelope, notify_execution_layer: NotifyExecutionLayer, - block_source: BlockImportSource, + envelope_source: BlockImportSource, publish_fn: impl FnOnce() -> Result<(), EnvelopeError>, ) -> Result { let block_slot = unverified_envelope.signed_envelope.slot(); @@ -50,7 +55,7 @@ impl BeaconChain { ); } - // TODO(gloas) insert the pre-executed envelope into some type of cache. + // TODO(gloas) insert the pre-executed envelope into some type of cache? let _full_timer = metrics::start_timer(&metrics::ENVELOPE_PROCESSING_TIMES); @@ -79,12 +84,11 @@ impl BeaconChain { let executed_envelope = chain .into_executed_payload_envelope(execution_pending) .await - .inspect_err(|_| { - // TODO(gloas) If the envelope fails execution for whatever reason (e.g. engine offline), - // and we keep it in the cache, then the node will NOT perform lookup and - // reprocess this block until the block is evicted from DA checker, causing the - // chain to get stuck temporarily if the block is canonical. Therefore we remove - // it from the cache if execution fails. + .map_err(|error| match error { + BlockError::ExecutionPayloadError(error) => { + EnvelopeError::ExecutionPayloadError(error) + } + error => EnvelopeError::ImportError(error), })?; // Record the time it took to wait for execution layer verification. @@ -94,15 +98,9 @@ impl BeaconChain { .set_time_executed(block_root, block_slot, timestamp); } - match executed_envelope { - ExecutedEnvelope::Available(envelope) => { - self.import_available_execution_payload_envelope(Box::new(envelope)) - .await - } - ExecutedEnvelope::AvailabilityPending() => Err(EnvelopeError::InternalError( - "Pending payload envelope not yet implemented".to_owned(), - )), - } + self.check_envelope_availability_and_import(executed_envelope) + .await + .map_err(EnvelopeError::ImportError) }; // Verify and import the payload envelope. @@ -112,7 +110,7 @@ impl BeaconChain { info!( ?block_root, %block_slot, - source = %block_source, + source = %envelope_source, "Execution payload envelope imported" ); @@ -138,6 +136,14 @@ impl BeaconChain { } Err(EnvelopeError::BeaconChainError(e)) } + Err(EnvelopeError::ImportError(BlockError::BeaconChainError(e))) => { + if matches!(e.as_ref(), BeaconChainError::TokioJoin(_)) { + debug!(error = ?e, "Envelope processing cancelled"); + } else { + warn!(error = ?e, "Execution payload envelope rejected"); + } + Err(EnvelopeError::ImportError(BlockError::BeaconChainError(e))) + } Err(other) => { warn!( reason = other.to_string(), @@ -148,6 +154,19 @@ impl BeaconChain { } } + #[instrument(skip_all)] + async fn check_envelope_availability_and_import( + self: &Arc, + envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result { + let slot = envelope.envelope.slot(); + let availability = self + .pending_payload_cache + .put_executed_payload_envelope(envelope)?; + self.process_payload_envelope_availability(slot, availability, || Ok(())) + .await + } + /// Accepts a fully-verified payload envelope and awaits on its payload verification handle to /// get a fully `ExecutedEnvelope`. /// @@ -156,10 +175,10 @@ impl BeaconChain { async fn into_executed_payload_envelope( self: Arc, pending_envelope: ExecutionPendingEnvelope, - ) -> Result, EnvelopeError> { + ) -> Result, BlockError> { let ExecutionPendingEnvelope { signed_envelope, - import_data, + block_root, payload_verification_handle, } = pending_envelope; @@ -173,16 +192,13 @@ impl BeaconChain { .payload_verification_status .is_optimistic() { - return Err(EnvelopeError::OptimisticSyncNotSupported { - block_root: import_data.block_root, - }); + return Err(BlockError::OptimisticSyncNotSupported { block_root }); } - Ok(ExecutedEnvelope::new( + Ok(AvailabilityPendingExecutedEnvelope::new( signed_envelope, - import_data, + block_root, payload_verification_outcome, - self.spec.clone(), )) } @@ -190,18 +206,13 @@ impl BeaconChain { pub async fn import_available_execution_payload_envelope( self: &Arc, envelope: Box>, - ) -> Result { + ) -> Result { let AvailableExecutedEnvelope { envelope, - import_data, + block_root, payload_verification_outcome, } = *envelope; - let EnvelopeImportData { - block_root, - _phantom, - } = import_data; - let block_root = { let chain = self.clone(); self.spawn_blocking_handle( @@ -232,13 +243,13 @@ impl BeaconChain { signed_envelope: AvailableEnvelope, block_root: Hash256, payload_verification_status: PayloadVerificationStatus, - ) -> Result { + ) -> Result { // Everything in this initial section is on the hot path for processing the envelope. // Take an upgradable read lock on fork choice so we can check if this block has already // been imported. We don't want to repeat work importing a block that is already imported. let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); if !fork_choice_reader.contains_block(&block_root) { - return Err(EnvelopeError::BlockRootUnknown { block_root }); + return Err(BlockError::EnvelopeBlockRootUnknown(block_root)); } // TODO(gloas) add defensive check to see if payload envelope is already in fork choice @@ -253,7 +264,7 @@ impl BeaconChain { // node which can be eligible for head. fork_choice .on_valid_payload_envelope_received(block_root) - .map_err(|e| EnvelopeError::InternalError(format!("{e:?}")))?; + .map_err(|e| BlockError::InternalError(format!("{e:?}")))?; // TODO(gloas) emit SSE event if the payload became the new head payload diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs index b153a3cd6a..a1e4e34eb6 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/mod.rs @@ -18,14 +18,13 @@ //! //! ``` -use std::marker::PhantomData; +use state_processing::envelope_processing::EnvelopeProcessingError; use std::sync::Arc; - -use state_processing::{BlockProcessingError, envelope_processing::EnvelopeProcessingError}; use store::Error as DBError; +use strum::AsRefStr; use tracing::instrument; use types::{ - BeaconState, BeaconStateError, ChainSpec, DataColumnSidecarList, EthSpec, ExecutionBlockHash, + BeaconState, BeaconStateError, DataColumnSidecarList, EthSpec, ExecutionBlockHash, ExecutionPayloadEnvelope, Hash256, SignedExecutionPayloadEnvelope, Slot, }; @@ -41,39 +40,18 @@ mod payload_notifier; pub use execution_pending_envelope::ExecutionPendingEnvelope; -// TODO(gloas): could remove this type completely, or remove the generic -#[derive(PartialEq)] -pub struct EnvelopeImportData { - pub block_root: Hash256, - _phantom: PhantomData, -} - #[derive(Debug)] -#[allow(dead_code)] pub struct AvailableEnvelope { - execution_block_hash: ExecutionBlockHash, envelope: Arc>, - columns: DataColumnSidecarList, - /// Timestamp at which this envelope first became available (UNIX timestamp, time since 1970). - columns_available_timestamp: Option, - pub spec: Arc, + pub columns: DataColumnSidecarList, } impl AvailableEnvelope { pub fn new( - execution_block_hash: ExecutionBlockHash, envelope: Arc>, columns: DataColumnSidecarList, - columns_available_timestamp: Option, - spec: Arc, ) -> Self { - Self { - execution_block_hash, - envelope, - columns, - columns_available_timestamp, - spec, - } + Self { envelope, columns } } pub fn message(&self) -> &ExecutionPayloadEnvelope { @@ -94,14 +72,6 @@ impl AvailableEnvelope { } } -pub enum MaybeAvailableEnvelope { - Available(AvailableEnvelope), - AvailabilityPending { - block_hash: ExecutionBlockHash, - envelope: Arc>, - }, -} - /// This snapshot is to be used for verifying a payload envelope. #[derive(Debug, Clone)] pub struct EnvelopeProcessingSnapshot { @@ -111,46 +81,25 @@ pub struct EnvelopeProcessingSnapshot { pub beacon_block_root: Hash256, } -/// A payload envelope that has gone through processing checks and execution by an EL client. -/// This envelope hasn't necessarily completed data availability checks. -/// -/// -/// It contains 2 variants: -/// 1. `Available`: This envelope has been executed and also contains all data to consider it -/// fully available. -/// 2. `AvailabilityPending`: This envelope hasn't received all required blobs to consider it -/// fully available. -#[allow(dead_code)] -pub enum ExecutedEnvelope { - Available(AvailableExecutedEnvelope), - // TODO(gloas): check data column availability via DA checker - AvailabilityPending(), +/// A payload envelope that has completed all envelope processing checks, verification +/// by an EL client but does not have all requisite columns to get imported into +/// fork choice. +pub struct AvailabilityPendingExecutedEnvelope { + pub envelope: Arc>, + pub block_root: Hash256, + pub payload_verification_outcome: PayloadVerificationOutcome, } -impl ExecutedEnvelope { +impl AvailabilityPendingExecutedEnvelope { pub fn new( - envelope: MaybeAvailableEnvelope, - import_data: EnvelopeImportData, + envelope: Arc>, + block_root: Hash256, payload_verification_outcome: PayloadVerificationOutcome, - spec: Arc, ) -> Self { - match envelope { - MaybeAvailableEnvelope::Available(available_envelope) => { - Self::Available(AvailableExecutedEnvelope::new( - available_envelope, - import_data, - payload_verification_outcome, - )) - } - // TODO(gloas): check data column availability via DA checker - MaybeAvailableEnvelope::AvailabilityPending { - block_hash, - envelope, - } => Self::Available(AvailableExecutedEnvelope::new( - AvailableEnvelope::new(block_hash, envelope, vec![], None, spec), - import_data, - payload_verification_outcome, - )), + Self { + envelope, + block_root, + payload_verification_outcome, } } } @@ -159,25 +108,25 @@ impl ExecutedEnvelope { /// by an EL client **and** has all requisite blob data to be imported into fork choice. pub struct AvailableExecutedEnvelope { pub envelope: AvailableEnvelope, - pub import_data: EnvelopeImportData, + pub block_root: Hash256, pub payload_verification_outcome: PayloadVerificationOutcome, } impl AvailableExecutedEnvelope { pub fn new( envelope: AvailableEnvelope, - import_data: EnvelopeImportData, + block_root: Hash256, payload_verification_outcome: PayloadVerificationOutcome, ) -> Self { Self { envelope, - import_data, + block_root, payload_verification_outcome, } } } -#[derive(Debug)] +#[derive(Debug, AsRefStr)] pub enum EnvelopeError { /// The envelope's block root is unknown. BlockRootUnknown { block_root: Hash256 }, @@ -205,22 +154,16 @@ pub enum EnvelopeError { payload_slot: Slot, latest_finalized_slot: Slot, }, - /// Optimistic sync is not supported for Gloas payload envelopes. - OptimisticSyncNotSupported { block_root: Hash256 }, /// Some Beacon Chain Error BeaconChainError(Arc), /// Some Beacon State error BeaconStateError(BeaconStateError), - /// Some BlockProcessingError (for electra operations) - BlockProcessingError(BlockProcessingError), /// Some EnvelopeProcessingError EnvelopeProcessingError(EnvelopeProcessingError), /// Error verifying the execution payload ExecutionPayloadError(ExecutionPayloadError), - /// An error from block-level checks reused during envelope import - BlockError(BlockError), - /// Internal error - InternalError(String), + /// An error from importing the envelope. + ImportError(BlockError), } impl std::fmt::Display for EnvelopeError { @@ -253,13 +196,6 @@ impl From for EnvelopeError { } } -impl From for EnvelopeError { - fn from(e: BlockError) -> Self { - EnvelopeError::BlockError(e) - } -} - -/// Pull errors up from EnvelopeProcessingError to EnvelopeError impl From for EnvelopeError { fn from(e: EnvelopeProcessingError) -> Self { match e { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs index eb5e13b0cc..0bbe32525a 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/payload_notifier.rs @@ -31,7 +31,8 @@ impl PayloadNotifier { match notify_execution_layer { NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { - let new_payload_request = Self::build_new_payload_request(&envelope, &block)?; + let new_payload_request = Self::build_new_payload_request(&envelope, &block) + .map_err(EnvelopeError::ImportError)?; // TODO(gloas): check and test RLP block hash calculation post-Gloas if let Err(e) = new_payload_request.perform_optimistic_sync_verifications() { warn!( diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs new file mode 100644 index 0000000000..2100a5fe9f --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -0,0 +1,781 @@ +//! This module builds out the data availability cache for Gloas. When a beacon block is received +//! over gossip/p2p we insert its bid into this cache, keyed by block root. As soon as the bid +//! is received we can begin using it to verify data columns. +//! +//! When a payload envelope is received and executed against the EL, it is inserted into this cache. +//! Once all required custody columns have been kzg verified and the envelope has been executed we can +//! import the envelope into fork choice and store it to disk. +//! +//! Note that the block must have arrived before the envelope or data columns can reach this cache. +//! Data columns require the bid (from the block) for verification. Columns that arrive before +//! the block are rejected with `BlockRootUnknown`. + +use crate::data_availability_checker::{AvailabilityCheckError, MissingCellsError}; +use crate::payload_envelope_verification::{ + AvailabilityPendingExecutedEnvelope, AvailableExecutedEnvelope, +}; +use crate::{BeaconChainTypes, CustodyContext, metrics}; +use kzg::Kzg; +use lru::LruCache; +use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::collections::HashMap; +use std::fmt; +use std::fmt::Debug; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tracing::{Span, debug, error, instrument}; +use types::{ + ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, + PartialDataColumnSidecarRef, +}; + +mod pending_column; +mod pending_components; + +use crate::data_column_verification::{ + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, +}; +use crate::metrics::{ + KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, +}; +use crate::observed_data_sidecars::ObservationStrategy; +use pending_components::{PendingComponents, ReconstructColumnsDecision}; +use types::SignedExecutionPayloadBid; +use types::new_non_zero_usize; + +/// The LRU Cache stores `PendingComponents`, which store the block root, the execution payload bid, and its associated column data. +/// The execution payload bid stores the kzg commitments which we use to verify against incoming column data. +/// Setting this to 32 keeps memory usage reasonable. +/// +/// `PendingComponents` are now never removed from the cache manually and are only removed via LRU +/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time. +const AVAILABILITY_CACHE_CAPACITY: NonZeroUsize = new_non_zero_usize(32); + +/// This type is returned after adding a bid / column to the `DataAvailabilityChecker`. +/// +/// Indicates if the payloads data is fully `Available` or if we need more columns. +pub enum Availability { + MissingComponents(Hash256), + Available(Box>), +} + +impl Debug for Availability { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::MissingComponents(block_root) => { + write!(f, "MissingComponents({})", block_root) + } + Self::Available(envelope) => { + write!(f, "Available({:?})", envelope.block_root) + } + } + } +} + +pub type AvailabilityAndReconstructedColumns = (Availability, DataColumnSidecarList); + +#[derive(Debug)] +pub enum DataColumnReconstructionResult { + Success(AvailabilityAndReconstructedColumns), + NotStarted(&'static str), + RecoveredColumnsNotImported(&'static str), +} + +/// Cache to hold data columns for payloads pending data availability. +/// +/// In Gloas, beacon blocks can be immediately imported into fork choice. The execution payload +/// bid contains the payloads kzg commitments. This cache tracks data columns for payloads until all +/// required columns are received. +/// +/// Usually data becomes available on its slot within a second of receiving its first component +/// over gossip. However, data may never become available if a malicious proposer does not +/// publish its data, or there are network issues. Components are only removed via LRU eviction. +pub struct PendingPayloadCache { + /// Contains all the data we keep in memory, protected by an RwLock + availability_cache: RwLock>>, + kzg: Arc, + custody_context: Arc>, + spec: Arc, +} + +impl PendingPayloadCache { + pub fn new( + kzg: Arc, + custody_context: Arc>, + spec: Arc, + ) -> Result { + Ok(Self { + availability_cache: RwLock::new(LruCache::new(AVAILABILITY_CACHE_CAPACITY)), + kzg, + custody_context, + spec, + }) + } + + pub fn custody_context(&self) -> &Arc> { + &self.custody_context + } + + /// Returns all cached data columns for the given block root, if any. + #[instrument(skip_all, level = "trace")] + pub fn get_data_columns( + &self, + block_root: Hash256, + ) -> Option> { + self.peek_pending_components(&block_root, |components| { + components.map(|c| c.get_cached_data_columns()) + }) + } + + /// Returns the indices of cached data columns for the given block root. + #[instrument(skip_all, level = "trace")] + pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + self.peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + + /// Return the cached Gloas payload bid for `block_root`, if present. + pub fn get_bid( + &self, + block_root: &Hash256, + ) -> Option>> { + self.peek_pending_components(block_root, |components| { + components.map(|components| components.bid.clone()) + }) + } + + /// Filter out cells that are already cached for the given column sidecar. + /// Returns the cells that still need KZG verification, or `None` if all cells are cached. + #[instrument(skip_all, level = "trace")] + pub fn missing_cells_for_column_sidecar<'a>( + &'_ self, + data_column: &'a DataColumnSidecar, + ) -> Result>, MissingCellsError> { + let block_root = data_column.block_root(); + let column_index = *data_column.index(); + + self.peek_pending_components(&block_root, |components| { + let Some(cached) = components.and_then(|c| c.verified_data_columns.get(&column_index)) + else { + return data_column.try_filter_to_partial_ref(|_, _, _| Ok(true)); + }; + + data_column.try_filter_to_partial_ref(|cell_idx, cell, proof| { + match cached.cell_matches(cell_idx, cell, proof) { + None => Ok(true), + Some(true) => Ok(false), + Some(false) => Err(MissingCellsError::MismatchesCachedColumn), + } + }) + }) + } + + /// Insert an executed payload envelope into the cache and performs an availability check + pub fn put_executed_payload_envelope( + &self, + executed_envelope: AvailabilityPendingExecutedEnvelope, + ) -> Result, AvailabilityCheckError> { + let epoch = executed_envelope.envelope.epoch(); + let beacon_block_root = executed_envelope.envelope.beacon_block_root(); + let bid = self + .get_bid(&beacon_block_root) + .ok_or(AvailabilityCheckError::MissingBid(beacon_block_root))?; + + let pending_components = + self.update_pending_components(beacon_block_root, bid, |pending_components| { + pending_components.insert_executed_payload_envelope(executed_envelope); + })?; + + let num_expected_columns = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + pending_components.span.in_scope(|| { + debug!( + component = "executed envelope", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability(beacon_block_root, pending_components, num_expected_columns) + } + + /// Inserts a bid into the pending payload cache. + /// This will silently drop the bid if a bid for this block root already exists in the cache. + pub fn insert_bid(&self, block_root: Hash256, bid: Arc>) { + let mut write_lock = self.availability_cache.write(); + write_lock.get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid)); + } + + /// Perform KZG verification on RPC custody columns and insert them into the cache. + /// After insertion check if the envelope becomes available. + #[instrument(skip_all, level = "trace")] + pub fn put_rpc_custody_columns( + &self, + block_root: Hash256, + custody_columns: DataColumnSidecarList, + ) -> Result, AvailabilityCheckError> { + let bid = self + .get_bid(&block_root) + .ok_or(AvailabilityCheckError::MissingBid(block_root))?; + let kzg_verified_columns = KzgVerifiedDataColumn::from_batch_with_scoring_and_commitments( + custody_columns, + bid.message.blob_kzg_commitments.as_ref(), + &self.kzg, + ) + .map_err(AvailabilityCheckError::InvalidColumn)?; + + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let verified_custody_columns = kzg_verified_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect::>(); + + self.put_kzg_verified_custody_data_columns(block_root, &verified_custody_columns) + } + + /// Perform KZG verification on gossip verified custody columns and insert them into the cache. + /// After insertion check if the envelope becomes available + #[instrument(skip_all, level = "trace")] + pub fn put_gossip_verified_data_columns( + &self, + block_root: Hash256, + data_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + let bid = self + .get_bid(&block_root) + .ok_or(AvailabilityCheckError::MissingBid(block_root))?; + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + let sampling_columns = self + .custody_context + .sampling_columns_for_epoch(epoch, &self.spec); + let custody_columns = data_columns + .into_iter() + .filter(|col| sampling_columns.contains(&col.index())) + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); + + self.put_kzg_verified_custody_data_columns(block_root, &custody_columns) + } + + /// Insert KZG verified columns into the cache. + /// After insertion check if the envelope becomes available. + pub fn put_kzg_verified_custody_data_columns( + &self, + block_root: Hash256, + kzg_verified_data_columns: &[KzgVerifiedCustodyDataColumn], + ) -> Result, AvailabilityCheckError> { + let bid = self + .get_bid(&block_root) + .ok_or(AvailabilityCheckError::MissingBid(block_root))?; + + let pending_components = + self.update_pending_components(block_root, bid.clone(), |pending_components| { + pending_components.merge_data_columns(kzg_verified_data_columns) + })?; + + let epoch = bid.message.slot.epoch(T::EthSpec::slots_per_epoch()); + + let num_expected_columns = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + pending_components.span.in_scope(|| { + debug!( + component = "data_columns", + status = pending_components.status_str(num_expected_columns), + "Component added to data availability checker" + ); + }); + + self.check_availability(block_root, pending_components, num_expected_columns) + } + + #[instrument(skip_all, level = "debug")] + pub fn reconstruct_data_columns( + &self, + block_root: &Hash256, + ) -> Result, AvailabilityCheckError> { + let bid = self + .get_bid(block_root) + .ok_or(AvailabilityCheckError::MissingBid(*block_root))?; + + let verified_data_columns = match self.check_and_set_reconstruction_started(block_root) { + ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, + ReconstructColumnsDecision::No(reason) => { + return Ok(DataColumnReconstructionResult::NotStarted(reason)); + } + }; + let existing_column_indices = verified_data_columns + .iter() + .map(|data_column| *data_column.index()) + .collect::>(); + + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); + let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); + + let all_data_columns = KzgVerifiedCustodyDataColumn::reconstruct_columns( + &self.kzg, + verified_data_columns, + bid.message.blob_kzg_commitments.as_ref(), + &self.spec, + ) + .map_err(|e| { + error!( + ?block_root, + error = ?e, + "Error reconstructing data columns" + ); + self.handle_reconstruction_failure(block_root); + metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES); + AvailabilityCheckError::ReconstructColumnsError(e) + })?; + + let slot = bid.message.slot; + let columns_to_sample = self + .custody_context() + .sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()), &self.spec); + + let data_columns_to_import_and_publish = all_data_columns + .into_iter() + .filter(|d| { + columns_to_sample.contains(&d.index()) + && !existing_column_indices.contains(&d.index()) + }) + .collect::>(); + + metrics::stop_timer(timer); + metrics::inc_counter_by( + &metrics::DATA_AVAILABILITY_RECONSTRUCTED_COLUMNS, + data_columns_to_import_and_publish.len() as u64, + ); + + debug!( + count = data_columns_to_import_and_publish.len(), + ?block_root, + %slot, + "Reconstructed columns" + ); + + self.put_kzg_verified_custody_data_columns(*block_root, &data_columns_to_import_and_publish) + .map(|availability| { + DataColumnReconstructionResult::Success(( + availability, + data_columns_to_import_and_publish + .into_iter() + .map(|d| d.clone_arc()) + .collect::>(), + )) + }) + } + + // ── Metrics ── + + /// Number of pending component entries in memory in the cache. + pub fn cache_size(&self) -> usize { + self.availability_cache.read().len() + } + + // ── Internal helpers ── + + fn check_availability( + &self, + block_root: Hash256, + pending_components: MappedRwLockReadGuard<'_, PendingComponents>, + num_expected_columns: usize, + ) -> Result, AvailabilityCheckError> { + if let Some(available_envelope) = pending_components.make_available(num_expected_columns)? { + // Explicitly drop read lock before acquiring write lock + drop(pending_components); + if let Some(components) = self.availability_cache.write().get_mut(&block_root) { + // Clean up span now that data is available + components.span = Span::none(); + } + + // We never remove the pending components manually to avoid race conditions. + // Components are only removed via LRU eviction as finality advances. + Ok(Availability::Available(Box::new(available_envelope))) + } else { + Ok(Availability::MissingComponents(block_root)) + } + } + + /// Gets or creates `PendingComponents` and applies the `update_fn` while holding the write lock. + /// + /// Once the update is complete, the write lock is downgraded and a read guard with a + /// reference of the updated `PendingComponents` is returned. + fn update_pending_components( + &self, + block_root: Hash256, + bid: Arc>, + update_fn: F, + ) -> Result>, AvailabilityCheckError> + where + F: FnOnce(&mut PendingComponents), + { + let mut write_lock = self.availability_cache.write(); + + { + let pending_components = write_lock + .get_or_insert_mut(block_root, || PendingComponents::new(block_root, bid)); + update_fn(pending_components) + } + + RwLockReadGuard::try_map(RwLockWriteGuard::downgrade(write_lock), |cache| { + cache.peek(&block_root) + }) + .map_err(|_| { + AvailabilityCheckError::Unexpected("pending components should exist".to_string()) + }) + } + + fn peek_pending_components>) -> R>( + &self, + block_root: &Hash256, + f: F, + ) -> R { + f(self.availability_cache.read().peek(block_root)) + } + + /// Check whether data column reconstruction should be attempted. + /// TODO(gloas): rethink reconstruction for the cell model + fn check_and_set_reconstruction_started( + &self, + block_root: &Hash256, + ) -> ReconstructColumnsDecision { + let mut write_lock = self.availability_cache.write(); + let Some(pending_components) = write_lock.get_mut(block_root) else { + return ReconstructColumnsDecision::No("block already imported"); + }; + + let epoch = pending_components.bid.epoch(); + + let total_column_count = T::EthSpec::number_of_columns(); + let sampling_column_count = self + .custody_context + .num_of_data_columns_to_sample(epoch, &self.spec); + + if pending_components.reconstruction_started { + return ReconstructColumnsDecision::No("already started"); + } + let received_column_count = pending_components.num_completed_columns(); + if received_column_count >= sampling_column_count { + return ReconstructColumnsDecision::No("all sampling columns received"); + } + if received_column_count < total_column_count / 2 { + return ReconstructColumnsDecision::No("not enough columns"); + } + + pending_components.reconstruction_started = true; + ReconstructColumnsDecision::Yes(pending_components.get_cached_data_columns()) + } + + /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. + /// In this case, we remove all data columns in `PendingComponents`, reset reconstruction + /// status so that we can attempt to retrieve columns from peers again. + fn handle_reconstruction_failure(&self, block_root: &Hash256) { + if let Some(pending_components_mut) = self.availability_cache.write().get_mut(block_root) { + pending_components_mut.verified_data_columns = HashMap::new(); + pending_components_mut.reconstruction_started = false; + } + } + + /// Maintain the cache by removing entries older than the cutoff epoch. + pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { + let mut write_lock = self.availability_cache.write(); + let mut keys_to_remove = vec![]; + for (key, value) in write_lock.iter() { + if value.bid.epoch() < cutoff_epoch { + keys_to_remove.push(*key); + } + } + for key in keys_to_remove { + write_lock.pop(&key); + } + + Ok(()) + } +} + +#[cfg(test)] +mod data_availability_checker_tests { + use super::*; + + use crate::block_verification::PayloadVerificationOutcome; + use crate::custody_context::NodeCustodyType; + use crate::test_utils::{ + DiskHarnessType, NumBlobs, generate_data_column_indices_rand_order, + generate_rand_block_and_data_columns, get_kzg, + }; + use fork_choice::PayloadVerificationStatus; + use logging::create_test_tracing_subscriber; + use types::test_utils::test_unstructured; + use types::{ + ExecutionPayloadEnvelope, ExecutionPayloadGloas, ExecutionRequests, ForkName, + MinimalEthSpec, SignedExecutionPayloadEnvelope, + }; + + type E = MinimalEthSpec; + type T = DiskHarnessType; + + const NUM_BLOBS: usize = 1; + + /// Stand up a cache + a 1-blob Gloas block for the given custody type. The bid is registered + /// in the cache; `custody` is pre-filtered to the sampling subset. + fn setup(node_custody: NodeCustodyType) -> Setup { + setup_with(node_custody, NumBlobs::Number(NUM_BLOBS)) + } + + fn setup_zero_blob(node_custody: NodeCustodyType) -> Setup { + setup_with(node_custody, NumBlobs::Number(0)) + } + + fn setup_with(node_custody: NodeCustodyType, num_blobs: NumBlobs) -> Setup { + create_test_tracing_subscriber(); + let spec = Arc::new(ForkName::Gloas.make_genesis_spec(E::default_spec())); + let kzg = get_kzg(&spec); + let custody_context = Arc::new(CustodyContext::::new( + node_custody, + generate_data_column_indices_rand_order::(), + &spec, + )); + let cache = Arc::new( + PendingPayloadCache::::new(kzg, custody_context, spec.clone()) + .expect("create cache"), + ); + + let mut u = test_unstructured(); + let (block, columns) = + generate_rand_block_and_data_columns::(ForkName::Gloas, num_blobs, &mut u, &spec) + .expect("generate test block"); + let block_root = block.canonical_root(); + let bid = Arc::new( + block + .message() + .body() + .signed_execution_payload_bid() + .expect("Gloas block has bid") + .clone(), + ); + cache.insert_bid(block_root, bid.clone()); + + let epoch = bid.message.slot.epoch(E::slots_per_epoch()); + let sampling = cache + .custody_context() + .sampling_columns_for_epoch(epoch, &cache.spec); + let custody = columns + .into_iter() + .filter(|c| sampling.contains(c.index())) + .collect(); + + Setup { + cache, + block_root, + custody, + } + } + + struct Setup { + cache: Arc>, + block_root: Hash256, + custody: DataColumnSidecarList, + } + + impl Setup { + fn put_envelope(&self) -> Availability { + self.cache + .put_executed_payload_envelope(executed_envelope(self.block_root)) + .expect("put envelope") + } + + fn put_columns(&self, columns: DataColumnSidecarList) -> Availability { + self.cache + .put_rpc_custody_columns(self.block_root, columns) + .expect("put columns") + } + + fn reconstruct(&self) -> Result, AvailabilityCheckError> { + self.cache.reconstruct_data_columns(&self.block_root) + } + + fn cached_indexes(&self) -> Vec { + self.cache + .cached_data_column_indexes(&self.block_root) + .expect("entry") + } + } + + /// Hand-rolled executed envelope with bypassed verification; the cache only inspects + /// `beacon_block_root` and the verification outcome, never the signature or payload. + fn executed_envelope(block_root: Hash256) -> AvailabilityPendingExecutedEnvelope { + AvailabilityPendingExecutedEnvelope { + envelope: Arc::new(SignedExecutionPayloadEnvelope { + message: ExecutionPayloadEnvelope { + payload: ExecutionPayloadGloas::default(), + execution_requests: ExecutionRequests::default(), + builder_index: 0, + beacon_block_root: block_root, + parent_beacon_block_root: Hash256::random(), + }, + signature: bls::Signature::infinity().expect("infinity sig"), + }), + block_root, + payload_verification_outcome: PayloadVerificationOutcome { + payload_verification_status: PayloadVerificationStatus::Verified, + }, + } + } + + #[track_caller] + fn assert_missing(availability: Availability) { + assert!( + matches!(availability, Availability::MissingComponents(_)), + "expected MissingComponents, got {availability:?}", + ); + } + + #[track_caller] + fn assert_available(availability: Availability) -> Box> { + match availability { + Availability::Available(env) => env, + other => panic!("expected Available, got {other:?}"), + } + } + + // ─── Tier 1: real-path availability flows ─────────────────────────────── + + /// Envelope first → MissingComponents. Then all sampling columns → Available. + #[tokio::test] + async fn availability_arrives_envelope_first() { + let s = setup(NodeCustodyType::Fullnode); + assert_missing(s.put_envelope()); + let envelope = assert_available(s.put_columns(s.custody.clone())); + assert_eq!(envelope.block_root, s.block_root); + assert_eq!(envelope.envelope.columns.len(), s.custody.len()); + } + + /// Columns first → MissingComponents. Then envelope → Available. + #[tokio::test] + async fn availability_arrives_columns_first() { + let s = setup(NodeCustodyType::Fullnode); + assert_missing(s.put_columns(s.custody.clone())); + let envelope = assert_available(s.put_envelope()); + assert_eq!(envelope.block_root, s.block_root); + assert_eq!(envelope.envelope.columns.len(), s.custody.len()); + } + + /// N-1 columns + envelope is still MissingComponents; the Nth column flips to Available. + /// Guards the strict count comparison in `make_available`. + #[tokio::test] + async fn partial_columns_then_complete() { + let mut s = setup(NodeCustodyType::Fullnode); + assert!(s.custody.len() >= 2, "needs at least 2 sampling columns"); + let last = s.custody.pop().expect("non-empty custody"); + + s.put_envelope(); + assert_missing(s.put_columns(s.custody.clone())); + assert_available(s.put_columns(vec![last])); + } + + /// Zero-blob block + envelope → Available. Guards the `num_blobs_expected == 0` early-return + /// in `make_available`. + #[tokio::test] + async fn zero_blob_envelope_immediately_available() { + let s = setup_zero_blob(NodeCustodyType::Fullnode); + let envelope = assert_available(s.put_envelope()); + assert!(envelope.envelope.columns.is_empty()); + } + + /// Receiving the same column twice keeps a single cache entry. Guards `PendingColumn::insert` + /// staying only-if-empty under repeated arrivals. + #[tokio::test] + async fn dedups_repeated_column_inserts() { + let s = setup(NodeCustodyType::Fullnode); + let column = s.custody.first().cloned().expect("sampling column"); + let column_index = *column.index(); + s.put_columns(vec![column.clone()]); + s.put_columns(vec![column]); + + assert_eq!(s.cached_indexes(), vec![column_index]); + assert_eq!( + s.cache.get_data_columns(s.block_root).map(|c| c.len()), + Some(1), + ); + } + + // ─── Tier 2: reconstruction state machine ─────────────────────────────── + // + // Reconstruction only triggers when `total/2 ≤ received < sampling_count`. Fullnode's small + // sampling count never satisfies this, so these tests use `Supernode`. + + /// Fewer than `number_of_columns / 2` columns received → reconstruction is `NotStarted`. + #[tokio::test] + async fn reconstruction_below_threshold_is_not_started() { + let s = setup(NodeCustodyType::Supernode); + let half = E::number_of_columns() / 2; + s.put_columns(s.custody.iter().take(half - 1).cloned().collect()); + assert!(matches!( + s.reconstruct().expect("reconstruct call"), + DataColumnReconstructionResult::NotStarted("not enough columns") + )); + } + + /// All sampling columns received → reconstruction unnecessary, returns `NotStarted`. + #[tokio::test] + async fn reconstruction_already_complete_is_not_started() { + let s = setup(NodeCustodyType::Supernode); + s.put_columns(s.custody.clone()); + assert!(matches!( + s.reconstruct().expect("reconstruct call"), + DataColumnReconstructionResult::NotStarted("all sampling columns received") + )); + } + + /// Envelope + 50% of sampling columns → reconstruction recovers the rest, the entry flips + /// to `Available`, and the cache holds every sampling column. + #[tokio::test] + async fn reconstruction_success_fills_missing_columns() { + let s = setup(NodeCustodyType::Supernode); + s.put_envelope(); + let sampling_count = s.custody.len(); + let half = sampling_count / 2; + s.put_columns(s.custody.iter().take(half).cloned().collect()); + assert_eq!(s.cached_indexes().len(), half); + + let result = s.reconstruct().expect("reconstruction must succeed"); + let (availability, _recovered) = match result { + DataColumnReconstructionResult::Success(inner) => inner, + other => panic!("expected Success, got {other:?}"), + }; + assert_available(availability); + assert_eq!(s.cached_indexes().len(), sampling_count); + } + + // ─── Tier 3: invariants ───────────────────────────────────────────────── + + /// `get_data_columns` and `cached_data_column_indexes` must agree on which columns are + /// complete. Drift between these two would corrupt the DB on import. + #[tokio::test] + async fn cached_columns_match_completed_indexes() { + let mut s = setup(NodeCustodyType::Fullnode); + let last = s.custody.pop().expect("non-empty custody"); + + let assert_lengths_match = |s: &Setup| { + let indexes_len = s.cached_indexes().len(); + let sidecars_len = s.cache.get_data_columns(s.block_root).expect("entry").len(); + assert_eq!(indexes_len, sidecars_len); + }; + + s.put_columns(s.custody.clone()); + assert_lengths_match(&s); + + s.put_columns(vec![last]); + assert_lengths_match(&s); + } +} diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs new file mode 100644 index 0000000000..890c17ba67 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs @@ -0,0 +1,63 @@ +use kzg::KzgProof; +use ssz_types::VariableList; +use std::sync::Arc; +use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; + +#[derive(Clone)] +pub struct PendingColumn { + cells: Vec, KzgProof)>>, +} + +impl PendingColumn { + /// Allocate a `PendingColumn` whose `cells` vec has space for `blob_count` entries, all + /// initialised to `None`. Required so that `insert(idx, ...)` can write into `cells[idx]`. + pub fn new_with_capacity(blob_count: usize) -> Self { + Self { + cells: vec![None; blob_count], + } + } + + pub fn insert(&mut self, index: usize, cell: &Cell, proof: &KzgProof) { + if let Some(existing_cell) = self.cells.get_mut(index) + && existing_cell.is_none() + { + *existing_cell = Some((cell.clone(), *proof)); + } + } + + pub fn cell_matches(&self, index: usize, cell: &Cell, proof: &KzgProof) -> Option { + self.cells + .get(index)? + .as_ref() + .map(|(c, p)| c == cell && p == proof) + } + + /// Returns a full `DataColumnSidecar` if all cells are present, or `None` if any are missing. + pub fn to_full_sidecar( + &self, + index: ColumnIndex, + slot: Slot, + beacon_block_root: Hash256, + ) -> Option>> { + let mut column = Vec::with_capacity(self.cells.len()); + let mut kzg_proofs = Vec::with_capacity(self.cells.len()); + + for cell in self.cells.iter() { + let (cell, proof) = cell.as_ref()?; + // TODO(gloas): we likely want to go and arc all cells. This will help us from requiring a clone + // in PendingColumn::insert + column.push(cell.clone()); + kzg_proofs.push(*proof); + } + + // TODO(gloas): this hard-codes the Gloas sidecar variant. Pass the fork in once + // post-Gloas variants are introduced (or move construction to a fork-aware helper). + Some(Arc::new(DataColumnSidecar::Gloas(DataColumnSidecarGloas { + index, + column: VariableList::try_from(column).ok()?, + kzg_proofs: VariableList::try_from(kzg_proofs).ok()?, + slot, + beacon_block_root, + }))) + } +} diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs new file mode 100644 index 0000000000..e7b9009577 --- /dev/null +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -0,0 +1,180 @@ +use crate::data_availability_checker::AvailabilityCheckError; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; +use crate::payload_envelope_verification::AvailableEnvelope; +use crate::payload_envelope_verification::AvailableExecutedEnvelope; +use crate::pending_payload_cache::pending_column::PendingColumn; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{Span, debug, debug_span}; +use types::DataColumnSidecar; +use types::{ColumnIndex, EthSpec, Hash256, SignedExecutionPayloadBid}; + +/// This represents the components of a payload pending data availability. +/// +/// The columns are all gossip and kzg verified. +/// The payload is considered "available" when all required columns are received. +pub struct PendingComponents { + pub block_root: Hash256, + pub bid: Arc>, + /// a cached post executed payload envelope + pub envelope: Option>, + /// A column entry in this map may only have some cells filled in (i.e. a partial data column) + pub verified_data_columns: HashMap>, + pub reconstruction_started: bool, + pub(crate) span: Span, +} + +impl PendingComponents { + pub fn num_blobs_expected(&self) -> usize { + self.bid.message.blob_kzg_commitments.len() + } + + /// Returns columns that have all cells present. + pub fn get_cached_data_columns(&self) -> Vec>> { + let slot = self.bid.message.slot; + let block_root = self.block_root; + self.verified_data_columns + .iter() + .filter_map(|(col_idx, col)| col.to_full_sidecar(*col_idx, slot, block_root)) + .collect() + } + + /// Returns the indices of columns that have all cells present. + pub fn get_cached_data_columns_indices(&self) -> Vec { + let slot = self.bid.message.slot; + let block_root = self.block_root; + self.verified_data_columns + .iter() + .filter_map(|(col_idx, col)| { + col.to_full_sidecar(*col_idx, slot, block_root) + .map(|_| *col_idx) + }) + .collect() + } + + /// Merges a given set of data columns into the cache. + pub(crate) fn merge_data_columns( + &mut self, + kzg_verified_data_columns: &[KzgVerifiedCustodyDataColumn], + ) { + let num_blobs_expected = self.num_blobs_expected(); + for data_column in kzg_verified_data_columns { + let data_column = data_column.as_data_column(); + // The Vec-backed `PendingColumn` keys cells by index, so we have to allocate up to + // `num_blobs_expected` entries before inserting; otherwise `cells.get_mut(idx)` returns + // None and the insert is a no-op. + let col = self + .verified_data_columns + .entry(*data_column.index()) + .or_insert_with(|| PendingColumn::new_with_capacity(num_blobs_expected)); + for (cell_idx, (cell, proof)) in data_column + .column() + .iter() + .zip(data_column.kzg_proofs().iter()) + .enumerate() + { + col.insert(cell_idx, cell, proof); + } + } + } + + // TODO(gloas): merge partial columns + + /// Inserts an executed payload envelope into the cache. + pub fn insert_executed_payload_envelope( + &mut self, + envelope: AvailabilityPendingExecutedEnvelope, + ) { + self.envelope = Some(envelope); + } + + pub fn num_completed_columns(&self) -> usize { + self.get_cached_data_columns().len() + } + + /// Returns `Some` if the envelope and all required data columns have been received. + pub fn make_available( + &self, + num_expected_columns: usize, + ) -> Result>, AvailabilityCheckError> { + // Check if the payload has been received and executed + let Some(envelope) = &self.envelope else { + return Ok(None); + }; + + let AvailabilityPendingExecutedEnvelope { + envelope, + block_root, + payload_verification_outcome, + } = envelope; + + let columns = if self.num_blobs_expected() == 0 { + self.span.in_scope(|| { + debug!("Bid has no blobs, data is available"); + }); + vec![] + } else { + let columns = self.get_cached_data_columns(); + match columns.len().cmp(&num_expected_columns) { + Ordering::Greater => { + return Err(AvailabilityCheckError::Unexpected(format!( + "too many columns: got {} expected {num_expected_columns}", + columns.len() + ))); + } + Ordering::Equal => { + self.span.in_scope(|| { + debug!("All data columns received, data is available"); + }); + columns + } + Ordering::Less => { + // Not enough data columns received yet + return Ok(None); + } + } + }; + + let available_envelope = AvailableEnvelope::new(envelope.clone(), columns); + + Ok(Some(AvailableExecutedEnvelope { + envelope: available_envelope, + block_root: *block_root, + payload_verification_outcome: payload_verification_outcome.clone(), + })) + } + + /// Constructs a fresh `PendingComponents` with no envelope and no columns yet. + pub fn new(block_root: Hash256, bid: Arc>) -> Self { + let span = debug_span!(parent: None, "lh_pending_components", %block_root); + let _guard = span.clone().entered(); + Self { + block_root, + bid, + envelope: None, + verified_data_columns: HashMap::new(), + reconstruction_started: false, + span, + } + } + + pub fn status_str(&self, num_expected_columns: usize) -> String { + format!( + "envelope {}, data_columns {}/{}", + self.envelope.is_some(), + self.num_completed_columns(), + num_expected_columns + ) + } +} + +// This enum is only used internally within the crate in the reconstruction function to improve +// readability, so it's OK to not box the variant value, and it shouldn't impact memory much with +// the current usage, as it's deconstructed immediately. +#[allow(clippy::large_enum_variant)] +pub(crate) enum ReconstructColumnsDecision { + Yes(Vec>>), + No(&'static str), +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 4378da8405..8e9cc61208 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2851,11 +2851,42 @@ where .await .expect("newPayload should succeed"); - // Store the envelope. + // Store the envelope and the data columns derived from the block. + // + // Production stores columns inside `import_available_execution_payload_envelope` after + // the cache is satisfied. The harness sidesteps that flow but must still persist columns + // or the `DataColumnMissing` invariant fires for any block with `num_expected_blobs > 0`. + let block = self + .chain + .store + .get_blinded_block(&block_root) + .expect("should read block from store") + .expect("block should exist in store"); + let mut ops = vec![]; + let block_with_full_payload = self + .chain + .store + .make_full_block(&block_root, block.clone()) + .expect("should reconstruct full block"); + let columns = + generate_data_column_sidecars_from_block(&block_with_full_payload, &self.spec); + if !columns.is_empty() + && let Some(store_op) = self.chain.get_blobs_or_columns_store_op( + block_root, + block.slot(), + AvailableBlockData::DataColumns(columns), + ) + { + ops.push(store_op); + } + ops.push(store::StoreOp::PutPayloadEnvelope( + block_root, + std::sync::Arc::new(signed_envelope), + )); self.chain .store - .put_payload_envelope(&block_root, &signed_envelope) - .expect("should store envelope"); + .do_atomically_with_block_and_blobs_cache(ops) + .expect("should persist envelope and columns"); // Update fork choice so it knows the payload was received. self.chain @@ -2876,11 +2907,10 @@ where block: Arc>, ) -> RangeSyncBlock { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - let has_blobs = block - .message() - .body() - .blob_kzg_commitments() - .is_ok_and(|c| !c.is_empty()); + // For Gloas, kzg commitments live in the bid (`signed_execution_payload_bid`), so the + // body's `blob_kzg_commitments()` accessor returns Err. `num_expected_blobs` already + // handles both shapes. + let has_blobs = block.num_expected_blobs() > 0; if !has_blobs { return RangeSyncBlock::new( block, @@ -3782,7 +3812,26 @@ pub fn generate_rand_block_and_blobs( SignedBeaconBlock::Fulu(SignedBeaconBlockFulu { ref mut message, .. }) => add_blob_transactions!(message, FullPayloadFulu, num_blobs, u, fork_name), - // TODO(EIP-7732) Add `SignedBeaconBlock::Gloas` variant + SignedBeaconBlock::Gloas(SignedBeaconBlockGloas { + ref mut message, .. + }) => { + // For Gloas, commitments are in the bid, not directly in the body. + // BlobSidecars cannot be created for Gloas because there's no merkle proof + // from the block body to the commitments. Return early with empty blob_sidecars. + let num_blobs = match num_blobs { + NumBlobs::Random => u.int_in_range(DEFAULT_MIN_BLOBS..=DEFAULT_MAX_BLOBS)?, + NumBlobs::Number(n) => n, + NumBlobs::None => 0, + }; + let (bundle, _transactions) = + execution_layer::test_utils::generate_blobs::(num_blobs, fork_name).unwrap(); + message + .body + .signed_execution_payload_bid + .message + .blob_kzg_commitments = bundle.commitments.clone(); + return Ok((block, blob_sidecars)); + } _ => return Ok((block, blob_sidecars)), }; diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 6646fe0b1e..533ef61219 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -323,18 +323,34 @@ fn update_data_column_signed_header( ) { for old_custody_column_sidecar in data_columns.as_mut_slice() { let old_column_sidecar = old_custody_column_sidecar.as_data_column(); - let new_column_sidecar = Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu { - index: *old_column_sidecar.index(), - column: old_column_sidecar.column().clone(), - kzg_commitments: old_column_sidecar.kzg_commitments().unwrap().clone(), - kzg_proofs: old_column_sidecar.kzg_proofs().clone(), - signed_block_header: signed_block.signed_block_header(), - kzg_commitments_inclusion_proof: signed_block - .message() - .body() - .kzg_commitments_merkle_proof() - .unwrap(), - })); + let new_column_sidecar = match old_column_sidecar.as_ref() { + DataColumnSidecar::Fulu(_) => { + Arc::new(DataColumnSidecar::Fulu(DataColumnSidecarFulu { + index: *old_column_sidecar.index(), + column: old_column_sidecar.column().clone(), + kzg_commitments: old_column_sidecar.kzg_commitments().unwrap().clone(), + kzg_proofs: old_column_sidecar.kzg_proofs().clone(), + signed_block_header: signed_block.signed_block_header(), + kzg_commitments_inclusion_proof: signed_block + .message() + .body() + .kzg_commitments_merkle_proof() + .unwrap(), + })) + } + // Gloas columns reference the block by `beacon_block_root` instead of holding the + // block header inline, so updating the parent root just means re-keying the column to + // the new canonical root. + DataColumnSidecar::Gloas(g) => { + Arc::new(DataColumnSidecar::Gloas(types::DataColumnSidecarGloas { + index: g.index, + column: g.column.clone(), + kzg_proofs: g.kzg_proofs.clone(), + slot: g.slot, + beacon_block_root: signed_block.canonical_root(), + })) + } + }; *old_custody_column_sidecar = CustodyDataColumn::from_asserted_custody(new_column_sidecar); } } @@ -1150,8 +1166,13 @@ async fn block_gossip_verification() { ) .await .expect("should import valid gossip verified block"); + if let Some(data_sidecars) = blobs_opt { + verify_and_process_gossip_data_sidecars(&harness, data_sidecars).await; + } // Post-Gloas, store the execution payload envelope so that subsequent blocks can look up - // the parent envelope. + // the parent envelope. This must run after gossip column processing because marking the + // payload as received in fork choice causes the gossip column path's + // `is_block_data_imported` gate to reject otherwise-valid columns as duplicates. if let Some(ref envelope) = snapshot.execution_envelope { harness .chain @@ -1165,9 +1186,6 @@ async fn block_gossip_verification() { .on_valid_payload_envelope_received(snapshot.beacon_block_root) .expect("should update fork choice with envelope"); } - if let Some(data_sidecars) = blobs_opt { - verify_and_process_gossip_data_sidecars(&harness, data_sidecars).await; - } } // Recompute the head to ensure we cache the latest view of fork choice. @@ -2246,7 +2264,6 @@ async fn rpc_block_allows_construction_past_da_boundary() { // Now verify the block is past the DA boundary let da_boundary = harness .chain - .data_availability_checker .data_availability_boundary() .expect("DA boundary should be set"); assert!( diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index cd0e700109..29d0e38b93 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -11,7 +11,8 @@ use std::sync::Arc; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, - MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedRoot, Slot, + MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedExecutionPayloadBid, + SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -84,6 +85,15 @@ async fn data_column_sidecar_event_on_process_gossip_data_column() { let epoch = slot.epoch(E::slots_per_epoch()); random_sidecar.slot = slot; random_sidecar.index = harness.chain.sampling_columns_for_epoch(epoch)[0]; + + // For gloas, the bid must be known, e.g. in the pending payload cache + let mut bid = SignedExecutionPayloadBid::::empty(); + bid.message.slot = Slot::new(10); + harness + .chain + .pending_payload_cache + .insert_bid(random_sidecar.beacon_block_root, Arc::new(bid)); + DataColumnSidecar::Gloas(random_sidecar) } else { let mut random_sidecar = DataColumnSidecarFulu::arbitrary(&mut u).unwrap(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index cfdd54857a..0ff9f6841d 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -23,6 +23,7 @@ use beacon_chain::{ }, custody_context::NodeCustodyType, historical_blocks::HistoricalBlockError, + kzg_utils::reconstruct_blobs, migrate::MigratorConfig, }; use bls::{Keypair, Signature, SignatureBytes}; @@ -68,6 +69,43 @@ static KEYPAIRS: LazyLock> = type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; +/// Retrieve or reconstruct blobs for a given block root. This uses the block's epoch to determine +/// whether to retrieve blobs directly or reconstruct them from columns. +/// +/// Returns `None` for Gloas blocks (which have no blob sidecar representation). +fn get_or_reconstruct_blobs( + chain: &BeaconChain, + block_root: &Hash256, +) -> Result>, BeaconChainError> { + let Some(block) = chain.store.get_blinded_block(block_root)? else { + return Ok(None); + }; + + if block.fork_name_unchecked().gloas_enabled() { + return Ok(None); + } + + if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + let fork_name = chain.spec.fork_name_at_epoch(block.epoch()); + if let Some(columns) = chain.store.get_data_columns(block_root, fork_name)? { + let num_required_columns = T::EthSpec::number_of_columns() / 2; + if columns.len() >= num_required_columns { + reconstruct_blobs(&chain.kzg, columns, None, &block, &chain.spec) + .map(Some) + .map_err(BeaconChainError::FailedToReconstructBlobs) + } else { + Err(BeaconChainError::InsufficientColumnsToReconstructBlobs { + columns_found: columns.len(), + }) + } + } else { + Ok(None) + } + } else { + Ok(chain.get_blobs(block_root)?.blobs()) + } +} + fn get_store(db_path: &TempDir) -> Arc, BeaconNodeBackend>> { let store_config = StoreConfig { prune_payloads: false, @@ -2835,10 +2873,7 @@ async fn reproduction_unaligned_checkpoint_sync_pruned_payload() { .is_ok() ); - let wss_blobs_opt = harness - .chain - .get_or_reconstruct_blobs(&wss_block_root) - .unwrap(); + let wss_blobs_opt = get_or_reconstruct_blobs(&harness.chain, &wss_block_root).unwrap(); let wss_state = full_store .get_state(&wss_state_root, Some(checkpoint_slot), CACHE_STATE_IN_TESTS) @@ -2971,10 +3006,7 @@ async fn weak_subjectivity_sync_test( .state_root_at_slot(checkpoint_slot) .unwrap() .unwrap(); - let wss_blobs_opt = harness - .chain - .get_or_reconstruct_blobs(&wss_block_root) - .unwrap(); + let wss_blobs_opt = get_or_reconstruct_blobs(&harness.chain, &wss_block_root).unwrap(); let wss_state = full_store .get_state(&wss_state_root, Some(checkpoint_slot), CACHE_STATE_IN_TESTS) .unwrap() @@ -3063,6 +3095,29 @@ async fn weak_subjectivity_sync_test( let beacon_chain = Arc::new(beacon_chain); let wss_block_root = wss_block.canonical_root(); + + // For Gloas, blobs aren't a standalone shape — the WSS data is the column sidecar set, which + // `get_or_reconstruct_blobs` returns `None` for. Copy the WSS block's columns straight from + // the source store so that the destination has them after checkpoint sync, matching what + // network-driven WSS would produce in production. + if wss_block.fork_name_unchecked().gloas_enabled() + && let Ok(Some(source_columns)) = harness + .chain + .store + .get_data_columns(&wss_block_root, ForkName::Gloas) + && !source_columns.is_empty() + && let Some(store_op) = beacon_chain.get_blobs_or_columns_store_op( + wss_block_root, + wss_block.slot(), + beacon_chain::block_verification_types::AvailableBlockData::DataColumns(source_columns), + ) + { + beacon_chain + .store + .do_atomically_with_block_and_blobs_cache(vec![store_op]) + .unwrap(); + } + let store_wss_block = harness .chain .get_block(&wss_block_root) @@ -3070,9 +3125,7 @@ async fn weak_subjectivity_sync_test( .unwrap() .unwrap(); // This test may break in the future if we no longer store the full checkpoint data columns. - let store_wss_blobs_opt = beacon_chain - .get_or_reconstruct_blobs(&wss_block_root) - .unwrap(); + let store_wss_blobs_opt = get_or_reconstruct_blobs(&beacon_chain, &wss_block_root).unwrap(); assert_eq!(store_wss_block, wss_block); // TODO(fulu): Remove this condition once #6760 (PeerDAS checkpoint sync) is merged. @@ -3130,12 +3183,43 @@ async fn weak_subjectivity_sync_test( .await .unwrap(); - // Store the envelope and apply it to fork choice. + // Store the envelope, its columns, and apply to fork choice. if let Some(envelope) = &snapshot.execution_envelope { + // Persist data columns for Gloas blocks. This mirrors what production does in + // `import_available_execution_payload_envelope` and what the harness now does in + // `process_envelope` — the WSS forward-sync loop bypasses both, so do it directly. + let mut ops = vec![]; + let columns_block = beacon_chain + .store + .get_blinded_block(&block_root) + .unwrap() + .and_then(|b| beacon_chain.store.make_full_block(&block_root, b).ok()); + if let Some(full_block) = columns_block { + let columns = beacon_chain::test_utils::generate_data_column_sidecars_from_block( + &full_block, + &beacon_chain.spec, + ); + if !columns.is_empty() + && let Some(store_op) = beacon_chain.get_blobs_or_columns_store_op( + block_root, + full_block.slot(), + beacon_chain::block_verification_types::AvailableBlockData::DataColumns( + columns, + ), + ) + { + ops.push(store_op); + } + } + ops.push(store::StoreOp::PutPayloadEnvelope( + block_root, + std::sync::Arc::new(envelope.as_ref().clone()), + )); beacon_chain .store - .put_payload_envelope(&block_root, envelope) + .do_atomically_with_block_and_blobs_cache(ops) .unwrap(); + // Update fork choice so head selection accounts for Full payload status. beacon_chain .canonical_head diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 65e1a83840..2e7fe693d6 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -7,6 +7,7 @@ use crate::version::{ execution_optimistic_finalized_beacon_response, }; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use beacon_chain::payload_envelope_verification::EnvelopeError; use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer}; use bytes::Bytes; use eth2::types as api_types; @@ -148,7 +149,7 @@ pub async fn publish_execution_payload_envelope( PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip)), ) .map_err(|_| { - beacon_chain::payload_envelope_verification::EnvelopeError::BeaconChainError(Arc::new( + EnvelopeError::BeaconChainError(Arc::new( beacon_chain::BeaconChainError::UnableToPublish, )) }) @@ -272,7 +273,7 @@ fn build_gloas_data_columns( let index = *col.index(); match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) { Ok(verified) => Some(verified), - Err(GossipDataColumnError::PriorKnownUnpublished) => None, + Err(GossipDataColumnError::PriorKnown { .. }) => None, Err(e) => { warn!( %slot, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 644ade956a..e96c86b17f 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -246,7 +246,7 @@ pub async fn publish_block>( if let Err(e) = Box::pin(chain.process_gossip_data_columns(sampling_columns, publish_fn)).await { - let msg = format!("Invalid data column: {e}"); + let msg = format!("Invalid data column: {e:?}"); return if let BroadcastValidation::Gossip = validation_level { Err(warp_utils::reject::broadcast_without_import(msg)) } else { diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 184bfffc9a..b47f8e946a 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -382,6 +382,7 @@ pub async fn proposer_boost_re_org_weight_misprediction() { /// - `num_empty_votes`: percentage of comm of attestations for the parent block /// - `num_head_votes`: number of attestations for the head block /// - `should_re_org`: whether the proposer should build on the parent rather than the head +#[allow(clippy::large_stack_frames)] pub async fn proposer_boost_re_org_test( ReOrgTest { head_slot, diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index b09dc95db4..4b34d7bfc0 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -1,5 +1,5 @@ use beacon_chain::{ - AvailabilityProcessingStatus, BlockError, attestation_verification::Error as AttnError, + AvailabilityProcessingStatus, attestation_verification::Error as AttnError, light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, sync_committee_verification::Error as SyncCommitteeError, @@ -733,7 +733,7 @@ pub fn register_sync_committee_error(error: &SyncCommitteeError) { } pub(crate) fn register_process_result_metrics( - result: &std::result::Result, + result: &std::result::Result>, source: BlockSource, block_component: &'static str, ) { diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 57871a2009..d34668b138 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -698,15 +698,6 @@ impl NetworkBeaconProcessor { } Err(err) => { match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %slot, - %block_root, - %index, - "Invalid gossip data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %slot, @@ -732,7 +723,27 @@ impl NetworkBeaconProcessor { column_sidecar, )); } - GossipDataColumnError::PubkeyCacheTimeout + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "ignoring", + %unknown_block_root, + "Unknown block root for column" + ); + // TODO(gloas): wire this into proper lookup sync. Sending + // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that + // mixes column processing with the attestation lookup path and is not + // the right primitive for Gloas column lookups. + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( error = ?err, @@ -743,6 +754,7 @@ impl NetworkBeaconProcessor { | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } @@ -803,6 +815,19 @@ impl NetworkBeaconProcessor { MessageAcceptance::Ignore, ); } + GossipDataColumnError::InternalError(err) => { + error!( + error = ?err, + %block_root, + %index, + "Internal error while processing data columns" + ); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } } } } @@ -904,14 +929,6 @@ impl NetworkBeaconProcessor { ) { match err { GossipPartialDataColumnError::GossipDataColumnError(err) => match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %block_root, - %index, - "Invalid gossip partial data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %block_root, @@ -933,6 +950,24 @@ impl NetworkBeaconProcessor { slot, }); } + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "requesting block", + %unknown_block_root, + "Unknown block root for partial column" + ); + // TODO(gloas): wire this into proper lookup sync. Sending + // `UnknownBlockHashFromAttestation` here is a Fulu-shaped fallback that + // mixes column processing with the attestation lookup path and is not + // the right primitive for Gloas column lookups. + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + unknown_block_root, + )); + } GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( @@ -940,10 +975,12 @@ impl NetworkBeaconProcessor { "Internal error when verifying partial column sidecar" ) } - GossipDataColumnError::ProposalSignatureInvalid + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } @@ -993,6 +1030,14 @@ impl NetworkBeaconProcessor { "gossip_partial_data_column_high", ); } + GossipDataColumnError::InternalError(err) => { + error!( + error = ?err, + %block_root, + %index, + "Internal error while handling partial data column verification" + ); + } }, GossipPartialDataColumnError::MissingHeader => { metrics::inc_counter( @@ -1052,7 +1097,7 @@ impl NetworkBeaconProcessor { "gossip_partial_data_column_low", ); } - GossipPartialDataColumnError::InternalError(_) => { + GossipPartialDataColumnError::InternalError(err) => { error!( error = ?err, %block_root, @@ -1323,6 +1368,7 @@ impl NetworkBeaconProcessor { let data_column_slot = verified_data_column.slot(); let data_column_index = verified_data_column.index(); + // TODO(gloas): implement partial messages if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column() && self .chain @@ -1353,7 +1399,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match &result { + match result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { debug!( @@ -1366,6 +1412,14 @@ impl NetworkBeaconProcessor { &metrics::BEACON_BLOB_DELAY_FULL_VERIFICATION, processing_start_time.elapsed().as_millis() as i64, ); + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If + // importing a block results in `Imported`, notify. Do not notify of data column errors. + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); } AvailabilityProcessingStatus::MissingComponents(slot, block_root) => { trace!( @@ -1375,7 +1429,7 @@ impl NetworkBeaconProcessor { "Processed data column, waiting for other components" ); - self.check_reconstruction_trigger(*slot, block_root).await; + self.check_reconstruction_trigger(slot, &block_root).await; } }, Err(BlockError::DuplicateFullyImported(_)) => { @@ -1399,16 +1453,6 @@ impl NetworkBeaconProcessor { ); } } - - // If a block is in the da_checker, sync maybe awaiting for an event when block is finally - // imported. A block can become imported both after processing a block or data column. If a - // importing a block results in `Imported`, notify. Do not notify of data column errors. - if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { - self.send_sync_message(SyncMessage::GossipBlockProcessResult { - block_root, - imported: true, - }); - } } /// Process a gossip-verified partial data column by merging it in the assembler @@ -1575,7 +1619,7 @@ impl NetworkBeaconProcessor { slot, process_fn: Box::pin(async move { cloned_self - .attempt_data_column_reconstruction(block_root) + .attempt_data_column_reconstruction(slot, block_root) .await; }), }, @@ -1827,7 +1871,10 @@ impl NetworkBeaconProcessor { return None; } // BlobNotRequired is unreachable. Only constructed in `process_gossip_blob` - Err(e @ BlockError::InternalError(_)) | Err(e @ BlockError::BlobNotRequired(_)) => { + Err(e @ BlockError::InternalError(_)) + | Err(e @ BlockError::BlobNotRequired(_)) + | Err(e @ BlockError::EnvelopeBlockRootUnknown(_)) + | Err(e @ BlockError::OptimisticSyncNotSupported { .. }) => { error!(error = %e, "Internal block gossip validation error"); return None; } @@ -3814,8 +3861,7 @@ impl NetworkBeaconProcessor { | EnvelopeError::UnknownValidator { .. } | EnvelopeError::IncorrectBlockProposer { .. } | EnvelopeError::ExecutionPayloadError(_) - | EnvelopeError::EnvelopeProcessingError(_) - | EnvelopeError::BlockError(_) => { + | EnvelopeError::EnvelopeProcessingError(_) => { self.propagate_validation_result( message_id, peer_id, @@ -3895,11 +3941,9 @@ impl NetworkBeaconProcessor { } EnvelopeError::PriorToFinalization { .. } - | EnvelopeError::OptimisticSyncNotSupported { .. } | EnvelopeError::BeaconChainError(_) | EnvelopeError::BeaconStateError(_) - | EnvelopeError::BlockProcessingError(_) - | EnvelopeError::InternalError(_) => { + | EnvelopeError::ImportError(_) => { self.propagate_validation_result( message_id, peer_id, diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 6a3ccbcd65..7bf969db10 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1014,6 +1014,7 @@ impl NetworkBeaconProcessor { } // Publish partial columns without eager send + // TODO(gloas): implement publish partial columns without eager send if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header); if !columns.is_empty() { @@ -1034,8 +1035,8 @@ impl NetworkBeaconProcessor { /// Attempts to reconstruct all data columns if the conditions checked in /// [`DataAvailabilityCheckerInner::check_and_set_reconstruction_started`] are satisfied. #[instrument(level = "debug", skip_all, fields(?block_root))] - async fn attempt_data_column_reconstruction(self: &Arc, block_root: Hash256) { - let result = self.chain.reconstruct_data_columns(block_root).await; + async fn attempt_data_column_reconstruction(self: &Arc, slot: Slot, block_root: Hash256) { + let result = self.chain.reconstruct_data_columns(slot, block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8f89b66948..988a68c9dd 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -731,6 +731,8 @@ impl NetworkBeaconProcessor { .map(|block| block.into_available_block()) .collect::>(); + // TODO(gloas) when implementing backfill sync for gloas + // we need a batch verify kzg function in the new da checker match self .chain .data_availability_checker diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index f13815f7b6..18d34b40b3 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -10,7 +10,7 @@ use crate::{ }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; -use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::test_utils::{ @@ -1195,12 +1195,8 @@ async fn accept_processed_gossip_data_columns_without_import() { .map(|data_column| { let subnet_id = DataColumnSubnetId::from_column_index(*data_column.index(), &rig.chain.spec); - validate_data_column_sidecar_for_gossip_fulu::<_, DoNotObserve>( - data_column, - subnet_id, - &rig.chain, - ) - .expect("should be valid data column") + GossipVerifiedDataColumn::<_, DoNotObserve>::new(data_column, subnet_id, &rig.chain) + .expect("should be valid data column") }) .collect(); diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index f5c0fdb4e5..bb43396473 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -548,13 +548,19 @@ mod tests { #[test] fn no_blobs_into_responses() { + let spec = Arc::new(test_spec::()); + let mut u = types::test_utils::test_unstructured(); let blocks = (0..4) .map(|_| { - generate_rand_block_and_blobs::(ForkName::Base, NumBlobs::None, &mut u) - .unwrap() - .0 - .into() + generate_rand_block_and_blobs::( + spec.fork_name_at_epoch(Epoch::new(0)), + NumBlobs::None, + &mut u, + ) + .unwrap() + .0 + .into() }) .collect::>>>(); @@ -565,7 +571,6 @@ mod tests { // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); - let spec = Arc::new(test_spec::()); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); // Assert response is finished and RpcBlocks can be constructed diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 734295ac1d..347b018a93 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -181,7 +181,9 @@ pub enum SyncMessage { result: BlockProcessingResult, }, - /// A block from gossip has completed processing, + /// A gossip-received component has completed processing and the block may now be imported. + /// In Fulu this is sent after block or blob processing. In Gloas this is also sent after + /// data column or payload envelope processing triggers availability. GossipBlockProcessResult { block_root: Hash256, imported: bool }, } @@ -905,9 +907,13 @@ impl SyncManager { }), ); } - // TODO(gloas) support gloas data column variant DataColumnSidecar::Gloas(_) => { - error!("Gloas variant not yet supported") + // TODO(gloas): proper lookup sync for Gloas. Routing into + // `handle_unknown_block_root` here mixes column processing with the + // single-block-lookup path; the Gloas column-arrives-before-block + // case wants its own queue/wakeup. + debug!(%block_root, "Received unknown block data column message"); + self.handle_unknown_block_root(peer_id, block_root); } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index b1ba87c75d..465e23998b 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1085,10 +1085,22 @@ impl SyncNetworkContext { block_root: Hash256, lookup_peers: Arc>>, ) -> Result { + let slot = self + .chain + .canonical_head + .fork_choice_read_lock() + .get_block(&block_root) + .map(|block| block.slot) + .or_else(|| self.chain.slot().ok()) + .ok_or_else(|| { + RpcRequestSendError::InternalError(format!( + "Unable to determine slot for block {block_root:?}" + )) + })?; + let custody_indexes_imported = self .chain - .data_availability_checker - .cached_data_column_indexes(&block_root) + .cached_data_column_indexes(&block_root, slot) .unwrap_or_default(); let current_epoch = self.chain.epoch().map_err(|e| { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index d27c92c21a..c1b2793491 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -2087,8 +2087,7 @@ async fn too_many_processing_failures(depth: usize) { r.build_chain_and_trigger_last_block(depth).await; // Simulate that a peer always returns empty r.simulate( - SimulateConfig::new() - .with_process_result(|| BlockProcessingResult::Err(BlockError::BlockSlotLimitReached)), + SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()), ) .await; // We register multiple penalties, the lookup fails and sync does not progress @@ -2156,9 +2155,10 @@ async fn test_single_block_lookup_duplicate_response() { let mut r = TestRig::default(); r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully - r.simulate(SimulateConfig::new().with_process_result(|| { - BlockProcessingResult::Err(BlockError::DuplicateFullyImported(Hash256::ZERO)) - })) + r.simulate( + SimulateConfig::new() + .with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()), + ) .await; // The block was not actually imported r.assert_head_slot(0); diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 593aa27915..a60859585c 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -1514,6 +1514,14 @@ where } } + /// Returns whether the execution payload for a block has been received. + /// + /// Returns `false` for unknown blocks and pre-Gloas nodes. + pub fn is_payload_received(&self, block_root: &Hash256) -> bool { + self.proto_array.is_payload_received(block_root) + && self.is_finalized_checkpoint_or_descendant(*block_root) + } + /// Returns whether the proposer should extend the execution payload chain of the given block. pub fn should_extend_payload(&self, block_root: &Hash256) -> Result> { let proposer_boost_root = self.fc_store.proposer_boost_root(); diff --git a/consensus/types/src/block/signed_beacon_block.rs b/consensus/types/src/block/signed_beacon_block.rs index 76bb9a09db..11ac17dece 100644 --- a/consensus/types/src/block/signed_beacon_block.rs +++ b/consensus/types/src/block/signed_beacon_block.rs @@ -351,6 +351,12 @@ impl> SignedBeaconBlock self.message() .body() .blob_kzg_commitments() + .or_else(|_| { + self.message() + .body() + .signed_execution_payload_bid() + .map(|bid| &bid.message.blob_kzg_commitments) + }) .map(|c| c.len()) .unwrap_or(0) } diff --git a/consensus/types/src/execution/signed_execution_payload_bid.rs b/consensus/types/src/execution/signed_execution_payload_bid.rs index 3d4f45a267..2ad6dcea1a 100644 --- a/consensus/types/src/execution/signed_execution_payload_bid.rs +++ b/consensus/types/src/execution/signed_execution_payload_bid.rs @@ -23,6 +23,14 @@ pub struct SignedExecutionPayloadBid { } impl SignedExecutionPayloadBid { + pub fn epoch(&self) -> crate::Epoch { + self.message.slot.epoch(E::slots_per_epoch()) + } + + pub fn slot(&self) -> crate::Slot { + self.message.slot + } + pub fn empty() -> Self { Self { message: ExecutionPayloadBid::default(),