Batch verify KZG proofs for getBlobsV2 (#7582)

This commit is contained in:
Daniel Knopik
2025-06-12 16:35:14 +02:00
committed by GitHub
parent 9803d69d80
commit 5472cb8500
8 changed files with 107 additions and 66 deletions

View File

@@ -1,15 +1,17 @@
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::KzgVerifiedDataColumn;
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use kzg::Kzg;
use kzg::{Error as KzgError, Kzg};
#[cfg(test)]
use mockall::automock;
use std::collections::HashSet;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Hash256, Slot};
use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Hash256, Slot};
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
@@ -75,12 +77,28 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
}
pub(crate) fn verify_data_column_for_gossip(
pub(crate) fn verify_data_columns_kzg(
&self,
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
) -> Result<GossipVerifiedDataColumn<T, DoNotObserve>, GossipDataColumnError> {
let index = data_column.index;
GossipVerifiedDataColumn::<T, DoNotObserve>::new(data_column, index, &self.chain)
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Result<Vec<KzgVerifiedDataColumn<T::EthSpec>>, KzgError> {
KzgVerifiedDataColumn::from_batch(data_columns, &self.chain.kzg)
}
pub(crate) fn known_for_proposal(
&self,
proposal_key: ProposalKey,
) -> Option<HashSet<ColumnIndex>> {
self.chain
.observed_column_sidecars
.read()
.known_for_proposal(&proposal_key)
.cloned()
}
pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
.cached_data_column_indexes(block_root)
}
pub(crate) async fn process_engine_blobs(

View File

@@ -13,10 +13,12 @@ mod fetch_blobs_beacon_adapter;
mod tests;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::block_verification_types::AsBlock;
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
#[cfg_attr(test, double)]
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
@@ -46,7 +48,7 @@ use types::{
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
CustodyColumns(Vec<GossipVerifiedDataColumn<T, DoNotObserve>>),
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
}
#[derive(Debug)]
@@ -59,7 +61,7 @@ pub enum FetchEngineBlobError {
ExecutionLayerMissing,
InternalError(String),
GossipBlob(GossipBlobError),
GossipDataColumn(GossipDataColumnError),
KzgError(kzg::Error),
RequestFailed(ExecutionLayerError),
RuntimeShutdown,
TokioJoin(tokio::task::JoinError),
@@ -293,6 +295,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
let chain_adapter = Arc::new(chain_adapter);
let custody_columns_to_import = compute_custody_columns_to_import(
&chain_adapter,
block_root,
block.clone(),
blobs,
proofs,
@@ -326,11 +329,12 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
) -> Result<Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>, FetchEngineBlobError> {
let kzg = chain_adapter.kzg().clone();
let spec = chain_adapter.spec().clone();
let chain_adapter_cloned = chain_adapter.clone();
@@ -353,57 +357,47 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
// This filtering ensures we only import and publish the custody columns.
// `DataAvailabilityChecker` requires a strict match on custody columns count to
// consider a block available.
let custody_columns = data_columns_result
let mut custody_columns = data_columns_result
.map(|mut data_columns| {
data_columns.retain(|col| custody_columns_indices.contains(&col.index));
data_columns
})
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
// Gossip verify data columns before publishing. This prevents blobs with invalid
// Only consider columns that are not already observed on gossip.
if let Some(observed_columns) = chain_adapter_cloned.known_for_proposal(
ProposalKey::new(block.message().proposer_index(), block.slot()),
) {
custody_columns.retain(|col| !observed_columns.contains(&col.index));
if custody_columns.is_empty() {
return Ok(vec![]);
}
}
// Only consider columns that are not already known to data availability.
if let Some(known_columns) =
chain_adapter_cloned.cached_data_column_indexes(&block_root)
{
custody_columns.retain(|col| !known_columns.contains(&col.index));
if custody_columns.is_empty() {
return Ok(vec![]);
}
}
// KZG verify data columns before publishing. This prevents blobs with invalid
// KZG proofs from the EL making it into the data availability checker. We do not
// immediately add these blobs to the observed blobs/columns cache because we want
// to allow blobs/columns to arrive on gossip and be accepted (and propagated) while
// we are waiting to publish. Just before publishing we will observe the blobs/columns
// and only proceed with publishing if they are not yet seen.
// TODO(das): we may want to just perform kzg proof verification here, since the
// `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary
// to verify them.
let columns_to_import_and_publish = custody_columns
.into_iter()
.filter_map(|col| {
match chain_adapter_cloned.verify_data_column_for_gossip(col) {
Ok(verified) => Some(Ok(verified)),
Err(e) => match e {
// Ignore already seen data columns
GossipDataColumnError::PriorKnown { .. }
| GossipDataColumnError::PriorKnownUnpublished => None,
GossipDataColumnError::BeaconChainError(_)
| GossipDataColumnError::ProposalSignatureInvalid
| GossipDataColumnError::UnknownValidator(_)
| GossipDataColumnError::IsNotLaterThanParent { .. }
| GossipDataColumnError::InvalidKzgProof(_)
| GossipDataColumnError::InvalidSubnetId { .. }
| GossipDataColumnError::FutureSlot { .. }
| GossipDataColumnError::PastFinalizedSlot { .. }
| GossipDataColumnError::PubkeyCacheTimeout
| GossipDataColumnError::ProposerIndexMismatch { .. }
| GossipDataColumnError::ParentUnknown { .. }
| GossipDataColumnError::NotFinalizedDescendant { .. }
| GossipDataColumnError::InvalidInclusionProof
| GossipDataColumnError::InvalidColumnIndex(_)
| GossipDataColumnError::UnexpectedDataColumn
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
| GossipDataColumnError::InconsistentProofsLength { .. } => {
Some(Err(e))
}
},
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipDataColumn)?;
let verified = chain_adapter_cloned
.verify_data_columns_kzg(custody_columns)
.map_err(FetchEngineBlobError::KzgError)?;
Ok(columns_to_import_and_publish)
Ok(verified
.into_iter()
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
.collect())
},
"compute_custody_columns_to_import",
)

View File

@@ -1,4 +1,4 @@
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::KzgVerifiedDataColumn;
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
use crate::fetch_blobs::{
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
@@ -156,14 +156,8 @@ mod get_blobs_v2 {
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| {
Err(GossipDataColumnError::PriorKnown {
proposer: c.block_proposer_index(),
slot: c.slot(),
index: c.index,
})
});
.expect_known_for_proposal()
.returning(|_| Some(hashset![0, 1, 2]));
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);
@@ -198,9 +192,17 @@ mod get_blobs_v2 {
// All blobs returned, fork choice doesn't contain block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter.expect_known_for_proposal().returning(|_| None);
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
.expect_cached_data_column_indexes()
.returning(|_| None);
mock_adapter
.expect_verify_data_columns_kzg()
.returning(|c| {
Ok(c.into_iter()
.map(KzgVerifiedDataColumn::__new_for_testing)
.collect())
});
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),