Remove duplication with a function

This commit is contained in:
Tan Chee Keong
2025-04-15 10:40:48 +08:00
parent 44bd5f13f6
commit b506fa5369

View File

@@ -20,7 +20,7 @@ use futures::{
stream::{self, FuturesUnordered},
StreamExt,
};
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockWriteGuard};
use safe_arith::{ArithError, SafeArith};
use slot_clock::SlotClock;
use std::cmp::min;
@@ -1091,6 +1091,72 @@ async fn post_validator_duties_attester<T: SlotClock + 'static, E: EthSpec>(
.map_err(|e| Error::FailedToDownloadAttesters(e.to_string()))
}
fn process_duty_and_proof<E: EthSpec>(
attesters: &mut RwLockWriteGuard<AttesterMap>,
result: Result<(AttesterData, Option<SelectionProof>), Error>,
dependent_root: Hash256,
current_slot: Slot,
) -> bool {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(ValidatorStoreError::UnknownPubkey(pubkey))) => {
// A pubkey can be missing when a validator was recently removed via the API.
warn!(
info = "A validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
return true;
}
Err(e) => {
error!(
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
return true;
}
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = selection_proof else {
return true;
};
let (existing_dependent_root, existing_duty) = entry.get_mut();
if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
true
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon this entire background process.
debug!(
reason = "re-org",
"Stopping selection proof background task"
);
false
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the entry so we may as well.
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
true
}
}
}
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
@@ -1169,68 +1235,13 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
while let Some(result) = duty_and_proof_results.next().await {
let mut attesters = duties_service.attesters.write();
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey),
)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
Err(e) => {
error!(
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = selection_proof else {
continue;
};
let (existing_dependent_root, existing_duty) = entry.get_mut();
if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon
// this entire background process.
debug!(
reason = "re-org",
"Stopping selection proof background task"
);
return;
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots =
SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
}
if !process_duty_and_proof::<E>(
&mut attesters,
result,
dependent_root,
current_slot,
) {
return;
}
}
} else {
@@ -1252,68 +1263,13 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// Add to attesters store.
let mut attesters = duties_service.attesters.write();
for result in duty_and_proof_results {
let (duty, selection_proof) = match result {
Ok(duty_and_proof) => duty_and_proof,
Err(Error::FailedToProduceSelectionProof(
ValidatorStoreError::UnknownPubkey(pubkey),
)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
info = "a validator may have recently been removed from this VC",
?pubkey,
"Missing pubkey for duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
Err(e) => {
error!(
error = ?e,
msg = "may impair attestation duties",
"Failed to produce duty and proof"
);
// Do not abort the entire batch for a single failure.
continue;
}
};
let attester_map = attesters.entry(duty.pubkey).or_default();
let epoch = duty.slot.epoch(E::slots_per_epoch());
match attester_map.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// No need to update duties for which no proof was computed.
let Some(selection_proof) = selection_proof else {
continue;
};
let (existing_dependent_root, existing_duty) = entry.get_mut();
if *existing_dependent_root == dependent_root {
// Replace existing proof.
existing_duty.selection_proof = Some(selection_proof);
} else {
// Our selection proofs are no longer relevant due to a reorg, abandon
// this entire background process.
debug!(
reason = "re-org",
"Stopping selection proof background task"
);
return;
}
}
hash_map::Entry::Vacant(entry) => {
// This probably shouldn't happen, but we have enough info to fill in the
// entry so we may as well.
let subscription_slots =
SubscriptionSlots::new(duty.slot, current_slot);
let duty_and_proof = DutyAndProof {
duty,
selection_proof,
subscription_slots,
};
entry.insert((dependent_root, duty_and_proof));
}
if !process_duty_and_proof::<E>(
&mut attesters,
result,
dependent_root,
current_slot,
) {
return;
}
}
drop(attesters);