Remove KZG verification on blobs fetched from the EL (#7771)

Continuation of #7713, addresses comment about skipping KZG verification on EL fetched blobs:

https://github.com/sigp/lighthouse/pull/7713#discussion_r2198542501
This commit is contained in:
Jimmy Chen
2025-07-25 16:49:50 +10:00
committed by GitHub
parent 6a52454647
commit 2aae08a8aa
7 changed files with 119 additions and 99 deletions

View File

@@ -3654,7 +3654,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
EngineGetBlobsOutput::Blobs(blobs) => { EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?;
self.data_availability_checker self.data_availability_checker
.put_gossip_verified_blobs(block_root, blobs)? .put_kzg_verified_blobs(block_root, blobs)?
} }
EngineGetBlobsOutput::CustodyColumns(data_columns) => { EngineGetBlobsOutput::CustodyColumns(data_columns) => {
self.check_columns_for_slashability( self.check_columns_for_slashability(

View File

@@ -9,7 +9,7 @@ use crate::block_verification::{
BlockSlashInfo, BlockSlashInfo,
}; };
use crate::kzg_utils::{validate_blob, validate_blobs}; use crate::kzg_utils::{validate_blob, validate_blobs};
use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe}; use crate::observed_data_sidecars::{ObservationStrategy, Observe};
use crate::{metrics, BeaconChainError}; use crate::{metrics, BeaconChainError};
use kzg::{Error as KzgError, Kzg, KzgCommitment}; use kzg::{Error as KzgError, Kzg, KzgCommitment};
use ssz_derive::{Decode, Encode}; use ssz_derive::{Decode, Encode};
@@ -304,6 +304,14 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
seen_timestamp: Duration::from_secs(0), seen_timestamp: Duration::from_secs(0),
} }
} }
/// Mark a blob as KZG verified. Caller must ONLY use this on blob sidecars constructed
/// from EL blobs.
pub fn from_execution_verified(blob: Arc<BlobSidecar<E>>, seen_timestamp: Duration) -> Self {
Self {
blob,
seen_timestamp,
}
}
} }
/// Complete kzg verification for a `BlobSidecar`. /// Complete kzg verification for a `BlobSidecar`.
@@ -594,21 +602,7 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrat
}) })
} }
impl<T: BeaconChainTypes> GossipVerifiedBlob<T, DoNotObserve> { pub fn observe_gossip_blob<T: BeaconChainTypes>(
pub fn observe(
self,
chain: &BeaconChain<T>,
) -> Result<GossipVerifiedBlob<T, Observe>, GossipBlobError> {
observe_gossip_blob(&self.blob.blob, chain)?;
Ok(GossipVerifiedBlob {
block_root: self.block_root,
blob: self.blob,
_phantom: PhantomData,
})
}
}
fn observe_gossip_blob<T: BeaconChainTypes>(
blob_sidecar: &BlobSidecar<T::EthSpec>, blob_sidecar: &BlobSidecar<T::EthSpec>,
chain: &BeaconChain<T>, chain: &BeaconChain<T>,
) -> Result<(), GossipBlobError> { ) -> Result<(), GossipBlobError> {

View File

@@ -1,4 +1,6 @@
use crate::blob_verification::{verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlobList}; use crate::blob_verification::{
verify_kzg_for_blob_list, GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList,
};
use crate::block_verification_types::{ use crate::block_verification_types::{
AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock,
}; };
@@ -264,6 +266,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner())) .put_kzg_verified_blobs(block_root, blobs.into_iter().map(|b| b.into_inner()))
} }
pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
&self,
block_root: Hash256,
blobs: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_kzg_verified_blobs(block_root, blobs)
}
/// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also
/// have a block cached, return the `Availability` variant triggering block import. /// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the data column sidecar. /// Otherwise cache the data column sidecar.

View File

