diff --git a/Cargo.lock b/Cargo.lock index 7c637a1847..1bd65e1721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4284,6 +4284,7 @@ dependencies = [ "health_metrics", "hex", "lighthouse_network", + "lighthouse_tracing", "lighthouse_version", "logging", "lru", @@ -5012,9 +5013,11 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "rayon", "rust_eth_kzg", "serde", "serde_json", + "tracing", "tree_hash", ] @@ -8978,6 +8981,7 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "ssz_types", "state_processing", "strum", "superstruct", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6002dc6524..3a511dacd4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3059,6 +3059,7 @@ impl BeaconChain { /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. + #[instrument(skip_all, level = "debug")] pub async fn process_gossip_data_columns( self: &Arc, data_columns: Vec>, diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2c05df3c7f..440388661c 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -377,7 +377,7 @@ where .store .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(self.justified_state_root))?; + .ok_or(Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ad01eb477b..2ebf765a4e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -29,7 +29,7 @@ mod state_lru_cache; use crate::data_column_verification::{ CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list_with_scoring, + KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -378,7 +378,7 @@ impl DataAvailabilityChecker { } if self.data_columns_required_for_block(&block) { return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list_with_scoring( + verify_kzg_for_data_column_list( data_column_list .iter() .map(|custody_column| custody_column.as_data_column()), @@ -449,7 +449,7 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { // Attributes fault to the specific peer that sent an invalid column - verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg) + verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb..c9efb7a414 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), - InvalidColumn(Vec<(ColumnIndex, KzgError)>), + InvalidColumn((Option, KzgError)), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 873627abea..fb88db1300 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -263,7 +263,10 @@ pub struct KzgVerifiedDataColumn { } impl KzgVerifiedDataColumn { - pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + pub fn new( + data_column: Arc>, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column, kzg) } @@ -278,22 +281,11 @@ impl KzgVerifiedDataColumn { Self { data: data_column } } - pub fn from_batch( - data_columns: Vec>>, - kzg: &Kzg, - ) -> Result, KzgError> { - verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; - Ok(data_columns - .into_iter() - .map(|column| Self { data: column }) - .collect()) - } - pub fn from_batch_with_scoring( data_columns: Vec>>, kzg: &Kzg, - ) -> Result, Vec<(ColumnIndex, KzgError)>> { - verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; + ) -> Result, (Option, KzgError)> { + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() .map(|column| Self { data: column }) @@ -367,7 +359,10 @@ impl KzgVerifiedCustodyDataColumn { } /// Verify a column already marked as custody column - pub fn new(data_column: CustodyDataColumn, kzg: &Kzg) -> Result { + pub fn new( + data_column: CustodyDataColumn, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column.clone_arc(), kzg)?; Ok(Self { data: data_column.data, @@ -418,22 +413,21 @@ impl KzgVerifiedCustodyDataColumn { pub fn verify_kzg_for_data_column( data_column: Arc>, kzg: &Kzg, -) -> Result, KzgError> { +) -> Result, (Option, KzgError)> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_data_columns(kzg, iter::once(&data_column))?; Ok(KzgVerifiedDataColumn { data: data_column }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. -/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// Returns an error for the first `DataColumnSidecar`s that fails kzg verification. /// /// Note: This function should be preferred over calling `verify_kzg_for_data_column` /// in a loop since this function kzg verifies a list of data columns more efficiently. -#[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( data_column_iter: I, kzg: &'a Kzg, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -442,38 +436,6 @@ where Ok(()) } -/// Complete kzg verification for a list of `DataColumnSidecar`s. -/// -/// If there's at least one invalid column, it re-verifies all columns individually to identify the -/// first column that is invalid. This is necessary to attribute fault to the specific peer that -/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it -/// will be quickly banned. -pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( - data_column_iter: I, - kzg: &'a Kzg, -) -> Result<(), Vec<(ColumnIndex, KzgError)>> -where - I: Iterator>> + Clone, -{ - if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() { - return Ok(()); - }; - - // Find all columns that are invalid and identify by index. If we hit this condition there - // should be at least one invalid column - let errors = data_column_iter - .filter_map(|data_column| { - if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { - Some((data_column.index, e)) - } else { - None - } - }) - .collect::>(); - - Err(errors) -} - #[instrument(skip_all, level = "debug")] pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, @@ -509,7 +471,7 @@ pub fn validate_data_column_sidecar_for_gossip( kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } -/// Validates a list of blobs along with their corresponding KZG commitments and -/// cell proofs for the extended blobs. -pub fn validate_blobs_and_cell_proofs( - kzg: &Kzg, - blobs: Vec<&Blob>, - cell_proofs: &[KzgProof], - kzg_commitments: &KzgCommitments, -) -> Result<(), KzgError> { - let cells = compute_cells::(&blobs, kzg)?; - let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::>(); - let cell_indices = (0..blobs.len()) - .flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64) - .collect::>(); - - let proofs = cell_proofs - .iter() - .map(|&proof| Bytes48::from(proof)) - .collect::>(); - - let commitments = kzg_commitments - .iter() - .flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB)) - .collect::>(); - - kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments) -} - /// Validate a batch of `DataColumnSidecar`. pub fn validate_data_columns<'a, E: EthSpec, I>( kzg: &Kzg, data_column_iter: I, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -88,8 +61,12 @@ where for data_column in data_column_iter { let col_index = data_column.index; + if data_column.column.is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + for cell in &data_column.column { - cells.push(ssz_cell_to_crypto_cell::(cell)?); + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); column_indices.push(col_index); } @@ -100,6 +77,19 @@ where for &commitment in &data_column.kzg_commitments { commitments.push(Bytes48::from(commitment)); } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } } kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) @@ -418,7 +408,7 @@ pub fn reconstruct_data_columns( mod test { use crate::kzg_utils::{ blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, - validate_blobs_and_cell_proofs, + validate_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -442,21 +432,20 @@ mod test { test_build_data_columns(&kzg, &spec); test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); - test_verify_blob_and_cell_proofs(&kzg); + test_validate_data_columns(&kzg, &spec); } #[track_caller] - fn test_verify_blob_and_cell_proofs(kzg: &Kzg) { - let (blobs_bundle, _) = generate_blobs::(3, ForkName::Fulu).unwrap(); - let BlobsBundle { - blobs, - commitments, - proofs, - } = blobs_bundle; - - let result = - validate_blobs_and_cell_proofs::(kzg, blobs.iter().collect(), &proofs, &commitments); + fn test_validate_data_columns(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 6; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + let result = validate_data_columns::(kzg, column_sidecars.iter()); assert!(result.is_ok()); } diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 781a4cfa44..2061df3762 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -20,6 +20,7 @@ futures = { workspace = true } health_metrics = { workspace = true } hex = { workspace = true } lighthouse_network = { workspace = true } +lighthouse_tracing = { workspace = true } lighthouse_version = { workspace = true } logging = { workspace = true } lru = { workspace = true } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b0b4f9df56..515c262b19 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -457,7 +457,7 @@ pub fn serve( move |network_globals: Arc>, chain: Arc>| async move { match *network_globals.sync_state.read() { - SyncState::SyncingFinalized { .. } => { + SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } => { let head_slot = chain.canonical_head.cached_head().head_slot(); let current_slot = @@ -479,9 +479,7 @@ pub fn serve( ))) } } - SyncState::SyncingHead { .. } - | SyncState::SyncTransition - | SyncState::BackFillSyncing { .. } => Ok(()), + SyncState::SyncTransition | SyncState::BackFillSyncing { .. } => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Ok(()), } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 6377639ccd..f797e3f300 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -16,6 +16,7 @@ use eth2::types::{ use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use futures::TryFutureExt; use lighthouse_network::PubsubMessage; +use lighthouse_tracing::SPAN_PUBLISH_BLOCK; use network::NetworkMessage; use rand::prelude::SliceRandom; use slot_clock::SlotClock; @@ -24,7 +25,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error, info, warn}; +use tracing::{Span, debug, debug_span, error, info, instrument, warn}; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource, @@ -75,6 +76,12 @@ impl ProvenancedBlock> /// Handles a request from the HTTP API for full blocks. #[allow(clippy::too_many_arguments)] +#[instrument( + name = SPAN_PUBLISH_BLOCK, + level = "info", + skip_all, + fields(?block_root, ?validation_level, provenance = tracing::field::Empty) +)] pub async fn publish_block>( block_root: Option, provenanced_block: ProvenancedBlock, @@ -96,6 +103,9 @@ pub async fn publish_block>( } else { "builder" }; + let current_span = Span::current(); + current_span.record("provenance", provenance); + let block = unverified_block.inner_block(); debug!(slot = %block.slot(), "Signed block received in HTTP API"); @@ -133,8 +143,12 @@ pub async fn publish_block>( let slot = block.message().slot(); let sender_clone = network_tx.clone(); - let build_sidecar_task_handle = - spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; + let build_sidecar_task_handle = spawn_build_data_sidecar_task( + chain.clone(), + block.clone(), + unverified_blobs, + current_span.clone(), + )?; // Gossip verify the block and blobs/data columns separately. let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); @@ -347,6 +361,7 @@ fn spawn_build_data_sidecar_task( chain: Arc>, block: Arc>>, proofs_and_blobs: UnverifiedBlobs, + current_span: Span, ) -> Result>, Rejection> { chain .clone() @@ -356,6 +371,7 @@ fn spawn_build_data_sidecar_task( let Some((kzg_proofs, blobs)) = proofs_and_blobs else { return Ok((vec![], vec![])); }; + let _guard = debug_span!(parent: current_span, "build_data_sidecars").entered(); let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); if !peer_das_enabled { @@ -532,7 +548,11 @@ fn publish_column_sidecars( .saturating_sub(malicious_withhold_count); // Randomize columns before dropping the last malicious_withhold_count items data_column_sidecars.shuffle(&mut **chain.rng.lock()); - data_column_sidecars.truncate(columns_to_keep); + let dropped_indices = data_column_sidecars + .drain(columns_to_keep..) + .map(|d| d.index) + .collect::>(); + debug!(indices = ?dropped_indices, "Dropping data columns from publishing"); } let pubsub_messages = data_column_sidecars .into_iter() diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 91f9960d47..acb0188456 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1089,11 +1089,11 @@ mod tests { } fn bbroot_request_v1(fork_name: ForkName, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name, spec)) + BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name, spec)).unwrap() } fn bbroot_request_v2(fork_name: ForkName, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name, spec)) + BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name, spec)).unwrap() } fn blbroot_request(fork_name: ForkName, spec: &ChainSpec) -> BlobsByRootRequest { @@ -1104,6 +1104,7 @@ mod tests { }], &fork_context(fork_name, spec), ) + .unwrap() } fn ping_message() -> Ping { diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 39078e8d9e..9319973e59 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -481,20 +481,22 @@ pub struct BlocksByRootRequest { } impl BlocksByRootRequest { - pub fn new(block_roots: Vec, fork_context: &ForkContext) -> Self { + pub fn new(block_roots: Vec, fork_context: &ForkContext) -> Result { let max_request_blocks = fork_context .spec .max_request_blocks(fork_context.current_fork_name()); - let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks); - Self::V2(BlocksByRootRequestV2 { block_roots }) + let block_roots = RuntimeVariableList::new(block_roots, max_request_blocks) + .map_err(|e| format!("BlocksByRootRequestV2 too many roots: {e:?}"))?; + Ok(Self::V2(BlocksByRootRequestV2 { block_roots })) } - pub fn new_v1(block_roots: Vec, fork_context: &ForkContext) -> Self { + pub fn new_v1(block_roots: Vec, fork_context: &ForkContext) -> Result { let max_request_blocks = fork_context .spec .max_request_blocks(fork_context.current_fork_name()); - let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks); - Self::V1(BlocksByRootRequestV1 { block_roots }) + let block_roots = RuntimeVariableList::new(block_roots, max_request_blocks) + .map_err(|e| format!("BlocksByRootRequestV1 too many roots: {e:?}"))?; + Ok(Self::V1(BlocksByRootRequestV1 { block_roots })) } } @@ -506,12 +508,13 @@ pub struct BlobsByRootRequest { } impl BlobsByRootRequest { - pub fn new(blob_ids: Vec, fork_context: &ForkContext) -> Self { + pub fn new(blob_ids: Vec, fork_context: &ForkContext) -> Result { let max_request_blob_sidecars = fork_context .spec .max_request_blob_sidecars(fork_context.current_fork_name()); - let blob_ids = RuntimeVariableList::from_vec(blob_ids, max_request_blob_sidecars); - Self { blob_ids } + let blob_ids = RuntimeVariableList::new(blob_ids, max_request_blob_sidecars) + .map_err(|e| format!("BlobsByRootRequestV1 too many blob IDs: {e:?}"))?; + Ok(Self { blob_ids }) } } @@ -526,9 +529,10 @@ impl DataColumnsByRootRequest { pub fn new( data_column_ids: Vec>, max_request_blocks: usize, - ) -> Self { - let data_column_ids = RuntimeVariableList::from_vec(data_column_ids, max_request_blocks); - Self { data_column_ids } + ) -> Result { + let data_column_ids = RuntimeVariableList::new(data_column_ids, max_request_blocks) + .map_err(|_| "DataColumnsByRootRequest too many column IDs")?; + Ok(Self { data_column_ids }) } pub fn max_requested(&self) -> usize { diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 098d7efadb..e37f4131a7 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -831,7 +831,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() { // BlocksByRoot Request let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 { - block_roots: RuntimeVariableList::from_vec( + block_roots: RuntimeVariableList::new( vec![ Hash256::zero(), Hash256::zero(), @@ -841,7 +841,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { Hash256::zero(), ], spec.max_request_blocks(current_fork_name), - ), + ) + .unwrap(), })); // BlocksByRoot Response @@ -991,7 +992,8 @@ fn test_tcp_columns_by_root_chunked_rpc() { max_request_blocks ], max_request_blocks, - ); + ) + .unwrap(); let req_bytes = req.data_column_ids.as_ssz_bytes(); let req_decoded = DataColumnsByRootRequest { data_column_ids: >>::from_ssz_bytes( @@ -1281,7 +1283,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { // BlocksByRoot Request let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 { - block_roots: RuntimeVariableList::from_vec( + block_roots: RuntimeVariableList::new( vec![ Hash256::zero(), Hash256::zero(), @@ -1295,7 +1297,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { Hash256::zero(), ], spec.max_request_blocks(current_fork), - ), + ) + .unwrap(), })); // BlocksByRoot Response diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index a69428d5bd..ffbad1364c 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -3,6 +3,9 @@ //! TODO: These span identifiers will be used to implement selective tracing export (to be implemented), //! where only the listed root spans and their descendants will be exported to the tracing backend. +/// Root span name for publish_block +pub const SPAN_PUBLISH_BLOCK: &str = "publish_block"; + /// Data Availability checker span identifiers pub const SPAN_PENDING_COMPONENTS: &str = "pending_components"; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7d26b42c33..a53e76402e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1032,7 +1032,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match result { + match &result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { info!( @@ -1058,6 +1058,7 @@ impl NetworkBeaconProcessor { // another column arrives it either completes availability or pushes // reconstruction back a bit. let cloned_self = Arc::clone(self); + let block_root = *block_root; let send_result = self.beacon_processor_send.try_send(WorkEvent { drop_during_sync: false, work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( @@ -1106,6 +1107,16 @@ impl NetworkBeaconProcessor { ); } } + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If a + // importing a block results in `Imported`, notify. Do not notify of data column errors. + if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 85e4f04641..9ddba86b81 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1200,8 +1200,15 @@ impl NetworkBeaconProcessor { .chain .custody_columns_for_epoch(Some(request_start_epoch)); + let indices_to_retrieve = req + .columns + .iter() + .copied() + .filter(|c| available_columns.contains(c)) + .collect::>(); + for root in block_roots { - for index in available_columns { + for index in &indices_to_retrieve { match self.chain.get_data_column(&root, index) { Ok(Some(data_column_sidecar)) => { // Due to skip slots, data columns could be out of the range, we ensure they diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 557f9a2914..2027a525e6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3, +}; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, discv5::enr::{self, CombinedKey}, @@ -30,6 +32,7 @@ use lighthouse_network::{ }; use matches::assert_matches; use slot_clock::SlotClock; +use std::collections::HashSet; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; @@ -70,7 +73,7 @@ struct TestRig { beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, network_rx: mpsc::UnboundedReceiver>, - _sync_rx: mpsc::UnboundedReceiver>, + sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, _harness: BeaconChainHarness, @@ -202,7 +205,7 @@ impl TestRig { beacon_processor_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); + let (sync_tx, sync_rx) = mpsc::unbounded_channel(); // Default metadata let meta_data = if spec.is_peer_das_scheduled() { @@ -310,7 +313,7 @@ impl TestRig { beacon_processor_tx, work_journal_rx, network_rx, - _sync_rx, + sync_rx, duplicate_cache, network_beacon_processor, _harness: harness, @@ -432,6 +435,20 @@ impl TestRig { .unwrap(); } + pub fn enqueue_data_columns_by_range_request(&self, count: u64, columns: Vec) { + self.network_beacon_processor + .send_data_columns_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + DataColumnsByRangeRequest { + start_slot: 0, + count, + columns, + }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self) { self.network_beacon_processor .send_chain_segment( @@ -677,6 +694,45 @@ impl TestRig { Some(events) } } + + /// Listen for sync messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + pub async fn receive_sync_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count + && events.len() >= target_count + { + break; + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.sync_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -1365,3 +1421,115 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +/// Ensure that data column processing that results in block import sends a sync notification +#[tokio::test] +async fn test_data_column_import_notifies_sync() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + } + + let mut rig = TestRig::new(SMALL_CHAIN).await; + let block_root = rig.next_block.canonical_root(); + + // Enqueue the block first to prepare for data column processing + rig.enqueue_gossip_block(); + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) + .await; + rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Enqueue data columns which should trigger block import when complete + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + // Verify block import succeeded + assert_eq!( + rig.head_root(), + block_root, + "block should be imported and become head" + ); + + // Check that sync was notified of the successful import + let sync_messages = rig + .receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Verify we received the expected GossipBlockProcessResult message + assert_eq!( + sync_messages.len(), + 1, + "should receive exactly one sync message" + ); + match &sync_messages[0] { + SyncMessage::GossipBlockProcessResult { + block_root: msg_block_root, + imported, + } => { + assert_eq!(*msg_block_root, block_root, "block root should match"); + assert!(*imported, "block should be marked as imported"); + } + other => panic!("expected GossipBlockProcessResult, got {:?}", other), + } + } +} + +#[tokio::test] +async fn test_data_columns_by_range_request_only_returns_requested_columns() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let slot_count = 4; + + let all_custody_columns = rig + .chain + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + let available_columns: Vec = all_custody_columns.to_vec(); + + let requested_columns = vec![available_columns[0], available_columns[2]]; + + rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone()); + + let mut received_columns = Vec::new(); + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::DataColumnsByRange(data_column), + inbound_request_id: _, + } = next + { + if let Some(column) = data_column { + received_columns.push(column.index); + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + + for received_index in &received_columns { + assert!( + requested_columns.contains(received_index), + "Received column index {} was not in requested columns {:?}", + received_index, + requested_columns + ); + } + + let unique_received: HashSet<_> = received_columns.into_iter().collect(); + assert!( + !unique_received.is_empty(), + "Should have received at least some data columns" + ); +} diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ae9ac2e770..2f5eb3f689 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -40,7 +40,7 @@ use types::{ColumnIndex, Epoch, EthSpec}; pub const BACKFILL_EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. -const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8f8d8a9a5..e9f24697ac 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,7 +36,6 @@ use beacon_chain::data_availability_checker::{ use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; -use itertools::Itertools; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -653,15 +652,15 @@ impl BlockLookups { // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(errors), - ) => errors - .iter() - // Collect all peers that sent a column that was invalid. Must - // run .unique as a single peer can send multiple invalid - // columns. Penalize once to avoid insta-bans - .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) - .unique() - .collect(), + AvailabilityCheckError::InvalidColumn((index_opt, _)), + ) => { + match index_opt { + Some(index) => peer_group.of_index(index as usize).collect(), + // If no index supplied this is an un-attributable fault. In practice + // this should never happen. + None => vec![], + } + } _ => peer_group.all().collect(), }; for peer in peers_to_penalize { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1f4a14b4bf..07462a01fe 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -865,10 +865,15 @@ impl SyncNetworkContext { // - RPCError(request_id): handled by `Self::on_single_block_response` // - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a // ` RPCError(request_id)`event handled by the above method + let network_request = RequestType::BlocksByRoot( + request + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); self.network_send .send(NetworkMessage::SendRequest { peer_id, - request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)), + request: network_request, app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; @@ -959,10 +964,16 @@ impl SyncNetworkContext { }; // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call + let network_request = RequestType::BlobsByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); self.network_send .send(NetworkMessage::SendRequest { peer_id, - request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)), + request: network_request, app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index 0d176e2d8c..39886d814e 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -11,7 +11,7 @@ pub struct BlobsByRootSingleBlockRequest { } impl BlobsByRootSingleBlockRequest { - pub fn into_request(self, spec: &ForkContext) -> BlobsByRootRequest { + pub fn into_request(self, spec: &ForkContext) -> Result { BlobsByRootRequest::new( self.indices .into_iter() diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs index 6d7eabf909..8cb7f53ac5 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -9,7 +9,8 @@ use super::{ActiveRequestItems, LookupVerifyError}; pub struct BlocksByRootSingleRequest(pub Hash256); impl BlocksByRootSingleRequest { - pub fn into_request(self, fork_context: &ForkContext) -> BlocksByRootRequest { + pub fn into_request(self, fork_context: &ForkContext) -> Result { + // This should always succeed (single block root), but we return a `Result` for safety. BlocksByRootRequest::new(vec![self.0], fork_context) } } diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 253e8940b2..34df801eaa 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -21,13 +21,13 @@ impl DataColumnsByRootSingleBlockRequest { ) -> Result, &'static str> { let columns = VariableList::new(self.indices) .map_err(|_| "Number of indices exceeds total number of columns")?; - Ok(DataColumnsByRootRequest::new( + DataColumnsByRootRequest::new( vec![DataColumnsByRootIdentifier { block_root: self.block_root, columns, }], spec.max_request_blocks(fork_name), - )) + ) } } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 4180025096..b5bc10851d 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -2303,7 +2303,7 @@ mod deneb_only { block, self.unknown_parent_blobs .take() - .map(|vec| RuntimeVariableList::from_vec(vec, max_len)), + .map(|vec| RuntimeVariableList::new(vec, max_len).unwrap()), ) .unwrap(); self.rig.parent_block_processed( diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 13df83efab..61a8474a73 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -25,6 +25,7 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +ssz_types = { workspace = true } state_processing = { workspace = true } strum = { workspace = true } superstruct = { workspace = true } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 51b4bfef83..f62647ae54 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -69,6 +69,7 @@ pub enum Error { CacheBuildError(EpochCacheError), RandaoMixOutOfBounds, MilhouseError(milhouse::Error), + SszTypesError(ssz_types::Error), Compression(std::io::Error), FinalizedStateDecreasingSlot, FinalizedStateUnaligned, @@ -161,6 +162,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ssz_types::Error) -> Self { + Self::SszTypesError(e) + } +} + impl From for Error { fn from(e: hdiff::Error) -> Self { Self::Hdiff(e) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8116596aa0..7b390b39f3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2478,7 +2478,7 @@ impl, Cold: ItemStore> HotColdDB .first() .map(|blob| self.spec.max_blobs_per_block(blob.epoch())) { - let blobs = BlobSidecarList::from_vec(blobs, max_blobs_per_block as usize); + let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?; self.block_cache .lock() .put_blobs(*block_root, blobs.clone()); diff --git a/consensus/types/presets/minimal/deneb.yaml b/consensus/types/presets/minimal/deneb.yaml index c101de3162..3096c605ab 100644 --- a/consensus/types/presets/minimal/deneb.yaml +++ b/consensus/types/presets/minimal/deneb.yaml @@ -4,7 +4,7 @@ # --------------------------------------------------------------- # `uint64(4096)` FIELD_ELEMENTS_PER_BLOB: 4096 -# [customized] -MAX_BLOB_COMMITMENTS_PER_BLOCK: 32 -# [customized] `floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 5 = 10 -KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 10 +# `uint64(4096)` +MAX_BLOB_COMMITMENTS_PER_BLOCK: 4096 +# `floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 12 = 17 +KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 17 diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index d65ad9a3e0..2e8c257897 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -84,6 +84,7 @@ pub enum BlobSidecarError { MissingKzgCommitment, BeaconState(BeaconStateError), MerkleTree(MerkleTreeError), + SszTypes(ssz_types::Error), ArithError(ArithError), } @@ -283,10 +284,11 @@ impl BlobSidecar { let blob_sidecar = BlobSidecar::new(i, blob, block, *kzg_proof)?; blob_sidecars.push(Arc::new(blob_sidecar)); } - Ok(RuntimeVariableList::from_vec( + RuntimeVariableList::new( blob_sidecars, spec.max_blobs_per_block(block.epoch()) as usize, - )) + ) + .map_err(BlobSidecarError::SszTypes) } } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 03fee0fda7..92cccf2b6f 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -2034,10 +2034,11 @@ const fn default_min_epochs_for_data_column_sidecars_requests() -> u64 { fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; - RuntimeVariableList::::from_vec( + RuntimeVariableList::::new( vec![Hash256::zero(); max_request_blocks], max_request_blocks, ) + .expect("creating a RuntimeVariableList of size `max_request_blocks` should succeed") .as_ssz_bytes() .len() } @@ -2049,10 +2050,11 @@ fn max_blobs_by_root_request_common(max_request_blob_sidecars: u64) -> usize { index: 0, }; - RuntimeVariableList::::from_vec( + RuntimeVariableList::::new( vec![empty_blob_identifier; max_request_blob_sidecars], max_request_blob_sidecars, ) + .expect("creating a RuntimeVariableList of size `max_request_blob_sidecars` should succeed") .as_ssz_bytes() .len() } @@ -2065,10 +2067,11 @@ fn max_data_columns_by_root_request_common(max_request_blocks: u64) columns: VariableList::from(vec![0; E::number_of_columns()]), }; - RuntimeVariableList::>::from_vec( + RuntimeVariableList::>::new( vec![empty_data_columns_by_root_id; max_request_blocks], max_request_blocks, ) + .expect("creating a RuntimeVariableList of size `max_request_blocks` should succeed") .as_ssz_bytes() .len() } diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 03bfffd8fc..f4c7abefe7 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -3,8 +3,8 @@ use crate::*; use safe_arith::SafeArith; use serde::{Deserialize, Serialize}; use ssz_types::typenum::{ - U0, U1, U2, U4, U8, U10, U16, U17, U32, U64, U128, U256, U512, U625, U1024, U2048, U4096, - U8192, U65536, U131072, U262144, U1048576, U16777216, U33554432, U134217728, U1073741824, + U0, U1, U2, U4, U8, U16, U17, U32, U64, U128, U256, U512, U625, U1024, U2048, U4096, U8192, + U65536, U131072, U262144, U1048576, U16777216, U33554432, U134217728, U1073741824, U1099511627776, UInt, bit::B0, }; use std::fmt::{self, Debug}; @@ -517,8 +517,8 @@ impl EthSpec for MinimalEthSpec { type MaxWithdrawalsPerPayload = U4; type FieldElementsPerBlob = U4096; type BytesPerBlob = U131072; - type MaxBlobCommitmentsPerBlock = U32; - type KzgCommitmentInclusionProofDepth = U10; + type MaxBlobCommitmentsPerBlock = U4096; + type KzgCommitmentInclusionProofDepth = U17; type PendingPartialWithdrawalsLimit = U64; type PendingConsolidationsLimit = U64; type FieldElementsPerCell = U64; diff --git a/consensus/types/src/runtime_var_list.rs b/consensus/types/src/runtime_var_list.rs index 2a8899e203..dcb98538b7 100644 --- a/consensus/types/src/runtime_var_list.rs +++ b/consensus/types/src/runtime_var_list.rs @@ -23,15 +23,15 @@ use tree_hash::{Hash256, MerkleHasher, PackedEncoding, TreeHash, TreeHashType}; /// let base: Vec = vec![1, 2, 3, 4]; /// /// // Create a `RuntimeVariableList` from a `Vec` that has the expected length. -/// let exact: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base.clone(), 4); +/// let exact: RuntimeVariableList<_> = RuntimeVariableList::new(base.clone(), 4).unwrap(); /// assert_eq!(&exact[..], &[1, 2, 3, 4]); /// -/// // Create a `RuntimeVariableList` from a `Vec` that is too long and the `Vec` is truncated. -/// let short: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base.clone(), 3); -/// assert_eq!(&short[..], &[1, 2, 3]); +/// // Create a `RuntimeVariableList` from a `Vec` that is too long you'll get an error. +/// let err = RuntimeVariableList::new(base.clone(), 3).unwrap_err(); +/// assert_eq!(err, ssz_types::Error::OutOfBounds { i: 4, len: 3 }); /// /// // Create a `RuntimeVariableList` from a `Vec` that is shorter than the maximum. -/// let mut long: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base, 5); +/// let mut long: RuntimeVariableList<_> = RuntimeVariableList::new(base, 5).unwrap(); /// assert_eq!(&long[..], &[1, 2, 3, 4]); /// /// // Push a value to if it does not exceed the maximum @@ -65,12 +65,6 @@ impl RuntimeVariableList { } } - pub fn from_vec(mut vec: Vec, max_len: usize) -> Self { - vec.truncate(max_len); - - Self { vec, max_len } - } - /// Create an empty list with the given `max_len`. pub fn empty(max_len: usize) -> Self { Self { @@ -231,14 +225,13 @@ where { // first parse out a Vec using the Vec impl you already have let vec: Vec = Vec::context_deserialize(deserializer, context.0)?; - if vec.len() > context.1 { - return Err(DeError::custom(format!( - "RuntimeVariableList lengh {} exceeds max_len {}", - vec.len(), - context.1 - ))); - } - Ok(RuntimeVariableList::from_vec(vec, context.1)) + let vec_len = vec.len(); + RuntimeVariableList::new(vec, context.1).map_err(|e| { + DeError::custom(format!( + "RuntimeVariableList length {} exceeds max_len {}: {e:?}", + vec_len, context.1, + )) + }) } } @@ -323,7 +316,8 @@ mod test { fn indexing() { let vec = vec![1, 2]; - let mut fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 8192); + let mut fixed: RuntimeVariableList = + RuntimeVariableList::new(vec.clone(), 8192).unwrap(); assert_eq!(fixed[0], 1); assert_eq!(&fixed[0..1], &vec[0..1]); @@ -335,24 +329,25 @@ mod test { #[test] fn length() { + // Too long. let vec = vec![42; 5]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 4); - assert_eq!(&fixed[..], &vec[0..4]); + let err = RuntimeVariableList::::new(vec.clone(), 4).unwrap_err(); + assert_eq!(err, Error::OutOfBounds { i: 5, len: 4 }); let vec = vec![42; 3]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec.clone(), 4).unwrap(); assert_eq!(&fixed[0..3], &vec[..]); assert_eq!(&fixed[..], &vec![42, 42, 42][..]); let vec = vec![]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec, 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec, 4).unwrap(); assert_eq!(&fixed[..], &[] as &[u64]); } #[test] fn deref() { let vec = vec![0, 2, 4, 6]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec, 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec, 4).unwrap(); assert_eq!(fixed.first(), Some(&0)); assert_eq!(fixed.get(3), Some(&6)); @@ -361,7 +356,7 @@ mod test { #[test] fn encode() { - let vec: RuntimeVariableList = RuntimeVariableList::from_vec(vec![0; 2], 2); + let vec: RuntimeVariableList = RuntimeVariableList::new(vec![0; 2], 2).unwrap(); assert_eq!(vec.as_ssz_bytes(), vec![0, 0, 0, 0]); assert_eq!( as Encode>::ssz_fixed_len(), 4); } @@ -378,7 +373,7 @@ mod test { #[test] fn u16_len_8() { - round_trip::(RuntimeVariableList::from_vec(vec![42; 8], 8)); - round_trip::(RuntimeVariableList::from_vec(vec![0; 8], 8)); + round_trip::(RuntimeVariableList::new(vec![42; 8], 8).unwrap()); + round_trip::(RuntimeVariableList::new(vec![0; 8], 8).unwrap()); } } diff --git a/crypto/kzg/Cargo.toml b/crypto/kzg/Cargo.toml index bfe0f19cd0..432fcc1792 100644 --- a/crypto/kzg/Cargo.toml +++ b/crypto/kzg/Cargo.toml @@ -14,9 +14,11 @@ ethereum_serde_utils = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } hex = { workspace = true } +rayon = { workspace = true } rust_eth_kzg = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tracing = { workspace = true } tree_hash = { workspace = true } [dev-dependencies] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index ddaddd1ada..1b8d46100f 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -3,6 +3,7 @@ mod kzg_proof; pub mod trusted_setup; use rust_eth_kzg::{CellIndex, DASContext}; +use std::collections::HashMap; use std::fmt::Debug; pub use crate::{ @@ -17,10 +18,12 @@ pub use c_kzg::{ }; use crate::trusted_setup::load_trusted_setup; +use rayon::prelude::*; pub use rust_eth_kzg::{ constants::{BYTES_PER_CELL, CELLS_PER_EXT_BLOB}, Cell, CellIndex as CellID, CellRef, TrustedSetup as PeerDASTrustedSetup, }; +use tracing::instrument; /// Disables the fixed-base multi-scalar multiplication optimization for computing /// cell KZG proofs, because `rust-eth-kzg` already handles the precomputation. @@ -229,31 +232,85 @@ impl Kzg { } /// Verifies a batch of cell-proof-commitment triplets. + #[instrument(skip_all, level = "debug", fields(cells = cells.len()))] pub fn verify_cell_proof_batch( &self, cells: &[CellRef<'_>], kzg_proofs: &[Bytes48], - columns: Vec, + indices: Vec, kzg_commitments: &[Bytes48], - ) -> Result<(), Error> { - let proofs: Vec<_> = kzg_proofs.iter().map(|proof| proof.as_ref()).collect(); - let commitments: Vec<_> = kzg_commitments - .iter() - .map(|commitment| commitment.as_ref()) - .collect(); - let verification_result = self.context().verify_cell_kzg_proof_batch( - commitments.to_vec(), - &columns, - cells.to_vec(), - proofs.to_vec(), - ); + ) -> Result<(), (Option, Error)> { + let mut column_groups: HashMap> = HashMap::new(); - // Modify the result so it matches roughly what the previous method was doing. - match verification_result { - Ok(_) => Ok(()), - Err(e) if e.is_proof_invalid() => Err(Error::KzgVerificationFailed), - Err(e) => Err(Error::PeerDASKZG(e)), + let expected_len = cells.len(); + + // This check is already made in `validate_data_columns`. However we add it here so that ef consensus spec tests pass + // and to avoid any potential footguns in the future. Note that by catching the error here and not in `validate_data_columns` + // the error becomes non-attributable. + if kzg_proofs.len() != expected_len + || indices.len() != expected_len + || kzg_commitments.len() != expected_len + { + return Err(( + None, + Error::InconsistentArrayLength("Invalid data column".to_string()), + )); } + + for (((cell, proof), &index), commitment) in cells + .iter() + .zip(kzg_proofs.iter()) + .zip(indices.iter()) + .zip(kzg_commitments.iter()) + { + column_groups + .entry(index) + .or_default() + .push((cell, *proof, *commitment)); + } + + column_groups + .into_par_iter() + .map(|(column_index, column_data)| { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut commitments = Vec::new(); + + for (cell, proof, commitment) in &column_data { + cells.push(*cell); + proofs.push(proof.as_ref()); + commitments.push(commitment.as_ref()); + } + + // Create per-chunk tracing span for visualizing parallel processing. + // This is safe from span explosion as we have at most 128 chunks, + // i.e. the number of column indices. + let _span = tracing::debug_span!( + "verify_cell_proof_chunk", + cells = cells.len(), + column_index, + verification_result = tracing::field::Empty, + ) + .entered(); + + let verification_result = self.context().verify_cell_kzg_proof_batch( + commitments, + &vec![column_index; cells.len()], // All column_data here is from the same index + cells, + proofs, + ); + + match verification_result { + Ok(_) => Ok(()), + Err(e) if e.is_proof_invalid() => { + Err((Some(column_index), Error::KzgVerificationFailed)) + } + Err(e) => Err((Some(column_index), Error::PeerDASKZG(e))), + } + }) + .collect::, (Option, Error)>>()?; + + Ok(()) } pub fn recover_cells_and_compute_kzg_proofs( diff --git a/scripts/tests/doppelganger_protection.sh b/scripts/tests/doppelganger_protection.sh index 86c9705ee4..9009d49d58 100755 --- a/scripts/tests/doppelganger_protection.sh +++ b/scripts/tests/doppelganger_protection.sh @@ -60,7 +60,7 @@ DELAY=$(( $SECONDS_PER_SLOT * 32 + $GENESIS_DELAY + $MIN_GENESIS_TIME - $CURRENT sleep $DELAY # Use BN2 for the next validator client -bn_2_url=$(kurtosis service inspect $ENCLAVE_NAME cl-2-lighthouse-geth | grep 'enr-address' | cut -d'=' -f2) +bn_2_url=$(kurtosis service inspect $ENCLAVE_NAME cl-2-lighthouse-geth | grep -oP '(?<=--enr-address=)[^ ]+') bn_2_port=4000 if [[ "$BEHAVIOR" == "failure" ]]; then diff --git a/testing/ef_tests/Makefile b/testing/ef_tests/Makefile index eb3631aa91..0c6fd50dfd 100644 --- a/testing/ef_tests/Makefile +++ b/testing/ef_tests/Makefile @@ -1,6 +1,6 @@ # To download/extract nightly tests, run: # CONSENSUS_SPECS_TEST_VERSION=nightly make -CONSENSUS_SPECS_TEST_VERSION ?= v1.6.0-alpha.4 +CONSENSUS_SPECS_TEST_VERSION ?= v1.6.0-alpha.5 REPO_NAME := consensus-spec-tests OUTPUT_DIR := ./$(REPO_NAME) diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index 9c168b6fa2..821287ce25 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -57,6 +57,8 @@ excluded_paths = [ # Ignore full epoch tests for now (just test the sub-transitions). "tests/.*/.*/epoch_processing/.*/pre_epoch.ssz_snappy", "tests/.*/.*/epoch_processing/.*/post_epoch.ssz_snappy", + # Ignore gloas tests for now + "tests/.*/gloas/.*", ] diff --git a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs index e3edc0df0a..7973af861f 100644 --- a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs +++ b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs @@ -53,7 +53,7 @@ impl Case for KZGVerifyCellKZGProofBatch { let kzg = get_kzg(); match kzg.verify_cell_proof_batch(&cells, &proofs, cell_indices, &commitments) { Ok(_) => Ok(true), - Err(KzgError::KzgVerificationFailed) => Ok(false), + Err((_, KzgError::KzgVerificationFailed)) => Ok(false), Err(e) => Err(Error::InternalError(format!( "Failed to validate cells: {:?}", e