From 64031b6cbb7d20d90bc72064b6778f840bbb1e4a Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 1 Dec 2025 11:19:49 +1100 Subject: [PATCH 1/7] Add tracing spans to validator client duty cycles (#8482) Co-Authored-By: Jimmy Chen Co-Authored-By: Jimmy Chen --- .../src/attestation_service.rs | 28 ++++++++++++++++- .../validator_services/src/block_service.rs | 11 ++++++- .../src/sync_committee_service.rs | 30 ++++++++++++++++--- 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index da6e8f3588..a6ce67fae9 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -8,7 +8,7 @@ use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; @@ -243,6 +243,11 @@ impl AttestationService AttestationService AttestationService AttestationService, Vec<_>) = join_all(signing_futures) + .instrument(info_span!( + "sign_attestations", + count = validator_duties.len() + )) .await .into_iter() .flatten() @@ -487,6 +498,10 @@ impl AttestationService(single_attestations, fork_name) .await }) + .instrument(info_span!( + "publish_attestations", + count = attestations.len() + )) .await { Ok(()) => info!( @@ -523,6 +538,7 @@ impl AttestationService AttestationService AttestationService AttestationService { diff --git a/validator_client/validator_services/src/block_service.rs b/validator_client/validator_services/src/block_service.rs index c111b1f22e..5ffabd22ec 100644 --- a/validator_client/validator_services/src/block_service.rs +++ b/validator_client/validator_services/src/block_service.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; use tokio::sync::mpsc; -use tracing::{debug, error, info, trace, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use types::{BlockType, ChainSpec, EthSpec, Graffiti, PublicKeyBytes, Slot}; use validator_store::{Error as ValidatorStoreError, SignedBlock, UnsignedBlock, ValidatorStore}; @@ -320,6 +320,7 @@ impl BlockService { } #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(%slot, ?validator_pubkey))] async fn sign_and_publish_block( &self, proposer_fallback: ProposerFallback, @@ -333,6 +334,7 @@ impl BlockService { let res = self .validator_store .sign_block(*validator_pubkey, unsigned_block, slot) + .instrument(info_span!("sign_block")) .await; let signed_block = match res { @@ -389,6 +391,11 @@ impl BlockService { Ok(()) } + #[instrument( + name = "block_proposal_duty_cycle", + skip_all, + fields(%slot, ?validator_pubkey) + )] async fn publish_block( self, slot: Slot, @@ -483,6 +490,7 @@ impl BlockService { Ok(()) } + #[instrument(skip_all)] async fn publish_signed_block_contents( &self, signed_block: &SignedBlock, @@ -518,6 +526,7 @@ impl BlockService { Ok::<_, BlockError>(()) } + #[instrument(skip_all, fields(%slot))] async fn get_validator_block( beacon_node: &BeaconNodeHttpClient, slot: Slot, diff --git a/validator_client/validator_services/src/sync_committee_service.rs b/validator_client/validator_services/src/sync_committee_service.rs index 02f9f24c8a..5f6b1cb710 100644 --- a/validator_client/validator_services/src/sync_committee_service.rs +++ b/validator_client/validator_services/src/sync_committee_service.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use types::{ ChainSpec, EthSpec, Hash256, PublicKeyBytes, Slot, SyncCommitteeSubscription, SyncContributionData, SyncDuty, SyncSelectionProof, SyncSubnetId, @@ -208,7 +208,8 @@ impl SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService SyncCommitteeService Date: Mon, 1 Dec 2025 13:56:50 +0800 Subject: [PATCH 2/7] Refactor get_validator_blocks_v3 fallback (#8186) #7727 introduced a bug in the logging, where as long as the node failed the SSZ `get_validator_blocks_v3` endpoint, it would log as `Beacon node does not support...`. However, the failure can be due to other reasons, such as a timed out error as found by @jimmygchen: `WARN Beacon node does not support SSZ in block production, falling back to JSON slot: 5283379, error: HttpClient(url: https://ho-h-bn-cowl.spesi.io:15052/, kind: timeout, detail: operation timed out` This PR made the error log more generic, so there is less confusion. Additionally, suggested by @michaelsproul, this PR refactors the `get_validator_blocks_v3` calls by trying all beacon nodes using the SSZ endpoint first, and if all beacon node fails the SSZ endpoint, only then fallback to JSON. It changes the logic from: "SSZ -> JSON for primary beacon node, followed by SSZ -> JSON for second beacon node and so on" to "SSZ for all beacon nodes -> JSON for all beacon nodes" This has the advantage that if the primary beacon node is having issues and failed the SSZ, we avoid retrying the primary beacon node again on JSON (as it could be that the primary beacon node fail again); rather, we switch to the second beacon node. Co-Authored-By: Tan Chee Keong Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com> --- .../validator_services/src/block_service.rs | 155 ++++++++---------- 1 file changed, 68 insertions(+), 87 deletions(-) diff --git a/validator_client/validator_services/src/block_service.rs b/validator_client/validator_services/src/block_service.rs index 5ffabd22ec..8ec53d3f40 100644 --- a/validator_client/validator_services/src/block_service.rs +++ b/validator_client/validator_services/src/block_service.rs @@ -1,5 +1,4 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, Error as FallbackError, Errors}; -use bls::SignatureBytes; use eth2::{BeaconNodeHttpClient, StatusCode}; use graffiti_file::{GraffitiFile, determine_graffiti}; use logging::crit; @@ -298,7 +297,7 @@ impl BlockService { self.inner.executor.spawn( async move { let result = service - .publish_block(slot, validator_pubkey, builder_boost_factor) + .get_validator_block_and_publish_block(slot, validator_pubkey, builder_boost_factor) .await; match result { @@ -396,7 +395,7 @@ impl BlockService { skip_all, fields(%slot, ?validator_pubkey) )] - async fn publish_block( + async fn get_validator_block_and_publish_block( self, slot: Slot, validator_pubkey: PublicKeyBytes, @@ -449,33 +448,80 @@ impl BlockService { info!(slot = slot.as_u64(), "Requesting unsigned block"); - // Request block from first responsive beacon node. + // Request an SSZ block from all beacon nodes in order, returning on the first successful response. + // If all nodes fail, run a second pass falling back to JSON. // - // Try the proposer nodes last, since it's likely that they don't have a + // Proposer nodes will always be tried last during each pass since it's likely that they don't have a // great view of attestations on the network. - let unsigned_block = proposer_fallback + let ssz_block_response = proposer_fallback .request_proposers_last(|beacon_node| async move { let _get_timer = validator_metrics::start_timer_vec( &validator_metrics::BLOCK_SERVICE_TIMES, &[validator_metrics::BEACON_BLOCK_HTTP_GET], ); - Self::get_validator_block( - &beacon_node, - slot, - randao_reveal_ref, - graffiti, - proposer_index, - builder_boost_factor, - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - }) + beacon_node + .get_validator_blocks_v3_ssz::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + builder_boost_factor, + ) + .await }) - .await?; + .await; + + let block_response = match ssz_block_response { + Ok((ssz_block_response, _metadata)) => ssz_block_response, + Err(e) => { + warn!( + slot = slot.as_u64(), + error = %e, + "SSZ block production failed, falling back to JSON" + ); + + proposer_fallback + .request_proposers_last(|beacon_node| async move { + let _get_timer = validator_metrics::start_timer_vec( + &validator_metrics::BLOCK_SERVICE_TIMES, + &[validator_metrics::BEACON_BLOCK_HTTP_GET], + ); + let (json_block_response, _metadata) = beacon_node + .get_validator_blocks_v3::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + builder_boost_factor, + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })?; + + Ok(json_block_response.data) + }) + .await + .map_err(BlockError::from)? + } + }; + + let (block_proposer, unsigned_block) = match block_response { + eth2::types::ProduceBlockV3Response::Full(block) => { + (block.block().proposer_index(), UnsignedBlock::Full(block)) + } + eth2::types::ProduceBlockV3Response::Blinded(block) => { + (block.proposer_index(), UnsignedBlock::Blinded(block)) + } + }; + + info!(slot = slot.as_u64(), "Received unsigned block"); + if proposer_index != Some(block_proposer) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), + )); + } self_ref .sign_and_publish_block( @@ -525,71 +571,6 @@ impl BlockService { } Ok::<_, BlockError>(()) } - - #[instrument(skip_all, fields(%slot))] - async fn get_validator_block( - beacon_node: &BeaconNodeHttpClient, - slot: Slot, - randao_reveal_ref: &SignatureBytes, - graffiti: Option, - proposer_index: Option, - builder_boost_factor: Option, - ) -> Result, BlockError> { - let block_response = match beacon_node - .get_validator_blocks_v3_ssz::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - builder_boost_factor, - ) - .await - { - Ok((ssz_block_response, _)) => ssz_block_response, - Err(e) => { - warn!( - slot = slot.as_u64(), - error = %e, - "Beacon node does not support SSZ in block production, falling back to JSON" - ); - - let (json_block_response, _) = beacon_node - .get_validator_blocks_v3::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - builder_boost_factor, - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })?; - - // Extract ProduceBlockV3Response (data field of the struct ForkVersionedResponse) - json_block_response.data - } - }; - - let (block_proposer, unsigned_block) = match block_response { - eth2::types::ProduceBlockV3Response::Full(block) => { - (block.block().proposer_index(), UnsignedBlock::Full(block)) - } - eth2::types::ProduceBlockV3Response::Blinded(block) => { - (block.proposer_index(), UnsignedBlock::Blinded(block)) - } - }; - - info!(slot = slot.as_u64(), "Received unsigned block"); - if proposer_index != Some(block_proposer) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), - )); - } - - Ok::<_, BlockError>(unsigned_block) - } } /// Wrapper for values we want to log about a block we signed, for easy extraction from the possible From f42b14ac589b29013cbb971bb8fa733bd13a6537 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 2 Dec 2025 09:09:08 +1100 Subject: [PATCH 3/7] Update local testnet scripts for the fulu fork (#8489) * Remove `fulu-devnet-3` testing on CI * Delete `scripts/local_testnet/network_params_das.yaml` and consolidate it into the main `network_params.yaml` file we use on CI * Delete enclave before building image, so it doesn't cause slow image building. Co-Authored-By: Jimmy Chen --- .github/workflows/local-testnet.yml | 2 +- scripts/local_testnet/README.md | 2 +- scripts/local_testnet/network_params.yaml | 32 +++++++++++---- scripts/local_testnet/network_params_das.yaml | 41 ------------------- scripts/local_testnet/start_local_testnet.sh | 10 ++--- .../tests/checkpoint-sync-config-devnet.yaml | 24 ----------- 6 files changed, 32 insertions(+), 79 deletions(-) delete mode 100644 scripts/local_testnet/network_params_das.yaml delete mode 100644 scripts/tests/checkpoint-sync-config-devnet.yaml diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index c129c0ec95..9992273e0a 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -179,7 +179,7 @@ jobs: continue-on-error: true strategy: matrix: - network: [sepolia, devnet] + network: [sepolia] steps: - uses: actions/checkout@v5 diff --git a/scripts/local_testnet/README.md b/scripts/local_testnet/README.md index 9d9844c4c4..6260f91019 100644 --- a/scripts/local_testnet/README.md +++ b/scripts/local_testnet/README.md @@ -21,7 +21,7 @@ cd ./scripts/local_testnet ``` It will build a Lighthouse docker image from the root of the directory and will take an approximately 12 minutes to complete. Once built, the testing will be started automatically. You will see a list of services running and "Started!" at the end. -You can also select your own Lighthouse docker image to use by specifying it in `network_params.yml` under the `cl_image` key. +You can also select your own Lighthouse docker image to use by specifying it in `network_params.yaml` under the `cl_image` key. Full configuration reference for Kurtosis is specified [here](https://github.com/ethpandaops/ethereum-package?tab=readme-ov-file#configuration). To view all running services: diff --git a/scripts/local_testnet/network_params.yaml b/scripts/local_testnet/network_params.yaml index cdfacbced4..a048674e63 100644 --- a/scripts/local_testnet/network_params.yaml +++ b/scripts/local_testnet/network_params.yaml @@ -1,19 +1,37 @@ # Full configuration reference [here](https://github.com/ethpandaops/ethereum-package?tab=readme-ov-file#configuration). participants: - - el_type: geth - el_image: ethereum/client-go:latest - cl_type: lighthouse + - cl_type: lighthouse cl_image: lighthouse:local + el_type: geth + el_image: ethereum/client-go:latest + supernode: true cl_extra_params: - --target-peers=3 - count: 4 + count: 2 + - cl_type: lighthouse + cl_image: lighthouse:local + el_type: geth + el_image: ethereum/client-go:latest + supernode: false + cl_extra_params: + - --target-peers=3 + count: 2 network_params: - electra_fork_epoch: 0 - seconds_per_slot: 3 -global_log_level: debug + fulu_fork_epoch: 0 + seconds_per_slot: 6 snooper_enabled: false +global_log_level: debug additional_services: - dora - spamoor - prometheus_grafana - tempo +spamoor_params: + image: ethpandaops/spamoor:master + spammers: + - scenario: eoatx + config: + throughput: 200 + - scenario: blobs + config: + throughput: 20 \ No newline at end of file diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml deleted file mode 100644 index e3bc513153..0000000000 --- a/scripts/local_testnet/network_params_das.yaml +++ /dev/null @@ -1,41 +0,0 @@ -participants: - - cl_type: lighthouse - cl_image: lighthouse:local - el_type: geth - el_image: ethpandaops/geth:master - supernode: true - cl_extra_params: - # Note: useful for testing range sync (only produce block if the node is in sync to prevent forking) - - --sync-tolerance-epochs=0 - - --target-peers=3 - count: 2 - - cl_type: lighthouse - cl_image: lighthouse:local - el_type: geth - el_image: ethpandaops/geth:master - supernode: false - cl_extra_params: - # Note: useful for testing range sync (only produce block if the node is in sync to prevent forking) - - --sync-tolerance-epochs=0 - - --target-peers=3 - count: 2 -network_params: - electra_fork_epoch: 0 - fulu_fork_epoch: 1 - seconds_per_slot: 6 -snooper_enabled: false -global_log_level: debug -additional_services: - - dora - - spamoor - - prometheus_grafana - - tempo -spamoor_params: - image: ethpandaops/spamoor:master - spammers: - - scenario: eoatx - config: - throughput: 200 - - scenario: blobs - config: - throughput: 20 \ No newline at end of file diff --git a/scripts/local_testnet/start_local_testnet.sh b/scripts/local_testnet/start_local_testnet.sh index 442e6fd98d..8d8b33526d 100755 --- a/scripts/local_testnet/start_local_testnet.sh +++ b/scripts/local_testnet/start_local_testnet.sh @@ -78,6 +78,11 @@ if [ "$RUN_ASSERTOOR_TESTS" = true ]; then echo "Assertoor has been added to $NETWORK_PARAMS_FILE." fi +if [ "$KEEP_ENCLAVE" = false ]; then + # Stop local testnet + kurtosis enclave rm -f $ENCLAVE_NAME 2>/dev/null || true +fi + if [ "$BUILD_IMAGE" = true ]; then echo "Building Lighthouse Docker image." ROOT_DIR="$SCRIPT_DIR/../.." @@ -86,11 +91,6 @@ else echo "Not rebuilding Lighthouse Docker image." fi -if [ "$KEEP_ENCLAVE" = false ]; then - # Stop local testnet - kurtosis enclave rm -f $ENCLAVE_NAME 2>/dev/null || true -fi - kurtosis run --enclave $ENCLAVE_NAME github.com/ethpandaops/ethereum-package@$ETHEREUM_PKG_VERSION --args-file $NETWORK_PARAMS_FILE echo "Started!" diff --git a/scripts/tests/checkpoint-sync-config-devnet.yaml b/scripts/tests/checkpoint-sync-config-devnet.yaml deleted file mode 100644 index 2392011ed3..0000000000 --- a/scripts/tests/checkpoint-sync-config-devnet.yaml +++ /dev/null @@ -1,24 +0,0 @@ -# Kurtosis config file to checkpoint sync to a running devnet supported by ethPandaOps and `ethereum-package`. -participants: - - cl_type: lighthouse - cl_image: lighthouse:local - el_type: geth - el_image: ethpandaops/geth:master - cl_extra_params: - - --disable-backfill-rate-limiting - supernode: true - - cl_type: lighthouse - cl_image: lighthouse:local - el_type: geth - el_image: ethpandaops/geth:master - cl_extra_params: - - --disable-backfill-rate-limiting - supernode: false - -checkpoint_sync_enabled: true -checkpoint_sync_url: "https://checkpoint-sync.fusaka-devnet-3.ethpandaops.io" - -global_log_level: debug - -network_params: - network: fusaka-devnet-3 From 4fbe5174915a7d98998e83512ed1f1bacdf433b2 Mon Sep 17 00:00:00 2001 From: 0xMushow <105550256+0xMushow@users.noreply.github.com> Date: Tue, 2 Dec 2025 04:06:29 +0100 Subject: [PATCH 4/7] Fix data columns sorting when reconstructing blobs (#8510) Closes https://github.com/sigp/lighthouse/issues/8509 Co-Authored-By: Antoine James --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/kzg_utils.rs | 36 +++++++++++++++++--- beacon_node/http_api/src/block_id.rs | 2 +- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 494346e7ff..00c5ab415c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1248,7 +1248,7 @@ impl BeaconChain { let num_required_columns = T::EthSpec::number_of_columns() / 2; let reconstruction_possible = columns.len() >= num_required_columns; if reconstruction_possible { - reconstruct_blobs(&self.kzg, &columns, None, &block, &self.spec) + reconstruct_blobs(&self.kzg, columns, None, &block, &self.spec) .map(Some) .map_err(Error::FailedToReconstructBlobs) } else { diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 200774ebe4..334124419b 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -308,12 +308,14 @@ pub(crate) fn build_data_column_sidecars( /// and it will be slow if the node needs to reconstruct the blobs pub fn reconstruct_blobs( kzg: &Kzg, - data_columns: &[Arc>], + mut data_columns: Vec>>, blob_indices_opt: Option>, signed_block: &SignedBlindedBeaconBlock, spec: &ChainSpec, ) -> Result, String> { - // The data columns are from the database, so we assume their correctness. + // Sort data columns by index to ensure ascending order for KZG operations + data_columns.sort_unstable_by_key(|dc| dc.index); + let first_data_column = data_columns .first() .ok_or("data_columns should have at least one element".to_string())?; @@ -331,7 +333,7 @@ pub fn reconstruct_blobs( .map(|row_index| { let mut cells: Vec = vec![]; let mut cell_ids: Vec = vec![]; - for data_column in data_columns { + for data_column in &data_columns { let cell = data_column .column .get(row_index) @@ -463,6 +465,7 @@ mod test { test_reconstruct_data_columns(&kzg, &spec); test_reconstruct_data_columns_unordered(&kzg, &spec); test_reconstruct_blobs_from_data_columns(&kzg, &spec); + test_reconstruct_blobs_from_data_columns_unordered(&kzg, &spec); test_validate_data_columns(&kzg, &spec); } @@ -595,7 +598,7 @@ mod test { let blob_indices = vec![1, 2]; let reconstructed_blobs = reconstruct_blobs( kzg, - &column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2], + column_sidecars[0..column_sidecars.len() / 2].to_vec(), Some(blob_indices.clone()), &signed_blinded_block, spec, @@ -613,6 +616,31 @@ mod test { } } + #[track_caller] + fn test_reconstruct_blobs_from_data_columns_unordered(kzg: &Kzg, spec: &ChainSpec) { + let num_of_blobs = 2; + let (signed_block, blobs, proofs) = + create_test_fulu_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); + let column_sidecars = + blobs_to_data_column_sidecars(&blob_refs, proofs.to_vec(), &signed_block, kzg, spec) + .unwrap(); + + // Test reconstruction with columns in reverse order (non-ascending) + let mut subset_columns: Vec<_> = + column_sidecars.iter().as_slice()[0..column_sidecars.len() / 2].to_vec(); + subset_columns.reverse(); // This would fail without proper sorting in reconstruct_blobs + + let signed_blinded_block = signed_block.into(); + let reconstructed_blobs = + reconstruct_blobs(kzg, subset_columns, None, &signed_blinded_block, spec).unwrap(); + + for (i, original_blob) in blobs.iter().enumerate() { + let reconstructed_blob = &reconstructed_blobs.get(i).unwrap().blob; + assert_eq!(reconstructed_blob, original_blob, "{i}"); + } + } + fn get_kzg() -> Kzg { Kzg::new_from_trusted_setup(&get_trusted_setup()).expect("should create kzg") } diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 778067c32b..e088005f20 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -474,7 +474,7 @@ impl BlockId { ) .collect::, _>>()?; - reconstruct_blobs(&chain.kzg, &data_columns, blob_indices, block, &chain.spec).map_err( + reconstruct_blobs(&chain.kzg, data_columns, blob_indices, block, &chain.spec).map_err( |e| { warp_utils::reject::custom_server_error(format!( "Error reconstructing data columns: {e:?}" From 7ef9501ff66f0edb88b7680b4a24d65d3309b363 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Tue, 2 Dec 2025 16:17:13 +1100 Subject: [PATCH 5/7] Instrument attestation signing. (#8508) We noticed attestation signing taking 2+ seconds on some of our hoodi nodes and the current traces doesn't provide enough details. This PR adds a few more spans to the `attestation_duty_cycle` code path in the VC. Before: image After: image Co-Authored-By: Jimmy Chen --- Cargo.lock | 1 + .../lighthouse_validator_store/src/lib.rs | 4 +- validator_client/signing_method/Cargo.toml | 1 + validator_client/signing_method/src/lib.rs | 2 + .../src/slashing_database.rs | 2 + .../src/attestation_service.rs | 133 +++++++++--------- 6 files changed, 77 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3730f132b..7ddcad7239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8336,6 +8336,7 @@ dependencies = [ "reqwest", "serde", "task_executor", + "tracing", "types", "url", "validator_metrics", diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index d10fecb32e..dc8fb07b65 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -15,7 +15,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; -use tracing::{error, info, warn}; +use tracing::{error, info, instrument, warn}; use types::{ AbstractExecPayload, Address, AggregateAndProof, Attestation, BeaconBlock, BlindedPayload, ChainSpec, ContributionAndProof, Domain, Epoch, EthSpec, Fork, Graffiti, Hash256, @@ -242,6 +242,7 @@ impl LighthouseValidatorStore { /// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe /// by doppelganger protection. + #[instrument(skip_all, level = "debug")] fn doppelganger_checked_signing_method( &self, validator_pubkey: PublicKeyBytes, @@ -745,6 +746,7 @@ impl ValidatorStore for LighthouseValidatorS } } + #[instrument(skip_all)] async fn sign_attestation( &self, validator_pubkey: PublicKeyBytes, diff --git a/validator_client/signing_method/Cargo.toml b/validator_client/signing_method/Cargo.toml index 3e1a48142f..2defd25caa 100644 --- a/validator_client/signing_method/Cargo.toml +++ b/validator_client/signing_method/Cargo.toml @@ -12,6 +12,7 @@ parking_lot = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } task_executor = { workspace = true } +tracing = { workspace = true } types = { workspace = true } url = { workspace = true } validator_metrics = { workspace = true } diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index c535415b1e..7e0f2c02f7 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -10,6 +10,7 @@ use reqwest::{Client, header::ACCEPT}; use std::path::PathBuf; use std::sync::Arc; use task_executor::TaskExecutor; +use tracing::instrument; use types::*; use url::Url; use web3signer::{ForkInfo, MessageType, SigningRequest, SigningResponse}; @@ -131,6 +132,7 @@ impl SigningMethod { } /// Return the signature of `signable_message`, with respect to the `signing_context`. + #[instrument(skip_all, level = "debug")] pub async fn get_signature>( &self, signable_message: SignableMessage<'_, E, Payload>, diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index ce32299a51..00677212a3 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -11,6 +11,7 @@ use rusqlite::{OptionalExtension, Transaction, TransactionBehavior, params}; use std::fs::File; use std::path::Path; use std::time::Duration; +use tracing::instrument; use types::{AttestationData, BeaconBlockHeader, Epoch, Hash256, PublicKeyBytes, SignedRoot, Slot}; type Pool = r2d2::Pool; @@ -639,6 +640,7 @@ impl SlashingDatabase { /// to prevent concurrent checks and inserts from resulting in slashable data being inserted. /// /// This is the safe, externally-callable interface for checking attestations. + #[instrument(skip_all, level = "debug")] pub fn check_and_insert_attestation( &self, validator_pubkey: &PublicKeyBytes, diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index a6ce67fae9..8211fb11f3 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -8,7 +8,7 @@ use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; +use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; @@ -369,79 +369,82 @@ impl AttestationService(attestation_data, &self.chain_spec) { - crit!( - validator = ?duty.pubkey, - duty_slot = %duty.slot, - attestation_slot = %attestation_data.slot, - duty_index = duty.committee_index, - attestation_index = attestation_data.index, - "Inconsistent validator duties during signing" - ); - return None; - } - - let mut attestation = match Attestation::empty_for_signing( - duty.committee_index, - duty.committee_length as usize, - attestation_data.slot, - attestation_data.beacon_block_root, - attestation_data.source, - attestation_data.target, - &self.chain_spec, - ) { - Ok(attestation) => attestation, - Err(err) => { + // Ensure that the attestation matches the duties. + if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { crit!( validator = ?duty.pubkey, - ?duty, - ?err, - "Invalid validator duties during signing" + duty_slot = %duty.slot, + attestation_slot = %attestation_data.slot, + duty_index = duty.committee_index, + attestation_index = attestation_data.index, + "Inconsistent validator duties during signing" ); return None; } - }; - match self - .validator_store - .sign_attestation( - duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) - .await - { - Ok(()) => Some((attestation, duty.validator_index)), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - pubkey = ?pubkey, - validator = ?duty.pubkey, - committee_index = committee_index, - slot = slot.as_u64(), - "Missing pubkey for attestation" - ); - None - } - Err(e) => { - crit!( - error = ?e, - validator = ?duty.pubkey, - committee_index, - slot = slot.as_u64(), - "Failed to sign attestation" - ); - None + let mut attestation = match Attestation::empty_for_signing( + duty.committee_index, + duty.committee_length as usize, + attestation_data.slot, + attestation_data.beacon_block_root, + attestation_data.source, + attestation_data.target, + &self.chain_spec, + ) { + Ok(attestation) => attestation, + Err(err) => { + crit!( + validator = ?duty.pubkey, + ?duty, + ?err, + "Invalid validator duties during signing" + ); + return None; + } + }; + + match self + .validator_store + .sign_attestation( + duty.pubkey, + duty.validator_committee_index as usize, + &mut attestation, + current_epoch, + ) + .await + { + Ok(()) => Some((attestation, duty.validator_index)), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + info = "a validator may have recently been removed from this VC", + pubkey = ?pubkey, + validator = ?duty.pubkey, + committee_index = committee_index, + slot = slot.as_u64(), + "Missing pubkey for attestation" + ); + None + } + Err(e) => { + crit!( + error = ?e, + validator = ?duty.pubkey, + committee_index, + slot = slot.as_u64(), + "Failed to sign attestation" + ); + None + } } } + .instrument(Span::current()) }); // Execute all the futures in parallel, collecting any successful results. From 0bccc7090c9dc27b57e860f7ab9aaeb83eb305e1 Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Wed, 3 Dec 2025 09:45:47 +0800 Subject: [PATCH 6/7] Always use committee index 0 when getting attestation data (#8171) * #8046 Split the function `publish_attestations_and_aggregates` into `publish_attestations` and `handle_aggregates`, so that for attestations, only 1 task is spawned. Co-Authored-By: Tan Chee Keong Co-Authored-By: chonghe <44791194+chong-he@users.noreply.github.com> Co-Authored-By: Michael Sproul Co-Authored-By: Michael Sproul --- .../src/attestation_service.rs | 240 +++++++++--------- 1 file changed, 126 insertions(+), 114 deletions(-) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 8211fb11f3..b2b8bc81e2 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -180,8 +180,9 @@ impl AttestationService Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self @@ -189,6 +190,53 @@ impl AttestationService = self.duties_service.attesters(slot).into_iter().collect(); + let attestation_service = self.clone(); + + let attestation_data_handle = self + .inner + .executor + .spawn_handle( + async move { + let attestation_data = attestation_service + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) + .await + .map_err(|e| e.to_string())?; + + attestation_service + .sign_and_publish_attestations( + slot, + &attestation_duties, + attestation_data.clone(), + ) + .await + .map_err(|e| { + crit!( + error = format!("{:?}", e), + slot = slot.as_u64(), + "Error during attestation routine" + ); + e + })?; + Ok::(attestation_data) + }, + "unaggregated attestation production", + ) + .ok_or("Failed to spawn attestation data task")?; + // If a validator needs to publish an aggregate attestation, they must do so at 2/3 // through the slot. This delay triggers at this time let aggregate_production_instant = Instant::now() @@ -196,7 +244,7 @@ impl AttestationService> = self + let aggregate_duties_by_committee_index: HashMap> = self .duties_service .attesters(slot) .into_iter() @@ -207,24 +255,45 @@ impl AttestationService data, + Ok(Some(Err(err))) => { + error!(?err, "Attestation production failed"); + return; + } + Ok(None) | Err(_) => { + info!("Aborting attestation production due to shutdown"); + return; + } + }; + + // For each committee index for this slot: + // Create and publish `SignedAggregateAndProof` for all aggregating validators. + aggregate_duties_by_committee_index.into_iter().for_each( + |(committee_index, validator_duties)| { + let attestation_service = attestation_service_clone.clone(); + let attestation_data = attestation_data.clone(); + executor.spawn_ignoring_error( + attestation_service.handle_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + attestation_data, + ), + "aggregate publish", + ); + }, + ) + }, + "attestation and aggregate publish", + ); // Schedule pruning of the slashing protection database once all unaggregated // attestations have (hopefully) been signed, i.e. at the same time as aggregate @@ -234,114 +303,76 @@ impl AttestationService, aggregate_production_instant: Instant, + attestation_data: AttestationData, ) -> Result<(), ()> { - let attestations_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS], - ); - - // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have + // There's not need to produce `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. if validator_duties.is_empty() { return Ok(()); } - // Step 1. - // - // Download, sign and publish an `Attestation` for each validator. - let attestation_opt = self - .produce_and_publish_attestations(slot, committee_index, &validator_duties) + // Wait until the `aggregation_production_instant` (2/3rds + // of the way though the slot). As verified in the + // `delay_triggers_when_in_the_past` test, this code will still run + // even if the instant has already elapsed. + sleep_until(aggregate_production_instant).await; + + // Start the metrics timer *after* we've done the delay. + let _aggregates_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::AGGREGATES], + ); + + // Download, sign and publish a `SignedAggregateAndProof` for each + // validator that is elected to aggregate for this `slot` and + // `committee_index`. + self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties) .await .map_err(move |e| { crit!( error = format!("{:?}", e), committee_index, slot = slot.as_u64(), - "Error during attestation routine" + "Error during aggregate attestation routine" ) })?; - drop(attestations_timer); - - // Step 2. - // - // If an attestation was produced, make an aggregate. - if let Some(attestation_data) = attestation_opt { - // First, wait until the `aggregation_production_instant` (2/3rds - // of the way though the slot). As verified in the - // `delay_triggers_when_in_the_past` test, this code will still run - // even if the instant has already elapsed. - sleep_until(aggregate_production_instant).await; - - // Start the metrics timer *after* we've done the delay. - let _aggregates_timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::AGGREGATES], - ); - - // Then download, sign and publish a `SignedAggregateAndProof` for each - // validator that is elected to aggregate for this `slot` and - // `committee_index`. - self.produce_and_publish_aggregates( - &attestation_data, - committee_index, - &validator_duties, - ) - .await - .map_err(move |e| { - crit!( - error = format!("{:?}", e), - committee_index, - slot = slot.as_u64(), - "Error during attestation routine" - ) - })?; - } - Ok(()) } - /// Performs the first step of the attesting process: downloading `Attestation` objects, - /// signing them and returning them to the validator. + /// Performs the main steps of the attesting process: signing and publishing to the BN. /// - /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting + /// https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/validator.md#attesting /// /// ## Detail /// /// The given `validator_duties` should already be filtered to only contain those that match - /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. - /// - /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each - /// validator and the list of individually-signed `Attestation` objects is returned to the BN. - #[instrument(skip_all, fields(%slot, %committee_index))] - async fn produce_and_publish_attestations( + /// `slot`. Critical errors will be logged if this is not the case. + #[instrument(skip_all, fields(%slot, %attestation_data.beacon_block_root))] + async fn sign_and_publish_attestations( &self, slot: Slot, - committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], - ) -> Result, String> { + attestation_data: AttestationData, + ) -> Result<(), String> { + let _attestations_timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS], + ); + if validator_duties.is_empty() { - return Ok(None); + return Ok(()); } let current_epoch = self @@ -350,23 +381,6 @@ impl AttestationService AttestationService AttestationService AttestationService AttestationService Date: Wed, 3 Dec 2025 15:06:28 +1100 Subject: [PATCH 7/7] Move deposit contract artifacts to /target (#8518) Alternative to: - https://github.com/sigp/lighthouse/pull/8488 Refactors deposit_contract crate to comply with Rust build conventions by placing generated artifacts in the build output directory. Co-Authored-By: Michael Sproul --- common/deposit_contract/build.rs | 9 ++++----- common/deposit_contract/src/lib.rs | 22 ++++++++++++++++------ 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/common/deposit_contract/build.rs b/common/deposit_contract/build.rs index cae1d480c8..2061d13c24 100644 --- a/common/deposit_contract/build.rs +++ b/common/deposit_contract/build.rs @@ -153,14 +153,13 @@ fn verify_checksum(bytes: &[u8], expected_checksum: &str) { /// Returns the directory that will be used to store the deposit contract ABI. fn abi_dir() -> PathBuf { - let base = env::var("CARGO_MANIFEST_DIR") - .expect("should know manifest dir") + let base = env::var("OUT_DIR") + .expect("should know out dir") .parse::() - .expect("should parse manifest dir as path") - .join("contracts"); + .expect("should parse out dir as path"); std::fs::create_dir_all(base.clone()) - .expect("should be able to create abi directory in manifest"); + .expect("should be able to create abi directory in out dir"); base } diff --git a/common/deposit_contract/src/lib.rs b/common/deposit_contract/src/lib.rs index 12c3bdaa89..e5f11bb89c 100644 --- a/common/deposit_contract/src/lib.rs +++ b/common/deposit_contract/src/lib.rs @@ -44,15 +44,25 @@ impl From for Error { pub const CONTRACT_DEPLOY_GAS: usize = 4_000_000; pub const DEPOSIT_GAS: usize = 400_000; -pub const ABI: &[u8] = include_bytes!("../contracts/v0.12.1_validator_registration.json"); -pub const BYTECODE: &[u8] = include_bytes!("../contracts/v0.12.1_validator_registration.bytecode"); +pub const ABI: &[u8] = include_bytes!(concat!( + env!("OUT_DIR"), + "/v0.12.1_validator_registration.json" +)); +pub const BYTECODE: &[u8] = include_bytes!(concat!( + env!("OUT_DIR"), + "/v0.12.1_validator_registration.bytecode" +)); pub const DEPOSIT_DATA_LEN: usize = 420; // lol pub mod testnet { - pub const ABI: &[u8] = - include_bytes!("../contracts/v0.12.1_testnet_validator_registration.json"); - pub const BYTECODE: &[u8] = - include_bytes!("../contracts/v0.12.1_testnet_validator_registration.bytecode"); + pub const ABI: &[u8] = include_bytes!(concat!( + env!("OUT_DIR"), + "/v0.12.1_testnet_validator_registration.json" + )); + pub const BYTECODE: &[u8] = include_bytes!(concat!( + env!("OUT_DIR"), + "/v0.12.1_testnet_validator_registration.bytecode" + )); } pub fn encode_eth1_tx_data(deposit_data: &DepositData) -> Result, Error> {