From eaca4dfa479951d4bb5ea0dc255d0c93b663e317 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 7 May 2025 15:36:05 +1000 Subject: [PATCH] Grinding through cases --- .../doppelganger_service/src/lib.rs | 79 ++++++++++++------- .../lighthouse_validator_store/src/lib.rs | 36 ++++++--- validator_client/validator_store/src/lib.rs | 5 +- 3 files changed, 79 insertions(+), 41 deletions(-) diff --git a/validator_client/doppelganger_service/src/lib.rs b/validator_client/doppelganger_service/src/lib.rs index f7f57cbc03..668dd73ab2 100644 --- a/validator_client/doppelganger_service/src/lib.rs +++ b/validator_client/doppelganger_service/src/lib.rs @@ -223,7 +223,7 @@ impl DoppelgangerService { // Define the `get_index` function as one that uses the validator store. let get_index = move |pubkey| { let inner_store = validator_store.clone(); - async move { inner_store.clone().get_validator_index(&pubkey).await } + async move { inner_store.clone().validator_index(&pubkey).await } }; // Define the `get_liveness` function as one that queries the beacon node API. @@ -726,7 +726,7 @@ mod test { self } - pub fn assert_all_enabled(self) -> Self { + pub async fn assert_all_enabled(self) -> Self { /* * 1. Ensure all validators have the correct status. */ @@ -744,7 +744,11 @@ mod test { let pubkey_to_index = self.pubkey_to_index_map(); let generated_map = self .doppelganger - .compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied()); + .compute_detection_indices_map(&|pubkey| { + let inner_map = pubkey_to_index.clone(); + async move { inner_map.get(&pubkey).copied() } + }) + .await; assert!( generated_map.is_empty(), "there should be no indices for detection if all validators are enabled" @@ -753,7 +757,7 @@ mod test { self } - pub fn assert_all_disabled(self) -> Self { + pub async fn assert_all_disabled(self) -> Self { /* * 1. Ensure all validators have the correct status. */ @@ -771,7 +775,11 @@ mod test { let pubkey_to_index = self.pubkey_to_index_map(); let generated_map = self .doppelganger - .compute_detection_indices_map(&|pubkey| pubkey_to_index.get(&pubkey).copied()); + .compute_detection_indices_map(&|pubkey| { + let inner_map = pubkey_to_index.clone(); + async move { inner_map.get(&pubkey).copied() } + }) + .await; assert_eq!( pubkey_to_index.len(), @@ -841,14 +849,15 @@ mod test { } } - #[test] - fn enabled_in_genesis_epoch() { + #[tokio::test] + async fn enabled_in_genesis_epoch() { for slot in genesis_epoch().slot_iter(E::slots_per_epoch()) { TestBuilder::default() .build() .set_slot(slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_enabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: genesis_epoch() + 1, remaining_epochs: 0, @@ -856,8 +865,8 @@ mod test { } } - #[test] - fn disabled_after_genesis_epoch() { + #[tokio::test] + async fn disabled_after_genesis_epoch() { let epoch = genesis_epoch() + 1; for slot in epoch.slot_iter(E::slots_per_epoch()) { @@ -866,6 +875,7 @@ mod test { .set_slot(slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: epoch + 1, remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, @@ -938,9 +948,12 @@ mod test { // Create a simulated validator store that can resolve pubkeys to indices. let pubkey_to_index = self.pubkey_to_index_map(); - let get_index = |pubkey| pubkey_to_index.get(&pubkey).copied(); + let get_index = |pubkey| { + let inner_map = pubkey_to_index.clone(); + async move { inner_map.get(&pubkey).copied() } + }; - block_on(self.doppelganger.detect_doppelgangers::( + block_on(self.doppelganger.detect_doppelgangers::( slot, &get_index, &get_liveness, @@ -958,8 +971,8 @@ mod test { } } - #[test] - fn detect_at_genesis() { + #[tokio::test] + async fn detect_at_genesis() { let epoch = genesis_epoch(); let slot = epoch.start_slot(E::slots_per_epoch()); @@ -969,6 +982,7 @@ mod test { .register_all_in_doppelganger_protection_if_enabled() // All validators should have signing enabled since it's the genesis epoch. .assert_all_enabled() + .await .simulate_detect_doppelgangers( slot, ShouldShutdown::No, @@ -984,7 +998,7 @@ mod test { .assert_all_enabled(); } - fn detect_after_genesis_test(mutate_responses: F) + async fn detect_after_genesis_test(mutate_responses: F) where F: Fn(&mut LivenessResponses), { @@ -999,6 +1013,7 @@ mod test { .set_slot(starting_slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await // First, simulate a check where there are no doppelgangers. .simulate_detect_doppelgangers( checking_slot, @@ -1014,6 +1029,7 @@ mod test { ) // All validators should be disabled since they started after genesis. .assert_all_disabled() + .await // Now, simulate a check where we apply `mutate_responses` which *must* create some // doppelgangers. .simulate_detect_doppelgangers( @@ -1033,6 +1049,7 @@ mod test { ) // All validators should still be disabled. .assert_all_disabled() + .await // The states of all validators should be jammed with `u64:MAX`. .assert_all_states(&DoppelgangerState { next_check_epoch: starting_epoch + 1, @@ -1040,18 +1057,20 @@ mod test { }); } - #[test] - fn detect_after_genesis_with_current_epoch_doppelganger() { + #[tokio::test] + async fn detect_after_genesis_with_current_epoch_doppelganger() { detect_after_genesis_test(|liveness_responses| { liveness_responses.current_epoch_responses[0].is_live = true }) + .await } - #[test] - fn detect_after_genesis_with_previous_epoch_doppelganger() { + #[tokio::test] + async fn detect_after_genesis_with_previous_epoch_doppelganger() { detect_after_genesis_test(|liveness_responses| { liveness_responses.previous_epoch_responses[0].is_live = true }) + .await } #[test] @@ -1065,8 +1084,8 @@ mod test { .register_all_in_doppelganger_protection_if_enabled(); } - #[test] - fn detect_doppelganger_in_starting_epoch() { + #[tokio::test] + async fn detect_doppelganger_in_starting_epoch() { let epoch = genesis_epoch() + 1; let slot = epoch.start_slot(E::slots_per_epoch()); @@ -1075,6 +1094,7 @@ mod test { .set_slot(slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await // First, simulate a check where there is a doppelganger in the starting epoch. // // This should *not* cause a shutdown since we don't declare a doppelganger in the @@ -1096,14 +1116,15 @@ mod test { }, ) .assert_all_disabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: epoch + 1, remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, }); } - #[test] - fn no_doppelgangers_for_adequate_time() { + #[tokio::test] + async fn no_doppelgangers_for_adequate_time() { let initial_epoch = genesis_epoch() + 42; let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); let activation_slot = @@ -1113,7 +1134,8 @@ mod test { .build() .set_slot(initial_slot) .register_all_in_doppelganger_protection_if_enabled() - .assert_all_disabled(); + .assert_all_disabled() + .await; for slot in initial_slot.as_u64()..=activation_slot.as_u64() { let slot = Slot::new(slot); @@ -1159,22 +1181,23 @@ mod test { scenario = scenario.assert_all_states(&expected_state); scenario = if slot < activation_slot { - scenario.assert_all_disabled() + scenario.assert_all_disabled().await } else { - scenario.assert_all_enabled() + scenario.assert_all_enabled().await }; } scenario .assert_all_enabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: activation_slot.epoch(E::slots_per_epoch()), remaining_epochs: 0, }); } - #[test] - fn time_skips_forward_no_doppelgangers() { + #[tokio::test] + async fn time_skips_forward_no_doppelgangers() { let initial_epoch = genesis_epoch() + 1; let initial_slot = initial_epoch.start_slot(E::slots_per_epoch()); let skipped_forward_epoch = initial_epoch + 42; @@ -1185,6 +1208,7 @@ mod test { .set_slot(initial_slot) .register_all_in_doppelganger_protection_if_enabled() .assert_all_disabled() + .await // First, simulate a check in the initialization epoch. .simulate_detect_doppelgangers( initial_slot, @@ -1197,6 +1221,7 @@ mod test { }, ) .assert_all_disabled() + .await .assert_all_states(&DoppelgangerState { next_check_epoch: initial_epoch + 1, remaining_epochs: DEFAULT_REMAINING_DETECTION_EPOCHS, diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index b5104f6971..c253312f09 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -113,9 +113,9 @@ impl LighthouseValidatorStore { /// duplicate validators operating on the network at the same time. /// /// This function has no effect if doppelganger protection is disabled. - pub fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> { + pub async fn register_all_in_doppelganger_protection_if_enabled(&self) -> Result<(), String> { if let Some(doppelganger_service) = &self.doppelganger_service { - for pubkey in self.validators.read().iter_voting_pubkeys() { + for pubkey in self.validators.read().await.iter_voting_pubkeys() { doppelganger_service.register_new_validator( *pubkey, &self.slot_clock, @@ -137,9 +137,10 @@ impl LighthouseValidatorStore { } /// Indicates if the `voting_public_key` exists in self and is enabled. - pub fn has_validator(&self, voting_public_key: &PublicKeyBytes) -> bool { + pub async fn has_validator(&self, voting_public_key: &PublicKeyBytes) -> bool { self.validators .read() + .await .validator(voting_public_key) .is_some() } @@ -183,8 +184,6 @@ impl LighthouseValidatorStore { /// - Adding the validator definition to the YAML file, saving it to the filesystem. /// - Enabling the validator with the slashing protection database. /// - If `enable == true`, starting to perform duties for the validator. - // FIXME: ignore this clippy lint until the validator store is refactored to use async locks - #[allow(clippy::await_holding_lock)] pub async fn add_validator( &self, validator_def: ValidatorDefinition, @@ -205,6 +204,7 @@ impl LighthouseValidatorStore { self.validators .write() + .await .add_definition_replace_disabled(validator_def.clone()) .await .map_err(|e| format!("Unable to add definition: {:?}", e))?; @@ -214,12 +214,13 @@ impl LighthouseValidatorStore { /// Returns doppelganger statuses for all enabled validators. #[allow(clippy::needless_collect)] // Collect is required to avoid holding a lock. - pub fn doppelganger_statuses(&self) -> Vec { + pub async fn doppelganger_statuses(&self) -> Vec { // Collect all the pubkeys first to avoid interleaving locks on `self.validators` and // `self.doppelganger_service`. let pubkeys = self .validators .read() + .await .iter_voting_pubkeys() .cloned() .collect::>(); @@ -242,13 +243,14 @@ impl LighthouseValidatorStore { /// Returns a `SigningMethod` for `validator_pubkey` *only if* that validator is considered safe /// by doppelganger protection. - fn doppelganger_checked_signing_method( + async fn doppelganger_checked_signing_method( &self, validator_pubkey: PublicKeyBytes, ) -> Result, Error> { if self.doppelganger_protection_allows_signing(validator_pubkey) { self.validators .read() + .await .signing_method(&validator_pubkey) .ok_or(Error::UnknownPubkey(validator_pubkey)) } else { @@ -459,7 +461,9 @@ impl LighthouseValidatorStore { let signing_context = self.signing_context(Domain::BeaconProposer, signing_epoch); let domain_hash = signing_context.domain_hash(&self.spec); - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_checked_signing_method(validator_pubkey) + .await?; // Check for slashing conditions. let slashing_status = if signing_method @@ -566,8 +570,8 @@ impl ValidatorStore for LighthouseValidatorS /// /// - Unknown. /// - Known, but with an unknown index. - fn validator_index(&self, pubkey: &PublicKeyBytes) -> impl Future> { - async { self.validators.read().await.get_index(pubkey) } + async fn validator_index(&self, pubkey: &PublicKeyBytes) -> Option { + self.validators.read().await.get_index(pubkey) } /// Returns all voting pubkeys for all enabled validators. @@ -706,7 +710,9 @@ impl ValidatorStore for LighthouseValidatorS validator_pubkey: PublicKeyBytes, signing_epoch: Epoch, ) -> Result { - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_checked_signing_method(validator_pubkey) + .await?; let signing_context = self.signing_context(Domain::Randao, signing_epoch); let signature = signing_method @@ -761,7 +767,9 @@ impl ValidatorStore for LighthouseValidatorS } // Get the signing method and check doppelganger protection. - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_checked_signing_method(validator_pubkey) + .await?; // Checking for slashing conditions. let signing_epoch = attestation.data().target.epoch; @@ -882,7 +890,9 @@ impl ValidatorStore for LighthouseValidatorS let message = AggregateAndProof::from_attestation(aggregator_index, aggregate, selection_proof); - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + let signing_method = self + .doppelganger_checked_signing_method(validator_pubkey) + .await?; let signature = signing_method .get_signature::>( SignableMessage::SignedAggregateAndProof(message.to_ref()), diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 626679d8b0..68772c5e9a 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -46,7 +46,10 @@ pub trait ValidatorStore: Send + Sync { /// /// - Unknown. /// - Known, but with an unknown index. - fn validator_index(&self, pubkey: &PublicKeyBytes) -> impl Future> + Sync; + fn validator_index( + &self, + pubkey: &PublicKeyBytes, + ) -> impl Future> + Send + Sync; /// Returns all voting pubkeys for all enabled validators. ///