diff --git a/validator_client/doppelganger_service/src/lib.rs b/validator_client/doppelganger_service/src/lib.rs index 668dd73ab2..348890705f 100644 --- a/validator_client/doppelganger_service/src/lib.rs +++ b/validator_client/doppelganger_service/src/lib.rs @@ -883,8 +883,8 @@ mod test { } } - #[test] - fn unregistered_validator() { + #[tokio::test] + async fn unregistered_validator() { // Non-genesis epoch let epoch = genesis_epoch() + 2; @@ -995,7 +995,7 @@ mod test { }, ) // All validators should be enabled. - .assert_all_enabled(); + .assert_all_enabled().await; } async fn detect_after_genesis_test(mutate_responses: F) @@ -1073,8 +1073,8 @@ mod test { .await } - #[test] - fn register_prior_to_genesis() { + #[tokio::test] + async fn register_prior_to_genesis() { let prior_to_genesis = GENESIS_TIME.checked_sub(SLOT_DURATION).unwrap(); TestBuilder::default() @@ -1244,8 +1244,8 @@ mod test { }); } - #[test] - fn time_skips_forward_with_doppelgangers() { + #[tokio::test] + async fn time_skips_forward_with_doppelgangers() { let initial_epoch = genesis_epoch() + 1; let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); let skipped_forward_epoch = initial_epoch + 42; @@ -1256,6 +1256,7 @@ mod test { .set_slot(initial_slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await // First, simulate a check in the initialization epoch. .simulate_detect_doppelgangers( initial_slot, @@ -1268,6 +1269,7 @@ mod test { }, ) .assert_all_disabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: initial_epoch + 1, remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, @@ -1294,8 +1296,8 @@ mod test { }); } - #[test] - fn time_skips_backward() { + #[tokio::test] + async fn time_skips_backward() { let initial_epoch = genesis_epoch() + 42; let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); let skipped_backward_epoch = initial_epoch - 12; @@ -1306,6 +1308,7 @@ mod test { .set_slot(initial_slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await // First, simulate a check in the initialization epoch. .simulate_detect_doppelgangers( initial_slot, @@ -1318,6 +1321,7 @@ mod test { }, ) .assert_all_disabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: initial_epoch + 1, remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, @@ -1334,6 +1338,7 @@ mod test { }, ) .assert_all_disabled() + .await // When time skips backward we should *not* allow doppelganger advancement. .assert_all_states(&DoppelgangerState { next_check_epoch: initial_epoch + 1, @@ -1341,8 +1346,8 @@ mod test { }); } - #[test] - fn staggered_entry() { + #[tokio::test] + async fn staggered_entry() { let early_epoch = genesis_epoch() + 42; let early_slot = early_epoch.start_slot(E::slots_per_epoch()); let early_activation_slot = @@ -1363,7 +1368,8 @@ mod test { .register_validators(&early_validators) .set_slot(late_slot) .register_validators(&late_validators) - .assert_all_disabled(); + .assert_all_disabled() + .await; for slot in early_slot.as_u64()..=late_activation_slot.as_u64() { let slot = Slot::new(slot); @@ -1401,6 +1407,6 @@ mod test { } } - scenario.assert_all_enabled(); + scenario.assert_all_enabled().await; } } diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index c253312f09..ef3364358b 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -2,7 +2,6 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition} use doppelganger_service::DoppelgangerService; use initialized_validators::InitializedValidators; use logging::crit; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use signing_method::Error as SigningError; use signing_method::{SignableMessage, SigningContext, SigningMethod}; @@ -14,7 +13,7 @@ use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use task_executor::TaskExecutor; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tracing::{error, info, warn}; use types::{ graffiti::GraffitiString, AbstractExecPayload, Address, AggregateAndProof, Attestation, @@ -264,12 +263,13 @@ impl LighthouseValidatorStore { /// ## Warning /// /// This method should only be used for signing non-slashable messages. - fn doppelganger_bypassed_signing_method( + async fn doppelganger_bypassed_signing_method( &self, validator_pubkey: PublicKeyBytes, ) -> Result, Error> { self.validators .read() + .await .signing_method(&validator_pubkey) .ok_or(Error::UnknownPubkey(validator_pubkey)) } @@ -313,9 +313,10 @@ impl LighthouseValidatorStore { /// Returns the suggested_fee_recipient from `validator_definitions.yml` if any. /// This has been pulled into a private function so the read lock is dropped easily - fn suggested_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ + async fn suggested_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ self.validators .read() + .await .suggested_fee_recipient(validator_pubkey) } @@ -325,8 +326,8 @@ impl LighthouseValidatorStore { /// 1. validator_definitions.yml /// 2. process level gas limit /// 3. `DEFAULT_GAS_LIMIT` - pub fn get_gas_limit(&self, validator_pubkey: &PublicKeyBytes) -> u64 { - self.get_gas_limit_defaulting(self.validators.read().gas_limit(validator_pubkey)) + pub async fn get_gas_limit(&self, validator_pubkey: &PublicKeyBytes) -> u64 { + self.get_gas_limit_defaulting(self.validators.read().await.gas_limit(validator_pubkey)) } fn get_gas_limit_defaulting(&self, gas_limit: Option) -> u64 { @@ -347,11 +348,17 @@ impl LighthouseValidatorStore { /// /// This function is currently only used in tests because in prod it is translated and combined /// with other flags into a builder boost factor (see `determine_builder_boost_factor`). - pub fn get_builder_proposals_testing_only(&self, validator_pubkey: &PublicKeyBytes) -> bool { + pub async fn get_builder_proposals_testing_only( + &self, + validator_pubkey: &PublicKeyBytes, + ) -> bool { // If there is a `suggested_fee_recipient` in the validator definitions yaml // file, use that value. self.get_builder_proposals_defaulting( - self.validators.read().builder_proposals(validator_pubkey), + self.validators + .read() + .await + .builder_proposals(validator_pubkey), ) } @@ -368,12 +375,13 @@ impl LighthouseValidatorStore { /// /// This function is currently only used in tests because in prod it is translated and combined /// with other flags into a builder boost factor (see `determine_builder_boost_factor`). - pub fn get_builder_boost_factor_testing_only( + pub async fn get_builder_boost_factor_testing_only( &self, validator_pubkey: &PublicKeyBytes, ) -> Option { self.validators .read() + .await .builder_boost_factor(validator_pubkey) .or(self.builder_boost_factor) } @@ -386,12 +394,13 @@ impl LighthouseValidatorStore { /// /// This function is currently only used in tests because in prod it is translated and combined /// with other flags into a builder boost factor (see `determine_builder_boost_factor`). - pub fn get_prefer_builder_proposals_testing_only( + pub async fn get_prefer_builder_proposals_testing_only( &self, validator_pubkey: &PublicKeyBytes, ) -> bool { self.validators .read() + .await .prefer_builder_proposals(validator_pubkey) .unwrap_or(self.prefer_builder_proposals) } @@ -537,7 +546,9 @@ impl LighthouseValidatorStore { ) -> Result { let signing_epoch = voluntary_exit.epoch; let signing_context = self.signing_context(Domain::VoluntaryExit, signing_epoch); - let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(validator_pubkey) + .await?; let signature = signing_method .get_signature::>( @@ -584,7 +595,7 @@ impl ValidatorStore for LighthouseValidatorS /// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still /// undergoing protection. This is useful for collecting duties or other non-signing tasks. #[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock. - fn voting_pubkeys(&self, filter_func: F) -> I + async fn voting_pubkeys(&self, filter_func: F) -> I where I: FromIterator, F: Fn(DoppelgangerStatus) -> Option, @@ -594,6 +605,7 @@ impl ValidatorStore for LighthouseValidatorS let pubkeys = self .validators .read() + .await .iter_voting_pubkeys() .cloned() .collect::>(); @@ -626,22 +638,22 @@ impl ValidatorStore for LighthouseValidatorS }) } - fn num_voting_validators(&self) -> usize { - self.validators.read().num_enabled() + async fn num_voting_validators(&self) -> usize { + self.validators.read().await.num_enabled() } - fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { - self.validators.read().graffiti(validator_pubkey) + async fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option { + self.validators.read().await.graffiti(validator_pubkey) } /// Returns the fee recipient for the given public key. The priority order for fetching /// the fee recipient is: /// 1. validator_definitions.yml /// 2. process level fee recipient - fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ + async fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
{ // If there is a `suggested_fee_recipient` in the validator definitions yaml // file, use that value. - self.get_fee_recipient_defaulting(self.suggested_fee_recipient(validator_pubkey)) + self.get_fee_recipient_defaulting(self.suggested_fee_recipient(validator_pubkey).await) } /// Translate the per validator and per process `builder_proposals`, `builder_boost_factor` and @@ -656,29 +668,30 @@ impl ValidatorStore for LighthouseValidatorS /// - If `builder_proposals` is set to false, set boost factor to 0 to indicate a preference for /// local payloads. /// - Else return `None` to indicate no preference between builder and local payloads. - fn determine_builder_boost_factor(&self, validator_pubkey: &PublicKeyBytes) -> Option { + async fn determine_builder_boost_factor( + &self, + validator_pubkey: &PublicKeyBytes, + ) -> Option { let validator_prefer_builder_proposals = self .validators .read() + .await .prefer_builder_proposals(validator_pubkey); if matches!(validator_prefer_builder_proposals, Some(true)) { return Some(u64::MAX); } - let factor = self - .validators - .read() + let validators = self.validators.read().await; + let factor = validators .builder_boost_factor(validator_pubkey) .or_else(|| { - if matches!( - self.validators.read().builder_proposals(validator_pubkey), - Some(false) - ) { + if matches!(validators.builder_proposals(validator_pubkey), Some(false)) { return Some(0); } None }); + drop(validators); factor .or_else(|| { @@ -727,9 +740,10 @@ impl ValidatorStore for LighthouseValidatorS Ok(signature) } - fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) { + async fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64) { self.initialized_validators() .write() + .await .set_index(validator_pubkey, index); } @@ -851,8 +865,9 @@ impl ValidatorStore for LighthouseValidatorS let domain_hash = self.spec.get_builder_domain(); let signing_root = validator_registration_data.signing_root(domain_hash); - let signing_method = - self.doppelganger_bypassed_signing_method(validator_registration_data.pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(validator_registration_data.pubkey) + .await?; let signature = signing_method .get_signature_from_root::>( SignableMessage::ValidatorRegistration(&validator_registration_data), @@ -929,7 +944,9 @@ impl ValidatorStore for LighthouseValidatorS // // As long as we disallow `SignedAggregateAndProof` then these selection proofs will never // be published on the network. - let signing_method = self.doppelganger_bypassed_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(validator_pubkey) + .await?; let signature = signing_method .get_signature::>( @@ -961,7 +978,9 @@ impl ValidatorStore for LighthouseValidatorS self.signing_context(Domain::SyncCommitteeSelectionProof, signing_epoch); // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(*validator_pubkey) + .await?; validator_metrics::inc_counter_vec( &validator_metrics::SIGNED_SYNC_SELECTION_PROOFS_TOTAL, @@ -997,7 +1016,9 @@ impl ValidatorStore for LighthouseValidatorS let signing_context = self.signing_context(Domain::SyncCommittee, signing_epoch); // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(*validator_pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(*validator_pubkey) + .await?; let signature = signing_method .get_signature::>( @@ -1036,7 +1057,9 @@ impl ValidatorStore for LighthouseValidatorS let signing_context = self.signing_context(Domain::ContributionAndProof, signing_epoch); // Bypass `with_validator_signing_method`: sync committee messages are not slashable. - let signing_method = self.doppelganger_bypassed_signing_method(aggregator_pubkey)?; + let signing_method = self + .doppelganger_bypassed_signing_method(aggregator_pubkey) + .await?; let message = ContributionAndProof { aggregator_index, @@ -1067,10 +1090,10 @@ impl ValidatorStore for LighthouseValidatorS /// This function will only do actual pruning periodically, so it should usually be /// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning /// runs. - fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) { + async fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool) { // Attempt to prune every SLASHING_PROTECTION_HISTORY_EPOCHs, with a tolerance for // missing the epoch that aligns exactly. - let mut last_prune = self.slashing_protection_last_prune.lock(); + let mut last_prune = self.slashing_protection_last_prune.lock().await; if current_epoch / SLASHING_PROTECTION_HISTORY_EPOCHS <= *last_prune / SLASHING_PROTECTION_HISTORY_EPOCHS { @@ -1093,7 +1116,7 @@ impl ValidatorStore for LighthouseValidatorS let new_min_target_epoch = current_epoch.saturating_sub(SLASHING_PROTECTION_HISTORY_EPOCHS); let new_min_slot = new_min_target_epoch.start_slot(E::slots_per_epoch()); - let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored); + let all_pubkeys: Vec<_> = self.voting_pubkeys(DoppelgangerStatus::ignored).await; if let Err(e) = self .slashing_protection @@ -1125,9 +1148,10 @@ impl ValidatorStore for LighthouseValidatorS /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`. - fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option { + async fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option { self.validators .read() + .await .validator(pubkey) .map(|validator| ProposalData { validator_index: validator.get_index(), diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index c1e96a2808..24885a098c 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -163,7 +163,7 @@ impl AttestationService AttestationService Result<(), String> { + async fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; let duration_to_next_slot = self .slot_clock @@ -197,16 +197,16 @@ impl AttestationService> = self - .duties_service - .attesters(slot) - .into_iter() - .fold(HashMap::new(), |mut map, duty_and_proof| { - map.entry(duty_and_proof.duty.committee_index) - .or_default() - .push(duty_and_proof); - map - }); + let duties_by_committee_index: HashMap> = + self.duties_service.attesters(slot).await.into_iter().fold( + HashMap::new(), + |mut map, duty_and_proof| { + map.entry(duty_and_proof.duty.committee_index) + .or_default() + .push(duty_and_proof); + map + }, + ); // For each committee index for this slot: // @@ -704,11 +704,12 @@ impl AttestationService BlockService { } for validator_pubkey in proposers { - let builder_boost_factor = self - .validator_store - .determine_builder_boost_factor(&validator_pubkey); let service = self.clone(); self.inner.executor.spawn( async move { + let builder_boost_factor = service + .validator_store + .determine_builder_boost_factor(&validator_pubkey).await; let result = service .publish_block(slot, validator_pubkey, builder_boost_factor) .await; @@ -448,13 +448,16 @@ impl BlockService { let graffiti = determine_graffiti( &validator_pubkey, self.graffiti_file.clone(), - self.validator_store.graffiti(&validator_pubkey), + self.validator_store.graffiti(&validator_pubkey).await, self.graffiti, ); let randao_reveal_ref = &randao_reveal; let self_ref = &self; - let proposer_index = self.validator_store.validator_index(&validator_pubkey); + let proposer_index = self + .validator_store + .validator_index(&validator_pubkey) + .await; let proposer_fallback = ProposerFallback { beacon_nodes: self.beacon_nodes.clone(), proposer_nodes: self.proposer_nodes.clone(), diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index b4d9bae273..5505f1b452 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -341,16 +341,17 @@ pub struct DutiesService { impl DutiesService { /// Returns the total number of validators known to the duties service. - pub fn total_validator_count(&self) -> usize { - self.validator_store.num_voting_validators() + pub async fn total_validator_count(&self) -> usize { + self.validator_store.num_voting_validators().await } /// Returns the total number of validators that should propose in the given epoch. - pub fn proposer_count(&self, epoch: Epoch) -> usize { + pub async fn proposer_count(&self, epoch: Epoch) -> usize { // Only collect validators that are considered safe in terms of doppelganger protection. let signing_pubkeys: HashSet<_> = self .validator_store - .voting_pubkeys(DoppelgangerStatus::only_safe); + .voting_pubkeys(DoppelgangerStatus::only_safe) + .await; self.proposers .read() @@ -364,11 +365,12 @@ impl DutiesService { } /// Returns the total number of validators that should attest in the given epoch. - pub fn attester_count(&self, epoch: Epoch) -> usize { + pub async fn attester_count(&self, epoch: Epoch) -> usize { // Only collect validators that are considered safe in terms of doppelganger protection. let signing_pubkeys: HashSet<_> = self .validator_store - .voting_pubkeys(DoppelgangerStatus::only_safe); + .voting_pubkeys(DoppelgangerStatus::only_safe) + .await; self.attesters .read() .iter() @@ -379,9 +381,10 @@ impl DutiesService { } /// Returns the total number of validators that are in a doppelganger detection period. - pub fn doppelganger_detecting_count(&self) -> usize { + pub async fn doppelganger_detecting_count(&self) -> usize { self.validator_store .voting_pubkeys::, _>(DoppelgangerStatus::only_unsafe) + .await .len() } @@ -389,13 +392,14 @@ impl DutiesService { /// /// It is possible that multiple validators have an identical proposal slot, however that is /// likely the result of heavy forking (lol) or inconsistent beacon node connections. - pub fn block_proposers(&self, slot: Slot) -> HashSet { + pub async fn block_proposers(&self, slot: Slot) -> HashSet { let epoch = slot.epoch(S::E::slots_per_epoch()); // Only collect validators that are considered safe in terms of doppelganger protection. let signing_pubkeys: HashSet<_> = self .validator_store - .voting_pubkeys(DoppelgangerStatus::only_safe); + .voting_pubkeys(DoppelgangerStatus::only_safe) + .await; self.proposers .read() @@ -414,13 +418,14 @@ impl DutiesService { } /// Returns all `ValidatorDuty` for the given `slot`. - pub fn attesters(&self, slot: Slot) -> Vec { + pub async fn attesters(&self, slot: Slot) -> Vec { let epoch = slot.epoch(S::E::slots_per_epoch()); // Only collect validators that are considered safe in terms of doppelganger protection. let signing_pubkeys: HashSet<_> = self .validator_store - .voting_pubkeys(DoppelgangerStatus::only_safe); + .voting_pubkeys(DoppelgangerStatus::only_safe) + .await; self.attesters .read() @@ -436,9 +441,9 @@ impl DutiesService { } /// Returns `true` if we should collect per validator metrics and `false` otherwise. - pub fn per_validator_metrics(&self) -> bool { + pub async fn per_validator_metrics(&self) -> bool { self.enable_high_validator_count_metrics - || self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT + || self.total_validator_count().await <= VALIDATOR_METRICS_MIN_COUNT } } @@ -586,13 +591,15 @@ async fn poll_validator_indices( // collect those indices. let all_pubkeys: Vec<_> = duties_service .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); + .voting_pubkeys(DoppelgangerStatus::ignored) + .await; for pubkey in all_pubkeys { // This is on its own line to avoid some weirdness with locks and if statements. let is_known = duties_service .validator_store .validator_index(&pubkey) + .await .is_some(); if !is_known { @@ -634,6 +641,7 @@ async fn poll_validator_indices( let fee_recipient = duties_service .validator_store .get_fee_recipient(&pubkey) + .await .map(|fr| fr.to_string()) .unwrap_or_else(|| { "Fee recipient for validator not set in validator_definitions.yml \ @@ -713,13 +721,18 @@ async fn poll_beacon_attesters = duties_service .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); + .voting_pubkeys(DoppelgangerStatus::ignored) + .await; let local_indices = { let mut local_indices = Vec::with_capacity(local_pubkeys.len()); for &pubkey in &local_pubkeys { - if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) { + if let Some(validator_index) = duties_service + .validator_store + .validator_index(&pubkey) + .await + { local_indices.push(validator_index) } } @@ -906,7 +919,7 @@ async fn poll_beacon_attesters_for_epoch>(); + let mut indices_to_request = vec![]; + for pubkey in &validators_to_update { + if let Some(validator_index) = duties_service.validator_store.validator_index(pubkey).await + { + if !initial_indices_to_request.contains(&validator_index) { + indices_to_request.push(validator_index); + } + } + } // Filter the initial duties by their relevance so that we don't hit the warning below about // overwriting duties. There was previously a bug here. @@ -1041,29 +1058,39 @@ async fn poll_beacon_attesters_for_epoch( +async fn get_uninitialized_validators( duties_service: &Arc>, epoch: &Epoch, local_pubkeys: &HashSet, ) -> Vec { - let attesters = duties_service.attesters.read(); - local_pubkeys - .iter() - .filter(|pubkey| { - attesters - .get(pubkey) - .is_none_or(|duties| !duties.contains_key(epoch)) - }) - .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) - .collect::>() + // Allocate a temporary vec to prevent holding two locks simultaneously (deadlock risk if we use + // a different lock order in another function). + let uninitialized_pubkeys = { + let attesters = duties_service.attesters.read(); + local_pubkeys + .iter() + .filter(|pubkey| { + attesters + .get(pubkey) + .is_none_or(|duties| !duties.contains_key(epoch)) + }) + .collect::>() + }; + let mut uninitialized_indices = Vec::with_capacity(uninitialized_pubkeys.len()); + for pubkey in uninitialized_pubkeys { + if let Some(index) = duties_service.validator_store.validator_index(pubkey).await { + uninitialized_indices.push(index); + } + } + uninitialized_indices } -fn update_per_validator_duty_metrics( +async fn update_per_validator_duty_metrics( duties_service: &Arc>, epoch: Epoch, current_slot: Slot, ) { - if duties_service.per_validator_metrics() { + if duties_service.per_validator_metrics().await { let attesters = duties_service.attesters.read(); attesters.values().for_each(|attester_duties_by_epoch| { if let Some((_, duty_and_proof)) = attester_duties_by_epoch.get(&epoch) { @@ -1309,7 +1336,7 @@ async fn poll_beacon_proposers( // Notify the block proposal service for any proposals that we have in our cache. // // See the function-level documentation for more information. - let initial_block_proposers = duties_service.block_proposers(current_slot); + let initial_block_proposers = duties_service.block_proposers(current_slot).await; notify_block_production_service::( current_slot, &initial_block_proposers, @@ -1324,7 +1351,8 @@ async fn poll_beacon_proposers( // doppelganger finishes. let local_pubkeys: HashSet<_> = duties_service .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); + .voting_pubkeys(DoppelgangerStatus::ignored) + .await; // Only download duties and push out additional block production events if we have some // validators. @@ -1387,6 +1415,7 @@ async fn poll_beacon_proposers( // which were not included in the initial notification to the `BlockService`. let additional_block_producers = duties_service .block_proposers(current_slot) + .await .difference(&initial_block_proposers) .copied() .collect::>(); diff --git a/validator_client/validator_services/src/preparation_service.rs b/validator_client/validator_services/src/preparation_service.rs index b59e3266dc..a533a5b101 100644 --- a/validator_client/validator_services/src/preparation_service.rs +++ b/validator_client/validator_services/src/preparation_service.rs @@ -302,21 +302,22 @@ impl PreparationService(&self, map_fn: G) -> Vec + async fn collect_proposal_data(&self, map_fn: G) -> Vec where G: Fn(PublicKeyBytes, ProposalData) -> Option, { let all_pubkeys: Vec<_> = self .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); + .voting_pubkeys(DoppelgangerStatus::ignored) + .await; - all_pubkeys - .into_iter() - .filter_map(|pubkey| { - let proposal_data = self.validator_store.proposal_data(&pubkey)?; - map_fn(pubkey, proposal_data) - }) - .collect() + let mut proposal_data = Vec::with_capacity(all_pubkeys.len()); + for pubkey in all_pubkeys { + if let Some(proposal_data) = self.validator_store.proposal_data(&pubkey).await? { + proposal_data.push(map_fn(pubkey, proposal_data)); + } + } + proposal_data } async fn publish_preparation_data( diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index c13b70db80..2335ebdaea 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -308,13 +308,18 @@ pub async fn poll_sync_committee_duties = duties_service .validator_store - .voting_pubkeys(DoppelgangerStatus::ignored); + .voting_pubkeys(DoppelgangerStatus::ignored) + .await; let local_indices = { let mut local_indices = Vec::with_capacity(local_pubkeys.len()); for &pubkey in &local_pubkeys { - if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) { + if let Some(validator_index) = duties_service + .validator_store + .validator_index(&pubkey) + .await + { local_indices.push(validator_index) } } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 68772c5e9a..cbcbfda1c4 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -60,23 +60,30 @@ pub trait ValidatorStore: Send + Sync { /// protection and are safe-enough to sign messages. /// - `DoppelgangerStatus::ignored`: returns all the pubkeys from `only_safe` *plus* those still /// undergoing protection. This is useful for collecting duties or other non-signing tasks. - fn voting_pubkeys(&self, filter_func: F) -> I + fn voting_pubkeys(&self, filter_func: F) -> impl Future + Send + Sync where I: FromIterator, - F: Fn(DoppelgangerStatus) -> Option; + F: Fn(DoppelgangerStatus) -> Option + Send + Sync; /// Check if the `validator_pubkey` is permitted by the doppleganger protection to sign /// messages. fn doppelganger_protection_allows_signing(&self, validator_pubkey: PublicKeyBytes) -> bool; - fn num_voting_validators(&self) -> usize; - fn graffiti(&self, validator_pubkey: &PublicKeyBytes) -> Option; + fn num_voting_validators(&self) -> impl Future + Send + Sync; + + fn graffiti( + &self, + validator_pubkey: &PublicKeyBytes, + ) -> impl Future> + Send + Sync; /// Returns the fee recipient for the given public key. The priority order for fetching /// the fee recipient is: /// 1. validator_definitions.yml /// 2. process level fee recipient - fn get_fee_recipient(&self, validator_pubkey: &PublicKeyBytes) -> Option
; + fn get_fee_recipient( + &self, + validator_pubkey: &PublicKeyBytes, + ) -> impl Future>; /// Translate the `builder_proposals`, `builder_boost_factor` and /// `prefer_builder_proposals` to a boost factor, if available. @@ -86,7 +93,10 @@ pub trait ValidatorStore: Send + Sync { /// - If `builder_proposals` is set to false, set boost factor to 0 to indicate a preference for /// local payloads. /// - Else return `None` to indicate no preference between builder and local payloads. - fn determine_builder_boost_factor(&self, validator_pubkey: &PublicKeyBytes) -> Option; + fn determine_builder_boost_factor( + &self, + validator_pubkey: &PublicKeyBytes, + ) -> impl Future> + Send + Sync; fn randao_reveal( &self, @@ -94,7 +104,11 @@ pub trait ValidatorStore: Send + Sync { signing_epoch: Epoch, ) -> impl Future>> + Send; - fn set_validator_index(&self, validator_pubkey: &PublicKeyBytes, index: u64); + fn set_validator_index( + &self, + validator_pubkey: &PublicKeyBytes, + index: u64, + ) -> impl Future + Send + Sync; fn sign_block( &self, @@ -165,12 +179,19 @@ pub trait ValidatorStore: Send + Sync { /// This function will only do actual pruning periodically, so it should usually be /// cheap to call. The `first_run` flag can be used to print a more verbose message when pruning /// runs. - fn prune_slashing_protection_db(&self, current_epoch: Epoch, first_run: bool); + fn prune_slashing_protection_db( + &self, + current_epoch: Epoch, + first_run: bool, + ) -> impl Future + Send + Sync; /// Returns `ProposalData` for the provided `pubkey` if it exists in `InitializedValidators`. /// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`, /// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`. - fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option; + fn proposal_data( + &self, + pubkey: &PublicKeyBytes, + ) -> impl Future> + Send + Sync; } #[derive(Clone, Debug, PartialEq)]