diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 13ecfe42ca..0367b8da82 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -4,6 +4,7 @@ mod latency; mod notifier; use crate::cli::ValidatorClient; +use crate::duties_service::SelectionProofConfig; pub use config::Config; use initialized_validators::InitializedValidators; use metrics::set_gauge; @@ -74,6 +75,17 @@ const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4; const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger"; +/// Compute attestation selection proofs this many slots before they are required. +/// +/// At start-up selection proofs will be computed with less lookahead out of necessity. +const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; + +/// The attestation selection proof lookahead for those running with the --distributed flag. +const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1; + +/// Fraction of a slot at which selection proof signing should happen (2 means half way). +const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; + #[derive(Clone)] pub struct ProductionValidatorClient { context: RuntimeContext, @@ -444,11 +456,30 @@ impl ProductionValidatorClient { validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true); } + // Define a config to be pass to fill_in_selection_proofs. + // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) + // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. + let selection_proof_config = if config.distributed { + SelectionProofConfig { + lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT, + computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM, + selections_endpoint: true, + parallel_sign: true, + } + } else { + SelectionProofConfig { + lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD, + computation_offset: Duration::from_secs(6), + selections_endpoint: false, + parallel_sign: false, + } + }; + let duties_context = context.service_context("duties".into()); let duties_service = Arc::new(DutiesService { attesters: <_>::default(), proposers: <_>::default(), - sync_duties: SyncDutiesMap::new(config.distributed), + sync_duties: SyncDutiesMap::new(selection_proof_config.clone()), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), @@ -456,7 +487,7 @@ impl ProductionValidatorClient { spec: context.eth2_config.spec.clone(), context: duties_context, enable_high_validator_count_metrics: config.enable_high_validator_count_metrics, - distributed: config.distributed, + selection_proof_config, disable_attesting: config.disable_attesting, }); diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index a5d582381e..24999ba0c3 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -37,17 +37,6 @@ use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; -/// Compute attestation selection proofs this many slots before they are required. -/// -/// At start-up selection proofs will be computed with less lookahead out of necessity. -const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; - -/// The attestation selection proof lookahead for those running with the --distributed flag. -const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1; - -/// Fraction of a slot at which selection proof signing should happen (2 means half way). -const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; - /// Minimum number of validators for which we auto-enable per-validator metrics. /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// flag in the cli to enable collection of per validator metrics. @@ -126,6 +115,7 @@ pub struct SubscriptionSlots { duty_slot: Slot, } +#[derive(Clone)] pub struct SelectionProofConfig { pub lookahead_slot: u64, pub computation_offset: Duration, // The seconds to compute the selection proof before a slot @@ -140,10 +130,10 @@ async fn make_selection_proof( duty: &AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, - config: &SelectionProofConfig, + duties_service: &DutiesService, beacon_nodes: &Arc>, ) -> Result, Error> { - let selection_proof = if config.selections_endpoint { + let selection_proof = if duties_service.selection_proof_config.selections_endpoint { let beacon_committee_selection = BeaconCommitteeSelection { validator_index: duty.validator_index, slot: duty.slot, @@ -285,10 +275,10 @@ pub struct DutiesService { pub context: RuntimeContext, /// The current chain spec. pub spec: Arc, - //// Whether we permit large validator counts in the metrics. + /// Whether we permit large validator counts in the metrics. pub enable_high_validator_count_metrics: bool, - /// If this validator is running in distributed mode. - pub distributed: bool, + /// Pass the config for distributed or non-distributed mode. + pub selection_proof_config: SelectionProofConfig, pub disable_attesting: bool, } @@ -990,24 +980,9 @@ async fn poll_beacon_attesters_for_epoch( // Spawn the background task to compute selection proofs. let subservice = duties_service.clone(); - // Define a config to be pass to fill_in_selection_proofs. - // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) - // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. - let config = SelectionProofConfig { - lookahead_slot: if duties_service.distributed { - SELECTION_PROOF_SLOT_LOOKAHEAD_DVT - } else { - SELECTION_PROOF_SLOT_LOOKAHEAD - }, - computation_offset: duties_service.slot_clock.slot_duration() - / SELECTION_PROOF_SCHEDULE_DENOM, - selections_endpoint: duties_service.distributed, - parallel_sign: duties_service.distributed, - }; - duties_service.context.executor.spawn( async move { - fill_in_selection_proofs(subservice, new_duties, dependent_root, config).await; + fill_in_selection_proofs(subservice, new_duties, dependent_root).await; }, "duties_service_selection_proofs_background", ); @@ -1171,7 +1146,6 @@ async fn fill_in_selection_proofs( duties_service: Arc>, duties: Vec, dependent_root: Hash256, - config: SelectionProofConfig, ) { // Sort duties by slot in a BTreeMap. let mut duties_by_slot: BTreeMap> = BTreeMap::new(); @@ -1186,17 +1160,32 @@ async fn fill_in_selection_proofs( while !duties_by_slot.is_empty() { if let Some(duration) = slot_clock.duration_to_next_slot() { - sleep(duration.saturating_sub(config.computation_offset)).await; + sleep( + duration.saturating_sub( + duties_service + .sync_duties + .selection_proof_config + .computation_offset, + ), + ) + .await; let Some(current_slot) = slot_clock.now() else { continue; }; - let selection_lookahead = config.lookahead_slot; + let selection_lookahead = duties_service + .sync_duties + .selection_proof_config + .lookahead_slot; let lookahead_slot = current_slot + selection_lookahead; - let relevant_duties = if config.selections_endpoint { + let relevant_duties = if duties_service + .sync_duties + .selection_proof_config + .parallel_sign + { // Remove old slot duties and only keep current duties in distributed mode duties_by_slot .remove(&lookahead_slot) @@ -1222,7 +1211,11 @@ async fn fill_in_selection_proofs( // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case - if config.parallel_sign { + if duties_service + .sync_duties + .selection_proof_config + .parallel_sign + { let mut duty_and_proof_results = relevant_duties .into_values() .flatten() @@ -1231,7 +1224,7 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - &config, + &duties_service, &duties_service.beacon_nodes, ) .await?; @@ -1259,7 +1252,7 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - &config, + &duties_service, &duties_service.beacon_nodes, ) .await?; diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index d85212c3e6..a2ad36e0a6 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -10,7 +10,6 @@ use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; use tracing::{debug, error, info, warn}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use validator_store::Error as ValidatorStoreError; @@ -36,7 +35,7 @@ pub struct SyncDutiesMap { /// Map from sync committee period to duties for members of that sync committee. committees: RwLock>, /// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute. - distributed: bool, + pub selection_proof_config: SelectionProofConfig, _phantom: PhantomData, } @@ -86,10 +85,10 @@ pub struct SlotDuties { } impl SyncDutiesMap { - pub fn new(distributed: bool) -> Self { + pub fn new(selection_proof_config: SelectionProofConfig) -> Self { Self { committees: RwLock::new(HashMap::new()), - distributed, + selection_proof_config, _phantom: PhantomData, } } @@ -109,7 +108,7 @@ impl SyncDutiesMap { /// Number of slots in advance to compute selection proofs fn aggregation_pre_compute_slots(&self) -> u64 { - if self.distributed { + if self.selection_proof_config.parallel_sign { AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED } else { E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS @@ -354,14 +353,6 @@ pub async fn poll_sync_committee_duties( async move { // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. - let config = SelectionProofConfig { - lookahead_slot: sub_duties_service - .sync_duties - .aggregation_pre_compute_slots(), // Use the current behaviour defined in the method - computation_offset: Duration::from_secs(0), - selections_endpoint: sub_duties_service.sync_duties.distributed, - parallel_sign: sub_duties_service.sync_duties.distributed, - }; fill_in_aggregation_proofs( sub_duties_service, @@ -369,7 +360,6 @@ pub async fn poll_sync_committee_duties( current_sync_committee_period, current_slot, current_pre_compute_slot, - config, ) .await }, @@ -408,22 +398,12 @@ pub async fn poll_sync_committee_duties( let sub_duties_service = duties_service.clone(); duties_service.context.executor.spawn( async move { - let config = SelectionProofConfig { - lookahead_slot: sub_duties_service - .sync_duties - .aggregation_pre_compute_slots(), - computation_offset: Duration::from_secs(0), - selections_endpoint: sub_duties_service.sync_duties.distributed, - parallel_sign: sub_duties_service.sync_duties.distributed, - }; - fill_in_aggregation_proofs( sub_duties_service, &new_pre_compute_duties, next_sync_committee_period, current_slot, pre_compute_slot, - config, ) .await }, @@ -528,7 +508,6 @@ pub async fn make_sync_selection_proof( duty: &SyncDuty, proof_slot: Slot, subnet_id: SyncSubnetId, - config: &SelectionProofConfig, ) -> Option { let sync_selection_proof = duties_service .validator_store @@ -557,7 +536,11 @@ pub async fn make_sync_selection_proof( }; // In distributed mode when we want to call the selections endpoint - if config.selections_endpoint { + if duties_service + .sync_duties + .selection_proof_config + .selections_endpoint + { debug!( "validator_index" = duty.validator_index, "slot" = %proof_slot, @@ -627,14 +610,16 @@ pub async fn fill_in_aggregation_proofs( sync_committee_period: u64, current_slot: Slot, pre_compute_slot: Slot, - config: SelectionProofConfig, ) { // Generate selection proofs for each validator at each slot, one slot at a time. for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { // For distributed mode - if config.parallel_sign { + if duties_service + .sync_duties + .selection_proof_config + .parallel_sign + { let mut futures_unordered = FuturesUnordered::new(); - let config_ref = &config; for (_, duty) in pre_compute_duties { let subnet_ids = match duty.subnet_ids::() { @@ -655,14 +640,9 @@ pub async fn fill_in_aggregation_proofs( for &subnet_id in &subnet_ids { let duties_service = duties_service.clone(); futures_unordered.push(async move { - let result = make_sync_selection_proof( - &duties_service, - duty, - proof_slot, - subnet_id, - config_ref, - ) - .await; + let result = + make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id) + .await; result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) }); @@ -740,19 +720,13 @@ pub async fn fill_in_aggregation_proofs( // Create futures to produce proofs. let duties_service_ref = &duties_service; - let config_ref = &config; let futures = subnet_ids.iter().map(|subnet_id| async move { // Construct proof for prior slot. let proof_slot = slot - 1; - let proof = make_sync_selection_proof( - duties_service_ref, - duty, - proof_slot, - *subnet_id, - config_ref, - ) - .await; + let proof = + make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id) + .await; match proof { Some(proof) => match proof.is_aggregator::() {