From 8901c7417d4c306e036cea34482fbc06b8d9b4af Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 27 Aug 2025 11:32:17 +1000 Subject: [PATCH 1/9] Notify lookup after gossip data column processing resulted in an import (#7940) When gossip data column processing completes and results in a block import, sync is currently not notified of the successful import. This is inconsistent with how blob processing and block processing both notify sync. This fix ensures lookup sync receives block import notifications when blocks become available through gossip data column. --- .../gossip_methods.rs | 13 ++- .../src/network_beacon_processor/tests.rs | 104 +++++++++++++++++- 2 files changed, 113 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 7d26b42c33..a53e76402e 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1032,7 +1032,7 @@ impl NetworkBeaconProcessor { .await; register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column"); - match result { + match &result { Ok(availability) => match availability { AvailabilityProcessingStatus::Imported(block_root) => { info!( @@ -1058,6 +1058,7 @@ impl NetworkBeaconProcessor { // another column arrives it either completes availability or pushes // reconstruction back a bit. let cloned_self = Arc::clone(self); + let block_root = *block_root; let send_result = self.beacon_processor_send.try_send(WorkEvent { drop_during_sync: false, work: Work::Reprocess(ReprocessQueueMessage::DelayColumnReconstruction( @@ -1106,6 +1107,16 @@ impl NetworkBeaconProcessor { ); } } + + // If a block is in the da_checker, sync maybe awaiting for an event when block is finally + // imported. A block can become imported both after processing a block or data column. If a + // importing a block results in `Imported`, notify. Do not notify of data column errors. + if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) { + self.send_sync_message(SyncMessage::GossipBlockProcessResult { + block_root, + imported: true, + }); + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 557f9a2914..916548eb9d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -70,7 +70,7 @@ struct TestRig { beacon_processor_tx: BeaconProcessorSend, work_journal_rx: mpsc::Receiver<&'static str>, network_rx: mpsc::UnboundedReceiver>, - _sync_rx: mpsc::UnboundedReceiver>, + sync_rx: mpsc::UnboundedReceiver>, duplicate_cache: DuplicateCache, network_beacon_processor: Arc>, _harness: BeaconChainHarness, @@ -202,7 +202,7 @@ impl TestRig { beacon_processor_rx, } = BeaconProcessorChannels::new(&beacon_processor_config); - let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); + let (sync_tx, sync_rx) = mpsc::unbounded_channel(); // Default metadata let meta_data = if spec.is_peer_das_scheduled() { @@ -310,7 +310,7 @@ impl TestRig { beacon_processor_tx, work_journal_rx, network_rx, - _sync_rx, + sync_rx, duplicate_cache, network_beacon_processor, _harness: harness, @@ -677,6 +677,45 @@ impl TestRig { Some(events) } } + + /// Listen for sync messages and collect them for a specified duration or until reaching a count. + /// + /// Returns None if no messages were received, or Some(Vec) containing the received messages. + pub async fn receive_sync_messages_with_timeout( + &mut self, + timeout: Duration, + count: Option, + ) -> Option>> { + let mut events = vec![]; + + let timeout_future = tokio::time::sleep(timeout); + tokio::pin!(timeout_future); + + loop { + // Break if we've received the requested count of messages + if let Some(target_count) = count + && events.len() >= target_count + { + break; + } + + tokio::select! { + _ = &mut timeout_future => break, + maybe_msg = self.sync_rx.recv() => { + match maybe_msg { + Some(msg) => events.push(msg), + None => break, // Channel closed + } + } + } + } + + if events.is_empty() { + None + } else { + Some(events) + } + } } fn junk_peer_id() -> PeerId { @@ -1365,3 +1404,62 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +/// Ensure that data column processing that results in block import sends a sync notification +#[tokio::test] +async fn test_data_column_import_notifies_sync() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + } + + let mut rig = TestRig::new(SMALL_CHAIN).await; + let block_root = rig.next_block.canonical_root(); + + // Enqueue the block first to prepare for data column processing + rig.enqueue_gossip_block(); + rig.assert_event_journal_completes(&[WorkType::GossipBlock]) + .await; + rig.receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Enqueue data columns which should trigger block import when complete + let num_data_columns = rig.next_data_columns.as_ref().map(|c| c.len()).unwrap_or(0); + if num_data_columns > 0 { + for i in 0..num_data_columns { + rig.enqueue_gossip_data_columns(i); + rig.assert_event_journal_completes(&[WorkType::GossipDataColumnSidecar]) + .await; + } + + // Verify block import succeeded + assert_eq!( + rig.head_root(), + block_root, + "block should be imported and become head" + ); + + // Check that sync was notified of the successful import + let sync_messages = rig + .receive_sync_messages_with_timeout(Duration::from_millis(100), Some(1)) + .await + .expect("should receive sync message"); + + // Verify we received the expected GossipBlockProcessResult message + assert_eq!( + sync_messages.len(), + 1, + "should receive exactly one sync message" + ); + match &sync_messages[0] { + SyncMessage::GossipBlockProcessResult { + block_root: msg_block_root, + imported, + } => { + assert_eq!(*msg_block_root, block_root, "block root should match"); + assert!(*imported, "block should be marked as imported"); + } + other => panic!("expected GossipBlockProcessResult, got {:?}", other), + } + } +} From 2b33fe662039a322182ef2aa625d00a35b362575 Mon Sep 17 00:00:00 2001 From: Barnabas Busa Date: Wed, 27 Aug 2025 05:59:21 +0200 Subject: [PATCH 2/9] Update to spec v1.6.0-alpha.5 (#7910) - https://github.com/ethereum/consensus-specs/pull/4508 --- consensus/types/presets/minimal/deneb.yaml | 8 ++++---- consensus/types/src/eth_spec.rs | 8 ++++---- testing/ef_tests/Makefile | 2 +- testing/ef_tests/check_all_files_accessed.py | 2 ++ 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/consensus/types/presets/minimal/deneb.yaml b/consensus/types/presets/minimal/deneb.yaml index c101de3162..3096c605ab 100644 --- a/consensus/types/presets/minimal/deneb.yaml +++ b/consensus/types/presets/minimal/deneb.yaml @@ -4,7 +4,7 @@ # --------------------------------------------------------------- # `uint64(4096)` FIELD_ELEMENTS_PER_BLOB: 4096 -# [customized] -MAX_BLOB_COMMITMENTS_PER_BLOCK: 32 -# [customized] `floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 5 = 10 -KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 10 +# `uint64(4096)` +MAX_BLOB_COMMITMENTS_PER_BLOCK: 4096 +# `floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 12 = 17 +KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 17 diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index e9a291f6c7..e001cf0e4e 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -3,8 +3,8 @@ use crate::*; use safe_arith::SafeArith; use serde::{Deserialize, Serialize}; use ssz_types::typenum::{ - U0, U1, U2, U4, U8, U10, U16, U17, U32, U64, U128, U256, U512, U625, U1024, U2048, U4096, - U8192, U65536, U131072, U262144, U1048576, U16777216, U33554432, U134217728, U1073741824, + U0, U1, U2, U4, U8, U16, U17, U32, U64, U128, U256, U512, U625, U1024, U2048, U4096, U8192, + U65536, U131072, U262144, U1048576, U16777216, U33554432, U134217728, U1073741824, U1099511627776, UInt, bit::B0, }; use std::fmt::{self, Debug}; @@ -490,8 +490,8 @@ impl EthSpec for MinimalEthSpec { type MaxWithdrawalsPerPayload = U4; type FieldElementsPerBlob = U4096; type BytesPerBlob = U131072; - type MaxBlobCommitmentsPerBlock = U32; - type KzgCommitmentInclusionProofDepth = U10; + type MaxBlobCommitmentsPerBlock = U4096; + type KzgCommitmentInclusionProofDepth = U17; type PendingPartialWithdrawalsLimit = U64; type PendingConsolidationsLimit = U64; type FieldElementsPerCell = U64; diff --git a/testing/ef_tests/Makefile b/testing/ef_tests/Makefile index eb3631aa91..0c6fd50dfd 100644 --- a/testing/ef_tests/Makefile +++ b/testing/ef_tests/Makefile @@ -1,6 +1,6 @@ # To download/extract nightly tests, run: # CONSENSUS_SPECS_TEST_VERSION=nightly make -CONSENSUS_SPECS_TEST_VERSION ?= v1.6.0-alpha.4 +CONSENSUS_SPECS_TEST_VERSION ?= v1.6.0-alpha.5 REPO_NAME := consensus-spec-tests OUTPUT_DIR := ./$(REPO_NAME) diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index 9c168b6fa2..821287ce25 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -57,6 +57,8 @@ excluded_paths = [ # Ignore full epoch tests for now (just test the sub-transitions). "tests/.*/.*/epoch_processing/.*/pre_epoch.ssz_snappy", "tests/.*/.*/epoch_processing/.*/post_epoch.ssz_snappy", + # Ignore gloas tests for now + "tests/.*/gloas/.*", ] From ccf03e1c88094554032588d3c4ada098467d7cd0 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Tue, 26 Aug 2025 22:00:34 -0700 Subject: [PATCH 3/9] Fix data columns by range returning all columns (#7942) N/A In https://github.com/sigp/lighthouse/pull/7897 , we seem to have modified data columns by range to return all the columns we have for the requested epoch disregarding what columns the peer requested. --- .../network_beacon_processor/rpc_methods.rs | 9 ++- .../src/network_beacon_processor/tests.rs | 72 ++++++++++++++++++- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 85e4f04641..9ddba86b81 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -1200,8 +1200,15 @@ impl NetworkBeaconProcessor { .chain .custody_columns_for_epoch(Some(request_start_epoch)); + let indices_to_retrieve = req + .columns + .iter() + .copied() + .filter(|c| available_columns.contains(c)) + .collect::>(); + for root in block_roots { - for index in available_columns { + for index in &indices_to_retrieve { match self.chain.get_data_column(&root, index) { Ok(Some(data_column_sidecar)) => { // Due to skip slots, data columns could be out of the range, we ensure they diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 916548eb9d..2027a525e6 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -21,7 +21,9 @@ use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; use lighthouse_network::rpc::InboundRequestId; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, MetaDataV3, +}; use lighthouse_network::{ Client, MessageId, NetworkConfig, NetworkGlobals, PeerId, Response, discv5::enr::{self, CombinedKey}, @@ -30,6 +32,7 @@ use lighthouse_network::{ }; use matches::assert_matches; use slot_clock::SlotClock; +use std::collections::HashSet; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; @@ -432,6 +435,20 @@ impl TestRig { .unwrap(); } + pub fn enqueue_data_columns_by_range_request(&self, count: u64, columns: Vec) { + self.network_beacon_processor + .send_data_columns_by_range_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + DataColumnsByRangeRequest { + start_slot: 0, + count, + columns, + }, + ) + .unwrap(); + } + pub fn enqueue_backfill_batch(&self) { self.network_beacon_processor .send_chain_segment( @@ -1463,3 +1480,56 @@ async fn test_data_column_import_notifies_sync() { } } } + +#[tokio::test] +async fn test_data_columns_by_range_request_only_returns_requested_columns() { + if test_spec::().fulu_fork_epoch.is_none() { + return; + }; + + let mut rig = TestRig::new(64).await; + let slot_count = 4; + + let all_custody_columns = rig + .chain + .sampling_columns_for_epoch(rig.chain.epoch().unwrap()); + let available_columns: Vec = all_custody_columns.to_vec(); + + let requested_columns = vec![available_columns[0], available_columns[2]]; + + rig.enqueue_data_columns_by_range_request(slot_count, requested_columns.clone()); + + let mut received_columns = Vec::new(); + + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::DataColumnsByRange(data_column), + inbound_request_id: _, + } = next + { + if let Some(column) = data_column { + received_columns.push(column.index); + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + + for received_index in &received_columns { + assert!( + requested_columns.contains(received_index), + "Received column index {} was not in requested columns {:?}", + received_index, + requested_columns + ); + } + + let unique_received: HashSet<_> = received_columns.into_iter().collect(); + assert!( + !unique_received.is_empty(), + "Should have received at least some data columns" + ); +} From d235f2c69708c91debec645cd70eac4f98d67dea Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 27 Aug 2025 16:52:14 +1000 Subject: [PATCH 4/9] Delete `RuntimeVariableList::from_vec` (#7930) This method is a footgun because it truncates the list. It is the source of a recent bug: - https://github.com/sigp/lighthouse/pull/7927 - Delete uses of `RuntimeVariableList::from_vec` and replace them with `::new` which does validation and can fail. - Propagate errors where possible, unwrap in tests and use `expect` for obviously-safe uses (in `chain_spec.rs`). --- Cargo.lock | 1 + .../lighthouse_network/src/rpc/codec.rs | 5 +- .../lighthouse_network/src/rpc/methods.rs | 28 +++++----- .../lighthouse_network/tests/rpc_tests.rs | 13 +++-- .../network/src/sync/network_context.rs | 15 +++++- .../network_context/requests/blobs_by_root.rs | 2 +- .../requests/blocks_by_root.rs | 3 +- .../requests/data_columns_by_root.rs | 4 +- beacon_node/network/src/sync/tests/lookups.rs | 2 +- beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/errors.rs | 7 +++ beacon_node/store/src/hot_cold_store.rs | 2 +- consensus/types/src/blob_sidecar.rs | 6 ++- consensus/types/src/chain_spec.rs | 9 ++-- consensus/types/src/runtime_var_list.rs | 51 +++++++++---------- 15 files changed, 89 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c637a1847..5e89dc2a90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8978,6 +8978,7 @@ dependencies = [ "safe_arith", "serde", "smallvec", + "ssz_types", "state_processing", "strum", "superstruct", diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 91f9960d47..acb0188456 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -1089,11 +1089,11 @@ mod tests { } fn bbroot_request_v1(fork_name: ForkName, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name, spec)) + BlocksByRootRequest::new_v1(vec![Hash256::zero()], &fork_context(fork_name, spec)).unwrap() } fn bbroot_request_v2(fork_name: ForkName, spec: &ChainSpec) -> BlocksByRootRequest { - BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name, spec)) + BlocksByRootRequest::new(vec![Hash256::zero()], &fork_context(fork_name, spec)).unwrap() } fn blbroot_request(fork_name: ForkName, spec: &ChainSpec) -> BlobsByRootRequest { @@ -1104,6 +1104,7 @@ mod tests { }], &fork_context(fork_name, spec), ) + .unwrap() } fn ping_message() -> Ping { diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 39078e8d9e..9319973e59 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -481,20 +481,22 @@ pub struct BlocksByRootRequest { } impl BlocksByRootRequest { - pub fn new(block_roots: Vec, fork_context: &ForkContext) -> Self { + pub fn new(block_roots: Vec, fork_context: &ForkContext) -> Result { let max_request_blocks = fork_context .spec .max_request_blocks(fork_context.current_fork_name()); - let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks); - Self::V2(BlocksByRootRequestV2 { block_roots }) + let block_roots = RuntimeVariableList::new(block_roots, max_request_blocks) + .map_err(|e| format!("BlocksByRootRequestV2 too many roots: {e:?}"))?; + Ok(Self::V2(BlocksByRootRequestV2 { block_roots })) } - pub fn new_v1(block_roots: Vec, fork_context: &ForkContext) -> Self { + pub fn new_v1(block_roots: Vec, fork_context: &ForkContext) -> Result { let max_request_blocks = fork_context .spec .max_request_blocks(fork_context.current_fork_name()); - let block_roots = RuntimeVariableList::from_vec(block_roots, max_request_blocks); - Self::V1(BlocksByRootRequestV1 { block_roots }) + let block_roots = RuntimeVariableList::new(block_roots, max_request_blocks) + .map_err(|e| format!("BlocksByRootRequestV1 too many roots: {e:?}"))?; + Ok(Self::V1(BlocksByRootRequestV1 { block_roots })) } } @@ -506,12 +508,13 @@ pub struct BlobsByRootRequest { } impl BlobsByRootRequest { - pub fn new(blob_ids: Vec, fork_context: &ForkContext) -> Self { + pub fn new(blob_ids: Vec, fork_context: &ForkContext) -> Result { let max_request_blob_sidecars = fork_context .spec .max_request_blob_sidecars(fork_context.current_fork_name()); - let blob_ids = RuntimeVariableList::from_vec(blob_ids, max_request_blob_sidecars); - Self { blob_ids } + let blob_ids = RuntimeVariableList::new(blob_ids, max_request_blob_sidecars) + .map_err(|e| format!("BlobsByRootRequestV1 too many blob IDs: {e:?}"))?; + Ok(Self { blob_ids }) } } @@ -526,9 +529,10 @@ impl DataColumnsByRootRequest { pub fn new( data_column_ids: Vec>, max_request_blocks: usize, - ) -> Self { - let data_column_ids = RuntimeVariableList::from_vec(data_column_ids, max_request_blocks); - Self { data_column_ids } + ) -> Result { + let data_column_ids = RuntimeVariableList::new(data_column_ids, max_request_blocks) + .map_err(|_| "DataColumnsByRootRequest too many column IDs")?; + Ok(Self { data_column_ids }) } pub fn max_requested(&self) -> usize { diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index 098d7efadb..e37f4131a7 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -831,7 +831,7 @@ fn test_tcp_blocks_by_root_chunked_rpc() { // BlocksByRoot Request let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 { - block_roots: RuntimeVariableList::from_vec( + block_roots: RuntimeVariableList::new( vec![ Hash256::zero(), Hash256::zero(), @@ -841,7 +841,8 @@ fn test_tcp_blocks_by_root_chunked_rpc() { Hash256::zero(), ], spec.max_request_blocks(current_fork_name), - ), + ) + .unwrap(), })); // BlocksByRoot Response @@ -991,7 +992,8 @@ fn test_tcp_columns_by_root_chunked_rpc() { max_request_blocks ], max_request_blocks, - ); + ) + .unwrap(); let req_bytes = req.data_column_ids.as_ssz_bytes(); let req_decoded = DataColumnsByRootRequest { data_column_ids: >>::from_ssz_bytes( @@ -1281,7 +1283,7 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { // BlocksByRoot Request let rpc_request = RequestType::BlocksByRoot(BlocksByRootRequest::V2(BlocksByRootRequestV2 { - block_roots: RuntimeVariableList::from_vec( + block_roots: RuntimeVariableList::new( vec![ Hash256::zero(), Hash256::zero(), @@ -1295,7 +1297,8 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { Hash256::zero(), ], spec.max_request_blocks(current_fork), - ), + ) + .unwrap(), })); // BlocksByRoot Response diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1f4a14b4bf..07462a01fe 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -865,10 +865,15 @@ impl SyncNetworkContext { // - RPCError(request_id): handled by `Self::on_single_block_response` // - Disconnect(peer_id) handled by `Self::peer_disconnected``which converts it to a // ` RPCError(request_id)`event handled by the above method + let network_request = RequestType::BlocksByRoot( + request + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); self.network_send .send(NetworkMessage::SendRequest { peer_id, - request: RequestType::BlocksByRoot(request.into_request(&self.fork_context)), + request: network_request, app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; @@ -959,10 +964,16 @@ impl SyncNetworkContext { }; // Lookup sync event safety: Refer to `Self::block_lookup_request` `network_send.send` call + let network_request = RequestType::BlobsByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); self.network_send .send(NetworkMessage::SendRequest { peer_id, - request: RequestType::BlobsByRoot(request.clone().into_request(&self.fork_context)), + request: network_request, app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; diff --git a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs index 0d176e2d8c..39886d814e 100644 --- a/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blobs_by_root.rs @@ -11,7 +11,7 @@ pub struct BlobsByRootSingleBlockRequest { } impl BlobsByRootSingleBlockRequest { - pub fn into_request(self, spec: &ForkContext) -> BlobsByRootRequest { + pub fn into_request(self, spec: &ForkContext) -> Result { BlobsByRootRequest::new( self.indices .into_iter() diff --git a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs index 6d7eabf909..8cb7f53ac5 100644 --- a/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/blocks_by_root.rs @@ -9,7 +9,8 @@ use super::{ActiveRequestItems, LookupVerifyError}; pub struct BlocksByRootSingleRequest(pub Hash256); impl BlocksByRootSingleRequest { - pub fn into_request(self, fork_context: &ForkContext) -> BlocksByRootRequest { + pub fn into_request(self, fork_context: &ForkContext) -> Result { + // This should always succeed (single block root), but we return a `Result` for safety. BlocksByRootRequest::new(vec![self.0], fork_context) } } diff --git a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs index 253e8940b2..34df801eaa 100644 --- a/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs +++ b/beacon_node/network/src/sync/network_context/requests/data_columns_by_root.rs @@ -21,13 +21,13 @@ impl DataColumnsByRootSingleBlockRequest { ) -> Result, &'static str> { let columns = VariableList::new(self.indices) .map_err(|_| "Number of indices exceeds total number of columns")?; - Ok(DataColumnsByRootRequest::new( + DataColumnsByRootRequest::new( vec![DataColumnsByRootIdentifier { block_root: self.block_root, columns, }], spec.max_request_blocks(fork_name), - )) + ) } } diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 4180025096..b5bc10851d 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -2303,7 +2303,7 @@ mod deneb_only { block, self.unknown_parent_blobs .take() - .map(|vec| RuntimeVariableList::from_vec(vec, max_len)), + .map(|vec| RuntimeVariableList::new(vec, max_len).unwrap()), ) .unwrap(); self.rig.parent_block_processed( diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 13df83efab..61a8474a73 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -25,6 +25,7 @@ redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } +ssz_types = { workspace = true } state_processing = { workspace = true } strum = { workspace = true } superstruct = { workspace = true } diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 51b4bfef83..f62647ae54 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -69,6 +69,7 @@ pub enum Error { CacheBuildError(EpochCacheError), RandaoMixOutOfBounds, MilhouseError(milhouse::Error), + SszTypesError(ssz_types::Error), Compression(std::io::Error), FinalizedStateDecreasingSlot, FinalizedStateUnaligned, @@ -161,6 +162,12 @@ impl From for Error { } } +impl From for Error { + fn from(e: ssz_types::Error) -> Self { + Self::SszTypesError(e) + } +} + impl From for Error { fn from(e: hdiff::Error) -> Self { Self::Hdiff(e) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8116596aa0..7b390b39f3 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2478,7 +2478,7 @@ impl, Cold: ItemStore> HotColdDB .first() .map(|blob| self.spec.max_blobs_per_block(blob.epoch())) { - let blobs = BlobSidecarList::from_vec(blobs, max_blobs_per_block as usize); + let blobs = BlobSidecarList::new(blobs, max_blobs_per_block as usize)?; self.block_cache .lock() .put_blobs(*block_root, blobs.clone()); diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index d65ad9a3e0..2e8c257897 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -84,6 +84,7 @@ pub enum BlobSidecarError { MissingKzgCommitment, BeaconState(BeaconStateError), MerkleTree(MerkleTreeError), + SszTypes(ssz_types::Error), ArithError(ArithError), } @@ -283,10 +284,11 @@ impl BlobSidecar { let blob_sidecar = BlobSidecar::new(i, blob, block, *kzg_proof)?; blob_sidecars.push(Arc::new(blob_sidecar)); } - Ok(RuntimeVariableList::from_vec( + RuntimeVariableList::new( blob_sidecars, spec.max_blobs_per_block(block.epoch()) as usize, - )) + ) + .map_err(BlobSidecarError::SszTypes) } } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 709b4f28fe..a1005d904a 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -2001,10 +2001,11 @@ const fn default_min_epochs_for_data_column_sidecars_requests() -> u64 { fn max_blocks_by_root_request_common(max_request_blocks: u64) -> usize { let max_request_blocks = max_request_blocks as usize; - RuntimeVariableList::::from_vec( + RuntimeVariableList::::new( vec![Hash256::zero(); max_request_blocks], max_request_blocks, ) + .expect("creating a RuntimeVariableList of size `max_request_blocks` should succeed") .as_ssz_bytes() .len() } @@ -2016,10 +2017,11 @@ fn max_blobs_by_root_request_common(max_request_blob_sidecars: u64) -> usize { index: 0, }; - RuntimeVariableList::::from_vec( + RuntimeVariableList::::new( vec![empty_blob_identifier; max_request_blob_sidecars], max_request_blob_sidecars, ) + .expect("creating a RuntimeVariableList of size `max_request_blob_sidecars` should succeed") .as_ssz_bytes() .len() } @@ -2032,10 +2034,11 @@ fn max_data_columns_by_root_request_common(max_request_blocks: u64) columns: VariableList::from(vec![0; E::number_of_columns()]), }; - RuntimeVariableList::>::from_vec( + RuntimeVariableList::>::new( vec![empty_data_columns_by_root_id; max_request_blocks], max_request_blocks, ) + .expect("creating a RuntimeVariableList of size `max_request_blocks` should succeed") .as_ssz_bytes() .len() } diff --git a/consensus/types/src/runtime_var_list.rs b/consensus/types/src/runtime_var_list.rs index 2a8899e203..dcb98538b7 100644 --- a/consensus/types/src/runtime_var_list.rs +++ b/consensus/types/src/runtime_var_list.rs @@ -23,15 +23,15 @@ use tree_hash::{Hash256, MerkleHasher, PackedEncoding, TreeHash, TreeHashType}; /// let base: Vec = vec![1, 2, 3, 4]; /// /// // Create a `RuntimeVariableList` from a `Vec` that has the expected length. -/// let exact: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base.clone(), 4); +/// let exact: RuntimeVariableList<_> = RuntimeVariableList::new(base.clone(), 4).unwrap(); /// assert_eq!(&exact[..], &[1, 2, 3, 4]); /// -/// // Create a `RuntimeVariableList` from a `Vec` that is too long and the `Vec` is truncated. -/// let short: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base.clone(), 3); -/// assert_eq!(&short[..], &[1, 2, 3]); +/// // Create a `RuntimeVariableList` from a `Vec` that is too long you'll get an error. +/// let err = RuntimeVariableList::new(base.clone(), 3).unwrap_err(); +/// assert_eq!(err, ssz_types::Error::OutOfBounds { i: 4, len: 3 }); /// /// // Create a `RuntimeVariableList` from a `Vec` that is shorter than the maximum. -/// let mut long: RuntimeVariableList<_> = RuntimeVariableList::from_vec(base, 5); +/// let mut long: RuntimeVariableList<_> = RuntimeVariableList::new(base, 5).unwrap(); /// assert_eq!(&long[..], &[1, 2, 3, 4]); /// /// // Push a value to if it does not exceed the maximum @@ -65,12 +65,6 @@ impl RuntimeVariableList { } } - pub fn from_vec(mut vec: Vec, max_len: usize) -> Self { - vec.truncate(max_len); - - Self { vec, max_len } - } - /// Create an empty list with the given `max_len`. pub fn empty(max_len: usize) -> Self { Self { @@ -231,14 +225,13 @@ where { // first parse out a Vec using the Vec impl you already have let vec: Vec = Vec::context_deserialize(deserializer, context.0)?; - if vec.len() > context.1 { - return Err(DeError::custom(format!( - "RuntimeVariableList lengh {} exceeds max_len {}", - vec.len(), - context.1 - ))); - } - Ok(RuntimeVariableList::from_vec(vec, context.1)) + let vec_len = vec.len(); + RuntimeVariableList::new(vec, context.1).map_err(|e| { + DeError::custom(format!( + "RuntimeVariableList length {} exceeds max_len {}: {e:?}", + vec_len, context.1, + )) + }) } } @@ -323,7 +316,8 @@ mod test { fn indexing() { let vec = vec![1, 2]; - let mut fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 8192); + let mut fixed: RuntimeVariableList = + RuntimeVariableList::new(vec.clone(), 8192).unwrap(); assert_eq!(fixed[0], 1); assert_eq!(&fixed[0..1], &vec[0..1]); @@ -335,24 +329,25 @@ mod test { #[test] fn length() { + // Too long. let vec = vec![42; 5]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 4); - assert_eq!(&fixed[..], &vec[0..4]); + let err = RuntimeVariableList::::new(vec.clone(), 4).unwrap_err(); + assert_eq!(err, Error::OutOfBounds { i: 5, len: 4 }); let vec = vec![42; 3]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec.clone(), 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec.clone(), 4).unwrap(); assert_eq!(&fixed[0..3], &vec[..]); assert_eq!(&fixed[..], &vec![42, 42, 42][..]); let vec = vec![]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec, 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec, 4).unwrap(); assert_eq!(&fixed[..], &[] as &[u64]); } #[test] fn deref() { let vec = vec![0, 2, 4, 6]; - let fixed: RuntimeVariableList = RuntimeVariableList::from_vec(vec, 4); + let fixed: RuntimeVariableList = RuntimeVariableList::new(vec, 4).unwrap(); assert_eq!(fixed.first(), Some(&0)); assert_eq!(fixed.get(3), Some(&6)); @@ -361,7 +356,7 @@ mod test { #[test] fn encode() { - let vec: RuntimeVariableList = RuntimeVariableList::from_vec(vec![0; 2], 2); + let vec: RuntimeVariableList = RuntimeVariableList::new(vec![0; 2], 2).unwrap(); assert_eq!(vec.as_ssz_bytes(), vec![0, 0, 0, 0]); assert_eq!( as Encode>::ssz_fixed_len(), 4); } @@ -378,7 +373,7 @@ mod test { #[test] fn u16_len_8() { - round_trip::(RuntimeVariableList::from_vec(vec![42; 8], 8)); - round_trip::(RuntimeVariableList::from_vec(vec![0; 8], 8)); + round_trip::(RuntimeVariableList::new(vec![42; 8], 8).unwrap()); + round_trip::(RuntimeVariableList::new(vec![0; 8], 8).unwrap()); } } From 746da7ffd5c2bdc63e8b9ea0190c9be2393e0395 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 28 Aug 2025 12:48:43 +1000 Subject: [PATCH 5/9] Fix doppelganger protection script (#7959) Previously `kurtosis service inspect` gives us output like this - flags in separate lines ``` CMD: lighthouse beacon_node --debug-level=debug --datadir=/data/lighthouse/beacon-data --listen-address=0.0.0.0 --port=9000 --http --http-address=0.0.0.0 --http-port=4000 --disable-packet-filter --execution-endpoints=http://172.16.0.8:8551 --jwt-secrets=/jwt/jwtsecret --suggested-fee-recipient=0x8943545177806ED17B9F23F0a21ee5948eCaa776 --disable-enr-auto-update --enr-address=172.16.0.11 ``` In the latest version this has been updated to a single line ``` CMD: exec lighthouse beacon_node --debug-level=debug --datadir=/data/lighthouse/beacon-data --listen-address=0.0.0.0 --port=9000 --http --http-address=0.0.0.0 --http-port=4000 --disable-packet-filter --execution-endpoints=http://172.16.0.12:8551 --jwt-secrets=/jwt/jwtsecret --suggested-fee-recipient=0x8943545177806ED17B9F23F0a21ee5948eCaa776 --disable-enr-auto-update --enr-address=172.16.0.18 --enr-tcp-port=9000 --enr-udp-port=9000 --enr-quic-port=9001 --quic-port=9001 --metrics --metrics-address=0.0.0.0 --metrics-allow-origin=* --metrics-port=5054 --enable-private-discovery --testnet-dir=/network-configs --boot-nodes=enr:-N24QPYP7bj0aqoM2dXsP5hnosW27U6PTYJt1kYFhNkwIvlFQhGJ1om7f4zcHhVJwvUL7wCsVbDJbP_l-TF8X3q4pVEDh2F0dG5ldHOIAAAwAAAAAACGY2xpZW500YpMaWdodGhvdXNlhTcuMS4whGV0aDKQqFs_bWAAADj__________4JpZIJ2NIJpcISsEAAPhHF1aWOCIymJc2VjcDI1NmsxoQK_z4HQylgsOal74Jek9D_EhY0vcDX5AcLHnPD7iOeEdYhzeW5jbmV0cwCDdGNwgiMog3VkcIIjKA --target-peers=3 ``` and it broke our script. This PR update the extraction logic. --- scripts/tests/doppelganger_protection.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tests/doppelganger_protection.sh b/scripts/tests/doppelganger_protection.sh index 86c9705ee4..9009d49d58 100755 --- a/scripts/tests/doppelganger_protection.sh +++ b/scripts/tests/doppelganger_protection.sh @@ -60,7 +60,7 @@ DELAY=$(( $SECONDS_PER_SLOT * 32 + $GENESIS_DELAY + $MIN_GENESIS_TIME - $CURRENT sleep $DELAY # Use BN2 for the next validator client -bn_2_url=$(kurtosis service inspect $ENCLAVE_NAME cl-2-lighthouse-geth | grep 'enr-address' | cut -d'=' -f2) +bn_2_url=$(kurtosis service inspect $ENCLAVE_NAME cl-2-lighthouse-geth | grep -oP '(?<=--enr-address=)[^ ]+') bn_2_port=4000 if [[ "$BEHAVIOR" == "failure" ]]; then From c13fb2fb46e5f982d5f68d4f82ee4ee65b49b137 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 28 Aug 2025 13:31:29 +1000 Subject: [PATCH 6/9] Instrument `publish_block` code path (#7945) Instrument `publish_block` code path and log dropped data columns when publishing. Example spans (running the devnet from my laptop, so the numbers aren't great) image image --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 1 + beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/src/publish_blocks.rs | 28 +++++++++++++++++--- beacon_node/lighthouse_tracing/src/lib.rs | 3 +++ 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e89dc2a90..14f8d5cbaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4284,6 +4284,7 @@ dependencies = [ "health_metrics", "hex", "lighthouse_network", + "lighthouse_tracing", "lighthouse_version", "logging", "lru", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d5a2929301..4358e4a872 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3059,6 +3059,7 @@ impl BeaconChain { /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was /// imported or errors. + #[instrument(skip_all, level = "debug")] pub async fn process_gossip_data_columns( self: &Arc, data_columns: Vec>, diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 781a4cfa44..2061df3762 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -20,6 +20,7 @@ futures = { workspace = true } health_metrics = { workspace = true } hex = { workspace = true } lighthouse_network = { workspace = true } +lighthouse_tracing = { workspace = true } lighthouse_version = { workspace = true } logging = { workspace = true } lru = { workspace = true } diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 6377639ccd..f797e3f300 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -16,6 +16,7 @@ use eth2::types::{ use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use futures::TryFutureExt; use lighthouse_network::PubsubMessage; +use lighthouse_tracing::SPAN_PUBLISH_BLOCK; use network::NetworkMessage; use rand::prelude::SliceRandom; use slot_clock::SlotClock; @@ -24,7 +25,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; -use tracing::{debug, error, info, warn}; +use tracing::{Span, debug, debug_span, error, info, instrument, warn}; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource, @@ -75,6 +76,12 @@ impl ProvenancedBlock> /// Handles a request from the HTTP API for full blocks. #[allow(clippy::too_many_arguments)] +#[instrument( + name = SPAN_PUBLISH_BLOCK, + level = "info", + skip_all, + fields(?block_root, ?validation_level, provenance = tracing::field::Empty) +)] pub async fn publish_block>( block_root: Option, provenanced_block: ProvenancedBlock, @@ -96,6 +103,9 @@ pub async fn publish_block>( } else { "builder" }; + let current_span = Span::current(); + current_span.record("provenance", provenance); + let block = unverified_block.inner_block(); debug!(slot = %block.slot(), "Signed block received in HTTP API"); @@ -133,8 +143,12 @@ pub async fn publish_block>( let slot = block.message().slot(); let sender_clone = network_tx.clone(); - let build_sidecar_task_handle = - spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs)?; + let build_sidecar_task_handle = spawn_build_data_sidecar_task( + chain.clone(), + block.clone(), + unverified_blobs, + current_span.clone(), + )?; // Gossip verify the block and blobs/data columns separately. let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); @@ -347,6 +361,7 @@ fn spawn_build_data_sidecar_task( chain: Arc>, block: Arc>>, proofs_and_blobs: UnverifiedBlobs, + current_span: Span, ) -> Result>, Rejection> { chain .clone() @@ -356,6 +371,7 @@ fn spawn_build_data_sidecar_task( let Some((kzg_proofs, blobs)) = proofs_and_blobs else { return Ok((vec![], vec![])); }; + let _guard = debug_span!(parent: current_span, "build_data_sidecars").entered(); let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); if !peer_das_enabled { @@ -532,7 +548,11 @@ fn publish_column_sidecars( .saturating_sub(malicious_withhold_count); // Randomize columns before dropping the last malicious_withhold_count items data_column_sidecars.shuffle(&mut **chain.rng.lock()); - data_column_sidecars.truncate(columns_to_keep); + let dropped_indices = data_column_sidecars + .drain(columns_to_keep..) + .map(|d| d.index) + .collect::>(); + debug!(indices = ?dropped_indices, "Dropping data columns from publishing"); } let pubsub_messages = data_column_sidecars .into_iter() diff --git a/beacon_node/lighthouse_tracing/src/lib.rs b/beacon_node/lighthouse_tracing/src/lib.rs index a69428d5bd..ffbad1364c 100644 --- a/beacon_node/lighthouse_tracing/src/lib.rs +++ b/beacon_node/lighthouse_tracing/src/lib.rs @@ -3,6 +3,9 @@ //! TODO: These span identifiers will be used to implement selective tracing export (to be implemented), //! where only the listed root spans and their descendants will be exported to the tracing backend. +/// Root span name for publish_block +pub const SPAN_PUBLISH_BLOCK: &str = "publish_block"; + /// Data Availability checker span identifiers pub const SPAN_PENDING_COMPONENTS: &str = "pending_components"; From b6792d85d2082d88112319912d3a9cdf328a3efa Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Wed, 27 Aug 2025 20:31:31 -0700 Subject: [PATCH 7/9] Reduce backfill batch buffer size (#7958) N/A Currently, backfill is allowed to create upto 20 pending batches which is unnecessarily high imo. Forward sync also allows a max of 5 batches to be buffered at a time. This PR reduces the batch size to match with forward sync. Having high number of batches is a little annoying with peerdas because we try to create and send 20 requests (even though we are processing them in a rate limited manner). Requests with peerdas is a lot more heavy as we distribute requests across multiple peers leading to lot of requests that may keep getting retried. This could take resources away from processing at head. --- beacon_node/network/src/sync/backfill_sync/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index ae9ac2e770..2f5eb3f689 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -40,7 +40,7 @@ use types::{ColumnIndex, Epoch, EthSpec}; pub const BACKFILL_EPOCHS_PER_BATCH: u64 = 1; /// The maximum number of batches to queue before requesting more. -const BACKFILL_BATCH_BUFFER_SIZE: u8 = 20; +const BACKFILL_BATCH_BUFFER_SIZE: u8 = 5; /// The number of times to retry a batch before it is considered failed. const MAX_BATCH_DOWNLOAD_ATTEMPTS: u8 = 10; From a134d43446f776fe2a84f420854afbff76ca93d8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 29 Aug 2025 10:59:40 +1000 Subject: [PATCH 8/9] Use `rayon` to speed up batch KZG verification (#7921) Addresses #7866. Use Rayon to speed up batch KZG verification during range / backfill sync. While I was analysing the traces, I also discovered a bug that resulted in only the first 128 columns in a chain segment batch being verified. This PR fixes it, so we might actually observe slower range sync due to more cells being KZG verified. I've also updated the handling of batch KZG failure to only find the first invalid KZG column when verification fails as this gets very expensive during range/backfill sync. --- Cargo.lock | 2 + .../src/beacon_fork_choice_store.rs | 2 +- .../src/data_availability_checker.rs | 6 +- .../src/data_availability_checker/error.rs | 2 +- .../src/data_column_verification.rs | 66 +++---------- beacon_node/beacon_chain/src/kzg_utils.rs | 75 +++++++-------- .../network/src/sync/block_lookups/mod.rs | 19 ++-- crypto/kzg/Cargo.toml | 2 + crypto/kzg/src/lib.rs | 93 +++++++++++++++---- .../cases/kzg_verify_cell_kzg_proof_batch.rs | 2 +- 10 files changed, 140 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14f8d5cbaa..1bd65e1721 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5013,9 +5013,11 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "rayon", "rust_eth_kzg", "serde", "serde_json", + "tracing", "tree_hash", ] diff --git a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs index 2c05df3c7f..440388661c 100644 --- a/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs +++ b/beacon_node/beacon_chain/src/beacon_fork_choice_store.rs @@ -377,7 +377,7 @@ where .store .get_hot_state(&self.justified_state_root, update_cache) .map_err(Error::FailedToReadState)? - .ok_or_else(|| Error::MissingState(self.justified_state_root))?; + .ok_or(Error::MissingState(self.justified_state_root))?; self.justified_balances = JustifiedBalances::from_justified_state(&state)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index ad01eb477b..2ebf765a4e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -29,7 +29,7 @@ mod state_lru_cache; use crate::data_column_verification::{ CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list_with_scoring, + KzgVerifiedDataColumn, verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -378,7 +378,7 @@ impl DataAvailabilityChecker { } if self.data_columns_required_for_block(&block) { return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list_with_scoring( + verify_kzg_for_data_column_list( data_column_list .iter() .map(|custody_column| custody_column.as_data_column()), @@ -449,7 +449,7 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { // Attributes fault to the specific peer that sent an invalid column - verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg) + verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidColumn)?; } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d091d6fefb..c9efb7a414 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -4,7 +4,7 @@ use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { InvalidBlobs(KzgError), - InvalidColumn(Vec<(ColumnIndex, KzgError)>), + InvalidColumn((Option, KzgError)), ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 873627abea..fb88db1300 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -263,7 +263,10 @@ pub struct KzgVerifiedDataColumn { } impl KzgVerifiedDataColumn { - pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + pub fn new( + data_column: Arc>, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column, kzg) } @@ -278,22 +281,11 @@ impl KzgVerifiedDataColumn { Self { data: data_column } } - pub fn from_batch( - data_columns: Vec>>, - kzg: &Kzg, - ) -> Result, KzgError> { - verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; - Ok(data_columns - .into_iter() - .map(|column| Self { data: column }) - .collect()) - } - pub fn from_batch_with_scoring( data_columns: Vec>>, kzg: &Kzg, - ) -> Result, Vec<(ColumnIndex, KzgError)>> { - verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?; + ) -> Result, (Option, KzgError)> { + verify_kzg_for_data_column_list(data_columns.iter(), kzg)?; Ok(data_columns .into_iter() .map(|column| Self { data: column }) @@ -367,7 +359,10 @@ impl KzgVerifiedCustodyDataColumn { } /// Verify a column already marked as custody column - pub fn new(data_column: CustodyDataColumn, kzg: &Kzg) -> Result { + pub fn new( + data_column: CustodyDataColumn, + kzg: &Kzg, + ) -> Result, KzgError)> { verify_kzg_for_data_column(data_column.clone_arc(), kzg)?; Ok(Self { data: data_column.data, @@ -418,22 +413,21 @@ impl KzgVerifiedCustodyDataColumn { pub fn verify_kzg_for_data_column( data_column: Arc>, kzg: &Kzg, -) -> Result, KzgError> { +) -> Result, (Option, KzgError)> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_data_columns(kzg, iter::once(&data_column))?; Ok(KzgVerifiedDataColumn { data: data_column }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. -/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// Returns an error for the first `DataColumnSidecar`s that fails kzg verification. /// /// Note: This function should be preferred over calling `verify_kzg_for_data_column` /// in a loop since this function kzg verifies a list of data columns more efficiently. -#[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( data_column_iter: I, kzg: &'a Kzg, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -442,38 +436,6 @@ where Ok(()) } -/// Complete kzg verification for a list of `DataColumnSidecar`s. -/// -/// If there's at least one invalid column, it re-verifies all columns individually to identify the -/// first column that is invalid. This is necessary to attribute fault to the specific peer that -/// sent bad data. The re-verification cost should not be significant. If a peer sends invalid data it -/// will be quickly banned. -pub fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( - data_column_iter: I, - kzg: &'a Kzg, -) -> Result<(), Vec<(ColumnIndex, KzgError)>> -where - I: Iterator>> + Clone, -{ - if verify_kzg_for_data_column_list(data_column_iter.clone(), kzg).is_ok() { - return Ok(()); - }; - - // Find all columns that are invalid and identify by index. If we hit this condition there - // should be at least one invalid column - let errors = data_column_iter - .filter_map(|data_column| { - if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { - Some((data_column.index, e)) - } else { - None - } - }) - .collect::>(); - - Err(errors) -} - #[instrument(skip_all, level = "debug")] pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, @@ -509,7 +471,7 @@ pub fn validate_data_column_sidecar_for_gossip( kzg.verify_blob_kzg_proof(&kzg_blob, kzg_commitment, kzg_proof) } -/// Validates a list of blobs along with their corresponding KZG commitments and -/// cell proofs for the extended blobs. -pub fn validate_blobs_and_cell_proofs( - kzg: &Kzg, - blobs: Vec<&Blob>, - cell_proofs: &[KzgProof], - kzg_commitments: &KzgCommitments, -) -> Result<(), KzgError> { - let cells = compute_cells::(&blobs, kzg)?; - let cell_refs = cells.iter().map(|cell| cell.as_ref()).collect::>(); - let cell_indices = (0..blobs.len()) - .flat_map(|_| 0..CELLS_PER_EXT_BLOB as u64) - .collect::>(); - - let proofs = cell_proofs - .iter() - .map(|&proof| Bytes48::from(proof)) - .collect::>(); - - let commitments = kzg_commitments - .iter() - .flat_map(|&commitment| std::iter::repeat_n(Bytes48::from(commitment), CELLS_PER_EXT_BLOB)) - .collect::>(); - - kzg.verify_cell_proof_batch(&cell_refs, &proofs, cell_indices, &commitments) -} - /// Validate a batch of `DataColumnSidecar`. pub fn validate_data_columns<'a, E: EthSpec, I>( kzg: &Kzg, data_column_iter: I, -) -> Result<(), KzgError> +) -> Result<(), (Option, KzgError)> where I: Iterator>> + Clone, { @@ -88,8 +61,12 @@ where for data_column in data_column_iter { let col_index = data_column.index; + if data_column.column.is_empty() { + return Err((Some(col_index), KzgError::KzgVerificationFailed)); + } + for cell in &data_column.column { - cells.push(ssz_cell_to_crypto_cell::(cell)?); + cells.push(ssz_cell_to_crypto_cell::(cell).map_err(|e| (Some(col_index), e))?); column_indices.push(col_index); } @@ -100,6 +77,19 @@ where for &commitment in &data_column.kzg_commitments { commitments.push(Bytes48::from(commitment)); } + + let expected_len = column_indices.len(); + + // We make this check at each iteration so that the error is attributable to a specific column + if cells.len() != expected_len + || proofs.len() != expected_len + || commitments.len() != expected_len + { + return Err(( + Some(col_index), + KzgError::InconsistentArrayLength("Invalid data column".to_string()), + )); + } } kzg.verify_cell_proof_batch(&cells, &proofs, column_indices, &commitments) @@ -418,7 +408,7 @@ pub fn reconstruct_data_columns( mod test { use crate::kzg_utils::{ blobs_to_data_column_sidecars, reconstruct_blobs, reconstruct_data_columns, - validate_blobs_and_cell_proofs, + validate_data_columns, }; use bls::Signature; use eth2::types::BlobsBundle; @@ -442,21 +432,20 @@ mod test { test_build_data_columns(&kzg, &spec); test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); - test_verify_blob_and_cell_proofs(&kzg); + test_validate_data_columns(&kzg, &spec); } #[track_caller] - fn test_verify_blob_and_cell_proofs(kzg: &Kzg) { - let (blobs_bundle, _) = generate_blobs::(3, ForkName::Fulu).unwrap(); - let BlobsBundle { - blobs, - commitments, - proofs, - } = blobs_bundle; - - let result = - validate_blobs_and_cell_proofs::(kzg, blobs.iter().collect(), &proofs, &commitments); + fn test_validate_data_columns(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 6; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + let result = validate_data_columns::(kzg, column_sidecars.iter()); assert!(result.is_ok()); } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8f8d8a9a5..e9f24697ac 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -36,7 +36,6 @@ use beacon_chain::data_availability_checker::{ use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; -use itertools::Itertools; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; @@ -653,15 +652,15 @@ impl BlockLookups { // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn(errors), - ) => errors - .iter() - // Collect all peers that sent a column that was invalid. Must - // run .unique as a single peer can send multiple invalid - // columns. Penalize once to avoid insta-bans - .flat_map(|(index, _)| peer_group.of_index((*index) as usize)) - .unique() - .collect(), + AvailabilityCheckError::InvalidColumn((index_opt, _)), + ) => { + match index_opt { + Some(index) => peer_group.of_index(index as usize).collect(), + // If no index supplied this is an un-attributable fault. In practice + // this should never happen. + None => vec![], + } + } _ => peer_group.all().collect(), }; for peer in peers_to_penalize { diff --git a/crypto/kzg/Cargo.toml b/crypto/kzg/Cargo.toml index bfe0f19cd0..432fcc1792 100644 --- a/crypto/kzg/Cargo.toml +++ b/crypto/kzg/Cargo.toml @@ -14,9 +14,11 @@ ethereum_serde_utils = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } hex = { workspace = true } +rayon = { workspace = true } rust_eth_kzg = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +tracing = { workspace = true } tree_hash = { workspace = true } [dev-dependencies] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index ddaddd1ada..1b8d46100f 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -3,6 +3,7 @@ mod kzg_proof; pub mod trusted_setup; use rust_eth_kzg::{CellIndex, DASContext}; +use std::collections::HashMap; use std::fmt::Debug; pub use crate::{ @@ -17,10 +18,12 @@ pub use c_kzg::{ }; use crate::trusted_setup::load_trusted_setup; +use rayon::prelude::*; pub use rust_eth_kzg::{ constants::{BYTES_PER_CELL, CELLS_PER_EXT_BLOB}, Cell, CellIndex as CellID, CellRef, TrustedSetup as PeerDASTrustedSetup, }; +use tracing::instrument; /// Disables the fixed-base multi-scalar multiplication optimization for computing /// cell KZG proofs, because `rust-eth-kzg` already handles the precomputation. @@ -229,31 +232,85 @@ impl Kzg { } /// Verifies a batch of cell-proof-commitment triplets. + #[instrument(skip_all, level = "debug", fields(cells = cells.len()))] pub fn verify_cell_proof_batch( &self, cells: &[CellRef<'_>], kzg_proofs: &[Bytes48], - columns: Vec, + indices: Vec, kzg_commitments: &[Bytes48], - ) -> Result<(), Error> { - let proofs: Vec<_> = kzg_proofs.iter().map(|proof| proof.as_ref()).collect(); - let commitments: Vec<_> = kzg_commitments - .iter() - .map(|commitment| commitment.as_ref()) - .collect(); - let verification_result = self.context().verify_cell_kzg_proof_batch( - commitments.to_vec(), - &columns, - cells.to_vec(), - proofs.to_vec(), - ); + ) -> Result<(), (Option, Error)> { + let mut column_groups: HashMap> = HashMap::new(); - // Modify the result so it matches roughly what the previous method was doing. - match verification_result { - Ok(_) => Ok(()), - Err(e) if e.is_proof_invalid() => Err(Error::KzgVerificationFailed), - Err(e) => Err(Error::PeerDASKZG(e)), + let expected_len = cells.len(); + + // This check is already made in `validate_data_columns`. However we add it here so that ef consensus spec tests pass + // and to avoid any potential footguns in the future. Note that by catching the error here and not in `validate_data_columns` + // the error becomes non-attributable. + if kzg_proofs.len() != expected_len + || indices.len() != expected_len + || kzg_commitments.len() != expected_len + { + return Err(( + None, + Error::InconsistentArrayLength("Invalid data column".to_string()), + )); } + + for (((cell, proof), &index), commitment) in cells + .iter() + .zip(kzg_proofs.iter()) + .zip(indices.iter()) + .zip(kzg_commitments.iter()) + { + column_groups + .entry(index) + .or_default() + .push((cell, *proof, *commitment)); + } + + column_groups + .into_par_iter() + .map(|(column_index, column_data)| { + let mut cells = Vec::new(); + let mut proofs = Vec::new(); + let mut commitments = Vec::new(); + + for (cell, proof, commitment) in &column_data { + cells.push(*cell); + proofs.push(proof.as_ref()); + commitments.push(commitment.as_ref()); + } + + // Create per-chunk tracing span for visualizing parallel processing. + // This is safe from span explosion as we have at most 128 chunks, + // i.e. the number of column indices. + let _span = tracing::debug_span!( + "verify_cell_proof_chunk", + cells = cells.len(), + column_index, + verification_result = tracing::field::Empty, + ) + .entered(); + + let verification_result = self.context().verify_cell_kzg_proof_batch( + commitments, + &vec![column_index; cells.len()], // All column_data here is from the same index + cells, + proofs, + ); + + match verification_result { + Ok(_) => Ok(()), + Err(e) if e.is_proof_invalid() => { + Err((Some(column_index), Error::KzgVerificationFailed)) + } + Err(e) => Err((Some(column_index), Error::PeerDASKZG(e))), + } + }) + .collect::, (Option, Error)>>()?; + + Ok(()) } pub fn recover_cells_and_compute_kzg_proofs( diff --git a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs index e3edc0df0a..7973af861f 100644 --- a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs +++ b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs @@ -53,7 +53,7 @@ impl Case for KZGVerifyCellKZGProofBatch { let kzg = get_kzg(); match kzg.verify_cell_proof_batch(&cells, &proofs, cell_indices, &commitments) { Ok(_) => Ok(true), - Err(KzgError::KzgVerificationFailed) => Ok(false), + Err((_, KzgError::KzgVerificationFailed)) => Ok(false), Err(e) => Err(Error::InternalError(format!( "Failed to validate cells: {:?}", e From 438fb65d450019f5a7a51343dca52f0b579f712c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 29 Aug 2025 13:01:40 +1000 Subject: [PATCH 9/9] Avoid serving validator endpoints while the node is far behind syncing head (#7962) A performance issue was discovered when devnet-3 was under non-finality - some of the lighthouse nodes are "stuck" with syncing because of handling proposer duties HTTP requests. These validator requests are higher priority than Status processing, and if they are taking a long time to process, the node won't be able to progress. What's worse is - under long period of non finality, the proposer duties calculation function tries to do state advance for a large number of [slots](https://github.com/sigp/lighthouse/blob/d545ddcbc7d97b24b5c15012d1a5f9a1dae90b2a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs#L183) here, causing the node to spend all its CPU time on a task that doesn't really help, e.g. the computed duties aren't useful if the node is 20000 slots behind. To solve this issue, we use the `not_while_syncing` filter to prevent serving these requests, until the node is synced. This should allow the node to focus on sync under non-finality situations. --- beacon_node/http_api/src/lib.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index b0b4f9df56..515c262b19 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -457,7 +457,7 @@ pub fn serve( move |network_globals: Arc>, chain: Arc>| async move { match *network_globals.sync_state.read() { - SyncState::SyncingFinalized { .. } => { + SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } => { let head_slot = chain.canonical_head.cached_head().head_slot(); let current_slot = @@ -479,9 +479,7 @@ pub fn serve( ))) } } - SyncState::SyncingHead { .. } - | SyncState::SyncTransition - | SyncState::BackFillSyncing { .. } => Ok(()), + SyncState::SyncTransition | SyncState::BackFillSyncing { .. } => Ok(()), SyncState::Synced => Ok(()), SyncState::Stalled => Ok(()), }