From bd8cfa35f4a07379182551838df2ac114f3ec52e Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:36:36 +0200 Subject: [PATCH] Refine Gloas data column availability --- beacon_node/beacon_chain/src/beacon_chain.rs | 86 ++- .../src/data_column_verification.rs | 308 ++++++++-- beacon_node/beacon_chain/src/kzg_utils.rs | 51 ++ .../payload_envelope_verification/import.rs | 6 +- .../src/pending_payload_cache/mod.rs | 575 +++++------------- .../pending_components.rs | 96 +-- .../gossip_methods.rs | 58 +- .../src/network_beacon_processor/tests.rs | 10 +- beacon_node/network/src/sync/manager.rs | 4 +- 9 files changed, 604 insertions(+), 590 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d276fbc15e..8456bbbc02 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -27,7 +27,7 @@ use crate::data_availability_checker::DataAvailabilityChecker; use crate::data_column_verification::{ GossipDataColumnError, GossipPartialDataColumnError, GossipVerifiedDataColumn, GossipVerifiedPartialDataColumnHeader, KzgVerifiedCustodyPartialDataColumn, - KzgVerifiedPartialDataColumn, PartialColumnVerificationResult, + KzgVerifiedPartialDataColumn, PartialColumnVerificationResult, load_gloas_payload_bid, validate_partial_data_column_sidecar_for_gossip, }; use crate::early_attester_cache::EarlyAttesterCache; @@ -71,7 +71,7 @@ use crate::payload_envelope_verification::EnvelopeError; use crate::pending_payload_cache::PendingPayloadCache; use crate::pending_payload_cache::{ Availability as PayloadAvailability, - DataColumnReconstructionResult as DataColumnReconstructionResultGloas, + DataColumnReconstructionResult as DataColumnReconstructionResultGloas, PendingPayloadBid, }; use crate::pending_payload_envelopes::PendingPayloadEnvelopes; use crate::persisted_beacon_chain::PersistedBeaconChain; @@ -3317,12 +3317,19 @@ impl BeaconChain { .into()); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its samples again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) + let is_gloas = self + .spec + .fork_name_at_slot::(slot) + .gloas_enabled(); + + // Before Gloas, if this block has already been imported to fork choice it must have been + // available, so we don't need to process its samples again. In Gloas the beacon block is + // imported before the payload envelope and data columns, so this check does not apply. + if !is_gloas + && self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) { return Err(BlockError::DuplicateFullyImported(block_root).into()); } @@ -3419,10 +3426,16 @@ impl BeaconChain { .fork_name_at_slot::(slot) .gloas_enabled() { + let Some(bid) = + load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)? + else { + return Err(EnvelopeError::BlockRootUnknown { block_root }.into()); + }; let availability = self .pending_payload_cache .put_kzg_verified_custody_data_columns( block_root, + bid, merge_result.full_columns.clone(), ) .map_err(EnvelopeError::from)?; @@ -3631,8 +3644,9 @@ impl BeaconChain { custody_columns.iter().map(|column| column.as_ref()), ); - self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) - .await + Ok(self + .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) + .await?) } pub async fn reconstruct_data_columns( @@ -3663,11 +3677,16 @@ impl BeaconChain { .gloas_enabled(); if is_gloas { + let Some(bid) = + load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)? + else { + return Err(EnvelopeError::BlockRootUnknown { block_root }.into()); + }; let pending_payload_cache = self.pending_payload_cache.clone(); let result = self .task_executor .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { - pending_payload_cache.reconstruct_data_columns(&block_root) + pending_payload_cache.reconstruct_data_columns(&block_root, bid) }) .await .map_err(|_| EnvelopeError::from(BeaconChainError::RuntimeShutdown))? @@ -3806,6 +3825,15 @@ impl BeaconChain { &chain, notify_execution_layer, )?; + + let block = execution_pending.block.block_cloned(); + if block.fork_name_unchecked().gloas_enabled() { + let bid = PendingPayloadBid::from_block(block.as_ref())?; + chain + .pending_payload_cache + .init_pending_bid(block_root, bid); + } + publish_fn()?; // Record the time it took to complete consensus verification. @@ -3979,9 +4007,16 @@ impl BeaconChain { .fork_name_at_slot::(slot) .gloas_enabled() { + let Some(bid) = + load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)? + else { + return Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )); + }; let availability = self .pending_payload_cache - .put_gossip_verified_data_columns(block_root, slot, data_columns) + .put_gossip_verified_data_columns(block_root, bid, data_columns) .map_err(EnvelopeError::from)?; Ok(self .process_payload_availability(slot, availability, publish_fn) @@ -4085,9 +4120,16 @@ impl BeaconChain { .fork_name_at_slot::(slot) .gloas_enabled() { + let Some(bid) = + load_gloas_payload_bid(block_root, self).map_err(EnvelopeError::from)? + else { + return Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )); + }; let availability = self .pending_payload_cache - .put_kzg_verified_custody_data_columns(block_root, data_columns) + .put_kzg_verified_custody_data_columns(block_root, bid, data_columns) .map_err(EnvelopeError::from)?; Ok(self .process_payload_availability(slot, availability, || Ok(())) @@ -4112,7 +4154,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, custody_columns: DataColumnSidecarList, - ) -> Result { + ) -> Result { // TODO(gloas) ensure that this check is no longer relevant post gloas self.check_data_column_sidecar_header_signature_and_slashability( block_root, @@ -4127,13 +4169,23 @@ impl BeaconChain { .fork_name_at_slot::(slot) .gloas_enabled() { + let Some(bid) = load_gloas_payload_bid(block_root, self).map_err(BlockError::from)? + else { + return Ok(AvailabilityProcessingStatus::MissingComponents( + slot, block_root, + )); + }; let availability = self .pending_payload_cache - .put_rpc_custody_columns(block_root, slot, custody_columns) - .map_err(EnvelopeError::from)?; + .put_rpc_custody_columns(block_root, bid, custody_columns) + .map_err(BlockError::from)?; Ok(self .process_payload_availability(slot, availability, || Ok(())) - .await?) + .await + .map_err(|error| match error { + EnvelopeError::BlockError(error) => error, + error => BlockError::InternalError(error.to_string()), + })?) } else { let availability = self .data_availability_checker diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index b420965024..e342a05798 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -3,11 +3,13 @@ use crate::block_verification::{ }; use crate::data_availability_checker::MissingCellsError; use crate::kzg_utils::{ - reconstruct_data_columns, validate_full_data_columns, validate_partial_data_columns, + reconstruct_data_columns, validate_full_data_columns, + validate_full_data_columns_with_commitments, validate_partial_data_columns, }; use crate::observed_data_sidecars::{ Error as ObservedDataSidecarsError, ObservationKey, ObservationStrategy, Observe, }; +use crate::pending_payload_cache::PendingPayloadBid; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics}; use educe::Educe; use fork_choice::ProtoBlock; @@ -20,6 +22,7 @@ use std::iter; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; +use store::DatabaseBlock; use tracing::{debug, instrument}; use tree_hash::TreeHash; use types::data::{ @@ -27,8 +30,9 @@ use types::data::{ PartialDataColumnSidecarError, }; use types::{ - BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, - EthSpec, Hash256, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, Slot, + BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, + DataColumnSubnetId, EthSpec, Hash256, KzgCommitment, PartialDataColumnSidecarRef, + SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip data column. @@ -131,6 +135,24 @@ pub enum GossipDataColumnError { parent_root: Hash256, slot: Slot, }, + /// The block referenced by the data column is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the column without the referenced block, the peer isn't necessarily faulty. + BlockRootUnknown { + block_root: Hash256, + slot: Slot, + }, + /// The data column slot does not match its referenced block slot. + /// + /// ## Peer scoring + /// + /// The column sidecar is invalid and the peer is faulty. + BlockSlotMismatch { + block_slot: Slot, + data_column_slot: Slot, + }, /// The column conflicts with finalization, no need to propagate. /// /// ## Peer scoring @@ -307,21 +329,32 @@ impl GossipVerifiedDataColumn let header = c.signed_block_header.clone(); // We only process slashing info if the gossip verification failed // since we do not process the data column any further in that case. - validate_data_column_sidecar_for_gossip_fulu::( - column_sidecar, + validate_data_column_sidecar_for_gossip_fulu::(c, subnet_id, chain).map_err( + |e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }, + )?; + } + DataColumnSidecar::Gloas(data_column_gloas) => { + validate_data_column_sidecar_for_gossip_gloas::( + data_column_gloas, subnet_id, chain, - ) - .map_err(|e| { - process_block_slash_info::<_, GossipDataColumnError>( - chain, - BlockSlashInfo::from_early_error_data_column(header, e), - ) - }) + )?; } - // TODO(gloas) support gloas data column variant - DataColumnSidecar::Gloas(_) => Err(GossipDataColumnError::InvalidVariant), } + + Ok(GossipVerifiedDataColumn { + block_root: column_sidecar.block_root(), + data_column: KzgVerifiedDataColumn { + data: column_sidecar, + seen_timestamp: chain.slot_clock.now_duration().unwrap_or_default(), + }, + _phantom: PhantomData, + }) } /// Create a `GossipVerifiedDataColumn` from `DataColumnSidecar` for block production ONLY. @@ -331,7 +364,28 @@ impl GossipVerifiedDataColumn column_sidecar: Arc>, chain: &BeaconChain, ) -> Result { - verify_data_column_sidecar(&column_sidecar, &chain.spec)?; + match column_sidecar.as_ref() { + DataColumnSidecar::Fulu(data_column_fulu) => { + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; + } + DataColumnSidecar::Gloas(_) => { + let kzg_commitments = load_gloas_payload_bid(column_sidecar.block_root(), chain)? + .ok_or(GossipDataColumnError::BlockRootUnknown { + block_root: column_sidecar.block_root(), + slot: column_sidecar.slot(), + })? + .blob_kzg_commitments; + verify_data_column_sidecar_with_commitments_len( + &column_sidecar, + kzg_commitments.len(), + &chain.spec, + )?; + } + } // Check if the data column is already in the DA checker cache. This happens when data columns // are made available through the `engine_getBlobs` method. If it exists in the cache, we know @@ -340,28 +394,19 @@ impl GossipVerifiedDataColumn // In this case, we should accept it for gossip propagation. verify_is_unknown_sidecar(chain, &column_sidecar)?; - match chain - .data_availability_checker - .missing_cells_for_column_sidecar(&column_sidecar) - { - Ok(Some(_)) => Ok(Self { + match missing_cells_for_column_sidecar(chain, &column_sidecar)? { + Some(_) => Ok(Self { block_root: column_sidecar.block_root(), data_column: KzgVerifiedDataColumn::from_execution_verified(column_sidecar), _phantom: Default::default(), }), - Ok(None) => { + None => { // Observe this data column so we don't process it again. if O::observe() { observe_gossip_data_column(&column_sidecar, chain)?; } Err(GossipDataColumnError::PriorKnownUnpublished) } - Err(MissingCellsError::MismatchesCachedColumn) => { - Err(GossipDataColumnError::MismatchesCachedColumn) - } - Err(MissingCellsError::UnexpectedError(_)) => { - todo!("handle unexpected error") - } } } @@ -440,6 +485,22 @@ impl KzgVerifiedDataColumn { .collect()) } + pub fn from_batch_with_scoring_and_commitments( + data_columns: Vec>>, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + ) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_BATCH_TIMES); + validate_full_data_columns_with_commitments(kzg, data_columns.iter(), kzg_commitments)?; + Ok(data_columns + .into_iter() + .map(|column| Self { + data: column, + seen_timestamp: timestamp_now(), + }) + .collect()) + } + pub fn to_data_column(self) -> Arc> { self.data } @@ -854,6 +915,26 @@ pub fn verify_kzg_for_data_column( }) } +#[instrument(skip_all, level = "debug")] +pub fn verify_kzg_for_data_column_with_commitments( + data_column: Arc>, + cells_to_verify: PartialDataColumnSidecarRef, + kzg_commitments: &[KzgCommitment], + kzg: &Kzg, + seen_timestamp: Duration, +) -> Result, (Option, KzgError)> { + let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); + validate_partial_data_columns( + kzg, + iter::once((*data_column.index(), cells_to_verify)), + kzg_commitments, + )?; + Ok(KzgVerifiedDataColumn { + data: data_column, + seen_timestamp, + }) +} + /// Complete kzg verification for a `VerifiablePartialDataColumn`. /// /// Returns an error if the kzg verification check fails. @@ -901,16 +982,18 @@ where level = "debug" )] pub fn validate_data_column_sidecar_for_gossip_fulu( - data_column: Arc>, + data_column_fulu: &DataColumnSidecarFulu, subnet: DataColumnSubnetId, chain: &BeaconChain, -) -> Result, GossipDataColumnError> { - let DataColumnSidecar::Fulu(data_column_fulu) = data_column.as_ref() else { - return Err(GossipDataColumnError::InvalidVariant); - }; - +) -> Result<(), GossipDataColumnError> { + let data_column = Arc::new(DataColumnSidecar::Fulu(data_column_fulu.clone())); let column_slot = data_column.slot(); - verify_data_column_sidecar(&data_column, &chain.spec)?; + + verify_data_column_sidecar_with_commitments_len( + &data_column, + data_column_fulu.kzg_commitments.len(), + &chain.spec, + )?; verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; verify_sidecar_not_from_future_slot(chain, column_slot)?; verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; @@ -947,13 +1030,12 @@ pub fn validate_data_column_sidecar_for_gossip_fulu( + data_column_gloas: &DataColumnSidecarGloas, + subnet: DataColumnSubnetId, + chain: &BeaconChain, +) -> Result<(), GossipDataColumnError> { + let data_column = Arc::new(DataColumnSidecar::Gloas(data_column_gloas.clone())); + let column_slot = data_column.slot(); + + if *data_column.index() >= T::EthSpec::number_of_columns() as u64 { + return Err(GossipDataColumnError::InvalidColumnIndex( + *data_column.index(), + )); + } + + if !chain + .spec + .fork_name_at_slot::(column_slot) + .gloas_enabled() + { + return Err(GossipDataColumnError::InvalidVariant); + } + + verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; + verify_sidecar_not_from_future_slot(chain, column_slot)?; + verify_slot_greater_than_latest_finalized_slot(chain, column_slot)?; + verify_is_unknown_sidecar(chain, &data_column)?; + + let kzg_commitments = load_gloas_payload_bid(data_column.block_root(), chain)? + .ok_or(GossipDataColumnError::BlockRootUnknown { + block_root: data_column.block_root(), + slot: column_slot, + })? + .blob_kzg_commitments; + verify_data_column_sidecar_with_commitments_len( + &data_column, + kzg_commitments.len(), + &chain.spec, + )?; + + let Some(cells_to_kzg_verify) = missing_cells_for_column_sidecar(chain, &data_column)? else { + // Observe this data column so we don't process it again. + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + return Err(GossipDataColumnError::PriorKnownUnpublished); + }; + + let kzg = &chain.kzg; + let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default(); + verify_kzg_for_data_column_with_commitments( + data_column.clone(), + cells_to_kzg_verify, + kzg_commitments.as_ref(), + kzg, + seen_timestamp, + ) + .map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?; + + if O::observe() { + observe_gossip_data_column(&data_column, chain)?; + } + + Ok(()) } #[instrument(skip_all, level = "debug")] @@ -1109,9 +1260,9 @@ pub enum PartialColumnVerificationResult { Err(GossipPartialDataColumnError), } -/// Verify if the data column sidecar is valid. -fn verify_data_column_sidecar( +fn verify_data_column_sidecar_with_commitments_len( data_column: &DataColumnSidecar, + commitments_len: usize, spec: &ChainSpec, ) -> Result<(), GossipDataColumnError> { if *data_column.index() >= E::number_of_columns() as u64 { @@ -1120,12 +1271,6 @@ fn verify_data_column_sidecar( )); } - // TODO(gloas): implement Gloas verification that takes kzg_commitments from block as parameter - let commitments_len = match data_column { - DataColumnSidecar::Fulu(dc) => dc.kzg_commitments.len(), - DataColumnSidecar::Gloas(_) => return Err(GossipDataColumnError::InvalidVariant), - }; - if commitments_len == 0 { return Err(GossipDataColumnError::UnexpectedDataColumn); } @@ -1158,6 +1303,63 @@ fn verify_data_column_sidecar( Ok(()) } +pub(crate) fn load_gloas_payload_bid( + block_root: Hash256, + chain: &BeaconChain, +) -> Result>, BeaconChainError> { + let bid = if let Some(bid) = chain.pending_payload_cache.get_bid(&block_root) { + bid + } else if let Some(block) = chain.early_attester_cache.get_block(block_root) { + PendingPayloadBid::from_block(block.as_ref()).map_err(BeaconChainError::BeaconStateError)? + } else { + match chain + .store + .try_get_full_block(&block_root) + .map_err(BeaconChainError::DBError)? + { + Some(DatabaseBlock::Full(block)) => { + PendingPayloadBid::from_block(&block).map_err(BeaconChainError::BeaconStateError)? + } + Some(DatabaseBlock::Blinded(block)) => { + PendingPayloadBid::from_block(&block).map_err(BeaconChainError::BeaconStateError)? + } + None => { + return Ok(None); + } + } + }; + + chain + .pending_payload_cache + .init_pending_bid(block_root, bid.clone()); + + Ok(Some(bid)) +} + +fn missing_cells_for_column_sidecar<'a, T: BeaconChainTypes>( + chain: &'_ BeaconChain, + data_column: &'a DataColumnSidecar, +) -> Result>, GossipDataColumnError> { + let result = if chain + .spec + .fork_name_at_slot::(data_column.slot()) + .gloas_enabled() + { + chain + .pending_payload_cache + .missing_cells_for_column_sidecar(data_column) + } else { + chain + .data_availability_checker + .missing_cells_for_column_sidecar(data_column) + }; + + result.map_err(|err| match err { + MissingCellsError::MismatchesCachedColumn => GossipDataColumnError::MismatchesCachedColumn, + MissingCellsError::UnexpectedError(_) => todo!("handle unexpected error"), + }) +} + /// Verify that `column_sidecar` is not yet known, i.e. this is the first time `column_sidecar` has been received for the tuple: /// `(block_header.slot, block_header.proposer_index, column_sidecar.index)` fn verify_is_unknown_sidecar( @@ -1441,7 +1643,7 @@ mod test { let verify_fn = |column_sidecar: DataColumnSidecar| { let col_index = *column_sidecar.index(); validate_data_column_sidecar_for_gossip_fulu::<_, Observe>( - column_sidecar.into(), + column_sidecar.as_fulu().unwrap(), DataColumnSubnetId::from_column_index(col_index, &harness.spec), &harness.chain, ) diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index b05a896777..5ad1cc115d 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -111,6 +111,57 @@ pub fn validate_full_data_columns<'a, E: EthSpec>( kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) } +/// Validate a batch of full `DataColumnSidecar`s against commitments supplied out-of-band. +/// +/// Gloas sidecars do not carry commitments. Their commitments come from the block's +/// `ExecutionPayloadBid`. +pub fn validate_full_data_columns_with_commitments<'a, E: EthSpec>( + kzg: &Kzg, + data_column_iter: impl Iterator>>, + kzg_commitments: &[KzgCommitment], +) -> Result<(), (Option, KzgError)> { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut column_indices = Vec::new(); + let mut commitments = Vec::new(); + + for data_column in data_column_iter { + let col_index = *data_column.index(); + + if data_column.column().is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + + for cell in data_column.column() { + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); + column_indices.push(col_index); + } + + for &proof in data_column.kzg_proofs() { + proofs.push(proof.0); + } + + for &commitment in kzg_commitments { + commitments.push(commitment.0); + } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column. + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } + } + + kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) +} + /// Validate a batch of partial `VerifiablePartialDataColumn`s. /// /// Partial columns may have missing cells, indicated by a bitmap. We only verify present cells. diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index beabe0e76c..8b7c7eb4d7 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -12,6 +12,7 @@ use super::{ AvailableEnvelope, AvailableExecutedEnvelope, EnvelopeError, gossip_verified_envelope::GossipVerifiedEnvelope, }; +use crate::data_column_verification::load_gloas_payload_bid; use crate::pending_payload_cache::Availability as PayloadAvailability; use crate::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, @@ -168,9 +169,12 @@ impl BeaconChain { envelope: AvailabilityPendingExecutedEnvelope, ) -> Result { let slot = envelope.envelope.slot(); + let block_root = envelope.block_root; + let bid = load_gloas_payload_bid(block_root, self)? + .ok_or(EnvelopeError::BlockRootUnknown { block_root })?; let availability = self .pending_payload_cache - .put_executed_payload_envelope(envelope)?; + .put_executed_payload_envelope(bid, envelope)?; self.process_payload_envelope_availability(slot, availability, || Ok(())) .await } diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index b740212c9d..21a86f05cc 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -55,13 +55,12 @@ use task_executor::TaskExecutor; use tracing::{Span, debug, error, instrument, trace}; use types::{ ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, + PartialDataColumnSidecarRef, }; mod pending_column; mod pending_components; -use crate::block_verification_types::AsBlock; use crate::data_column_verification::{ GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, }; @@ -69,6 +68,7 @@ use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, }; use crate::observed_data_sidecars::ObservationStrategy; +pub(crate) use pending_components::PendingPayloadBid; use pending_components::{PendingComponents, ReconstructColumnsDecision}; use types::new_non_zero_usize; @@ -153,19 +153,7 @@ impl PendingPayloadCache { block_root: Hash256, ) -> Option> { self.peek_pending_components(&block_root, |components| { - components.map(|c| { - c.verified_data_columns - .iter() - .filter_map(|(col_idx, col)| { - col.try_to_sidecar( - *col_idx, - c.block.slot(), - block_root, - c.num_blobs_expected(), - ) - }) - .collect() - }) + components.map(|c| c.get_cached_data_columns(block_root)) }) } @@ -177,6 +165,13 @@ impl PendingPayloadCache { }) } + /// Return the cached Gloas payload bid metadata for `block_root`, if present. + pub fn get_bid(&self, block_root: &Hash256) -> Option> { + self.peek_pending_components(block_root, |components| { + components.map(|components| components.bid.clone()) + }) + } + /// Filter out cells that are already cached for the given column sidecar. /// Returns the cells that still need KZG verification, or `None` if all cells are cached. #[instrument(skip_all, level = "trace")] @@ -206,12 +201,13 @@ impl PendingPayloadCache { /// Insert an executed payload envelope into the cache and performs an availability check pub fn put_executed_payload_envelope( &self, + bid: PendingPayloadBid, executed_envelope: AvailabilityPendingExecutedEnvelope, ) -> Result, AvailabilityCheckError> { let epoch = executed_envelope.envelope.epoch(); let beacon_block_root = executed_envelope.envelope.beacon_block_root(); let pending_components = - self.get_pending_components(beacon_block_root, |pending_components| { + self.get_pending_components(beacon_block_root, bid, |pending_components| { pending_components.insert_executed_payload_envelope(executed_envelope); Ok(()) })?; @@ -229,16 +225,10 @@ impl PendingPayloadCache { self.check_availability(beacon_block_root, pending_components, num_expected_columns) } - /// Initialize pending components for a block. Called when the beacon block (containing the - /// bid) arrives. Sets up the slot and expected blob count so that subsequent column insertions - /// know how many cells to expect per column. - pub fn init_pending_block( - &self, - block_root: Hash256, - block: Arc>, - ) { + /// Initialize pending components for a block's Gloas bid. + pub fn init_pending_bid(&self, block_root: Hash256, bid: PendingPayloadBid) { let mut write_lock = self.availability_cache.write(); - write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root, block)); + write_lock.get_or_insert_mut(block_root, || PendingComponents::empty(block_root, bid)); } /// Perform KZG verification on RPC custody columns and insert them into the cache. @@ -247,14 +237,17 @@ impl PendingPayloadCache { pub fn put_rpc_custody_columns( &self, block_root: Hash256, - slot: Slot, + bid: PendingPayloadBid, custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { - let kzg_verified_columns = - KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; + let kzg_verified_columns = KzgVerifiedDataColumn::from_batch_with_scoring_and_commitments( + custody_columns, + bid.blob_kzg_commitments.as_ref(), + &self.kzg, + ) + .map_err(AvailabilityCheckError::InvalidColumn)?; - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let epoch = bid.slot.epoch(T::EthSpec::slots_per_epoch()); let sampling_columns = self .custody_context .sampling_columns_for_epoch(epoch, &self.spec); @@ -264,7 +257,7 @@ impl PendingPayloadCache { .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) .collect::>(); - self.put_kzg_verified_custody_data_columns(block_root, verified_custody_columns) + self.put_kzg_verified_custody_data_columns(block_root, bid, verified_custody_columns) } /// Perform KZG verification on gossip verified custody columns and insert them into the cache. @@ -273,10 +266,10 @@ impl PendingPayloadCache { pub fn put_gossip_verified_data_columns( &self, block_root: Hash256, - slot: Slot, + bid: PendingPayloadBid, data_columns: Vec>, ) -> Result, AvailabilityCheckError> { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let epoch = bid.slot.epoch(T::EthSpec::slots_per_epoch()); let sampling_columns = self .custody_context .sampling_columns_for_epoch(epoch, &self.spec); @@ -286,7 +279,7 @@ impl PendingPayloadCache { .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) .collect::>(); - self.put_kzg_verified_custody_data_columns(block_root, custody_columns) + self.put_kzg_verified_custody_data_columns(block_root, bid, custody_columns) } /// Insert KZG verified columns into the cache. @@ -294,11 +287,13 @@ impl PendingPayloadCache { pub fn put_kzg_verified_custody_data_columns( &self, block_root: Hash256, + bid: PendingPayloadBid, kzg_verified_data_columns: Vec>, ) -> Result, AvailabilityCheckError> { - let pending_components = self.get_pending_components(block_root, |pending_components| { - pending_components.merge_data_columns(kzg_verified_data_columns) - })?; + let pending_components = + self.get_pending_components(block_root, bid, |pending_components| { + pending_components.merge_data_columns(kzg_verified_data_columns) + })?; let num_expected_columns = self.get_num_expected_columns(pending_components.epoch()); @@ -317,6 +312,7 @@ impl PendingPayloadCache { pub fn reconstruct_data_columns( &self, block_root: &Hash256, + bid: PendingPayloadBid, ) -> Result, AvailabilityCheckError> { let verified_data_columns = match self.check_and_set_reconstruction_started(block_root) { ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns, @@ -324,6 +320,10 @@ impl PendingPayloadCache { return Ok(DataColumnReconstructionResult::NotStarted(reason)); } }; + let existing_column_indices = verified_data_columns + .iter() + .map(|data_column| *data_column.index()) + .collect::>(); metrics::inc_counter(&KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS); let timer = metrics::start_timer(&metrics::DATA_AVAILABILITY_RECONSTRUCTION_TIME); @@ -344,12 +344,6 @@ impl PendingPayloadCache { AvailabilityCheckError::ReconstructColumnsError(e) })?; - let Some(existing_column_indices) = self.cached_data_column_indexes(block_root) else { - return Err(AvailabilityCheckError::Unexpected( - "block no longer exists in the data availability checker".to_string(), - )); - }; - let Some(slot) = all_data_columns.first().map(|d| d.as_data_column().slot()) else { return Ok(DataColumnReconstructionResult::RecoveredColumnsNotImported( "No new columns to import and publish", @@ -383,6 +377,7 @@ impl PendingPayloadCache { self.put_kzg_verified_custody_data_columns( *block_root, + bid, data_columns_to_import_and_publish.clone(), ) .map(|availability| { @@ -436,16 +431,15 @@ impl PendingPayloadCache { } } - /// Gets existing `PendingComponents` and applies the `update_fn` while holding the write lock. + /// Gets or creates `PendingComponents` and applies the `update_fn` while holding the write lock. /// /// Once the update is complete, the write lock is downgraded and a read guard with a /// reference of the updated `PendingComponents` is returned. /// - /// Returns an error if no pending components exist for the given block root (the block must - /// be initialized via `init_pending_block` first). fn get_pending_components( &self, block_root: Hash256, + bid: PendingPayloadBid, update_fn: F, ) -> Result>, AvailabilityCheckError> where @@ -454,11 +448,8 @@ impl PendingPayloadCache { let mut write_lock = self.availability_cache.write(); { - let pending_components = write_lock.get_mut(&block_root).ok_or_else(|| { - AvailabilityCheckError::Unexpected( - "pending components not initialized for block".to_string(), - ) - })?; + let pending_components = write_lock + .get_or_insert_mut(block_root, || PendingComponents::empty(block_root, bid)); update_fn(pending_components)? } @@ -634,7 +625,6 @@ mod data_availability_checker_tests { use crate::block_verification::PayloadVerificationOutcome; use crate::test_utils::{ NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, - test_spec, }; use crate::{ custody_context::NodeCustodyType, @@ -652,8 +642,10 @@ mod data_availability_checker_tests { }; type E = MinimalEthSpec; + type T = DiskHarnessType; const LOW_VALIDATOR_COUNT: usize = 32; + const RNG_SEED: u64 = 0xDEADBEEF; fn gloas_spec() -> Arc { let mut spec = E::default_spec(); @@ -703,18 +695,7 @@ mod data_availability_checker_tests { .build() } - async fn setup_harness_and_cache() -> ( - BeaconChainHarness>, - Arc>, - TempDir, - ) - where - T: BeaconChainTypes< - HotStore = BeaconNodeBackend, - ColdStore = BeaconNodeBackend, - EthSpec = E, - >, - { + async fn setup() -> (BeaconChainHarness, Arc>, TempDir) { create_test_tracing_subscriber(); let chain_db_path = tempdir().expect("should get temp dir"); let harness = get_gloas_chain::(&chain_db_path).await; @@ -732,22 +713,6 @@ mod data_availability_checker_tests { (harness, cache, chain_db_path) } - fn is_gloas_enabled() -> bool { - let spec = test_spec::(); - spec.fork_name_at_slot::(Slot::new(0)).gloas_enabled() - } - - #[tokio::test] - async fn test_cache_creation() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (_harness, cache, _path) = setup_harness_and_cache::().await; - assert_eq!(cache.block_cache_size(), 0); - } - fn make_test_signed_envelope(block_root: Hash256) -> Arc> { Arc::new(SignedExecutionPayloadEnvelope { message: ExecutionPayloadEnvelope { @@ -771,311 +736,132 @@ mod data_availability_checker_tests { } } + fn init_block( + cache: &PendingPayloadCache, + spec: &ChainSpec, + num_blobs: NumBlobs, + seed: u64, + ) -> (PendingPayloadBid, Hash256, DataColumnSidecarList) { + let mut rng = StdRng::seed_from_u64(seed); + let (block, data_columns) = + generate_rand_block_and_data_columns::(ForkName::Gloas, num_blobs, &mut rng, spec); + let block_root = block.canonical_root(); + let bid = PendingPayloadBid::from_block(&block).expect("should get payload bid"); + cache.init_pending_bid(block_root, bid.clone()); + (bid, block_root, data_columns) + } + #[tokio::test] - async fn test_put_columns_creates_pending_components() { - if !is_gloas_enabled() { - return; + async fn caches_and_deduplicates_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + let column = data_columns.first().cloned().expect("should have column"); + let column_index = *column.index(); + + for _ in 0..2 { + cache + .put_rpc_custody_columns(block_root, bid.clone(), vec![column.clone()]) + .expect("should put column"); } - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, + assert_eq!( + cache.cached_data_column_indexes(&block_root), + Some(vec![column_index]) + ); + assert_eq!( + cache.get_data_columns(block_root).map(|cols| cols.len()), + Some(1) ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let columns: DataColumnSidecarList = data_columns.into_iter().take(1).collect(); - - let result = cache.put_rpc_custody_columns(block_root, slot, columns); - assert!(result.is_ok(), "put_rpc_custody_columns failed: {result:?}"); - assert_eq!(cache.block_cache_size(), 1); - - let cached_indices = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.get_cached_data_columns_indices()) - }); - assert!(cached_indices.is_some()); - assert_eq!(cached_indices.unwrap().len(), 1); } #[tokio::test] - async fn test_column_deduplication() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let first_column = data_columns.first().cloned().expect("should have column"); - let column_index = *first_column.index(); - - cache - .put_rpc_custody_columns(block_root, slot, vec![first_column.clone()]) - .expect("should put column"); - - cache - .put_rpc_custody_columns(block_root, slot, vec![first_column]) - .expect("should put column again"); - - let cached_indices = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.get_cached_data_columns_indices()) - }); - assert!(cached_indices.is_some()); - let indices = cached_indices.unwrap(); - assert_eq!(indices.len(), 1); - assert_eq!(indices[0], column_index); - } - - #[tokio::test] - async fn test_columns_without_envelope_not_available() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); + async fn requires_columns_and_executed_envelope() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); let result = cache - .put_rpc_custody_columns(block_root, slot, data_columns) + .put_rpc_custody_columns(block_root, bid, data_columns) .expect("should put columns"); + assert!(matches!(result, Availability::MissingComponents(_))); - // Without an executed envelope, should still be missing components + let result = cache + .put_executed_payload_envelope(bid, make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + let Availability::Available(envelope) = result else { + panic!("expected available envelope"); + }; + assert_eq!(envelope.block_root, block_root); + assert_eq!(envelope.envelope.columns.len(), E::number_of_columns()); + } + + #[tokio::test] + async fn zero_blob_envelope_is_available_without_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, _columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(0), RNG_SEED); + + let result = cache + .put_executed_payload_envelope(bid, make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + let Availability::Available(envelope) = result else { + panic!("zero-blob block should be available"); + }; + assert!(envelope.envelope.columns.is_empty()); + } + + #[tokio::test] + async fn partial_columns_wait_for_missing_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + + cache + .put_executed_payload_envelope(bid.clone(), make_test_executed_envelope(block_root)) + .expect("should put executed envelope"); + + let columns = data_columns.into_iter().take(1).collect(); + let result = cache + .put_rpc_custody_columns(block_root, bid, columns) + .expect("should put columns"); assert!(matches!(result, Availability::MissingComponents(_))); } #[tokio::test] - async fn test_full_availability_flow() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let result = cache - .put_rpc_custody_columns(block_root, slot, data_columns) - .expect("should put columns"); - - assert!(matches!(result, Availability::MissingComponents(_))); - - let executed_envelope = make_test_executed_envelope(block_root); - let result = cache - .put_executed_payload_envelope(executed_envelope) - .expect("should put executed envelope"); - - assert!( - matches!(result, Availability::Available(_)), - "expected Available, got {:?}", - result - ); - } - - #[tokio::test] - async fn test_zero_blob_immediately_available() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, _) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(0), - &mut rng, - &spec, - ); - - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let executed_envelope = make_test_executed_envelope(block_root); - let result = cache - .put_executed_payload_envelope(executed_envelope) - .expect("should put executed envelope"); - - assert!( - matches!(result, Availability::Available(_)), - "zero-blob block should be immediately available, got {:?}", - result - ); - } - - #[tokio::test] - async fn test_handle_reconstruction_failure_clears_columns() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let columns: DataColumnSidecarList = data_columns.into_iter().take(5).collect(); + async fn reconstruction_failure_clears_columns() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + let columns = data_columns.into_iter().take(5).collect(); cache - .put_rpc_custody_columns(block_root, slot, columns) + .put_rpc_custody_columns(block_root, bid, columns) .expect("should put columns"); - - let cached_count = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.verified_data_columns.len()) - }); - assert_eq!(cached_count, Some(5)); + assert_eq!( + cache + .cached_data_column_indexes(&block_root) + .map(|indices| indices.len()), + Some(5) + ); cache.handle_reconstruction_failure(&block_root); - - let cached_count_after = cache.peek_pending_components(&block_root, |components| { - components.map(|c| c.verified_data_columns.len()) - }); - assert_eq!(cached_count_after, Some(0)); + assert_eq!(cache.cached_data_column_indexes(&block_root), Some(vec![])); } #[tokio::test] - async fn test_maintenance_removes_old_entries() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (_harness, cache, _path) = setup_harness_and_cache::().await; - - let cutoff_epoch = Epoch::new(100); - cache - .do_maintenance(cutoff_epoch) - .expect("maintenance should succeed"); - - assert_eq!(cache.block_cache_size(), 0); - } - - #[tokio::test] - async fn test_peek_data_columns() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - - assert!(cache.get_data_columns(block_root).is_none()); - - cache.init_pending_block(block_root, Arc::new(block)); - - let columns: DataColumnSidecarList = data_columns.into_iter().take(3).collect(); - - cache - .put_rpc_custody_columns(block_root, slot, columns) - .expect("should put columns"); - - let peeked = cache.get_data_columns(block_root); - assert!(peeked.is_some()); - assert_eq!(peeked.unwrap().len(), 3); - } - - #[tokio::test] - async fn test_lru_eviction() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block = Arc::new(block); - let first_column = data_columns.first().cloned().expect("should have column"); - + async fn lru_eviction_keeps_cache_bounded() { + let (harness, cache, _path) = setup().await; let mut roots = Vec::new(); - for _ in 0..33 { - let block_root = Hash256::random(); + + for i in 0..33 { + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED + i); + let column = data_columns.first().cloned().expect("should have column"); roots.push(block_root); - cache.init_pending_block(block_root, block.clone()); cache - .put_rpc_custody_columns(block_root, slot, vec![first_column.clone()]) + .put_rpc_custody_columns(block_root, bid, vec![column]) .expect("should put columns"); } @@ -1085,79 +871,20 @@ mod data_availability_checker_tests { } #[tokio::test] - async fn test_maintenance_prunes_old_entries() { - if !is_gloas_enabled() { - return; - } + async fn maintenance_prunes_old_entries() { + let (harness, cache, _path) = setup().await; + let (bid, block_root, data_columns) = + init_block(&cache, &harness.spec, NumBlobs::Number(1), RNG_SEED); + let column = data_columns.first().cloned().expect("should have column"); - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let col = data_columns.first().cloned().expect("should have column"); cache - .put_rpc_custody_columns(block_root, slot, vec![col]) + .put_rpc_custody_columns(block_root, bid, vec![column]) .expect("should put columns"); assert_eq!(cache.block_cache_size(), 1); - - // slot=0 → epoch=0 < cutoff=100, should prune cache - .do_maintenance(Epoch::new(100)) + .do_maintenance(Epoch::new(1)) .expect("maintenance should succeed"); - assert_eq!(cache.block_cache_size(), 0); } - - #[tokio::test] - async fn test_partial_columns_missing_components() { - if !is_gloas_enabled() { - return; - } - - type T = DiskHarnessType; - let (harness, cache, _path) = setup_harness_and_cache::().await; - - let mut rng = StdRng::seed_from_u64(0xDEADBEEF); - let spec = harness.spec.clone(); - - let (block, data_columns) = generate_rand_block_and_data_columns::( - ForkName::Gloas, - NumBlobs::Number(1), - &mut rng, - &spec, - ); - let slot = block.slot(); - let block_root = Hash256::random(); - cache.init_pending_block(block_root, Arc::new(block)); - - let executed_envelope = make_test_executed_envelope(block_root); - cache - .put_executed_payload_envelope(executed_envelope) - .expect("should put executed envelope"); - - // Insert only 1 column (need 128 for fullnode) - let columns: DataColumnSidecarList = data_columns.into_iter().take(1).collect(); - - let result = cache - .put_rpc_custody_columns(block_root, slot, columns) - .expect("should put columns"); - - assert!( - matches!(result, Availability::MissingComponents(_)), - "partial columns should not trigger availability" - ); - } } diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs index 027fd06982..779470e3ce 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -9,14 +9,35 @@ use std::collections::HashMap; use std::sync::Arc; use tracing::{Span, debug, debug_span}; use types::DataColumnSidecar; -use types::{ColumnIndex, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + AbstractExecPayload, BeaconStateError, ColumnIndex, Epoch, EthSpec, Hash256, KzgCommitments, + SignedBeaconBlock, Slot, +}; + +#[derive(Clone)] +pub struct PendingPayloadBid { + pub slot: Slot, + pub blob_kzg_commitments: KzgCommitments, +} + +impl PendingPayloadBid { + pub fn from_block>( + block: &SignedBeaconBlock, + ) -> Result { + let signed_bid = block.message().body().signed_execution_payload_bid()?; + Ok(Self { + slot: block.slot(), + blob_kzg_commitments: signed_bid.message.blob_kzg_commitments.clone(), + }) + } +} /// This represents the components of a payload pending data availability. /// /// The columns are all gossip and kzg verified. /// The payload is considered "available" when all required columns are received. pub struct PendingComponents { - pub block: Arc>, + pub bid: PendingPayloadBid, /// a cached post executed payload envelope pub envelope: Option>, pub verified_data_columns: HashMap>, @@ -26,7 +47,7 @@ pub struct PendingComponents { impl PendingComponents { pub fn num_blobs_expected(&self) -> usize { - self.block.num_expected_blobs() + self.bid.blob_kzg_commitments.len() } /// Returns the completed custody columns @@ -36,7 +57,7 @@ impl PendingComponents { .filter_map(|(col_idx, col)| { col.try_to_sidecar( *col_idx, - self.block.slot(), + self.bid.slot, block_root, self.num_blobs_expected(), ) @@ -138,7 +159,7 @@ impl PendingComponents { .filter_map(|(col_idx, col)| { col.try_to_sidecar( *col_idx, - self.block.slot(), + self.bid.slot, block_hash, self.num_blobs_expected(), ) @@ -167,11 +188,11 @@ impl PendingComponents { } /// Returns an empty `PendingComponents` object with the given block root. - pub fn empty(block_root: Hash256, block: Arc>) -> Self { + pub fn empty(block_root: Hash256, bid: PendingPayloadBid) -> Self { let span = debug_span!(parent: None, "lh_pending_components", %block_root); let _guard = span.clone().entered(); Self { - block, + bid, envelope: None, verified_data_columns: HashMap::new(), reconstruction_started: false, @@ -181,7 +202,7 @@ impl PendingComponents { /// Returns the epoch of the bid or first data column, if available. pub fn epoch(&self) -> Epoch { - self.block.slot().epoch(E::slots_per_epoch()) + self.bid.slot.epoch(E::slots_per_epoch()) } pub fn status_str(&self, num_expected_columns: usize) -> String { @@ -202,62 +223,3 @@ pub(crate) enum ReconstructColumnsDecision { Yes(Vec>>), No(&'static str), } - -/* -#[cfg(test)] -mod pending_components_tests { - use crate::test_utils::test_spec; - - use super::*; - use types::MinimalEthSpec; - - type E = MinimalEthSpec; - - #[test] - fn test_get_cached_data_columns_indices_empty() { - let spec = Arc::new(test_spec::()); - let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root, spec); - - let indices = components.get_cached_data_columns_indices(); - assert!(indices.is_empty()); - } - - #[test] - fn test_status_str_no_bid() { - let spec = Arc::new(test_spec::()); - let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root, spec); - - let status = components.status_str(10); - assert_eq!(status, "data_columns 0/10"); - } - - #[test] - fn test_num_blobs_expected_no_bid() { - let spec = Arc::new(test_spec::()); - let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root, spec); - - let result = components.num_blobs_expected(); - assert!(result.is_err()); - // Error should be AvailabilityCheckError::Unexpected - assert!(matches!( - result.unwrap_err(), - AvailabilityCheckError::Unexpected(_) - )); - } - - #[test] - fn test_make_available_no_bid_returns_none() { - let spec = Arc::new(test_spec::()); - let block_root = Hash256::random(); - let components = PendingComponents::::empty(block_root, spec); - - // Without a bid, make_available should return Ok(None) - let result = components.make_available(10); - assert!(result.is_ok()); - assert!(result.unwrap().is_none()); - } -} -*/ diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5e291bd833..6bc3c03b8b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -698,15 +698,6 @@ impl NetworkBeaconProcessor { } Err(err) => { match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %slot, - %block_root, - %index, - "Invalid gossip data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %slot, @@ -732,6 +723,25 @@ impl NetworkBeaconProcessor { column_sidecar, )); } + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "requesting block", + %unknown_block_root, + "Unknown block root for column" + ); + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + unknown_block_root, + )); + self.propagate_validation_result( + message_id, + peer_id, + MessageAcceptance::Ignore, + ); + } GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( @@ -739,10 +749,12 @@ impl NetworkBeaconProcessor { "Internal error when verifying column sidecar" ) } - GossipDataColumnError::ProposalSignatureInvalid + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } @@ -904,14 +916,6 @@ impl NetworkBeaconProcessor { ) { match err { GossipPartialDataColumnError::GossipDataColumnError(err) => match err { - GossipDataColumnError::InvalidVariant => { - // TODO(gloas) we should probably penalize the peer here - debug!( - %block_root, - %index, - "Invalid gossip partial data column variant." - ) - } GossipDataColumnError::PriorKnownUnpublished => { debug!( %block_root, @@ -933,6 +937,20 @@ impl NetworkBeaconProcessor { slot, }); } + GossipDataColumnError::BlockRootUnknown { + block_root: unknown_block_root, + .. + } => { + debug!( + action = "requesting block", + %unknown_block_root, + "Unknown block root for partial column" + ); + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + unknown_block_root, + )); + } GossipDataColumnError::PubkeyCacheTimeout | GossipDataColumnError::BeaconChainError(_) => { crit!( @@ -940,10 +958,12 @@ impl NetworkBeaconProcessor { "Internal error when verifying partial column sidecar" ) } - GossipDataColumnError::ProposalSignatureInvalid + GossipDataColumnError::InvalidVariant + | GossipDataColumnError::ProposalSignatureInvalid | GossipDataColumnError::UnknownValidator(_) | GossipDataColumnError::ProposerIndexMismatch { .. } | GossipDataColumnError::IsNotLaterThanParent { .. } + | GossipDataColumnError::BlockSlotMismatch { .. } | GossipDataColumnError::InvalidSubnetId { .. } | GossipDataColumnError::InvalidInclusionProof | GossipDataColumnError::InvalidKzgProof { .. } diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index c4e7f8f8d1..21745e12db 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -10,7 +10,7 @@ use crate::{ }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; -use beacon_chain::data_column_verification::validate_data_column_sidecar_for_gossip_fulu; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::kzg_utils::blobs_to_data_column_sidecars; use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::test_utils::{ @@ -1185,12 +1185,8 @@ async fn accept_processed_gossip_data_columns_without_import() { .map(|data_column| { let subnet_id = DataColumnSubnetId::from_column_index(*data_column.index(), &rig.chain.spec); - validate_data_column_sidecar_for_gossip_fulu::<_, DoNotObserve>( - data_column, - subnet_id, - &rig.chain, - ) - .expect("should be valid data column") + GossipVerifiedDataColumn::<_, DoNotObserve>::new(data_column, subnet_id, &rig.chain) + .expect("should be valid data column") }) .collect(); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index fb31e92262..1b45ea7052 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -906,9 +906,9 @@ impl SyncManager { }), ); } - // TODO(gloas) support gloas data column variant DataColumnSidecar::Gloas(_) => { - error!("Gloas variant not yet supported") + debug!(%block_root, "Received unknown block data column message"); + self.handle_unknown_block_root(peer_id, block_root); } } }