mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
reduce diff
This commit is contained in:
@@ -506,8 +506,129 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
) {
|
) {
|
||||||
// Generate selection proofs for each validator at each slot, one slot at a time.
|
// Generate selection proofs for each validator at each slot, one slot at a time.
|
||||||
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
||||||
// For distributed mode
|
debug!(
|
||||||
if duties_service.distributed {
|
period = sync_committee_period,
|
||||||
|
%current_slot,
|
||||||
|
%pre_compute_slot,
|
||||||
|
"Calculating sync selection proofs"
|
||||||
|
);
|
||||||
|
if !duties_service.distributed {
|
||||||
|
// For non-distributed mode
|
||||||
|
let mut validator_proofs = vec![];
|
||||||
|
for (validator_start_slot, duty) in pre_compute_duties {
|
||||||
|
// Proofs are already known at this slot for this validator.
|
||||||
|
if slot < *validator_start_slot {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let subnet_ids = match duty.subnet_ids::<E>() {
|
||||||
|
Ok(subnet_ids) => subnet_ids,
|
||||||
|
Err(e) => {
|
||||||
|
crit!(
|
||||||
|
error = ?e,
|
||||||
|
"Arithmetic error computing subnet IDs"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create futures to produce proofs.
|
||||||
|
let duties_service_ref = &duties_service;
|
||||||
|
let futures = subnet_ids.iter().map(|subnet_id| async move {
|
||||||
|
// Construct proof for prior slot.
|
||||||
|
let proof_slot = slot - 1;
|
||||||
|
|
||||||
|
let proof = match duties_service_ref
|
||||||
|
.validator_store
|
||||||
|
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(proof) => proof,
|
||||||
|
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
|
||||||
|
// A pubkey can be missing when a validator was recently
|
||||||
|
// removed via the API.
|
||||||
|
debug!(
|
||||||
|
?pubkey,
|
||||||
|
pubkey = ?duty.pubkey,
|
||||||
|
slot = %proof_slot,
|
||||||
|
"Missing pubkey for sync selection proof"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
error = ?e,
|
||||||
|
pubkey = ?duty.pubkey,
|
||||||
|
slot = %proof_slot,
|
||||||
|
"Unable to sign selection proof"
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match proof.is_aggregator::<E>() {
|
||||||
|
Ok(true) => {
|
||||||
|
debug!(
|
||||||
|
validator_index = duty.validator_index,
|
||||||
|
slot = %proof_slot,
|
||||||
|
%subnet_id,
|
||||||
|
"Validator is sync aggregator"
|
||||||
|
);
|
||||||
|
Some(((proof_slot, *subnet_id), proof))
|
||||||
|
}
|
||||||
|
Ok(false) => None,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
pubkey = ?duty.pubkey,
|
||||||
|
slot = %proof_slot,
|
||||||
|
error = ?e,
|
||||||
|
"Error determining is_aggregator"
|
||||||
|
);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Execute all the futures in parallel, collecting any successful results.
|
||||||
|
let proofs = join_all(futures)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
validator_proofs.push((duty.validator_index, proofs));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to global storage (we add regularly so the proofs can be used ASAP).
|
||||||
|
let sync_map = duties_service.sync_duties.committees.read();
|
||||||
|
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
||||||
|
debug!(period = sync_committee_period, "Missing sync duties");
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let validators = committee_duties.validators.read();
|
||||||
|
let num_validators_updated = validator_proofs.len();
|
||||||
|
|
||||||
|
for (validator_index, proofs) in validator_proofs {
|
||||||
|
if let Some(Some(duty)) = validators.get(&validator_index) {
|
||||||
|
duty.aggregation_duties.proofs.write().extend(proofs);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
validator_index,
|
||||||
|
period = sync_committee_period,
|
||||||
|
"Missing sync duty to update"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if num_validators_updated > 0 {
|
||||||
|
debug!(
|
||||||
|
%slot,
|
||||||
|
updated_validators = num_validators_updated,
|
||||||
|
"Finished computing sync selection proofs"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// For distributed mode
|
||||||
let mut sync_committee_selection = Vec::new();
|
let mut sync_committee_selection = Vec::new();
|
||||||
|
|
||||||
for (_validator_start_slot, duty) in pre_compute_duties {
|
for (_validator_start_slot, duty) in pre_compute_duties {
|
||||||
@@ -670,128 +791,6 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
// For non-distributed mode
|
|
||||||
debug!(
|
|
||||||
period = sync_committee_period,
|
|
||||||
%current_slot,
|
|
||||||
%pre_compute_slot,
|
|
||||||
"Calculating sync selection proofs"
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut validator_proofs = vec![];
|
|
||||||
for (validator_start_slot, duty) in pre_compute_duties {
|
|
||||||
// Proofs are already known at this slot for this validator.
|
|
||||||
if slot < *validator_start_slot {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let subnet_ids = match duty.subnet_ids::<E>() {
|
|
||||||
Ok(subnet_ids) => subnet_ids,
|
|
||||||
Err(e) => {
|
|
||||||
crit!(
|
|
||||||
error = ?e,
|
|
||||||
"Arithmetic error computing subnet IDs"
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Create futures to produce proofs.
|
|
||||||
let duties_service_ref = &duties_service;
|
|
||||||
let futures = subnet_ids.iter().map(|subnet_id| async move {
|
|
||||||
// Construct proof for prior slot.
|
|
||||||
let proof_slot = slot - 1;
|
|
||||||
|
|
||||||
let proof = match duties_service_ref
|
|
||||||
.validator_store
|
|
||||||
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(proof) => proof,
|
|
||||||
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
|
|
||||||
// A pubkey can be missing when a validator was recently
|
|
||||||
// removed via the API.
|
|
||||||
debug!(
|
|
||||||
?pubkey,
|
|
||||||
pubkey = ?duty.pubkey,
|
|
||||||
slot = %proof_slot,
|
|
||||||
"Missing pubkey for sync selection proof"
|
|
||||||
);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
error = ?e,
|
|
||||||
pubkey = ?duty.pubkey,
|
|
||||||
slot = %proof_slot,
|
|
||||||
"Unable to sign selection proof"
|
|
||||||
);
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match proof.is_aggregator::<E>() {
|
|
||||||
Ok(true) => {
|
|
||||||
debug!(
|
|
||||||
validator_index = duty.validator_index,
|
|
||||||
slot = %proof_slot,
|
|
||||||
%subnet_id,
|
|
||||||
"Validator is sync aggregator"
|
|
||||||
);
|
|
||||||
Some(((proof_slot, *subnet_id), proof))
|
|
||||||
}
|
|
||||||
Ok(false) => None,
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
pubkey = ?duty.pubkey,
|
|
||||||
slot = %proof_slot,
|
|
||||||
error = ?e,
|
|
||||||
"Error determining is_aggregator"
|
|
||||||
);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Execute all the futures in parallel, collecting any successful results.
|
|
||||||
let proofs = join_all(futures)
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.flatten()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
validator_proofs.push((duty.validator_index, proofs));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to global storage (we add regularly so the proofs can be used ASAP).
|
|
||||||
let sync_map = duties_service.sync_duties.committees.read();
|
|
||||||
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
|
||||||
debug!(period = sync_committee_period, "Missing sync duties");
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let validators = committee_duties.validators.read();
|
|
||||||
let num_validators_updated = validator_proofs.len();
|
|
||||||
|
|
||||||
for (validator_index, proofs) in validator_proofs {
|
|
||||||
if let Some(Some(duty)) = validators.get(&validator_index) {
|
|
||||||
duty.aggregation_duties.proofs.write().extend(proofs);
|
|
||||||
} else {
|
|
||||||
debug!(
|
|
||||||
validator_index,
|
|
||||||
period = sync_committee_period,
|
|
||||||
"Missing sync duty to update"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if num_validators_updated > 0 {
|
|
||||||
debug!(
|
|
||||||
%slot,
|
|
||||||
updated_validators = num_validators_updated,
|
|
||||||
"Finished computing sync selection proofs"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user