WIP async locks

This commit is contained in:
Michael Sproul
2025-05-05 15:36:08 +10:00
parent 50c7b2b8e9
commit bd57ddf4c7

View File

@@ -221,7 +221,10 @@ impl DoppelgangerService {
V: ValidatorStore<E = E> + Send + Sync + 'static, V: ValidatorStore<E = E> + Send + Sync + 'static,
{ {
// Define the `get_index` function as one that uses the validator store. // Define the `get_index` function as one that uses the validator store.
let get_index = move |pubkey| validator_store.validator_index(&pubkey); let get_index = move |pubkey| {
let inner_store = validator_store.clone();
async move { inner_store.clone().get_validator_index(&pubkey).await }
};
// Define the `get_liveness` function as one that queries the beacon node API. // Define the `get_liveness` function as one that queries the beacon node API.
let get_liveness = move |current_epoch, validator_indices| { let get_liveness = move |current_epoch, validator_indices| {
@@ -263,7 +266,7 @@ impl DoppelgangerService {
if let Some(slot) = slot_clock.now() { if let Some(slot) = slot_clock.now() {
if let Err(e) = service if let Err(e) = service
.detect_doppelgangers::<E, _, _, _, _>( .detect_doppelgangers::<E, _, _, _, _, _>(
slot, slot,
&get_index, &get_index,
&get_liveness, &get_liveness,
@@ -355,7 +358,7 @@ impl DoppelgangerService {
/// This function is relatively complex when it comes to generic parameters. This is to allow /// This function is relatively complex when it comes to generic parameters. This is to allow
/// for simple unit testing. Using these generics, we can test the `DoppelgangerService` without /// for simple unit testing. Using these generics, we can test the `DoppelgangerService` without
/// needing a BN API or a `ValidatorStore`. /// needing a BN API or a `ValidatorStore`.
async fn detect_doppelgangers<E, I, L, F, S>( async fn detect_doppelgangers<E, I, L, F, G, S>(
&self, &self,
request_slot: Slot, request_slot: Slot,
get_index: &I, get_index: &I,
@@ -364,13 +367,14 @@ impl DoppelgangerService {
) -> Result<(), String> ) -> Result<(), String>
where where
E: EthSpec, E: EthSpec,
I: Fn(PublicKeyBytes) -> Option<u64>, I: Fn(PublicKeyBytes) -> G,
L: Fn(Epoch, Vec<u64>) -> F, L: Fn(Epoch, Vec<u64>) -> F,
F: Future<Output = LivenessResponses>, F: Future<Output = LivenessResponses>,
G: Future<Output = Option<u64>>,
S: FnMut(), S: FnMut(),
{ {
// Get all validators with active doppelganger protection. // Get all validators with active doppelganger protection.
let indices_map = self.compute_detection_indices_map(get_index); let indices_map = self.compute_detection_indices_map(get_index).await;
if indices_map.is_empty() { if indices_map.is_empty() {
// Nothing to do. // Nothing to do.
@@ -397,9 +401,13 @@ impl DoppelgangerService {
/// further doppelganger checks. /// further doppelganger checks.
/// ///
/// Any validator with an unknown index will be omitted from these results. /// Any validator with an unknown index will be omitted from these results.
fn compute_detection_indices_map<F>(&self, get_index: &F) -> HashMap<u64, PublicKeyBytes> async fn compute_detection_indices_map<F, G>(
&self,
get_index: &F,
) -> HashMap<u64, PublicKeyBytes>
where where
F: Fn(PublicKeyBytes) -> Option<u64>, F: Fn(PublicKeyBytes) -> G,
G: Future<Output = Option<u64>>,
{ {
let detection_pubkeys = self let detection_pubkeys = self
.doppelganger_states .doppelganger_states
@@ -421,7 +429,7 @@ impl DoppelgangerService {
// any other locks. That is why this is a separate loop to the one that generates // any other locks. That is why this is a separate loop to the one that generates
// `detection_pubkeys`. // `detection_pubkeys`.
for pubkey in detection_pubkeys { for pubkey in detection_pubkeys {
if let Some(index) = get_index(pubkey) { if let Some(index) = get_index(pubkey).await {
indices_map.insert(index, pubkey); indices_map.insert(index, pubkey);
} }
} }