diff --git a/Cargo.lock b/Cargo.lock index 14f8d5cbaa..1bd65e1721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5013,9 +5013,11 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "rayon", "rust_eth_kzg", "serde", "serde_json", + "tracing", "tree_hash", ] diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2c05df3c7f..440388661c 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -377,7 +377,7 @@ where .store .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(self.justified_state_root))?; + .ok_or(Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ad01eb477b..2ebf765a4e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -29,7 +29,7 @@ mod state_lru_cache; use crate::data_column_verification::{ CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list_with_scoring, + KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -378,7 +378,7 @@ impl DataAvailabilityChecker { } if self.data_columns_required_for_block(&block) { return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list_with_scoring( + verify_kzg_for_data_column_list( data_column_list .iter() .map(|custody_column| custody_column.as_data_column()), @@ -449,7 +449,7 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { // Attributes fault to the specific peer that sent an invalid column - verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg) + verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb..c9efb7a414 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), - InvalidColumn(Vec<(ColumnIndex, KzgError)>), + InvalidColumn((Option, KzgError)), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 873627abea..fb88db1300 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -263,7 +263,10 @@ pub struct KzgVerifiedDataColumn { } impl KzgVerifiedDataColumn { - pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + pub fn new( + data_column: Arc>, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column, kzg) } @@ -278,22 +281,11 @@ impl KzgVerifiedDataColumn { Self { data: data_column } } - pub fn from_batch( - data_columns: Vec>>, - kzg: &Kzg, - ) -> Result, KzgError> { - verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; - Ok(data_columns - .into_iter() - .map(|column| Self { data: column }) - .collect()) - } - pub fn from_batch_with_scoring( data_columns: Vec>>, kzg: &Kzg, - ) -> Result, Vec<(ColumnIndex, KzgError)>> { - verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; + ) -> Result, (Option, KzgError)> { + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() .map(|column| Self { data: column }) @@ -367,7 +359,10 @@ impl KzgVerifiedCustodyDataColumn { } /// Verify a column already marked as custody column - pub fn new(data_column: CustodyDataColumn, kzg: &Kzg) -> Result { + pub fn new( + data_column: CustodyDataColumn, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column.clone_arc(), kzg)?; Ok(Self { data: data_column.data, @@ -418,22 +413,21 @@ impl KzgVerifiedCustodyDataColumn { pub fn verify_kzg_for_data_column( data_column: Arc>, kzg: &Kzg, -) -> Result, KzgError> { +) -> Result, (Option, KzgError)> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_data_columns(kzg, iter::once(&data_column))?; Ok(KzgVerifiedDataColumn { data: data_column }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. -/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// Returns an error for the first `DataColumnSidecar`s that fails kzg verification. /// /// Note: This function should be preferred over calling `verify_kzg_for_data_column` /// in a loop since this function kzg verifies a list of data columns more efficiently. -#[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( data_column_iter: I, kzg: &'a Kzg, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -442,38 +436,6 @@ where Ok(()) } -/// Complete kzg verification for a list of `DataColumnSidecar`s. -/// -/// If there's at least one invalid column, it re-verifies all columns individually to identify the -/// first column that is invalid. This is necessary to attribute fault to the specific peer that -/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it -/// will be quickly banned. -pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( - data_column_iter: I, - kzg: &'a Kzg, -) -> Result<(), Vec<(ColumnIndex, KzgError)>> -where - I: Iterator>> + Clone, -{ - if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() { - return Ok(()); - }; - - // Find all columns that are invalid and identify by index. If we hit this condition there - // should be at least one invalid column - let errors = data_column_iter - .filter_map(|data_column| { - if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { - Some((data_column.index, e)) - } else { - None - } - }) - .collect::>(); - - Err(errors) -} - #[instrument(skip_all, level = "debug")] pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, @@ -509,7 +471,7 @@ pub fn validate_data_column_sidecar_for_gossip( kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } -/// Validates a list of blobs along with their corresponding KZG commitments and -/// cell proofs for the extended blobs. -pub fn validate_blobs_and_cell_proofs( - kzg: &Kzg, - blobs: Vec<&Blob>, - cell_proofs: &[KzgProof], - kzg_commitments: &KzgCommitments, -) -> Result<(), KzgError> { - let cells = compute_cells::(&blobs, kzg)?; - let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::>(); - let cell_indices = (0..blobs.len()) - .flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64) - .collect::>(); - - let proofs = cell_proofs - .iter() - .map(|&proof| Bytes48::from(proof)) - .collect::>(); - - let commitments = kzg_commitments - .iter() - .flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB)) - .collect::>(); - - kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments) -} - /// Validate a batch of `DataColumnSidecar`. pub fn validate_data_columns<'a, E: EthSpec, I>( kzg: &Kzg, data_column_iter: I, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -88,8 +61,12 @@ where for data_column in data_column_iter { let col_index = data_column.index; + if data_column.column.is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + for cell in &data_column.column { - cells.push(ssz_cell_to_crypto_cell::(cell)?); + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); column_indices.push(col_index); } @@ -100,6 +77,19 @@ where for &commitment in &data_column.kzg_commitments { commitments.push(Bytes48::from(commitment)); } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } } kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) @@ -418,7 +408,7 @@ pub fn reconstruct_data_columns( mod test { use crate::kzg_utils::{ blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, - validate_blobs_and_cell_proofs, + validate_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -442,21 +432,20 @@ mod test { test_build_data_columns(&kzg, &spec); test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); - test_verify_blob_and_cell_proofs(&kzg); + test_validate_data_columns(&kzg, &spec); } #[track_caller] - fn test_verify_blob_and_cell_proofs(kzg: &Kzg) { - let (blobs_bundle, _) = generate_blobs::(3, ForkName::Fulu).unwrap(); - let BlobsBundle { - blobs, - commitments, - proofs, - } = blobs_bundle; - - let result = - validate_blobs_and_cell_proofs::(kzg, blobs.iter().collect(), &proofs, &commitments); + fn test_validate_data_columns(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 6; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + let result = validate_data_columns::(kzg, column_sidecars.iter()); assert!(result.is_ok()); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8f8d8a9a5..e9f24697ac 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,7 +36,6 @@ use beacon_chain::data_availability_checker::{ use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; -use itertools::Itertools; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -653,15 +652,15 @@ impl BlockLookups { // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(errors), - ) => errors - .iter() - // Collect all peers that sent a column that was invalid. Must - // run .unique as a single peer can send multiple invalid - // columns. Penalize once to avoid insta-bans - .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) - .unique() - .collect(), + AvailabilityCheckError::InvalidColumn((index_opt, _)), + ) => { + match index_opt { + Some(index) => peer_group.of_index(index as usize).collect(), + // If no index supplied this is an un-attributable fault. In practice + // this should never happen. + None => vec![], + } + } _ => peer_group.all().collect(), }; for peer in peers_to_penalize { diff --git a/crypto/kzg/Cargo.toml b/crypto/kzg/Cargo.toml index bfe0f19cd0..432fcc1792 100644 --- a/crypto/kzg/Cargo.toml +++ b/crypto/kzg/Cargo.toml @@ -14,9 +14,11 @@ ethereum_serde_utils = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } hex = { workspace = true } +rayon = { workspace = true } rust_eth_kzg = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tracing = { workspace = true } tree_hash = { workspace = true } [dev-dependencies] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index ddaddd1ada..1b8d46100f 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -3,6 +3,7 @@ mod kzg_proof; pub mod trusted_setup; use rust_eth_kzg::{CellIndex, DASContext}; +use std::collections::HashMap; use std::fmt::Debug; pub use crate::{ @@ -17,10 +18,12 @@ pub use c_kzg::{ }; use crate::trusted_setup::load_trusted_setup; +use rayon::prelude::*; pub use rust_eth_kzg::{ constants::{BYTES_PER_CELL, CELLS_PER_EXT_BLOB}, Cell, CellIndex as CellID, CellRef, TrustedSetup as PeerDASTrustedSetup, }; +use tracing::instrument; /// Disables the fixed-base multi-scalar multiplication optimization for computing /// cell KZG proofs, because `rust-eth-kzg` already handles the precomputation. @@ -229,31 +232,85 @@ impl Kzg { } /// Verifies a batch of cell-proof-commitment triplets. + #[instrument(skip_all, level = "debug", fields(cells = cells.len()))] pub fn verify_cell_proof_batch( &self, cells: &[CellRef<'_>], kzg_proofs: &[Bytes48], - columns: Vec, + indices: Vec, kzg_commitments: &[Bytes48], - ) -> Result<(), Error> { - let proofs: Vec<_> = kzg_proofs.iter().map(|proof| proof.as_ref()).collect(); - let commitments: Vec<_> = kzg_commitments - .iter() - .map(|commitment| commitment.as_ref()) - .collect(); - let verification_result = self.context().verify_cell_kzg_proof_batch( - commitments.to_vec(), - &columns, - cells.to_vec(), - proofs.to_vec(), - ); + ) -> Result<(), (Option, Error)> { + let mut column_groups: HashMap> = HashMap::new(); - // Modify the result so it matches roughly what the previous method was doing. - match verification_result { - Ok(_) => Ok(()), - Err(e) if e.is_proof_invalid() => Err(Error::KzgVerificationFailed), - Err(e) => Err(Error::PeerDASKZG(e)), + let expected_len = cells.len(); + + // This check is already made in `validate_data_columns`. However we add it here so that ef consensus spec tests pass + // and to avoid any potential footguns in the future. Note that by catching the error here and not in `validate_data_columns` + // the error becomes non-attributable. + if kzg_proofs.len() != expected_len + || indices.len() != expected_len + || kzg_commitments.len() != expected_len + { + return Err(( + None, + Error::InconsistentArrayLength("Invalid data column".to_string()), + )); } + + for (((cell, proof), &index), commitment) in cells + .iter() + .zip(kzg_proofs.iter()) + .zip(indices.iter()) + .zip(kzg_commitments.iter()) + { + column_groups + .entry(index) + .or_default() + .push((cell, *proof, *commitment)); + } + + column_groups + .into_par_iter() + .map(|(column_index, column_data)| { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut commitments = Vec::new(); + + for (cell, proof, commitment) in &column_data { + cells.push(*cell); + proofs.push(proof.as_ref()); + commitments.push(commitment.as_ref()); + } + + // Create per-chunk tracing span for visualizing parallel processing. + // This is safe from span explosion as we have at most 128 chunks, + // i.e. the number of column indices. + let _span = tracing::debug_span!( + "verify_cell_proof_chunk", + cells = cells.len(), + column_index, + verification_result = tracing::field::Empty, + ) + .entered(); + + let verification_result = self.context().verify_cell_kzg_proof_batch( + commitments, + &vec![column_index; cells.len()], // All column_data here is from the same index + cells, + proofs, + ); + + match verification_result { + Ok(_) => Ok(()), + Err(e) if e.is_proof_invalid() => { + Err((Some(column_index), Error::KzgVerificationFailed)) + } + Err(e) => Err((Some(column_index), Error::PeerDASKZG(e))), + } + }) + .collect::, (Option, Error)>>()?; + + Ok(()) } pub fn recover_cells_and_compute_kzg_proofs( diff --git a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs index e3edc0df0a..7973af861f 100644 --- a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs +++ b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs @@ -53,7 +53,7 @@ impl Case for KZGVerifyCellKZGProofBatch { let kzg = get_kzg(); match kzg.verify_cell_proof_batch(&cells, &proofs, cell_indices, &commitments) { Ok(_) => Ok(true), - Err(KzgError::KzgVerificationFailed) => Ok(false), + Err((_, KzgError::KzgVerificationFailed)) => Ok(false), Err(e) => Err(Error::InternalError(format!( "Failed to validate cells: {:?}", e