diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 55acca5fc2..3df04c84f3 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -20,7 +20,7 @@ use futures::{ stream::{self, FuturesUnordered}, StreamExt, }; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; use safe_arith::{ArithError, SafeArith}; use slot_clock::SlotClock; use std::cmp::min; @@ -1091,6 +1091,72 @@ async fn post_validator_duties_attester( .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) } +fn process_duty_and_proof( + attesters: &mut RwLockWriteGuard, + result: Result<(AttesterData, Option), Error>, + dependent_root: Hash256, + current_slot: Slot, +) -> bool { + let (duty, selection_proof) = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(Error::FailedToProduceSelectionProof(ValidatorStoreError::UnknownPubkey(pubkey))) => { + // A pubkey can be missing when a validator was recently removed via the API. + warn!( + info = "A validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for duty and proof" + ); + // Do not abort the entire batch for a single failure. + return true; + } + Err(e) => { + error!( + error = ?e, + msg = "may impair attestation duties", + "Failed to produce duty and proof" + ); + return true; + } + }; + + let attester_map = attesters.entry(duty.pubkey).or_default(); + let epoch = duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = selection_proof else { + return true; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + true + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon this entire background process. + debug!( + reason = "re-org", + "Stopping selection proof background task" + ); + false + } + } + + hash_map::Entry::Vacant(entry) => { + // This probably shouldn't happen, but we have enough info to fill in the entry so we may as well. + let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); + let duty_and_proof = DutyAndProof { + duty, + selection_proof, + subscription_slots, + }; + entry.insert((dependent_root, duty_and_proof)); + true + } + } +} /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will @@ -1169,68 +1235,13 @@ async fn fill_in_selection_proofs( while let Some(result) = duty_and_proof_results.next().await { let mut attesters = duties_service.attesters.write(); - let (duty, selection_proof) = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(Error::FailedToProduceSelectionProof( - ValidatorStoreError::UnknownPubkey(pubkey), - )) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - Err(e) => { - error!( - error = ?e, - msg = "may impair attestation duties", - "Failed to produce duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; - - let attester_map = attesters.entry(duty.pubkey).or_default(); - let epoch = duty.slot.epoch(E::slots_per_epoch()); - match attester_map.entry(epoch) { - hash_map::Entry::Occupied(mut entry) => { - // No need to update duties for which no proof was computed. - let Some(selection_proof) = selection_proof else { - continue; - }; - - let (existing_dependent_root, existing_duty) = entry.get_mut(); - - if *existing_dependent_root == dependent_root { - // Replace existing proof. - existing_duty.selection_proof = Some(selection_proof); - } else { - // Our selection proofs are no longer relevant due to a reorg, abandon - // this entire background process. - debug!( - reason = "re-org", - "Stopping selection proof background task" - ); - return; - } - } - hash_map::Entry::Vacant(entry) => { - // This probably shouldn't happen, but we have enough info to fill in the - // entry so we may as well. - let subscription_slots = - SubscriptionSlots::new(duty.slot, current_slot); - let duty_and_proof = DutyAndProof { - duty, - selection_proof, - subscription_slots, - }; - entry.insert((dependent_root, duty_and_proof)); - } + if !process_duty_and_proof::( + &mut attesters, + result, + dependent_root, + current_slot, + ) { + return; } } } else { @@ -1252,68 +1263,13 @@ async fn fill_in_selection_proofs( // Add to attesters store. let mut attesters = duties_service.attesters.write(); for result in duty_and_proof_results { - let (duty, selection_proof) = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(Error::FailedToProduceSelectionProof( - ValidatorStoreError::UnknownPubkey(pubkey), - )) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - Err(e) => { - error!( - error = ?e, - msg = "may impair attestation duties", - "Failed to produce duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; - - let attester_map = attesters.entry(duty.pubkey).or_default(); - let epoch = duty.slot.epoch(E::slots_per_epoch()); - match attester_map.entry(epoch) { - hash_map::Entry::Occupied(mut entry) => { - // No need to update duties for which no proof was computed. - let Some(selection_proof) = selection_proof else { - continue; - }; - - let (existing_dependent_root, existing_duty) = entry.get_mut(); - - if *existing_dependent_root == dependent_root { - // Replace existing proof. - existing_duty.selection_proof = Some(selection_proof); - } else { - // Our selection proofs are no longer relevant due to a reorg, abandon - // this entire background process. - debug!( - reason = "re-org", - "Stopping selection proof background task" - ); - return; - } - } - hash_map::Entry::Vacant(entry) => { - // This probably shouldn't happen, but we have enough info to fill in the - // entry so we may as well. - let subscription_slots = - SubscriptionSlots::new(duty.slot, current_slot); - let duty_and_proof = DutyAndProof { - duty, - selection_proof, - subscription_slots, - }; - entry.insert((dependent_root, duty_and_proof)); - } + if !process_duty_and_proof::( + &mut attesters, + result, + dependent_root, + current_slot, + ) { + return; } } drop(attesters);