From a134d43446f776fe2a84f420854afbff76ca93d8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 29 Aug 2025 10:59:40 +1000 Subject: [PATCH] Use `rayon` to speed up batch KZG verification (#7921) Addresses #7866. Use Rayon to speed up batch KZG verification during range / backfill sync. While I was analysing the traces, I also discovered a bug that resulted in only the first 128 columns in a chain segment batch being verified. This PR fixes it, so we might actually observe slower range sync due to more cells being KZG verified. I've also updated the handling of batch KZG failure to only find the first invalid KZG column when verification fails as this gets very expensive during range/backfill sync. --- Cargo.lock | 2 + .../src/beacon_fork_choice_store.rs | 2 +- .../src/data_availability_checker.rs | 6 +- .../src/data_availability_checker/error.rs | 2 +- .../src/data_column_verification.rs | 66 +++---------- beacon_node/beacon_chain/src/kzg_utils.rs | 75 +++++++-------- .../network/src/sync/block_lookups/mod.rs | 19 ++-- crypto/kzg/Cargo.toml | 2 + crypto/kzg/src/lib.rs | 93 +++++++++++++++---- .../cases/kzg_verify_cell_kzg_proof_batch.rs | 2 +- 10 files changed, 140 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14f8d5cbaa..1bd65e1721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5013,9 +5013,11 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "rayon", "rust_eth_kzg", "serde", "serde_json", + "tracing", "tree_hash", ] diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2c05df3c7f..440388661c 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -377,7 +377,7 @@ where .store .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(self.justified_state_root))?; + .ok_or(Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ad01eb477b..2ebf765a4e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -29,7 +29,7 @@ mod state_lru_cache; use crate::data_column_verification::{ CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list_with_scoring, + KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -378,7 +378,7 @@ impl DataAvailabilityChecker { } if self.data_columns_required_for_block(&block) { return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list_with_scoring( + verify_kzg_for_data_column_list( data_column_list .iter() .map(|custody_column| custody_column.as_data_column()), @@ -449,7 +449,7 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { // Attributes fault to the specific peer that sent an invalid column - verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg) + verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb..c9efb7a414 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), - InvalidColumn(Vec<(ColumnIndex, KzgError)>), + InvalidColumn((Option, KzgError)), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 873627abea..fb88db1300 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -263,7 +263,10 @@ pub struct KzgVerifiedDataColumn { } impl KzgVerifiedDataColumn { - pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + pub fn new( + data_column: Arc>, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column, kzg) } @@ -278,22 +281,11 @@ impl KzgVerifiedDataColumn { Self { data: data_column } } - pub fn from_batch( - data_columns: Vec>>, - kzg: &Kzg, - ) -> Result, KzgError> { - verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; - Ok(data_columns - .into_iter() - .map(|column| Self { data: column }) - .collect()) - } - pub fn from_batch_with_scoring( data_columns: Vec>>, kzg: &Kzg, - ) -> Result, Vec<(ColumnIndex, KzgError)>> { - verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; + ) -> Result, (Option, KzgError)> { + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() .map(|column| Self { data: column }) @@ -367,7 +359,10 @@ impl KzgVerifiedCustodyDataColumn { } /// Verify a column already marked as custody column - pub fn new(data_column: CustodyDataColumn, kzg: &Kzg) -> Result { + pub fn new( + data_column: CustodyDataColumn, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column.clone_arc(), kzg)?; Ok(Self { data: data_column.data, @@ -418,22 +413,21 @@ impl KzgVerifiedCustodyDataColumn { pub fn verify_kzg_for_data_column( data_column: Arc>, kzg: &Kzg, -) -> Result, KzgError> { +) -> Result, (Option, KzgError)> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_data_columns(kzg, iter::once(&data_column))?; Ok(KzgVerifiedDataColumn { data: data_column }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. -/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// Returns an error for the first `DataColumnSidecar`s that fails kzg verification. /// /// Note: This function should be preferred over calling `verify_kzg_for_data_column` /// in a loop since this function kzg verifies a list of data columns more efficiently. -#[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( data_column_iter: I, kzg: &'a Kzg, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -442,38 +436,6 @@ where Ok(()) } -/// Complete kzg verification for a list of `DataColumnSidecar`s. -/// -/// If there's at least one invalid column, it re-verifies all columns individually to identify the -/// first column that is invalid. This is necessary to attribute fault to the specific peer that -/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it -/// will be quickly banned. -pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( - data_column_iter: I, - kzg: &'a Kzg, -) -> Result<(), Vec<(ColumnIndex, KzgError)>> -where - I: Iterator>> + Clone, -{ - if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() { - return Ok(()); - }; - - // Find all columns that are invalid and identify by index. If we hit this condition there - // should be at least one invalid column - let errors = data_column_iter - .filter_map(|data_column| { - if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { - Some((data_column.index, e)) - } else { - None - } - }) - .collect::>(); - - Err(errors) -} - #[instrument(skip_all, level = "debug")] pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, @@ -509,7 +471,7 @@ pub fn validate_data_column_sidecar_for_gossip( kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } -/// Validates a list of blobs along with their corresponding KZG commitments and -/// cell proofs for the extended blobs. -pub fn validate_blobs_and_cell_proofs( - kzg: &Kzg, - blobs: Vec<&Blob>, - cell_proofs: &[KzgProof], - kzg_commitments: &KzgCommitments, -) -> Result<(), KzgError> { - let cells = compute_cells::(&blobs, kzg)?; - let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::>(); - let cell_indices = (0..blobs.len()) - .flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64) - .collect::>(); - - let proofs = cell_proofs - .iter() - .map(|&proof| Bytes48::from(proof)) - .collect::>(); - - let commitments = kzg_commitments - .iter() - .flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB)) - .collect::>(); - - kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments) -} - /// Validate a batch of `DataColumnSidecar`. pub fn validate_data_columns<'a, E: EthSpec, I>( kzg: &Kzg, data_column_iter: I, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -88,8 +61,12 @@ where for data_column in data_column_iter { let col_index = data_column.index; + if data_column.column.is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + for cell in &data_column.column { - cells.push(ssz_cell_to_crypto_cell::(cell)?); + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); column_indices.push(col_index); } @@ -100,6 +77,19 @@ where for &commitment in &data_column.kzg_commitments { commitments.push(Bytes48::from(commitment)); } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } } kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) @@ -418,7 +408,7 @@ pub fn reconstruct_data_columns( mod test { use crate::kzg_utils::{ blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, - validate_blobs_and_cell_proofs, + validate_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -442,21 +432,20 @@ mod test { test_build_data_columns(&kzg, &spec); test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); - test_verify_blob_and_cell_proofs(&kzg); + test_validate_data_columns(&kzg, &spec); } #[track_caller] - fn test_verify_blob_and_cell_proofs(kzg: &Kzg) { - let (blobs_bundle, _) = generate_blobs::(3, ForkName::Fulu).unwrap(); - let BlobsBundle { - blobs, - commitments, - proofs, - } = blobs_bundle; - - let result = - validate_blobs_and_cell_proofs::(kzg, blobs.iter().collect(), &proofs, &commitments); + fn test_validate_data_columns(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 6; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + let result = validate_data_columns::(kzg, column_sidecars.iter()); assert!(result.is_ok()); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8f8d8a9a5..e9f24697ac 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,7 +36,6 @@ use beacon_chain::data_availability_checker::{ use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; -use itertools::Itertools; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -653,15 +652,15 @@ impl BlockLookups { // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(errors), - ) => errors - .iter() - // Collect all peers that sent a column that was invalid. Must - // run .unique as a single peer can send multiple invalid - // columns. Penalize once to avoid insta-bans - .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) - .unique() - .collect(), + AvailabilityCheckError::InvalidColumn((index_opt, _)), + ) => { + match index_opt { + Some(index) => peer_group.of_index(index as usize).collect(), + // If no index supplied this is an un-attributable fault. In practice + // this should never happen. + None => vec![], + } + } _ => peer_group.all().collect(), }; for peer in peers_to_penalize { diff --git a/crypto/kzg/Cargo.toml b/crypto/kzg/Cargo.toml index bfe0f19cd0..432fcc1792 100644 --- a/crypto/kzg/Cargo.toml +++ b/crypto/kzg/Cargo.toml @@ -14,9 +14,11 @@ ethereum_serde_utils = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } hex = { workspace = true } +rayon = { workspace = true } rust_eth_kzg = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tracing = { workspace = true } tree_hash = { workspace = true } [dev-dependencies] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index ddaddd1ada..1b8d46100f 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -3,6 +3,7 @@ mod kzg_proof; pub mod trusted_setup; use rust_eth_kzg::{CellIndex, DASContext}; +use std::collections::HashMap; use std::fmt::Debug; pub use crate::{ @@ -17,10 +18,12 @@ pub use c_kzg::{ }; use crate::trusted_setup::load_trusted_setup; +use rayon::prelude::*; pub use rust_eth_kzg::{ constants::{BYTES_PER_CELL, CELLS_PER_EXT_BLOB}, Cell, CellIndex as CellID, CellRef, TrustedSetup as PeerDASTrustedSetup, }; +use tracing::instrument; /// Disables the fixed-base multi-scalar multiplication optimization for computing /// cell KZG proofs, because `rust-eth-kzg` already handles the precomputation. @@ -229,31 +232,85 @@ impl Kzg { } /// Verifies a batch of cell-proof-commitment triplets. + #[instrument(skip_all, level = "debug", fields(cells = cells.len()))] pub fn verify_cell_proof_batch( &self, cells: &[CellRef<'_>], kzg_proofs: &[Bytes48], - columns: Vec, + indices: Vec, kzg_commitments: &[Bytes48], - ) -> Result<(), Error> { - let proofs: Vec<_> = kzg_proofs.iter().map(|proof| proof.as_ref()).collect(); - let commitments: Vec<_> = kzg_commitments - .iter() - .map(|commitment| commitment.as_ref()) - .collect(); - let verification_result = self.context().verify_cell_kzg_proof_batch( - commitments.to_vec(), - &columns, - cells.to_vec(), - proofs.to_vec(), - ); + ) -> Result<(), (Option, Error)> { + let mut column_groups: HashMap> = HashMap::new(); - // Modify the result so it matches roughly what the previous method was doing. - match verification_result { - Ok(_) => Ok(()), - Err(e) if e.is_proof_invalid() => Err(Error::KzgVerificationFailed), - Err(e) => Err(Error::PeerDASKZG(e)), + let expected_len = cells.len(); + + // This check is already made in `validate_data_columns`. However we add it here so that ef consensus spec tests pass + // and to avoid any potential footguns in the future. Note that by catching the error here and not in `validate_data_columns` + // the error becomes non-attributable. + if kzg_proofs.len() != expected_len + || indices.len() != expected_len + || kzg_commitments.len() != expected_len + { + return Err(( + None, + Error::InconsistentArrayLength("Invalid data column".to_string()), + )); } + + for (((cell, proof), &index), commitment) in cells + .iter() + .zip(kzg_proofs.iter()) + .zip(indices.iter()) + .zip(kzg_commitments.iter()) + { + column_groups + .entry(index) + .or_default() + .push((cell, *proof, *commitment)); + } + + column_groups + .into_par_iter() + .map(|(column_index, column_data)| { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut commitments = Vec::new(); + + for (cell, proof, commitment) in &column_data { + cells.push(*cell); + proofs.push(proof.as_ref()); + commitments.push(commitment.as_ref()); + } + + // Create per-chunk tracing span for visualizing parallel processing. + // This is safe from span explosion as we have at most 128 chunks, + // i.e. the number of column indices. + let _span = tracing::debug_span!( + "verify_cell_proof_chunk", + cells = cells.len(), + column_index, + verification_result = tracing::field::Empty, + ) + .entered(); + + let verification_result = self.context().verify_cell_kzg_proof_batch( + commitments, + &vec![column_index; cells.len()], // All column_data here is from the same index + cells, + proofs, + ); + + match verification_result { + Ok(_) => Ok(()), + Err(e) if e.is_proof_invalid() => { + Err((Some(column_index), Error::KzgVerificationFailed)) + } + Err(e) => Err((Some(column_index), Error::PeerDASKZG(e))), + } + }) + .collect::, (Option, Error)>>()?; + + Ok(()) } pub fn recover_cells_and_compute_kzg_proofs( diff --git a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs index e3edc0df0a..7973af861f 100644 --- a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs +++ b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs @@ -53,7 +53,7 @@ impl Case for KZGVerifyCellKZGProofBatch { let kzg = get_kzg(); match kzg.verify_cell_proof_batch(&cells, &proofs, cell_indices, &commitments) { Ok(_) => Ok(true), - Err(KzgError::KzgVerificationFailed) => Ok(false), + Err((_, KzgError::KzgVerificationFailed)) => Ok(false), Err(e) => Err(Error::InternalError(format!( "Failed to validate cells: {:?}", e