From efa14b08e454c3ea28141ae03fc3ba4c1dbe7279 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Thu, 17 Apr 2025 11:52:09 +0800 Subject: [PATCH] refactor distributed sync part --- .../validator_services/src/sync.rs | 170 +++++------------- 1 file changed, 44 insertions(+), 126 deletions(-) diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index e377ea1d2f..65bdb15308 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -631,7 +631,8 @@ pub async fn fill_in_aggregation_proofs( for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { // For distributed mode if config.parallel_sign { - let mut sync_committee_selection = Vec::new(); + let mut futures_unordered = FuturesUnordered::new(); + let config_ref = &config; for (_, duty) in pre_compute_duties { let subnet_ids = match duty.subnet_ids::() { @@ -649,86 +650,28 @@ pub async fn fill_in_aggregation_proofs( let proof_slot = slot - 1; // Store all partial sync selection proofs so that it can be sent together later - sync_committee_selection.extend(subnet_ids.iter().map(|&subnet_id| { + for &subnet_id in &subnet_ids { let duties_service = duties_service.clone(); let duty = duty.clone(); - async move { - // Produce partial selection proof - let partial_sync_selection_proof = duties_service - .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, subnet_id) - .await; + futures_unordered.push(async move { + let result = make_sync_selection_proof( + &duties_service, + &duty, + proof_slot, + subnet_id, + config_ref, + &duties_service.beacon_nodes, + ) + .await; - match partial_sync_selection_proof { - Ok(proof) => { - debug!( - "validator_index" = duty.validator_index, - "slot" = %proof_slot, - "subcommittee_index" = *subnet_id, - "partial selection proof" = ?Signature::from(proof.clone()), - "Sending sync selection to middleware" - ); - - let sync_committee_selection = SyncCommitteeSelection { - validator_index: duty.validator_index, - slot: proof_slot, - subcommittee_index: *subnet_id, - selection_proof: proof.clone().into(), - }; - Some(sync_committee_selection) - } - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - debug!( - ?pubkey, - "slot" = %proof_slot, - "Missing pubkey for sync selection proof" - ); - None - } - Err(e) => { - warn!( - "error" = ?e, - "pubkey" = ?duty.pubkey, - "slot" = %proof_slot, - "Unable to sign selection proof" - ); - None - } - } - } - })); - } - - let mut futures_unordered = FuturesUnordered::new(); - - for future in sync_committee_selection { - futures_unordered.push(future); - } - - let mut sync_selection_data = Vec::new(); - while let Some(result) = futures_unordered.next().await { - if let Some(selection) = result { - sync_selection_data.push(selection); + result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) + }); } } - // Call the endpoint /eth/v1/validator/sync_committee_selections - // by sending the SyncCommitteeSelection that contains partial sync selection proof - // The middleware should return SyncCommitteeSelection that contains full sync selection proof - let middleware_response = duties_service - .beacon_nodes - .first_success(|beacon_node| { - let selection_data = sync_selection_data.clone(); - async move { - beacon_node - .post_validator_sync_committee_selections(&selection_data) - .await - } - }) - .await; - match middleware_response { - Ok(response) => { - // Get the sync map to update duties + let mut successful_results = Vec::new(); + while let Some(result) = futures_unordered.next().await { + if let Some((validator_index, proof_slot, subnet_id, proof)) = result { let sync_map = duties_service.sync_duties.committees.read(); let Some(committee_duties) = sync_map.get(&sync_committee_period) else { debug!("period" = sync_committee_period, "Missing sync duties"); @@ -737,63 +680,38 @@ pub async fn fill_in_aggregation_proofs( let validators = committee_duties.validators.read(); - // Process each middleware response - for response_data in response.data.iter() { - // The selection proof from middleware response will be a full selection proof - debug!( - "validator_index" = response_data.validator_index, - "slot" = %response_data.slot, - "subcommittee_index" = response_data.subcommittee_index, - "full selection proof" = ?response_data.selection_proof, - "Received sync selection from middleware" - ); - let validator_index = response_data.validator_index; - let slot = response_data.slot; - let subcommittee_index = response_data.subcommittee_index; - - // Convert the response to a SyncSelectionProof so we can call the is_aggregator method - let full_selection_proof = - SyncSelectionProof::from(response_data.selection_proof.clone()); - - // Check if the validator is an aggregator - match full_selection_proof.is_aggregator::() { - Ok(true) => { - if let Some(Some(duty)) = validators.get(&validator_index) { - debug!( - validator_index, - %slot, - "subcommittee_index" = subcommittee_index, - // log full selection proof for debugging - "full selection proof" = ?response_data.selection_proof, - "Validator is sync aggregator" - ); - - // Store the proof - duty.aggregation_duties.proofs.write().insert( - (slot, subcommittee_index.into()), - full_selection_proof, - ); - } - } - Ok(false) => {} // Not an aggregator - Err(e) => { - warn!( + // Check if the validator is an aggregator + match proof.is_aggregator::() { + Ok(true) => { + if let Some(Some(duty)) = validators.get(&validator_index) { + debug!( validator_index, - %slot, - "error" = ?e, - "Error determining is_aggregator" + "slot" = %proof_slot, + "subcommittee_index" = *subnet_id, + // log full selection proof for debugging + "full selection proof" = ?proof, + "Validator is sync aggregator" ); + + // Store the proof + duty.aggregation_duties + .proofs + .write() + .insert((slot, subnet_id), proof); + successful_results.push(validator_index); } } + Ok(false) => {} // Not an aggregator + Err(e) => { + warn!( + validator_index, + %slot, + "error" = ?e, + "Error determining is_aggregator" + ); + } } } - Err(e) => { - warn!( - "error" = %e, - %slot, - "Failed to get sync selection proofs from middleware" - ); - } } } else { // For non-distributed mode