mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 17:26:04 +00:00
sync.rs
This commit is contained in:
@@ -504,18 +504,11 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
current_slot: Slot,
|
current_slot: Slot,
|
||||||
pre_compute_slot: Slot,
|
pre_compute_slot: Slot,
|
||||||
) {
|
) {
|
||||||
debug!(
|
|
||||||
period = sync_committee_period,
|
|
||||||
%current_slot,
|
|
||||||
%pre_compute_slot,
|
|
||||||
"Calculating sync selection proofs"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Generate selection proofs for each validator at each slot, one slot at a time.
|
// Generate selection proofs for each validator at each slot, one slot at a time.
|
||||||
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
|
||||||
// For distributed mode
|
// For distributed mode
|
||||||
if duties_service.distributed {
|
if duties_service.distributed {
|
||||||
let mut partial_proofs = Vec::new();
|
let mut sync_committee_selection = Vec::new();
|
||||||
|
|
||||||
for (_validator_start_slot, duty) in pre_compute_duties {
|
for (_validator_start_slot, duty) in pre_compute_duties {
|
||||||
// Proofs are already known at this slot for this validator.
|
// Proofs are already known at this slot for this validator.
|
||||||
@@ -540,10 +533,10 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
let duty = duty.clone();
|
let duty = duty.clone();
|
||||||
let subnet_id = *subnet_id;
|
let subnet_id = *subnet_id;
|
||||||
|
|
||||||
// Store all partial sync selection proofs for this slot in partial_proofs so that it can be sent together
|
// Store all partial sync selection proofs in partial_proofs so that it can be sent together later
|
||||||
partial_proofs.push(async move {
|
sync_committee_selection.push(async move {
|
||||||
// Produce partial selection proof
|
// Produce partial selection proof
|
||||||
let sync_selection_proof = duties_service
|
let partial_sync_selection_proof = duties_service
|
||||||
.validator_store
|
.validator_store
|
||||||
.produce_sync_selection_proof(
|
.produce_sync_selection_proof(
|
||||||
&duty.pubkey,
|
&duty.pubkey,
|
||||||
@@ -552,7 +545,7 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
match sync_selection_proof {
|
match partial_sync_selection_proof {
|
||||||
Ok(proof) => {
|
Ok(proof) => {
|
||||||
let sync_committee_selection = SyncCommitteeSelection {
|
let sync_committee_selection = SyncCommitteeSelection {
|
||||||
validator_index: duty.validator_index,
|
validator_index: duty.validator_index,
|
||||||
@@ -560,13 +553,12 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
subcommittee_index: subnet_id,
|
subcommittee_index: subnet_id,
|
||||||
selection_proof: proof.clone().into(),
|
selection_proof: proof.clone().into(),
|
||||||
};
|
};
|
||||||
// Add log for debugging
|
|
||||||
debug!(
|
debug!(
|
||||||
"validator_index" = duty.validator_index,
|
"validator_index" = duty.validator_index,
|
||||||
"slot" = %proof_slot,
|
"slot" = %proof_slot,
|
||||||
"subcommittee_index" = subnet_id,
|
"subcommittee_index" = subnet_id,
|
||||||
"partial selection proof" = ?proof,
|
"partial sync selection proof" = ?proof,
|
||||||
"Partial sync selection proof"
|
"Sending sync selection to middleware"
|
||||||
);
|
);
|
||||||
Some(sync_committee_selection)
|
Some(sync_committee_selection)
|
||||||
}
|
}
|
||||||
@@ -592,104 +584,105 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Execute partial_proofs in parallel
|
let sync_committee_selection_data = join_all(sync_committee_selection).await;
|
||||||
let partial_proof_results = join_all(partial_proofs).await;
|
|
||||||
|
|
||||||
// Filter out None values and extract the selection proofs
|
// Filter out None values and extract the selection proofs
|
||||||
let valid_results: Vec<_> = partial_proof_results.into_iter().flatten().collect();
|
let sync_selection_data: Vec<_> = sync_committee_selection_data
|
||||||
let selection_proofs: Vec<_> = valid_results.clone();
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect();
|
||||||
|
|
||||||
// If we have any valid proofs, send them to the middleware
|
let middleware_response = duties_service
|
||||||
if !selection_proofs.is_empty() {
|
.beacon_nodes
|
||||||
debug!(
|
.first_success(|beacon_node| {
|
||||||
"count" = selection_proofs.len(),
|
let selection_data = sync_selection_data.clone();
|
||||||
%slot, "Sending batch of partial sync selection proofs"
|
async move {
|
||||||
);
|
beacon_node
|
||||||
|
.post_validator_sync_committee_selections(&selection_data)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
let response = duties_service
|
match middleware_response {
|
||||||
.beacon_nodes
|
Ok(response) => {
|
||||||
.first_success(|beacon_node| {
|
// The selection proof from middleware response will be a full selection proof
|
||||||
let proofs = selection_proofs.clone();
|
debug!(
|
||||||
async move {
|
"validator_index" = response.data[0].validator_index,
|
||||||
beacon_node
|
"slot" = %response.data[0].slot,
|
||||||
.post_validator_sync_committee_selections(&proofs)
|
"subcommittee_index" = response.data[0].subcommittee_index,
|
||||||
.await
|
"full sync selection proof" = ?response.data[0].selection_proof,
|
||||||
}
|
"Received sync selection from middleware"
|
||||||
})
|
);
|
||||||
.await;
|
|
||||||
|
|
||||||
match response {
|
// Get the sync map to update duties
|
||||||
Ok(response) => {
|
let sync_map = duties_service.sync_duties.committees.read();
|
||||||
debug!(
|
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
||||||
"count" = response.data.len(),
|
debug!("period" = sync_committee_period, "Missing sync duties");
|
||||||
%slot, "Received batch response from middleware for sync"
|
continue;
|
||||||
);
|
};
|
||||||
|
|
||||||
// Get the sync map to update duties
|
let validators = committee_duties.validators.read();
|
||||||
let sync_map = duties_service.sync_duties.committees.read();
|
|
||||||
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
|
|
||||||
debug!("period" = sync_committee_period, "Missing sync duties");
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
|
|
||||||
let validators = committee_duties.validators.read();
|
// Process each response
|
||||||
|
for response_data in response.data.iter() {
|
||||||
|
let validator_index = response_data.validator_index;
|
||||||
|
let slot = response_data.slot;
|
||||||
|
let subcommittee_index = response_data.subcommittee_index;
|
||||||
|
|
||||||
// Process each response
|
// Convert the response to a SyncSelectionProof so we can call the is_aggregator method
|
||||||
for selection_response in response.data.iter() {
|
let full_selection_proof =
|
||||||
let validator_index = selection_response.validator_index;
|
SyncSelectionProof::from(response_data.selection_proof.clone());
|
||||||
let slot = selection_response.slot;
|
|
||||||
let subcommittee_index = selection_response.subcommittee_index;
|
|
||||||
|
|
||||||
// Convert the response to a SyncSelectionProof
|
// Check if the validator is an aggregator
|
||||||
let proof = SyncSelectionProof::from(
|
match full_selection_proof.is_aggregator::<E>() {
|
||||||
selection_response.selection_proof.clone(),
|
Ok(true) => {
|
||||||
);
|
if let Some(Some(duty)) = validators.get(&validator_index) {
|
||||||
|
debug!(
|
||||||
// Check if the validator is an aggregator
|
|
||||||
match proof.is_aggregator::<E>() {
|
|
||||||
Ok(true) => {
|
|
||||||
if let Some(Some(duty)) = validators.get(&validator_index) {
|
|
||||||
debug!(
|
|
||||||
validator_index,
|
|
||||||
%slot,
|
|
||||||
"subnet_id" = subcommittee_index,
|
|
||||||
// log full selection proof for debugging
|
|
||||||
"full selection proof" = ?proof,
|
|
||||||
"Validator is sync aggregator"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Store the proof
|
|
||||||
duty.aggregation_duties
|
|
||||||
.proofs
|
|
||||||
.write()
|
|
||||||
.insert((slot, subcommittee_index.into()), proof);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(false) => {
|
|
||||||
// Not an aggregator, nothing to do
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!(
|
|
||||||
validator_index,
|
validator_index,
|
||||||
%slot,
|
%slot,
|
||||||
"error" = ?e,
|
"subcommittee_index" = subcommittee_index,
|
||||||
"Error determining is_aggregator"
|
// log full selection proof for debugging
|
||||||
|
"full selection proof" = ?response_data.selection_proof,
|
||||||
|
"Validator is sync aggregator"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Store the proof
|
||||||
|
duty.aggregation_duties.proofs.write().insert(
|
||||||
|
(slot, subcommittee_index.into()),
|
||||||
|
full_selection_proof,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(false) => {} // Not an aggregator
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
validator_index,
|
||||||
|
%slot,
|
||||||
|
"error" = ?e,
|
||||||
|
"Error determining is_aggregator"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
}
|
||||||
warn!(
|
Err(e) => {
|
||||||
"error" = %e,
|
warn!(
|
||||||
%slot,
|
"error" = %e,
|
||||||
"Failed to get sync selection proofs from middleware"
|
%slot,
|
||||||
);
|
"Failed to get sync selection proofs from middleware"
|
||||||
}
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// For non-distributed mode
|
// For non-distributed mode
|
||||||
|
debug!(
|
||||||
|
period = sync_committee_period,
|
||||||
|
%current_slot,
|
||||||
|
%pre_compute_slot,
|
||||||
|
"Calculating sync selection proofs"
|
||||||
|
);
|
||||||
|
|
||||||
let mut validator_proofs = vec![];
|
let mut validator_proofs = vec![];
|
||||||
for (validator_start_slot, duty) in pre_compute_duties {
|
for (validator_start_slot, duty) in pre_compute_duties {
|
||||||
// Proofs are already known at this slot for this validator.
|
// Proofs are already known at this slot for this validator.
|
||||||
|
|||||||
Reference in New Issue
Block a user