From d34a91ac07fcef1f41e09563b68d033a92296bab Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 29 Apr 2025 16:28:42 +0800 Subject: [PATCH] address comments --- validator_client/src/lib.rs | 27 +++++++++++++-- .../validator_services/src/duties_service.rs | 33 +++++-------------- .../validator_services/src/sync.rs | 18 ++-------- 3 files changed, 34 insertions(+), 44 deletions(-) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index c1a3217753..5cb1c362ba 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -86,6 +86,11 @@ const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1; /// Fraction of a slot at which selection proof signing should happen (2 means half way). const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; +/// Number of epochs in advance to compute selection proofs when not in `distributed` mode. +pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; +/// Number of slots in advance to compute selection proofs when in `distributed` mode. +pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; + #[derive(Clone)] pub struct ProductionValidatorClient { context: RuntimeContext, @@ -459,7 +464,7 @@ impl ProductionValidatorClient { // Define a config to be pass to fill_in_selection_proofs. // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. - let selection_proof_config = if config.distributed { + let attestation_selection_proof_config = if config.distributed { SelectionProofConfig { lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT, computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM, @@ -475,11 +480,27 @@ impl ProductionValidatorClient { } }; + let sync_selection_proof_config = if config.distributed { + SelectionProofConfig { + lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED, + computation_offset: Duration::default(), + selections_endpoint: true, + parallel_sign: true, + } + } else { + SelectionProofConfig { + lookahead_slot: E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS, + computation_offset: Duration::default(), + selections_endpoint: false, + parallel_sign: false, + } + }; + let duties_context = context.service_context("duties".into()); let duties_service = Arc::new(DutiesService { attesters: <_>::default(), proposers: <_>::default(), - sync_duties: SyncDutiesMap::new(selection_proof_config.clone()), + sync_duties: SyncDutiesMap::new(sync_selection_proof_config), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), @@ -487,7 +508,7 @@ impl ProductionValidatorClient { spec: context.eth2_config.spec.clone(), context: duties_context, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, - selection_proof_config, + selection_proof_config: attestation_selection_proof_config, disable_attesting: config.disable_attesting, }); diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 24999ba0c3..27f0817b0d 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -115,7 +115,6 @@ pub struct SubscriptionSlots { duty_slot: Slot, } -#[derive(Clone)] pub struct SelectionProofConfig { pub lookahead_slot: u64, pub computation_offset: Duration, // The seconds to compute the selection proof before a slot @@ -130,10 +129,10 @@ async fn make_selection_proof( duty: &AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, - duties_service: &DutiesService, beacon_nodes: &Arc>, + config: &SelectionProofConfig, ) -> Result, Error> { - let selection_proof = if duties_service.selection_proof_config.selections_endpoint { + let selection_proof = if config.selections_endpoint { let beacon_committee_selection = BeaconCommitteeSelection { validator_index: duty.validator_index, slot: duty.slot, @@ -1161,12 +1160,7 @@ async fn fill_in_selection_proofs( while !duties_by_slot.is_empty() { if let Some(duration) = slot_clock.duration_to_next_slot() { sleep( - duration.saturating_sub( - duties_service - .sync_duties - .selection_proof_config - .computation_offset, - ), + duration.saturating_sub(duties_service.selection_proof_config.computation_offset), ) .await; @@ -1174,18 +1168,11 @@ async fn fill_in_selection_proofs( continue; }; - let selection_lookahead = duties_service - .sync_duties - .selection_proof_config - .lookahead_slot; + let selection_lookahead = duties_service.selection_proof_config.lookahead_slot; let lookahead_slot = current_slot + selection_lookahead; - let relevant_duties = if duties_service - .sync_duties - .selection_proof_config - .parallel_sign - { + let relevant_duties = if duties_service.selection_proof_config.parallel_sign { // Remove old slot duties and only keep current duties in distributed mode duties_by_slot .remove(&lookahead_slot) @@ -1211,11 +1198,7 @@ async fn fill_in_selection_proofs( // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case - if duties_service - .sync_duties - .selection_proof_config - .parallel_sign - { + if duties_service.selection_proof_config.parallel_sign { let mut duty_and_proof_results = relevant_duties .into_values() .flatten() @@ -1224,8 +1207,8 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - &duties_service, &duties_service.beacon_nodes, + &duties_service.selection_proof_config, ) .await?; Ok((duty, opt_selection_proof)) @@ -1252,8 +1235,8 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - &duties_service, &duties_service.beacon_nodes, + &duties_service.selection_proof_config, ) .await?; Ok((duty, opt_selection_proof)) diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index 15252f4d92..0df51adb32 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -14,11 +14,6 @@ use tracing::{debug, error, info, warn}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use validator_store::Error as ValidatorStoreError; -/// Number of epochs in advance to compute selection proofs when not in `distributed` mode. -pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; -/// Number of slots in advance to compute selection proofs when in `distributed` mode. -pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; - /// Top-level data-structure containing sync duty information. /// /// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained @@ -106,15 +101,6 @@ impl SyncDutiesMap { }) } - /// Number of slots in advance to compute selection proofs - fn aggregation_pre_compute_slots(&self) -> u64 { - if self.selection_proof_config.parallel_sign { - AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED - } else { - E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS - } - } - /// Prepare for pre-computation of selection proofs for `committee_period`. /// /// Return the slot up to which proofs should be pre-computed, as well as a vec of @@ -130,7 +116,7 @@ impl SyncDutiesMap { current_slot, first_slot_of_period::(committee_period, spec), ); - let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots(); + let pre_compute_lookahead_slots = self.selection_proof_config.lookahead_slot; let pre_compute_slot = std::cmp::min( current_slot + pre_compute_lookahead_slots, last_slot_of_period::(committee_period, spec), @@ -382,7 +368,7 @@ pub async fn poll_sync_committee_duties( } // Pre-compute aggregator selection proofs for the next period. - let aggregate_pre_compute_lookahead_slots = sync_duties.aggregation_pre_compute_slots(); + let aggregate_pre_compute_lookahead_slots = sync_duties.selection_proof_config.lookahead_slot; if (current_slot + aggregate_pre_compute_lookahead_slots) .epoch(E::slots_per_epoch()) .sync_committee_period(spec)?