mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-21 05:44:44 +00:00
Process each result
This commit is contained in:
@@ -139,10 +139,10 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
||||
duty: &AttesterData,
|
||||
validator_store: &ValidatorStore<T, E>,
|
||||
spec: &ChainSpec,
|
||||
distributed: bool,
|
||||
config: &SelectionProofConfig,
|
||||
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
|
||||
) -> Result<Option<SelectionProof>, Error> {
|
||||
let selection_proof = if distributed {
|
||||
let selection_proof = if config.selections_endpoint {
|
||||
let beacon_committee_selection = BeaconCommitteeSelection {
|
||||
validator_index: duty.validator_index,
|
||||
slot: duty.slot,
|
||||
@@ -1149,7 +1149,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
||||
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
|
||||
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
|
||||
let duty_and_proof_results = if config.parallel_sign {
|
||||
let futures = relevant_duties
|
||||
let mut futures = relevant_duties
|
||||
.into_values()
|
||||
.flatten()
|
||||
.map(|duty| async {
|
||||
@@ -1157,7 +1157,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
||||
&duty,
|
||||
&duties_service.validator_store,
|
||||
&duties_service.spec,
|
||||
config.selections_endpoint,
|
||||
&config,
|
||||
&duties_service.beacon_nodes,
|
||||
)
|
||||
.await?;
|
||||
@@ -1165,7 +1165,73 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
futures.collect::<Vec<_>>().await
|
||||
while let Some(result) = futures.next().await {
|
||||
let mut attesters = duties_service.attesters.write();
|
||||
let (duty, selection_proof) = match result {
|
||||
Ok(duty_and_proof) => duty_and_proof,
|
||||
Err(Error::FailedToProduceSelectionProof(
|
||||
ValidatorStoreError::UnknownPubkey(pubkey),
|
||||
)) => {
|
||||
// A pubkey can be missing when a validator was recently
|
||||
// removed via the API.
|
||||
warn!(
|
||||
info = "a validator may have recently been removed from this VC",
|
||||
?pubkey,
|
||||
"Missing pubkey for duty and proof"
|
||||
);
|
||||
// Do not abort the entire batch for a single failure.
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
error = ?e,
|
||||
msg = "may impair attestation duties",
|
||||
"Failed to produce duty and proof"
|
||||
);
|
||||
// Do not abort the entire batch for a single failure.
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let attester_map = attesters.entry(duty.pubkey).or_default();
|
||||
let epoch = duty.slot.epoch(E::slots_per_epoch());
|
||||
match attester_map.entry(epoch) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
// No need to update duties for which no proof was computed.
|
||||
let Some(selection_proof) = selection_proof else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
||||
|
||||
if *existing_dependent_root == dependent_root {
|
||||
// Replace existing proof.
|
||||
existing_duty.selection_proof = Some(selection_proof);
|
||||
} else {
|
||||
// Our selection proofs are no longer relevant due to a reorg, abandon
|
||||
// this entire background process.
|
||||
debug!(
|
||||
reason = "re-org",
|
||||
"Stopping selection proof background task"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
// This probably shouldn't happen, but we have enough info to fill in the
|
||||
// entry so we may as well.
|
||||
let subscription_slots =
|
||||
SubscriptionSlots::new(duty.slot, current_slot);
|
||||
let duty_and_proof = DutyAndProof {
|
||||
duty,
|
||||
selection_proof,
|
||||
subscription_slots,
|
||||
};
|
||||
entry.insert((dependent_root, duty_and_proof));
|
||||
}
|
||||
}
|
||||
}
|
||||
Vec::new()
|
||||
} else {
|
||||
stream::iter(relevant_duties.into_values().flatten())
|
||||
.then(|duty| async {
|
||||
@@ -1173,7 +1239,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
||||
&duty,
|
||||
&duties_service.validator_store,
|
||||
&duties_service.spec,
|
||||
config.selections_endpoint, // non-distributed case
|
||||
&config,
|
||||
&duties_service.beacon_nodes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Reference in New Issue
Block a user