From 5472cb85008b256512810b2163043e4276fc4486 Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:35:14 +0200 Subject: [PATCH] Batch verify KZG proofs for getBlobsV2 (#7582) --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- .../src/data_availability_checker.rs | 16 +++- .../src/data_column_verification.rs | 11 +++ .../fetch_blobs/fetch_blobs_beacon_adapter.rs | 34 ++++++-- .../beacon_chain/src/fetch_blobs/mod.rs | 80 +++++++++---------- .../beacon_chain/src/fetch_blobs/tests.rs | 24 +++--- .../src/observed_data_sidecars.rs | 4 + .../src/network_beacon_processor/mod.rs | 2 +- 8 files changed, 107 insertions(+), 66 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 50efb367a8..ef741f7b5b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3699,7 +3699,7 @@ impl BeaconChain { data_columns.iter().map(|c| c.as_data_column()), )?; self.data_availability_checker - .put_gossip_verified_data_columns(block_root, data_columns)? + .put_kzg_verified_custody_data_columns(block_root, data_columns)? } }; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 91ff5fb644..1bc95c22ac 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -234,8 +234,9 @@ impl DataAvailabilityChecker { custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { // Attributes fault to the specific peer that sent an invalid column - let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg) - .map_err(AvailabilityCheckError::InvalidColumn)?; + let kzg_verified_columns = + KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; let verified_custody_columns = kzg_verified_columns .into_iter() @@ -285,6 +286,17 @@ impl DataAvailabilityChecker { .put_kzg_verified_data_columns(block_root, custody_columns) } + pub fn put_kzg_verified_custody_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + custody_columns: I, + ) -> Result, AvailabilityCheckError> { + self.availability_cache + .put_kzg_verified_data_columns(block_root, custody_columns) + } + /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_pending_executed_block( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 609e5bd796..3009522bf6 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -274,6 +274,17 @@ impl KzgVerifiedDataColumn { pub fn from_batch( data_columns: Vec>>, kzg: &Kzg, + ) -> Result, KzgError> { + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; + Ok(data_columns + .into_iter() + .map(|column| Self { data: column }) + .collect()) + } + + pub fn from_batch_with_scoring( + data_columns: Vec>>, + kzg: &Kzg, ) -> Result, Vec<(ColumnIndex, KzgError)>> { verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; Ok(data_columns diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index 2959f47376..4a7a5aeea2 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -1,15 +1,17 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use crate::data_column_verification::KzgVerifiedDataColumn; use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; +use crate::observed_block_producers::ProposalKey; use crate::observed_data_sidecars::DoNotObserve; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; -use kzg::Kzg; +use kzg::{Error as KzgError, Kzg}; #[cfg(test)] use mockall::automock; +use std::collections::HashSet; use std::sync::Arc; use task_executor::TaskExecutor; -use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Hash256, Slot}; +use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Hash256, Slot}; /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. pub(crate) struct FetchBlobsBeaconAdapter { @@ -75,12 +77,28 @@ impl FetchBlobsBeaconAdapter { GossipVerifiedBlob::::new(blob.clone(), blob.index, &self.chain) } - pub(crate) fn verify_data_column_for_gossip( + pub(crate) fn verify_data_columns_kzg( &self, - data_column: Arc>, - ) -> Result, GossipDataColumnError> { - let index = data_column.index; - GossipVerifiedDataColumn::::new(data_column, index, &self.chain) + data_columns: Vec>>, + ) -> Result>, KzgError> { + KzgVerifiedDataColumn::from_batch(data_columns, &self.chain.kzg) + } + + pub(crate) fn known_for_proposal( + &self, + proposal_key: ProposalKey, + ) -> Option> { + self.chain + .observed_column_sidecars + .read() + .known_for_proposal(&proposal_key) + .cloned() + } + + pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option> { + self.chain + .data_availability_checker + .cached_data_column_indexes(block_root) } pub(crate) async fn process_engine_blobs( diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 74dc680a9a..e02405ddba 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -13,10 +13,12 @@ mod fetch_blobs_beacon_adapter; mod tests; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use crate::block_verification_types::AsBlock; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; #[cfg_attr(test, double)] use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::kzg_utils::blobs_to_data_column_sidecars; +use crate::observed_block_producers::ProposalKey; use crate::observed_data_sidecars::DoNotObserve; use crate::{ metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, @@ -46,7 +48,7 @@ use types::{ pub enum EngineGetBlobsOutput { Blobs(Vec>), /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. - CustodyColumns(Vec>), + CustodyColumns(Vec>), } #[derive(Debug)] @@ -59,7 +61,7 @@ pub enum FetchEngineBlobError { ExecutionLayerMissing, InternalError(String), GossipBlob(GossipBlobError), - GossipDataColumn(GossipDataColumnError), + KzgError(kzg::Error), RequestFailed(ExecutionLayerError), RuntimeShutdown, TokioJoin(tokio::task::JoinError), @@ -293,6 +295,7 @@ async fn fetch_and_process_blobs_v2( let chain_adapter = Arc::new(chain_adapter); let custody_columns_to_import = compute_custody_columns_to_import( &chain_adapter, + block_root, block.clone(), blobs, proofs, @@ -326,11 +329,12 @@ async fn fetch_and_process_blobs_v2( /// Offload the data column computation to a blocking task to avoid holding up the async runtime. async fn compute_custody_columns_to_import( chain_adapter: &Arc>, + block_root: Hash256, block: Arc>>, blobs: Vec>, proofs: Vec>, custody_columns_indices: HashSet, -) -> Result>, FetchEngineBlobError> { +) -> Result>, FetchEngineBlobError> { let kzg = chain_adapter.kzg().clone(); let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); @@ -353,57 +357,47 @@ async fn compute_custody_columns_to_import( // This filtering ensures we only import and publish the custody columns. // `DataAvailabilityChecker` requires a strict match on custody columns count to // consider a block available. - let custody_columns = data_columns_result + let mut custody_columns = data_columns_result .map(|mut data_columns| { data_columns.retain(|col| custody_columns_indices.contains(&col.index)); data_columns }) .map_err(FetchEngineBlobError::DataColumnSidecarError)?; - // Gossip verify data columns before publishing. This prevents blobs with invalid + // Only consider columns that are not already observed on gossip. + if let Some(observed_columns) = chain_adapter_cloned.known_for_proposal( + ProposalKey::new(block.message().proposer_index(), block.slot()), + ) { + custody_columns.retain(|col| !observed_columns.contains(&col.index)); + if custody_columns.is_empty() { + return Ok(vec![]); + } + } + + // Only consider columns that are not already known to data availability. + if let Some(known_columns) = + chain_adapter_cloned.cached_data_column_indexes(&block_root) + { + custody_columns.retain(|col| !known_columns.contains(&col.index)); + if custody_columns.is_empty() { + return Ok(vec![]); + } + } + + // KZG verify data columns before publishing. This prevents blobs with invalid // KZG proofs from the EL making it into the data availability checker. We do not // immediately add these blobs to the observed blobs/columns cache because we want // to allow blobs/columns to arrive on gossip and be accepted (and propagated) while // we are waiting to publish. Just before publishing we will observe the blobs/columns // and only proceed with publishing if they are not yet seen. - // TODO(das): we may want to just perform kzg proof verification here, since the - // `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary - // to verify them. - let columns_to_import_and_publish = custody_columns - .into_iter() - .filter_map(|col| { - match chain_adapter_cloned.verify_data_column_for_gossip(col) { - Ok(verified) => Some(Ok(verified)), - Err(e) => match e { - // Ignore already seen data columns - GossipDataColumnError::PriorKnown { .. } - | GossipDataColumnError::PriorKnownUnpublished => None, - GossipDataColumnError::BeaconChainError(_) - | GossipDataColumnError::ProposalSignatureInvalid - | GossipDataColumnError::UnknownValidator(_) - | GossipDataColumnError::IsNotLaterThanParent { .. } - | GossipDataColumnError::InvalidKzgProof(_) - | GossipDataColumnError::InvalidSubnetId { .. } - | GossipDataColumnError::FutureSlot { .. } - | GossipDataColumnError::PastFinalizedSlot { .. } - | GossipDataColumnError::PubkeyCacheTimeout - | GossipDataColumnError::ProposerIndexMismatch { .. } - | GossipDataColumnError::ParentUnknown { .. } - | GossipDataColumnError::NotFinalizedDescendant { .. } - | GossipDataColumnError::InvalidInclusionProof - | GossipDataColumnError::InvalidColumnIndex(_) - | GossipDataColumnError::UnexpectedDataColumn - | GossipDataColumnError::InconsistentCommitmentsLength { .. } - | GossipDataColumnError::InconsistentProofsLength { .. } => { - Some(Err(e)) - } - }, - } - }) - .collect::, _>>() - .map_err(FetchEngineBlobError::GossipDataColumn)?; + let verified = chain_adapter_cloned + .verify_data_columns_kzg(custody_columns) + .map_err(FetchEngineBlobError::KzgError)?; - Ok(columns_to_import_and_publish) + Ok(verified + .into_iter() + .map(KzgVerifiedCustodyDataColumn::from_asserted_custody) + .collect()) }, "compute_custody_columns_to_import", ) diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 4556948ffc..3178020c75 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -1,4 +1,4 @@ -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use crate::data_column_verification::KzgVerifiedDataColumn; use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter; use crate::fetch_blobs::{ fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError, @@ -156,14 +156,8 @@ mod get_blobs_v2 { mock_fork_choice_contains_block(&mut mock_adapter, vec![]); // All data columns already seen on gossip mock_adapter - .expect_verify_data_column_for_gossip() - .returning(|c| { - Err(GossipDataColumnError::PriorKnown { - proposer: c.block_proposer_index(), - slot: c.slot(), - index: c.index, - }) - }); + .expect_known_for_proposal() + .returning(|_| Some(hashset![0, 1, 2])); // No blobs should be processed mock_adapter.expect_process_engine_blobs().times(0); @@ -198,9 +192,17 @@ mod get_blobs_v2 { // All blobs returned, fork choice doesn't contain block mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs)); mock_fork_choice_contains_block(&mut mock_adapter, vec![]); + mock_adapter.expect_known_for_proposal().returning(|_| None); mock_adapter - .expect_verify_data_column_for_gossip() - .returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c))); + .expect_cached_data_column_indexes() + .returning(|_| None); + mock_adapter + .expect_verify_data_columns_kzg() + .returning(|c| { + Ok(c.into_iter() + .map(KzgVerifiedDataColumn::__new_for_testing) + .collect()) + }); mock_process_engine_blobs_result( &mut mock_adapter, Ok(AvailabilityProcessingStatus::Imported(block_root)), diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs index d3bda09712..a2141a1697 100644 --- a/beacon_node/beacon_chain/src/observed_data_sidecars.rs +++ b/beacon_node/beacon_chain/src/observed_data_sidecars.rs @@ -124,6 +124,10 @@ impl ObservedDataSidecars { Ok(is_known) } + pub fn known_for_proposal(&self, proposal_key: &ProposalKey) -> Option<&HashSet> { + self.items.get(proposal_key) + } + fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> { if data_sidecar.index() >= T::max_num_of_items(&self.spec, data_sidecar.slot()) as u64 { return Err(Error::InvalidDataIndex(data_sidecar.index())); diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index df9b656051..0b89058ba9 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -853,7 +853,7 @@ impl NetworkBeaconProcessor { } EngineGetBlobsOutput::CustodyColumns(columns) => { self_cloned.publish_data_columns_gradually( - columns.into_iter().map(|c| c.clone_data_column()).collect(), + columns.into_iter().map(|c| c.clone_arc()).collect(), block_root, ); }