From 76125fa0fac11225288c1b147343454c18bb19f5 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 11 Apr 2025 11:39:45 +0800 Subject: [PATCH] Process each result --- .../validator_services/src/duties_service.rs | 78 +++++++++++++++++-- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index b0a8c7a95f..6e0c1e05a2 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -139,10 +139,10 @@ async fn make_selection_proof( duty: &AttesterData, validator_store: &ValidatorStore, spec: &ChainSpec, - distributed: bool, + config: &SelectionProofConfig, beacon_nodes: &Arc>, ) -> Result, Error> { - let selection_proof = if distributed { + let selection_proof = if config.selections_endpoint { let beacon_committee_selection = BeaconCommitteeSelection { validator_index: duty.validator_index, slot: duty.slot, @@ -1149,7 +1149,7 @@ async fn fill_in_selection_proofs( // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case let duty_and_proof_results = if config.parallel_sign { - let futures = relevant_duties + let mut futures = relevant_duties .into_values() .flatten() .map(|duty| async { @@ -1157,7 +1157,7 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - config.selections_endpoint, + &config, &duties_service.beacon_nodes, ) .await?; @@ -1165,7 +1165,73 @@ async fn fill_in_selection_proofs( }) .collect::>(); - futures.collect::>().await + while let Some(result) = futures.next().await { + let mut attesters = duties_service.attesters.write(); + let (duty, selection_proof) = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(Error::FailedToProduceSelectionProof( + ValidatorStoreError::UnknownPubkey(pubkey), + )) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for duty and proof" + ); + // Do not abort the entire batch for a single failure. + continue; + } + Err(e) => { + error!( + error = ?e, + msg = "may impair attestation duties", + "Failed to produce duty and proof" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; + + let attester_map = attesters.entry(duty.pubkey).or_default(); + let epoch = duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = selection_proof else { + continue; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon + // this entire background process. + debug!( + reason = "re-org", + "Stopping selection proof background task" + ); + return; + } + } + hash_map::Entry::Vacant(entry) => { + // This probably shouldn't happen, but we have enough info to fill in the + // entry so we may as well. + let subscription_slots = + SubscriptionSlots::new(duty.slot, current_slot); + let duty_and_proof = DutyAndProof { + duty, + selection_proof, + subscription_slots, + }; + entry.insert((dependent_root, duty_and_proof)); + } + } + } + Vec::new() } else { stream::iter(relevant_duties.into_values().flatten()) .then(|duty| async { @@ -1173,7 +1239,7 @@ async fn fill_in_selection_proofs( &duty, &duties_service.validator_store, &duties_service.spec, - config.selections_endpoint, // non-distributed case + &config, &duties_service.beacon_nodes, ) .await?;