@@ -1,7 +1,5 @@
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
use crate::observed_block_producers::ProposalKey; use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use kzg::Kzg; use kzg::Kzg;
@@ -10,7 +8,7 @@ use mockall::automock;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use task_executor::TaskExecutor; use task_executor::TaskExecutor;
use types::{BlobSidecar, ChainSpec, ColumnIndex, Hash256, Slot}; use types::{ChainSpec, ColumnIndex, Hash256, Slot};
/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic. /// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> { pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
@@ -69,11 +67,17 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.map_err(FetchEngineBlobError::RequestFailed) .map_err(FetchEngineBlobError::RequestFailed)
} }
pub(crate) fn verify_blob_for_gossip( pub(crate) fn blobs_known_for_proposal(
&self, &self,
blob: &Arc<BlobSidecar<T::EthSpec>>, proposer: u64,
) -> Result<GossipVerifiedBlob<T, DoNotObserve>, GossipBlobError> { slot: Slot,
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain) ) -> Option<HashSet<u64>> {
let proposer_key = ProposalKey::new(proposer, slot);
self.chain
.observed_blob_sidecars
.read()
.known_for_proposal(&proposer_key)
.cloned()
} }
pub(crate) fn data_column_known_for_proposal( pub(crate) fn data_column_known_for_proposal(
@@ -87,6 +91,12 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
.cloned() .cloned()
} }
pub(crate) fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
.cached_blob_indexes(block_root)
}
pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> { pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain self.chain
.data_availability_checker .data_availability_checker

View File

@@ -12,14 +12,14 @@ mod fetch_blobs_beacon_adapter;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::blob_verification::{GossipBlobError, KzgVerifiedBlob};
use crate::block_verification_types::AsBlock; use crate::block_verification_types::AsBlock;
use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn};
#[cfg_attr(test, double)] #[cfg_attr(test, double)]
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter; use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::ProposalKey; use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve; use crate::validator_monitor::timestamp_now;
use crate::{ use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
BlockError, BlockError,
@@ -34,11 +34,11 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, warn}; use tracing::{debug, warn};
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::blob_sidecar::BlobSidecarError;
use types::data_column_sidecar::DataColumnSidecarError; use types::data_column_sidecar::DataColumnSidecarError;
use types::{ use types::{
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, EthSpec, FullPayload, Hash256, BeaconStateError, Blob, BlobSidecar, ColumnIndex, EthSpec, FullPayload, Hash256, KzgProofs,
KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
}; };
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the
@@ -46,7 +46,7 @@ use types::{
/// be published immediately. /// be published immediately.
#[derive(Debug)] #[derive(Debug)]
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> { pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>), Blobs(Vec<KzgVerifiedBlob<T::EthSpec>>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`. /// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>), CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
} }
@@ -186,46 +186,47 @@ async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
.signed_block_header_and_kzg_commitments_proof() .signed_block_header_and_kzg_commitments_proof()
.map_err(FetchEngineBlobError::BeaconStateError)?; .map_err(FetchEngineBlobError::BeaconStateError)?;
let fixed_blob_sidecar_list = build_blob_sidecars( let mut blob_sidecar_list = build_blob_sidecars(
&block, &block,
response, response,
signed_block_header, signed_block_header,
&kzg_commitments_proof, &kzg_commitments_proof,
chain_adapter.spec(),
)?; )?;
// Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from if let Some(observed_blobs) =
// the EL making it into the data availability checker. We do not immediately add these chain_adapter.blobs_known_for_proposal(block.message().proposer_index(), block.slot())
// blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip {
// and be accepted (and propagated) while we are waiting to publish. Just before publishing blob_sidecar_list.retain(|blob| !observed_blobs.contains(&blob.blob_index()));
// we will observe the blobs/columns and only proceed with publishing if they are not yet seen. if blob_sidecar_list.is_empty() {
let blobs_to_import_and_publish = fixed_blob_sidecar_list debug!(
.into_iter() info = "blobs have already been seen on gossip",
.filter_map(|opt_blob| { "Ignoring EL blobs response"
let blob = opt_blob.as_ref()?; );
match chain_adapter.verify_blob_for_gossip(blob) {
Ok(verified) => Some(Ok(verified)),
// Ignore already seen blobs.
Err(GossipBlobError::RepeatBlob { .. }) => None,
Err(e) => Some(Err(e)),
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipBlob)?;
if blobs_to_import_and_publish.is_empty() {
return Ok(None); return Ok(None);
} }
}
publish_fn(EngineGetBlobsOutput::Blobs( if let Some(known_blobs) = chain_adapter.cached_blob_indexes(&block_root) {
blobs_to_import_and_publish.clone(), blob_sidecar_list.retain(|blob| !known_blobs.contains(&blob.blob_index()));
)); if blob_sidecar_list.is_empty() {
debug!(
info = "blobs have already been imported into data availability checker",
"Ignoring EL blobs response"
);
return Ok(None);
}
}
// Up until this point we have not observed the blobs in the gossip cache, which allows them to
// arrive independently while this function is running. In `publish_fn` we will observe them
// and then publish any blobs that had not already been observed.
publish_fn(EngineGetBlobsOutput::Blobs(blob_sidecar_list.clone()));
let availability_processing_status = chain_adapter let availability_processing_status = chain_adapter
.process_engine_blobs( .process_engine_blobs(
block.slot(), block.slot(),
block_root, block_root,
EngineGetBlobsOutput::Blobs(blobs_to_import_and_publish), EngineGetBlobsOutput::Blobs(blob_sidecar_list),
) )
.await?; .await?;
@@ -408,37 +409,28 @@ fn build_blob_sidecars<E: EthSpec>(
response: Vec<Option<BlobAndProofV1<E>>>, response: Vec<Option<BlobAndProofV1<E>>>,
signed_block_header: SignedBeaconBlockHeader, signed_block_header: SignedBeaconBlockHeader,
kzg_commitments_inclusion_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>, kzg_commitments_inclusion_proof: &FixedVector<Hash256, E::KzgCommitmentsInclusionProofDepth>,
spec: &ChainSpec, ) -> Result<Vec<KzgVerifiedBlob<E>>, FetchEngineBlobError> {
) -> Result<FixedBlobSidecarList<E>, FetchEngineBlobError> { let mut sidecars = vec![];
let epoch = block.epoch();
let mut fixed_blob_sidecar_list =
FixedBlobSidecarList::default(spec.max_blobs_per_block(epoch) as usize);
for (index, blob_and_proof) in response for (index, blob_and_proof) in response
.into_iter() .into_iter()
.enumerate() .enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?))) .filter_map(|(index, opt_blob)| Some((index, opt_blob?)))
{ {
match BlobSidecar::new_with_existing_proof( let blob_sidecar = BlobSidecar::new_with_existing_proof(
index, index,
blob_and_proof.blob, blob_and_proof.blob,
block, block,
signed_block_header.clone(), signed_block_header.clone(),
kzg_commitments_inclusion_proof, kzg_commitments_inclusion_proof,
blob_and_proof.proof, blob_and_proof.proof,
) { )
Ok(blob) => { .map_err(FetchEngineBlobError::BlobSidecarError)?;
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) {
*blob_mut = Some(Arc::new(blob)); sidecars.push(KzgVerifiedBlob::from_execution_verified(
} else { Arc::new(blob_sidecar),
return Err(FetchEngineBlobError::InternalError(format!( timestamp_now(),
"Blobs from EL contains blob with invalid index {index}" ));
)));
} }
}
Err(e) => { Ok(sidecars)
return Err(FetchEngineBlobError::BlobSidecarError(e));
}
}
}
Ok(fixed_blob_sidecar_list)
} }

View File

@@ -250,8 +250,8 @@ mod get_blobs_v2 {
mod get_blobs_v1 { mod get_blobs_v1 {
use super::*; use super::*;
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_verification_types::AsBlock; use crate::block_verification_types::AsBlock;
use std::collections::HashSet;
const ELECTRA_FORK: ForkName = ForkName::Electra; const ELECTRA_FORK: ForkName = ForkName::Electra;
@@ -325,10 +325,13 @@ mod get_blobs_v1 {
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// AND block is not imported into fork choice // AND block is not imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]); mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// AND all blobs returned are valid // AND all blobs have not yet been seen
mock_adapter mock_adapter
.expect_verify_blob_for_gossip() .expect_cached_blob_indexes()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); .returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
// Returned blobs should be processed // Returned blobs should be processed
mock_process_engine_blobs_result( mock_process_engine_blobs_result(
&mut mock_adapter, &mut mock_adapter,
@@ -408,17 +411,22 @@ mod get_blobs_v1 {
// **GIVEN**: // **GIVEN**:
// All blobs returned // All blobs returned
let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>(); let blob_and_proof_opts = blobs_and_proofs.into_iter().map(Some).collect::<Vec<_>>();
let all_blob_indices = blob_and_proof_opts
.iter()
.enumerate()
.map(|(i, _)| i as u64)
.collect::<HashSet<_>>();
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
// block not yet imported into fork choice // block not yet imported into fork choice
mock_fork_choice_contains_block(&mut mock_adapter, vec![]); mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All blobs already seen on gossip // All blobs already seen on gossip
mock_adapter.expect_verify_blob_for_gossip().returning(|b| { mock_adapter
Err(GossipBlobError::RepeatBlob { .expect_cached_blob_indexes()
proposer: b.block_proposer_index(), .returning(|_| None);
slot: b.slot(), mock_adapter
index: b.index, .expect_blobs_known_for_proposal()
}) .returning(move |_, _| Some(all_blob_indices.clone()));
});
// **WHEN**: Trigger `fetch_blobs` on the block // **WHEN**: Trigger `fetch_blobs` on the block
let custody_columns = hashset![0, 1, 2]; let custody_columns = hashset![0, 1, 2];
@@ -454,8 +462,11 @@ mod get_blobs_v1 {
mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts); mock_get_blobs_v1_response(&mut mock_adapter, blob_and_proof_opts);
mock_fork_choice_contains_block(&mut mock_adapter, vec![]); mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter mock_adapter
.expect_verify_blob_for_gossip() .expect_cached_blob_indexes()
.returning(|b| Ok(GossipVerifiedBlob::__assumed_valid(b.clone()))); .returning(|_| None);
mock_adapter
.expect_blobs_known_for_proposal()
.returning(|_, _| None);
mock_process_engine_blobs_result( mock_process_engine_blobs_result(
&mut mock_adapter, &mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)), Ok(AvailabilityProcessingStatus::Imported(block_root)),

View File

@@ -1,12 +1,11 @@
use crate::sync::manager::BlockProcessType; use crate::sync::manager::BlockProcessType;
use crate::{service::NetworkMessage, sync::manager::SyncMessage}; use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::blob_verification::{observe_gossip_blob, GossipBlobError};
use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
use beacon_chain::fetch_blobs::{ use beacon_chain::fetch_blobs::{
fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError, fetch_and_process_engine_blobs, EngineGetBlobsOutput, FetchEngineBlobError,
}; };
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::{ use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, NotifyExecutionLayer,
}; };
@@ -760,7 +759,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
if publish_blobs { if publish_blobs {
match blobs_or_data_column { match blobs_or_data_column {
EngineGetBlobsOutput::Blobs(blobs) => { EngineGetBlobsOutput::Blobs(blobs) => {
self_cloned.publish_blobs_gradually(blobs, block_root); self_cloned.publish_blobs_gradually(
blobs.into_iter().map(|b| b.to_blob()).collect(),
block_root,
);
} }
EngineGetBlobsOutput::CustodyColumns(columns) => { EngineGetBlobsOutput::CustodyColumns(columns) => {
self_cloned.publish_data_columns_gradually( self_cloned.publish_data_columns_gradually(
@@ -903,7 +905,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
/// publisher exists for a blob, it will eventually get published here. /// publisher exists for a blob, it will eventually get published here.
fn publish_blobs_gradually( fn publish_blobs_gradually(
self: &Arc<Self>, self: &Arc<Self>,
mut blobs: Vec<GossipVerifiedBlob<T, DoNotObserve>>, mut blobs: Vec<Arc<BlobSidecar<T::EthSpec>>>,
block_root: Hash256, block_root: Hash256,
) { ) {
let self_clone = self.clone(); let self_clone = self.clone();
@@ -934,8 +936,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
while blobs_iter.peek().is_some() { while blobs_iter.peek().is_some() {
let batch = blobs_iter.by_ref().take(batch_size); let batch = blobs_iter.by_ref().take(batch_size);
let publishable = batch let publishable = batch
.filter_map(|unobserved| match unobserved.observe(&chain) { .filter_map(|blob| match observe_gossip_blob(&blob, &chain) {
Ok(observed) => Some(observed.clone_blob()), Ok(()) => Some(blob),
Err(GossipBlobError::RepeatBlob { .. }) => None, Err(GossipBlobError::RepeatBlob { .. }) => None,
Err(e) => { Err(e) => {
warn!( warn!(