From bbb074692c1b2bbef2edd3bbe30ce6d581b52185 Mon Sep 17 00:00:00 2001 From: Mac L Date: Thu, 29 Jan 2026 10:30:22 +0400 Subject: [PATCH 1/3] Disable `sqlite` by default (#8708) Compiling types with the `sqlite` feature more than doubles the total compile time. Most downstream users don't need `sqlite` compatibility for `Slot` and `Epoch` types. Disable the `sqlite` feature by default. Co-Authored-By: Mac L --- consensus/types/Cargo.toml | 2 +- validator_client/slashing_protection/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 78c6f871cb..feea855c84 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -8,7 +8,7 @@ authors = [ edition = { workspace = true } [features] -default = ["sqlite", "legacy-arith"] +default = ["legacy-arith"] # Allow saturating arithmetic on slots and epochs. Enabled by default, but deprecated. legacy-arith = [] sqlite = ["dep:rusqlite"] diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index b80da6c786..86df6d01fe 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -23,7 +23,7 @@ serde = { workspace = true } serde_json = { workspace = true } tempfile = { workspace = true } tracing = { workspace = true } -types = { workspace = true } +types = { workspace = true, features = ["sqlite"] } [dev-dependencies] rayon = { workspace = true } From 119dc565a119f263bc5cf6fea72ac8d92b54c8d7 Mon Sep 17 00:00:00 2001 From: chonghe <44791194+chong-he@users.noreply.github.com> Date: Thu, 29 Jan 2026 14:44:17 +0800 Subject: [PATCH 2/3] Call beacon_committee_selections only once per epoch (#8699) * #8381 Add another if in the if/else branch for selections_endpoint case, and leaving the current code for non-selections_endpoint case mostly untouched. Now the if/else branch in `fill_in_selection_proofs` has 3 branches: - 1 for DVT with selections_endpoint (added in this PR) - 1 for DVT without selections_endpoint (Anchor) (untouched) - Non-DVT (untouched) Tested and it's working, also thanks very much to @KaloyanTanev for helping to test and confirmed that the calling has reduced and aggregated attestations are still working Co-Authored-By: Tan Chee Keong --- .../validator_services/src/duties_service.rs | 221 ++++++++++++------ 1 file changed, 149 insertions(+), 72 deletions(-) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index c2378181ef..f467db92a1 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -17,7 +17,7 @@ use eth2::types::{ }; use futures::{ StreamExt, - stream::{self, FuturesUnordered}, + stream::{self, FuturesUnordered, TryStreamExt}, }; use parking_lot::{RwLock, RwLockWriteGuard}; use safe_arith::{ArithError, SafeArith}; @@ -141,71 +141,15 @@ impl Default for SelectionProofConfig { /// Create a selection proof for `duty`. /// /// Return `Ok(None)` if the attesting validator is not an aggregator. -async fn make_selection_proof( +async fn make_selection_proof( duty: &AttesterData, validator_store: &S, spec: &ChainSpec, - beacon_nodes: &Arc>, - config: &SelectionProofConfig, ) -> Result, Error> { - let selection_proof = if config.selections_endpoint { - let beacon_committee_selection = BeaconCommitteeSelection { - validator_index: duty.validator_index, - slot: duty.slot, - // This is partial selection proof - selection_proof: validator_store - .produce_selection_proof(duty.pubkey, duty.slot) - .await - .map_err(Error::FailedToProduceSelectionProof)? - .into(), - }; - // Call the endpoint /eth/v1/validator/beacon_committee_selections - // by sending the BeaconCommitteeSelection that contains partial selection proof - // The middleware should return BeaconCommitteeSelection that contains full selection proof - let middleware_response = beacon_nodes - .first_success(|beacon_node| { - let selection_data = beacon_committee_selection.clone(); - debug!( - "validator_index" = duty.validator_index, - "slot" = %duty.slot, - "partial selection proof" = ?beacon_committee_selection.selection_proof, - "Sending selection to middleware" - ); - async move { - beacon_node - .post_validator_beacon_committee_selections(&[selection_data]) - .await - } - }) - .await; - - let response_data = middleware_response - .map_err(|e| { - Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(e.to_string())) - })? - .data - .pop() - .ok_or_else(|| { - Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(format!( - "attestation selection proof - empty response for validator {}", - duty.validator_index - ))) - })?; - - debug!( - "validator_index" = response_data.validator_index, - "slot" = %response_data.slot, - // The selection proof from middleware response will be a full selection proof - "full selection proof" = ?response_data.selection_proof, - "Received selection from middleware" - ); - SelectionProof::from(response_data.selection_proof) - } else { - validator_store - .produce_selection_proof(duty.pubkey, duty.slot) - .await - .map_err(Error::FailedToProduceSelectionProof)? - }; + let selection_proof = validator_store + .produce_selection_proof(duty.pubkey, duty.slot) + .await + .map_err(Error::FailedToProduceSelectionProof)?; selection_proof .is_aggregator(duty.committee_length as usize, spec) @@ -221,6 +165,69 @@ async fn make_selection_proof( }) } +/// Create a Vec for every epoch +/// so that when calling the selections_endpoint later, it calls once per epoch with duties of all slots in that epoch +async fn make_beacon_committee_selection( + duties_service: &Arc>, + duties: &[AttesterData], +) -> Result, Error> { + // collect the BeaconCommitteeSelection in duties + let beacon_committee_selections = duties + .iter() + .map(|duty| { + let validator_store = &duties_service.validator_store; + async move { + let partial_selection_proof = validator_store + .produce_selection_proof(duty.pubkey, duty.slot) + .await + .map_err(Error::FailedToProduceSelectionProof)?; + Ok::>(BeaconCommitteeSelection { + validator_index: duty.validator_index, + slot: duty.slot, + selection_proof: partial_selection_proof.into(), + }) + } + }) + .collect::>() + .try_collect::>() + .await?; + + let epoch = duties + .first() + .map(|attester_data| attester_data.slot.epoch(S::E::slots_per_epoch())) + .unwrap_or_default(); + + debug!( + %epoch, + count = beacon_committee_selections.len(), + "Sending selections to middleware" + ); + + let selections = duties_service + .beacon_nodes + .first_success(|beacon_node| { + let selections = beacon_committee_selections.clone(); + async move { + beacon_node + .post_validator_beacon_committee_selections(&selections) + .await + } + }) + .await + .map_err(|e| { + Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(e.to_string())) + })? + .data; + + debug!( + %epoch, + count = beacon_committee_selections.len(), + "Received selections from middleware" + ); + + Ok(selections) +} + impl DutyAndProof { /// Create a new `DutyAndProof` with the selection proof waiting to be filled in. pub fn new_without_selection_proof(duty: AttesterData, current_slot: Slot) -> Self { @@ -1287,14 +1294,21 @@ async fn fill_in_selection_proofs> = BTreeMap::new(); - for duty in duties { - duties_by_slot.entry(duty.slot).or_default().push(duty); + for duty in &duties { + duties_by_slot + .entry(duty.slot) + .or_default() + .push(duty.clone()); } // At halfway through each slot when nothing else is likely to be getting signed, sign a batch // of selection proofs and insert them into the duties service `attesters` map. let slot_clock = &duties_service.slot_clock; + // Create a HashMap for BeaconCommitteeSelection to match the duty later for distributed case involving middleware + let mut selection_hashmap = HashMap::new(); + let mut call_selection_endpoint = false; + while !duties_by_slot.is_empty() { if let Some(duration) = slot_clock.duration_to_next_slot() { sleep( @@ -1333,10 +1347,77 @@ async fn fill_in_selection_proofs selections, + Err(e) => { + error!( + error = ?e, + "Failed to fetch selection proofs" + ); + // If calling the endpoint fails, change to true so that it will retry the next slot + call_selection_endpoint = true; + continue; + } + }; + + for selection in &selections { + // This is a full_selection_proof returned by middleware + let selection_proof = + SelectionProof::from(selection.selection_proof.clone()); + selection_hashmap + .insert((selection.validator_index, selection.slot), selection_proof); + } + // Once we have the selection_proof, we don't call the selections_endpoint again + call_selection_endpoint = false; + } + + for duty in relevant_duties.into_values().flatten() { + let key = (duty.validator_index, duty.slot); + + let result = if let Some(selection_proof) = selection_hashmap.remove(&key) { + match selection_proof + .is_aggregator(duty.committee_length as usize, &duties_service.spec) + .map_err(Error::::InvalidModulo) + { + // Aggregator, return the result + Ok(true) => Ok((duty, Some(selection_proof))), + // Not an aggregator, do nothing and continue + Ok(false) => continue, + Err(_) => return, + } + } else { + Err(Error::FailedToProduceSelectionProof( + ValidatorStoreError::Middleware(format!( + "Missing selection proof for validator {} slot {}", + duty.validator_index, duty.slot + )), + )) + }; + + let mut attesters = duties_service.attesters.write(); + // if process_duty_and_proof returns false, exit the loop + if !process_duty_and_proof::( + &mut attesters, + result, + dependent_root, + current_slot, + ) { + return; + } + } + } + // For distributed case that uses parallel_sign + else if duties_service.selection_proof_config.parallel_sign { let mut duty_and_proof_results = relevant_duties .into_values() .flatten() @@ -1345,8 +1426,6 @@ async fn fill_in_selection_proofs Date: Thu, 29 Jan 2026 18:39:05 +1100 Subject: [PATCH 3/3] Emit `NewHead` SSE event earlier in block import (#8718) Co-Authored-By: Jimmy Chen --- beacon_node/beacon_chain/src/beacon_chain.rs | 51 +++++++++++++++++++ .../beacon_chain/src/canonical_head.rs | 38 +------------- beacon_node/beacon_chain/tests/events.rs | 42 +++++++++++++++ beacon_node/http_api/tests/tests.rs | 14 ++++- 4 files changed, 107 insertions(+), 38 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 148a4f8fcd..df0db42a66 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -77,6 +77,7 @@ use bls::{PublicKey, PublicKeyBytes, Signature}; use eth2::beacon_response::ForkVersionedResponse; use eth2::types::{ EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes, + SseHead, }; use execution_layer::{ BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer, @@ -3863,6 +3864,9 @@ impl BeaconChain { } }; + // Read the cached head prior to taking the fork choice lock to avoid potential deadlocks. + let old_head_slot = self.canonical_head.cached_head().head_slot(); + // Take an upgradable read lock on fork choice so we can check if this block has already // been imported. We don't want to repeat work importing a block that is already imported. let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock(); @@ -3918,6 +3922,9 @@ impl BeaconChain { // This block became the head, add it to the early attester cache. Ok(new_head_root) if new_head_root == block_root => { if let Some(proto_block) = fork_choice.get_block(&block_root) { + let new_head_is_optimistic = + proto_block.execution_status.is_optimistic_or_invalid(); + if let Err(e) = self.early_attester_cache.add_head_block( block_root, &signed_block, @@ -3937,6 +3944,50 @@ impl BeaconChain { attestable_timestamp, ) } + + // Register a server-sent-event for a new head. + if let Some(event_handler) = self + .event_handler + .as_ref() + .filter(|handler| handler.has_head_subscribers()) + { + let head_slot = state.slot(); + let state_root = block.state_root(); + let is_epoch_transition = state.current_epoch() + > old_head_slot.epoch(T::EthSpec::slots_per_epoch()); + + let dependent_root = state.attester_shuffling_decision_root( + self.genesis_block_root, + RelativeEpoch::Next, + ); + let prev_dependent_root = state.attester_shuffling_decision_root( + self.genesis_block_root, + RelativeEpoch::Current, + ); + + match (dependent_root, prev_dependent_root) { + ( + Ok(current_duty_dependent_root), + Ok(previous_duty_dependent_root), + ) => { + event_handler.register(EventKind::Head(SseHead { + slot: head_slot, + block: block_root, + state: state_root, + current_duty_dependent_root, + previous_duty_dependent_root, + epoch_transition: is_epoch_transition, + execution_optimistic: new_head_is_optimistic, + })); + } + (Err(e), _) | (_, Err(e)) => { + warn!( + error = ?e, + "Unable to find dependent roots, cannot register head event" + ); + } + } + } } else { warn!(?block_root, "Early attester block missing"); } diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index f18fdb25f6..417d7f4e2f 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -41,7 +41,7 @@ use crate::{ metrics, validator_monitor::get_slot_delay_ms, }; -use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead}; +use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseLateHead}; use fork_choice::{ ExecutionStatus, ForkChoiceStore, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, ResetPayloadStatuses, @@ -824,15 +824,8 @@ impl BeaconChain { .slot() .epoch(T::EthSpec::slots_per_epoch()); - // These fields are used for server-sent events. - let state_root = new_snapshot.beacon_state_root(); + // This field is used for server-sent events. let head_slot = new_snapshot.beacon_state.slot(); - let dependent_root = new_snapshot - .beacon_state - .attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Next); - let prev_dependent_root = new_snapshot - .beacon_state - .attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current); match BlockShufflingIds::try_from_head( new_snapshot.beacon_block_root, @@ -870,33 +863,6 @@ impl BeaconChain { self.op_pool.prune_attestations(self.epoch()?); } - // Register server-sent-events for a new head. - if let Some(event_handler) = self - .event_handler - .as_ref() - .filter(|handler| handler.has_head_subscribers()) - { - match (dependent_root, prev_dependent_root) { - (Ok(current_duty_dependent_root), Ok(previous_duty_dependent_root)) => { - event_handler.register(EventKind::Head(SseHead { - slot: head_slot, - block: new_snapshot.beacon_block_root, - state: state_root, - current_duty_dependent_root, - previous_duty_dependent_root, - epoch_transition: is_epoch_transition, - execution_optimistic: new_head_is_optimistic, - })); - } - (Err(e), _) | (_, Err(e)) => { - warn!( - error = ?e, - "Unable to find dependent roots, cannot register head event" - ); - } - } - } - // Register a server-sent-event for a reorg (if necessary). if let Some(depth) = reorg_distance && let Some(event_handler) = self diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 315da0862c..92727ffd76 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -213,3 +213,45 @@ async fn data_column_sidecar_event_on_process_rpc_columns() { EventKind::DataColumnSidecar(expected_sse_data_column) ); } + +/// Verifies that a head event is emitted when a block is imported and becomes the head. +#[tokio::test] +async fn head_event_on_block_import() { + let spec = Arc::new(test_spec::()); + let harness = BeaconChainHarness::builder(E::default()) + .spec(spec.clone()) + .deterministic_keypairs(8) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + // Subscribe to head events before importing the block + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut head_event_receiver = event_handler.subscribe_head(); + + // Build and process a block that will become the new head + let head_state = harness.get_current_state(); + let target_slot = head_state.slot() + 1; + harness.advance_slot(); + let ((signed_block, blobs), _) = harness.make_block(head_state, target_slot).await; + + let block_root = signed_block.canonical_root(); + let state_root = signed_block.message().state_root(); + + harness + .process_block(target_slot, block_root, (signed_block, blobs)) + .await + .unwrap(); + + // Verify the head event was emitted with correct data + let head_event = head_event_receiver.try_recv().unwrap(); + if let EventKind::Head(sse_head) = head_event { + assert_eq!(sse_head.slot, target_slot); + assert_eq!(sse_head.block, block_root); + assert_eq!(sse_head.state, state_root); + // execution_optimistic should be false since we're using mock execution layer + assert!(!sse_head.execution_optimistic); + } else { + panic!("Expected Head event, got {:?}", head_event); + } +} diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 8a79541363..a763db6421 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -6539,8 +6539,13 @@ impl ApiTester { block_events.as_slice(), &[ expected_gossip, - expected_block, + // SSE `Head`` event is now emitted before `Block` event, because we only emit the block event + // after it's persisted to the database. We could consider changing this later, but + // we might have to serve http API requests for blocks from early_attester_cache + // before they're persisted to the database. + // https://github.com/sigp/lighthouse/pull/8718#issuecomment-3815593310 expected_head, + expected_block, expected_finalized ] ); @@ -6797,7 +6802,12 @@ impl ApiTester { .unwrap(); let block_events = poll_events(&mut events_future, 2, Duration::from_millis(10000)).await; - assert_eq!(block_events.as_slice(), &[expected_block, expected_head]); + // SSE `Head`` event is now emitted before `Block` event, because we only emit the block event + // after it's persisted to the database. We could consider changing this later, but + // we might have to serve http API requests for blocks from early_attester_cache + // before they're persisted to the database. + // https://github.com/sigp/lighthouse/pull/8718#issuecomment-3815593310 + assert_eq!(block_events.as_slice(), &[expected_head, expected_block]); self }