mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
rearrange
This commit is contained in:
@@ -1150,8 +1150,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
|
// In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
|
||||||
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
|
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
|
||||||
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
|
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
|
||||||
let duty_and_proof_results = if config.parallel_sign {
|
if config.parallel_sign {
|
||||||
let mut futures = relevant_duties
|
let mut duty_and_proof_results = relevant_duties
|
||||||
.into_values()
|
.into_values()
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|duty| async {
|
.map(|duty| async {
|
||||||
@@ -1167,7 +1167,7 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
})
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
while let Some(result) = futures.next().await {
|
while let Some(result) = duty_and_proof_results.next().await {
|
||||||
let mut attesters = duties_service.attesters.write();
|
let mut attesters = duties_service.attesters.write();
|
||||||
let (duty, selection_proof) = match result {
|
let (duty, selection_proof) = match result {
|
||||||
Ok(duty_and_proof) => duty_and_proof,
|
Ok(duty_and_proof) => duty_and_proof,
|
||||||
@@ -1233,9 +1233,8 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Vec::new()
|
|
||||||
} else {
|
} else {
|
||||||
stream::iter(relevant_duties.into_values().flatten())
|
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
||||||
.then(|duty| async {
|
.then(|duty| async {
|
||||||
let opt_selection_proof = make_selection_proof(
|
let opt_selection_proof = make_selection_proof(
|
||||||
&duty,
|
&duty,
|
||||||
@@ -1248,76 +1247,77 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
Ok((duty, opt_selection_proof))
|
Ok((duty, opt_selection_proof))
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await
|
.await;
|
||||||
};
|
|
||||||
|
|
||||||
// Add to attesters store.
|
// Add to attesters store.
|
||||||
let mut attesters = duties_service.attesters.write();
|
let mut attesters = duties_service.attesters.write();
|
||||||
for result in duty_and_proof_results {
|
for result in duty_and_proof_results {
|
||||||
let (duty, selection_proof) = match result {
|
let (duty, selection_proof) = match result {
|
||||||
Ok(duty_and_proof) => duty_and_proof,
|
Ok(duty_and_proof) => duty_and_proof,
|
||||||
Err(Error::FailedToProduceSelectionProof(
|
Err(Error::FailedToProduceSelectionProof(
|
||||||
ValidatorStoreError::UnknownPubkey(pubkey),
|
ValidatorStoreError::UnknownPubkey(pubkey),
|
||||||
)) => {
|
)) => {
|
||||||
// A pubkey can be missing when a validator was recently
|
// A pubkey can be missing when a validator was recently
|
||||||
// removed via the API.
|
// removed via the API.
|
||||||
warn!(
|
warn!(
|
||||||
info = "a validator may have recently been removed from this VC",
|
info = "a validator may have recently been removed from this VC",
|
||||||
?pubkey,
|
?pubkey,
|
||||||
"Missing pubkey for duty and proof"
|
"Missing pubkey for duty and proof"
|
||||||
);
|
|
||||||
// Do not abort the entire batch for a single failure.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!(
|
|
||||||
error = ?e,
|
|
||||||
msg = "may impair attestation duties",
|
|
||||||
"Failed to produce duty and proof"
|
|
||||||
);
|
|
||||||
// Do not abort the entire batch for a single failure.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let attester_map = attesters.entry(duty.pubkey).or_default();
|
|
||||||
let epoch = duty.slot.epoch(E::slots_per_epoch());
|
|
||||||
match attester_map.entry(epoch) {
|
|
||||||
hash_map::Entry::Occupied(mut entry) => {
|
|
||||||
// No need to update duties for which no proof was computed.
|
|
||||||
let Some(selection_proof) = selection_proof else {
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
|
||||||
|
|
||||||
if *existing_dependent_root == dependent_root {
|
|
||||||
// Replace existing proof.
|
|
||||||
existing_duty.selection_proof = Some(selection_proof);
|
|
||||||
} else {
|
|
||||||
// Our selection proofs are no longer relevant due to a reorg, abandon
|
|
||||||
// this entire background process.
|
|
||||||
debug!(
|
|
||||||
reason = "re-org",
|
|
||||||
"Stopping selection proof background task"
|
|
||||||
);
|
);
|
||||||
return;
|
// Do not abort the entire batch for a single failure.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
error = ?e,
|
||||||
|
msg = "may impair attestation duties",
|
||||||
|
"Failed to produce duty and proof"
|
||||||
|
);
|
||||||
|
// Do not abort the entire batch for a single failure.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let attester_map = attesters.entry(duty.pubkey).or_default();
|
||||||
|
let epoch = duty.slot.epoch(E::slots_per_epoch());
|
||||||
|
match attester_map.entry(epoch) {
|
||||||
|
hash_map::Entry::Occupied(mut entry) => {
|
||||||
|
// No need to update duties for which no proof was computed.
|
||||||
|
let Some(selection_proof) = selection_proof else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
||||||
|
|
||||||
|
if *existing_dependent_root == dependent_root {
|
||||||
|
// Replace existing proof.
|
||||||
|
existing_duty.selection_proof = Some(selection_proof);
|
||||||
|
} else {
|
||||||
|
// Our selection proofs are no longer relevant due to a reorg, abandon
|
||||||
|
// this entire background process.
|
||||||
|
debug!(
|
||||||
|
reason = "re-org",
|
||||||
|
"Stopping selection proof background task"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hash_map::Entry::Vacant(entry) => {
|
||||||
|
// This probably shouldn't happen, but we have enough info to fill in the
|
||||||
|
// entry so we may as well.
|
||||||
|
let subscription_slots =
|
||||||
|
SubscriptionSlots::new(duty.slot, current_slot);
|
||||||
|
let duty_and_proof = DutyAndProof {
|
||||||
|
duty,
|
||||||
|
selection_proof,
|
||||||
|
subscription_slots,
|
||||||
|
};
|
||||||
|
entry.insert((dependent_root, duty_and_proof));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hash_map::Entry::Vacant(entry) => {
|
|
||||||
// This probably shouldn't happen, but we have enough info to fill in the
|
|
||||||
// entry so we may as well.
|
|
||||||
let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot);
|
|
||||||
let duty_and_proof = DutyAndProof {
|
|
||||||
duty,
|
|
||||||
selection_proof,
|
|
||||||
subscription_slots,
|
|
||||||
};
|
|
||||||
entry.insert((dependent_root, duty_and_proof));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
drop(attesters);
|
||||||
drop(attesters);
|
};
|
||||||
|
|
||||||
let time_taken_ms =
|
let time_taken_ms =
|
||||||
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
||||||
|
|||||||
Reference in New Issue
Block a user