mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-25 16:58:28 +00:00
Use rayon to speed up batch KZG verification (#7921)
Addresses #7866. Use Rayon to speed up batch KZG verification during range / backfill sync. While I was analysing the traces, I also discovered a bug that resulted in only the first 128 columns in a chain segment batch being verified. This PR fixes it, so we might actually observe slower range sync due to more cells being KZG verified. I've also updated the handling of batch KZG failure to only find the first invalid KZG column when verification fails as this gets very expensive during range/backfill sync.
This commit is contained in:
@@ -377,7 +377,7 @@ where
|
||||
.store
|
||||
.get_hot_state(&self.justified_state_root, update_cache)
|
||||
.map_err(Error::FailedToReadState)?
|
||||
.ok_or_else(|| Error::MissingState(self.justified_state_root))?;
|
||||
.ok_or(Error::MissingState(self.justified_state_root))?;
|
||||
|
||||
self.justified_balances = JustifiedBalances::from_justified_state(&state)?;
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ mod state_lru_cache;
|
||||
|
||||
use crate::data_column_verification::{
|
||||
CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn,
|
||||
KzgVerifiedDataColumn, verify_kzg_for_data_column_list_with_scoring,
|
||||
KzgVerifiedDataColumn, verify_kzg_for_data_column_list,
|
||||
};
|
||||
use crate::metrics::{
|
||||
KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES,
|
||||
@@ -378,7 +378,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
}
|
||||
if self.data_columns_required_for_block(&block) {
|
||||
return if let Some(data_column_list) = data_columns.as_ref() {
|
||||
verify_kzg_for_data_column_list_with_scoring(
|
||||
verify_kzg_for_data_column_list(
|
||||
data_column_list
|
||||
.iter()
|
||||
.map(|custody_column| custody_column.as_data_column()),
|
||||
@@ -449,7 +449,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
// verify kzg for all data columns at once
|
||||
if !all_data_columns.is_empty() {
|
||||
// Attributes fault to the specific peer that sent an invalid column
|
||||
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)
|
||||
verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg)
|
||||
.map_err(AvailabilityCheckError::InvalidColumn)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256};
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
InvalidBlobs(KzgError),
|
||||
InvalidColumn(Vec<(ColumnIndex, KzgError)>),
|
||||
InvalidColumn((Option<ColumnIndex>, KzgError)),
|
||||
ReconstructColumnsError(KzgError),
|
||||
KzgCommitmentMismatch {
|
||||
blob_commitment: KzgCommitment,
|
||||
|
||||
@@ -263,7 +263,10 @@ pub struct KzgVerifiedDataColumn<E: EthSpec> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> KzgVerifiedDataColumn<E> {
|
||||
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
|
||||
pub fn new(
|
||||
data_column: Arc<DataColumnSidecar<E>>,
|
||||
kzg: &Kzg,
|
||||
) -> Result<Self, (Option<ColumnIndex>, KzgError)> {
|
||||
verify_kzg_for_data_column(data_column, kzg)
|
||||
}
|
||||
|
||||
@@ -278,22 +281,11 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
|
||||
Self { data: data_column }
|
||||
}
|
||||
|
||||
pub fn from_batch(
|
||||
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
|
||||
kzg: &Kzg,
|
||||
) -> Result<Vec<Self>, KzgError> {
|
||||
verify_kzg_for_data_column_list(data_columns.iter(), kzg)?;
|
||||
Ok(data_columns
|
||||
.into_iter()
|
||||
.map(|column| Self { data: column })
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub fn from_batch_with_scoring(
|
||||
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
|
||||
kzg: &Kzg,
|
||||
) -> Result<Vec<Self>, Vec<(ColumnIndex, KzgError)>> {
|
||||
verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?;
|
||||
) -> Result<Vec<Self>, (Option<ColumnIndex>, KzgError)> {
|
||||
verify_kzg_for_data_column_list(data_columns.iter(), kzg)?;
|
||||
Ok(data_columns
|
||||
.into_iter()
|
||||
.map(|column| Self { data: column })
|
||||
@@ -367,7 +359,10 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
|
||||
}
|
||||
|
||||
/// Verify a column already marked as custody column
|
||||
pub fn new(data_column: CustodyDataColumn<E>, kzg: &Kzg) -> Result<Self, KzgError> {
|
||||
pub fn new(
|
||||
data_column: CustodyDataColumn<E>,
|
||||
kzg: &Kzg,
|
||||
) -> Result<Self, (Option<ColumnIndex>, KzgError)> {
|
||||
verify_kzg_for_data_column(data_column.clone_arc(), kzg)?;
|
||||
Ok(Self {
|
||||
data: data_column.data,
|
||||
@@ -418,22 +413,21 @@ impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
|
||||
pub fn verify_kzg_for_data_column<E: EthSpec>(
|
||||
data_column: Arc<DataColumnSidecar<E>>,
|
||||
kzg: &Kzg,
|
||||
) -> Result<KzgVerifiedDataColumn<E>, KzgError> {
|
||||
) -> Result<KzgVerifiedDataColumn<E>, (Option<ColumnIndex>, KzgError)> {
|
||||
let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES);
|
||||
validate_data_columns(kzg, iter::once(&data_column))?;
|
||||
Ok(KzgVerifiedDataColumn { data: data_column })
|
||||
}
|
||||
|
||||
/// Complete kzg verification for a list of `DataColumnSidecar`s.
|
||||
/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification.
|
||||
/// Returns an error for the first `DataColumnSidecar`s that fails kzg verification.
|
||||
///
|
||||
/// Note: This function should be preferred over calling `verify_kzg_for_data_column`
|
||||
/// in a loop since this function kzg verifies a list of data columns more efficiently.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>(
|
||||
data_column_iter: I,
|
||||
kzg: &'a Kzg,
|
||||
) -> Result<(), KzgError>
|
||||
) -> Result<(), (Option<ColumnIndex>, KzgError)>
|
||||
where
|
||||
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
|
||||
{
|
||||
@@ -442,38 +436,6 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Complete kzg verification for a list of `DataColumnSidecar`s.
|
||||
///
|
||||
/// If there's at least one invalid column, it re-verifies all columns individually to identify the
|
||||
/// first column that is invalid. This is necessary to attribute fault to the specific peer that
|
||||
/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it
|
||||
/// will be quickly banned.
|
||||
pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
|
||||
data_column_iter: I,
|
||||
kzg: &'a Kzg,
|
||||
) -> Result<(), Vec<(ColumnIndex, KzgError)>>
|
||||
where
|
||||
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
|
||||
{
|
||||
if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Find all columns that are invalid and identify by index. If we hit this condition there
|
||||
// should be at least one invalid column
|
||||
let errors = data_column_iter
|
||||
.filter_map(|data_column| {
|
||||
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
|
||||
Some((data_column.index, e))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Err(errors)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrategy>(
|
||||
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
|
||||
@@ -509,7 +471,7 @@ pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes, O: Observati
|
||||
verify_proposer_and_signature(&data_column, &parent_block, chain)?;
|
||||
let kzg = &chain.kzg;
|
||||
let kzg_verified_data_column = verify_kzg_for_data_column(data_column.clone(), kzg)
|
||||
.map_err(GossipDataColumnError::InvalidKzgProof)?;
|
||||
.map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?;
|
||||
|
||||
chain
|
||||
.observed_slashable
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use kzg::{
|
||||
Blob as KzgBlob, Bytes48, CELLS_PER_EXT_BLOB, Cell as KzgCell, CellRef as KzgCellRef,
|
||||
CellsAndKzgProofs, Error as KzgError, Kzg,
|
||||
Blob as KzgBlob, Bytes48, Cell as KzgCell, CellRef as KzgCellRef, CellsAndKzgProofs,
|
||||
Error as KzgError, Kzg,
|
||||
};
|
||||
use rayon::prelude::*;
|
||||
use ssz_types::{FixedVector, VariableList};
|
||||
@@ -45,38 +45,11 @@ pub fn validate_blob<E: EthSpec>(
|
||||
kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof)
|
||||
}
|
||||
|
||||
/// Validates a list of blobs along with their corresponding KZG commitments and
|
||||
/// cell proofs for the extended blobs.
|
||||
pub fn validate_blobs_and_cell_proofs<E: EthSpec>(
|
||||
kzg: &Kzg,
|
||||
blobs: Vec<&Blob<E>>,
|
||||
cell_proofs: &[KzgProof],
|
||||
kzg_commitments: &KzgCommitments<E>,
|
||||
) -> Result<(), KzgError> {
|
||||
let cells = compute_cells::<E>(&blobs, kzg)?;
|
||||
let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::<Vec<_>>();
|
||||
let cell_indices = (0..blobs.len())
|
||||
.flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let proofs = cell_proofs
|
||||
.iter()
|
||||
.map(|&proof| Bytes48::from(proof))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let commitments = kzg_commitments
|
||||
.iter()
|
||||
.flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments)
|
||||
}
|
||||
|
||||
/// Validate a batch of `DataColumnSidecar`.
|
||||
pub fn validate_data_columns<'a, E: EthSpec, I>(
|
||||
kzg: &Kzg,
|
||||
data_column_iter: I,
|
||||
) -> Result<(), KzgError>
|
||||
) -> Result<(), (Option<u64>, KzgError)>
|
||||
where
|
||||
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
|
||||
{
|
||||
@@ -88,8 +61,12 @@ where
|
||||
for data_column in data_column_iter {
|
||||
let col_index = data_column.index;
|
||||
|
||||
if data_column.column.is_empty() {
|
||||
return Err((Some(col_index), KzgError::KzgVerificationFailed));
|
||||
}
|
||||
|
||||
for cell in &data_column.column {
|
||||
cells.push(ssz_cell_to_crypto_cell::<E>(cell)?);
|
||||
cells.push(ssz_cell_to_crypto_cell::<E>(cell).map_err(|e| (Some(col_index), e))?);
|
||||
column_indices.push(col_index);
|
||||
}
|
||||
|
||||
@@ -100,6 +77,19 @@ where
|
||||
for &commitment in &data_column.kzg_commitments {
|
||||
commitments.push(Bytes48::from(commitment));
|
||||
}
|
||||
|
||||
let expected_len = column_indices.len();
|
||||
|
||||
// We make this check at each iteration so that the error is attributable to a specific column
|
||||
if cells.len() != expected_len
|
||||
|| proofs.len() != expected_len
|
||||
|| commitments.len() != expected_len
|
||||
{
|
||||
return Err((
|
||||
Some(col_index),
|
||||
KzgError::InconsistentArrayLength("Invalid data column".to_string()),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments)
|
||||
@@ -418,7 +408,7 @@ pub fn reconstruct_data_columns<E: EthSpec>(
|
||||
mod test {
|
||||
use crate::kzg_utils::{
|
||||
blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns,
|
||||
validate_blobs_and_cell_proofs,
|
||||
validate_data_columns,
|
||||
};
|
||||
use bls::Signature;
|
||||
use eth2::types::BlobsBundle;
|
||||
@@ -442,21 +432,20 @@ mod test {
|
||||
test_build_data_columns(&kzg, &spec);
|
||||
test_reconstruct_data_columns(&kzg, &spec);
|
||||
test_reconstruct_blobs_from_data_columns(&kzg, &spec);
|
||||
test_verify_blob_and_cell_proofs(&kzg);
|
||||
test_validate_data_columns(&kzg, &spec);
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
fn test_verify_blob_and_cell_proofs(kzg: &Kzg) {
|
||||
let (blobs_bundle, _) = generate_blobs::<E>(3, ForkName::Fulu).unwrap();
|
||||
let BlobsBundle {
|
||||
blobs,
|
||||
commitments,
|
||||
proofs,
|
||||
} = blobs_bundle;
|
||||
|
||||
let result =
|
||||
validate_blobs_and_cell_proofs::<E>(kzg, blobs.iter().collect(), &proofs, &commitments);
|
||||
fn test_validate_data_columns(kzg: &Kzg, spec: &ChainSpec) {
|
||||
let num_of_blobs = 6;
|
||||
let (signed_block, blobs, proofs) =
|
||||
create_test_fulu_block_and_blobs::<E>(num_of_blobs, spec);
|
||||
let blob_refs = blobs.iter().collect::<Vec<_>>();
|
||||
let column_sidecars =
|
||||
blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec)
|
||||
.unwrap();
|
||||
|
||||
let result = validate_data_columns::<E, _>(kzg, column_sidecars.iter());
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user