Offloading KZG Proof Computation from the beacon node (#7117)

Addresses #7108

- Add EL integration for `getPayloadV5` and `getBlobsV2`
- Offload proof computation and use proofs from EL RPC APIs
This commit is contained in:
Jimmy Chen
2025-04-08 17:37:16 +10:00
committed by GitHub
parent e924264e17
commit 759b0612b3
31 changed files with 721 additions and 476 deletions

View File

@@ -7,34 +7,52 @@
//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count,
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
//! supernodes.
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError};
use execution_layer::json_structures::BlobAndProofV1;
use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
BlockError,
};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use execution_layer::Error as ExecutionLayerError;
use metrics::{inc_counter, TryExt};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::oneshot;
use tracing::{debug, error};
use tracing::debug;
use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarError;
use types::{
BeaconStateError, BlobSidecar, ChainSpec, DataColumnSidecar, DataColumnSidecarList, EthSpec,
FullPayload, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader,
BeaconStateError, Blob, BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecarList, EthSpec,
FullPayload, Hash256, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, VersionedHash,
};
/// Blobs or data column to be published to the gossip network.
pub enum BlobsOrDataColumns<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
DataColumns(DataColumnSidecarList<T::EthSpec>),
}
/// Result from engine get blobs to be passed onto `DataAvailabilityChecker`.
///
/// The blobs are retrieved from a trusted EL and columns are computed locally, therefore they are
/// considered valid without requiring extra validation.
pub enum EngineGetBlobsOutput<E: EthSpec> {
Blobs(FixedBlobSidecarList<E>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
CustodyColumns(DataColumnSidecarList<E>),
}
#[derive(Debug)]
pub enum FetchEngineBlobError {
BeaconStateError(BeaconStateError),
BeaconChainError(BeaconChainError),
BlobProcessingError(BlockError),
BlobSidecarError(BlobSidecarError),
DataColumnSidecarError(DataColumnSidecarError),
ExecutionLayerMissing,
InternalError(String),
GossipBlob(GossipBlobError),
@@ -48,6 +66,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
custody_columns: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let versioned_hashes = if let Some(kzg_commitments) = block
@@ -66,8 +85,34 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
return Ok(None);
};
let num_expected_blobs = versioned_hashes.len();
debug!(
num_expected_blobs = versioned_hashes.len(),
"Fetching blobs from the EL"
);
if chain.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
fetch_and_process_blobs_v2(
chain,
block_root,
block,
versioned_hashes,
custody_columns,
publish_fn,
)
.await
} else {
fetch_and_process_blobs_v1(chain, block_root, block, versioned_hashes, publish_fn).await
}
}
async fn fetch_and_process_blobs_v1<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
versioned_hashes: Vec<VersionedHash>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + Sized,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let num_expected_blobs = versioned_hashes.len();
let execution_layer = chain
.execution_layer
.as_ref()
@@ -76,7 +121,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
debug!(num_expected_blobs, "Fetching blobs from the EL");
let response = execution_layer
.get_blobs(versioned_hashes)
.get_blobs_v1(versioned_hashes)
.await
.inspect_err(|_| {
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
@@ -125,59 +170,9 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipBlob)?;
let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
let data_columns_receiver_opt = if peer_das_enabled {
// Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns.
if num_fetched_blobs != num_expected_blobs {
debug!(
info = "Unable to compute data columns",
num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL"
);
return Ok(None);
}
if chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// Avoid computing columns if block has already been imported.
debug!(
info = "block has already been imported",
"Ignoring EL blobs response"
);
return Ok(None);
}
if chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// Avoid computing columns if block has already been imported.
debug!(
info = "block has already been imported",
"Ignoring EL blobs response"
);
return Ok(None);
}
let data_columns_receiver = spawn_compute_and_publish_data_columns_task(
&chain,
block.clone(),
fixed_blob_sidecar_list.clone(),
publish_fn,
);
Some(data_columns_receiver)
} else {
if !blobs_to_import_and_publish.is_empty() {
publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish));
}
None
};
if !blobs_to_import_and_publish.is_empty() {
publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish));
}
debug!(num_fetched_blobs, "Processing engine blobs");
@@ -185,8 +180,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
.process_engine_blobs(
block.slot(),
block_root,
fixed_blob_sidecar_list.clone(),
data_columns_receiver_opt,
EngineGetBlobsOutput::Blobs(fixed_blob_sidecar_list.clone()),
)
.await
.map_err(FetchEngineBlobError::BlobProcessingError)?;
@@ -194,67 +188,140 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
Ok(Some(availability_processing_status))
}
/// Spawn a blocking task here for long computation tasks, so it doesn't block processing, and it
/// allows blobs / data columns to propagate without waiting for processing.
///
/// An `mpsc::Sender` is then used to send the produced data columns to the `beacon_chain` for it
/// to be persisted, **after** the block is made attestable.
///
/// The reason for doing this is to make the block available and attestable as soon as possible,
/// while maintaining the invariant that block and data columns are persisted atomically.
fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec>>,
versioned_hashes: Vec<VersionedHash>,
custody_columns_indices: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let num_expected_blobs = versioned_hashes.len();
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;
metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
debug!(num_expected_blobs, "Fetching blobs from the EL");
let response = execution_layer
.get_blobs_v2(versioned_hashes)
.await
.inspect_err(|_| {
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
})
.map_err(FetchEngineBlobError::RequestFailed)?;
let (blobs, proofs): (Vec<_>, Vec<_>) = response
.into_iter()
.filter_map(|blob_and_proof_opt| {
blob_and_proof_opt.map(|blob_and_proof| {
let BlobAndProofV2 { blob, proofs } = blob_and_proof;
(blob, proofs)
})
})
.unzip();
let num_fetched_blobs = blobs.len();
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);
// Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns.
if num_fetched_blobs != num_expected_blobs {
debug!(
info = "Unable to compute data columns",
num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL"
);
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
return Ok(None);
} else {
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
}
if chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// Avoid computing columns if block has already been imported.
debug!(
info = "block has already been imported",
"Ignoring EL blobs response"
);
return Ok(None);
}
let custody_columns = compute_and_publish_data_columns(
&chain,
block.clone(),
blobs,
proofs,
custody_columns_indices,
publish_fn,
)
.await?;
debug!(num_fetched_blobs, "Processing engine blobs");
let availability_processing_status = chain
.process_engine_blobs(
block.slot(),
block_root,
EngineGetBlobsOutput::CustodyColumns(custody_columns),
)
.await
.map_err(FetchEngineBlobError::BlobProcessingError)?;
Ok(Some(availability_processing_status))
}
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
async fn compute_and_publish_data_columns<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: FixedBlobSidecarList<T::EthSpec>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> oneshot::Receiver<Vec<Arc<DataColumnSidecar<T::EthSpec>>>> {
) -> Result<DataColumnSidecarList<T::EthSpec>, FetchEngineBlobError> {
let chain_cloned = chain.clone();
let (data_columns_sender, data_columns_receiver) = oneshot::channel();
chain
.spawn_blocking_handle(
move || {
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);
chain.task_executor.spawn_blocking(
move || {
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);
let blob_refs = blobs
.iter()
.filter_map(|b| b.as_ref().map(|b| &b.blob))
.collect::<Vec<_>>();
let data_columns_result = blobs_to_data_column_sidecars(
&blob_refs,
&block,
&chain_cloned.kzg,
&chain_cloned.spec,
)
.discard_timer_on_break(&mut timer);
drop(timer);
let blob_refs = blobs.iter().collect::<Vec<_>>();
let cell_proofs = proofs.into_iter().flatten().collect();
let data_columns_result = blobs_to_data_column_sidecars(
&blob_refs,
cell_proofs,
&block,
&chain_cloned.kzg,
&chain_cloned.spec,
)
.discard_timer_on_break(&mut timer);
drop(timer);
let all_data_columns = match data_columns_result {
Ok(d) => d,
Err(e) => {
error!(
error = ?e,
"Failed to build data column sidecars from blobs"
);
return;
}
};
// This filtering ensures we only import and publish the custody columns.
// `DataAvailabilityChecker` requires a strict match on custody columns count to
// consider a block available.
let custody_columns = data_columns_result
.map(|mut data_columns| {
data_columns.retain(|col| custody_columns_indices.contains(&col.index));
data_columns
})
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;
if data_columns_sender.send(all_data_columns.clone()).is_err() {
// Data column receiver have been dropped - block may have already been imported.
// This race condition exists because gossip columns may arrive and trigger block
// import during the computation. Here we just drop the computed columns.
debug!("Failed to send computed data columns");
return;
};
publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns));
},
"compute_and_publish_data_columns",
);
data_columns_receiver
publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone()));
Ok(custody_columns)
},
"compute_and_publish_data_columns",
)
.await
.map_err(FetchEngineBlobError::BeaconChainError)
.and_then(|r| r)
}
fn build_blob_sidecars<E: EthSpec>(