refactor distributed sync part

This commit is contained in:
Tan Chee Keong
2025-04-17 11:52:09 +08:00
parent 2612a6b5d8
commit efa14b08e4

View File

@@ -631,7 +631,8 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
// For distributed mode
if config.parallel_sign {
let mut sync_committee_selection = Vec::new();
let mut futures_unordered = FuturesUnordered::new();
let config_ref = &config;
for (_, duty) in pre_compute_duties {
let subnet_ids = match duty.subnet_ids::<E>() {
@@ -649,86 +650,28 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
let proof_slot = slot - 1;
// Store all partial sync selection proofs so that it can be sent together later
sync_committee_selection.extend(subnet_ids.iter().map(|&subnet_id| {
for &subnet_id in &subnet_ids {
let duties_service = duties_service.clone();
let duty = duty.clone();
async move {
// Produce partial selection proof
let partial_sync_selection_proof = duties_service
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, subnet_id)
.await;
futures_unordered.push(async move {
let result = make_sync_selection_proof(
&duties_service,
&duty,
proof_slot,
subnet_id,
config_ref,
&duties_service.beacon_nodes,
)
.await;
match partial_sync_selection_proof {
Ok(proof) => {
debug!(
"validator_index" = duty.validator_index,
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
"partial selection proof" = ?Signature::from(proof.clone()),
"Sending sync selection to middleware"
);
let sync_committee_selection = SyncCommitteeSelection {
validator_index: duty.validator_index,
slot: proof_slot,
subcommittee_index: *subnet_id,
selection_proof: proof.clone().into(),
};
Some(sync_committee_selection)
}
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
debug!(
?pubkey,
"slot" = %proof_slot,
"Missing pubkey for sync selection proof"
);
None
}
Err(e) => {
warn!(
"error" = ?e,
"pubkey" = ?duty.pubkey,
"slot" = %proof_slot,
"Unable to sign selection proof"
);
None
}
}
}
}));
}
let mut futures_unordered = FuturesUnordered::new();
for future in sync_committee_selection {
futures_unordered.push(future);
}
let mut sync_selection_data = Vec::new();
while let Some(result) = futures_unordered.next().await {
if let Some(selection) = result {
sync_selection_data.push(selection);
result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof))
});
}
}
// Call the endpoint /eth/v1/validator/sync_committee_selections
// by sending the SyncCommitteeSelection that contains partial sync selection proof
// The middleware should return SyncCommitteeSelection that contains full sync selection proof
let middleware_response = duties_service
.beacon_nodes
.first_success(|beacon_node| {
let selection_data = sync_selection_data.clone();
async move {
beacon_node
.post_validator_sync_committee_selections(&selection_data)
.await
}
})
.await;
match middleware_response {
Ok(response) => {
// Get the sync map to update duties
let mut successful_results = Vec::new();
while let Some(result) = futures_unordered.next().await {
if let Some((validator_index, proof_slot, subnet_id, proof)) = result {
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!("period" = sync_committee_period, "Missing sync duties");
@@ -737,63 +680,38 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
let validators = committee_duties.validators.read();
// Process each middleware response
for response_data in response.data.iter() {
// The selection proof from middleware response will be a full selection proof
debug!(
"validator_index" = response_data.validator_index,
"slot" = %response_data.slot,
"subcommittee_index" = response_data.subcommittee_index,
"full selection proof" = ?response_data.selection_proof,
"Received sync selection from middleware"
);
let validator_index = response_data.validator_index;
let slot = response_data.slot;
let subcommittee_index = response_data.subcommittee_index;
// Convert the response to a SyncSelectionProof so we can call the is_aggregator method
let full_selection_proof =
SyncSelectionProof::from(response_data.selection_proof.clone());
// Check if the validator is an aggregator
match full_selection_proof.is_aggregator::<E>() {
Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) {
debug!(
validator_index,
%slot,
"subcommittee_index" = subcommittee_index,
// log full selection proof for debugging
"full selection proof" = ?response_data.selection_proof,
"Validator is sync aggregator"
);
// Store the proof
duty.aggregation_duties.proofs.write().insert(
(slot, subcommittee_index.into()),
full_selection_proof,
);
}
}
Ok(false) => {} // Not an aggregator
Err(e) => {
warn!(
// Check if the validator is an aggregator
match proof.is_aggregator::<E>() {
Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) {
debug!(
validator_index,
%slot,
"error" = ?e,
"Error determining is_aggregator"
"slot" = %proof_slot,
"subcommittee_index" = *subnet_id,
// log full selection proof for debugging
"full selection proof" = ?proof,
"Validator is sync aggregator"
);
// Store the proof
duty.aggregation_duties
.proofs
.write()
.insert((slot, subnet_id), proof);
successful_results.push(validator_index);
}
}
Ok(false) => {} // Not an aggregator
Err(e) => {
warn!(
validator_index,
%slot,
"error" = ?e,
"Error determining is_aggregator"
);
}
}
}
Err(e) => {
warn!(
"error" = %e,
%slot,
"Failed to get sync selection proofs from middleware"
);
}
}
} else {
// For non-distributed mode