From 6f2ea519395c54ae410586e612882b0be029c1d0 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 1 Apr 2025 16:07:32 +0800 Subject: [PATCH] sync.rs --- .../validator_services/src/sync.rs | 179 +++++++++--------- 1 file changed, 86 insertions(+), 93 deletions(-) diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index a0a733d7a4..b3c0d7d9bc 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -504,18 +504,11 @@ pub async fn fill_in_aggregation_proofs( current_slot: Slot, pre_compute_slot: Slot, ) { - debug!( - period = sync_committee_period, - %current_slot, - %pre_compute_slot, - "Calculating sync selection proofs" - ); - // Generate selection proofs for each validator at each slot, one slot at a time. for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { // For distributed mode if duties_service.distributed { - let mut partial_proofs = Vec::new(); + let mut sync_committee_selection = Vec::new(); for (_validator_start_slot, duty) in pre_compute_duties { // Proofs are already known at this slot for this validator. @@ -540,10 +533,10 @@ pub async fn fill_in_aggregation_proofs( let duty = duty.clone(); let subnet_id = *subnet_id; - // Store all partial sync selection proofs for this slot in partial_proofs so that it can be sent together - partial_proofs.push(async move { + // Store all partial sync selection proofs in partial_proofs so that it can be sent together later + sync_committee_selection.push(async move { // Produce partial selection proof - let sync_selection_proof = duties_service + let partial_sync_selection_proof = duties_service .validator_store .produce_sync_selection_proof( &duty.pubkey, @@ -552,7 +545,7 @@ pub async fn fill_in_aggregation_proofs( ) .await; - match sync_selection_proof { + match partial_sync_selection_proof { Ok(proof) => { let sync_committee_selection = SyncCommitteeSelection { validator_index: duty.validator_index, @@ -560,13 +553,12 @@ pub async fn fill_in_aggregation_proofs( subcommittee_index: subnet_id, selection_proof: proof.clone().into(), }; - // Add log for debugging debug!( "validator_index" = duty.validator_index, "slot" = %proof_slot, "subcommittee_index" = subnet_id, - "partial selection proof" = ?proof, - "Partial sync selection proof" + "partial sync selection proof" = ?proof, + "Sending sync selection to middleware" ); Some(sync_committee_selection) } @@ -592,104 +584,105 @@ pub async fn fill_in_aggregation_proofs( } } - // Execute partial_proofs in parallel - let partial_proof_results = join_all(partial_proofs).await; + let sync_committee_selection_data = join_all(sync_committee_selection).await; // Filter out None values and extract the selection proofs - let valid_results: Vec<_> = partial_proof_results.into_iter().flatten().collect(); - let selection_proofs: Vec<_> = valid_results.clone(); + let sync_selection_data: Vec<_> = sync_committee_selection_data + .into_iter() + .flatten() + .collect(); - // If we have any valid proofs, send them to the middleware - if !selection_proofs.is_empty() { - debug!( - "count" = selection_proofs.len(), - %slot, "Sending batch of partial sync selection proofs" - ); + let middleware_response = duties_service + .beacon_nodes + .first_success(|beacon_node| { + let selection_data = sync_selection_data.clone(); + async move { + beacon_node + .post_validator_sync_committee_selections(&selection_data) + .await + } + }) + .await; - let response = duties_service - .beacon_nodes - .first_success(|beacon_node| { - let proofs = selection_proofs.clone(); - async move { - beacon_node - .post_validator_sync_committee_selections(&proofs) - .await - } - }) - .await; + match middleware_response { + Ok(response) => { + // The selection proof from middleware response will be a full selection proof + debug!( + "validator_index" = response.data[0].validator_index, + "slot" = %response.data[0].slot, + "subcommittee_index" = response.data[0].subcommittee_index, + "full sync selection proof" = ?response.data[0].selection_proof, + "Received sync selection from middleware" + ); - match response { - Ok(response) => { - debug!( - "count" = response.data.len(), - %slot, "Received batch response from middleware for sync" - ); + // Get the sync map to update duties + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { + debug!("period" = sync_committee_period, "Missing sync duties"); + continue; + }; - // Get the sync map to update duties - let sync_map = duties_service.sync_duties.committees.read(); - let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!("period" = sync_committee_period, "Missing sync duties"); - continue; - }; + let validators = committee_duties.validators.read(); - let validators = committee_duties.validators.read(); + // Process each response + for response_data in response.data.iter() { + let validator_index = response_data.validator_index; + let slot = response_data.slot; + let subcommittee_index = response_data.subcommittee_index; - // Process each response - for selection_response in response.data.iter() { - let validator_index = selection_response.validator_index; - let slot = selection_response.slot; - let subcommittee_index = selection_response.subcommittee_index; + // Convert the response to a SyncSelectionProof so we can call the is_aggregator method + let full_selection_proof = + SyncSelectionProof::from(response_data.selection_proof.clone()); - // Convert the response to a SyncSelectionProof - let proof = SyncSelectionProof::from( - selection_response.selection_proof.clone(), - ); - - // Check if the validator is an aggregator - match proof.is_aggregator::() { - Ok(true) => { - if let Some(Some(duty)) = validators.get(&validator_index) { - debug!( - validator_index, - %slot, - "subnet_id" = subcommittee_index, - // log full selection proof for debugging - "full selection proof" = ?proof, - "Validator is sync aggregator" - ); - - // Store the proof - duty.aggregation_duties - .proofs - .write() - .insert((slot, subcommittee_index.into()), proof); - } - } - Ok(false) => { - // Not an aggregator, nothing to do - } - Err(e) => { - warn!( + // Check if the validator is an aggregator + match full_selection_proof.is_aggregator::() { + Ok(true) => { + if let Some(Some(duty)) = validators.get(&validator_index) { + debug!( validator_index, %slot, - "error" = ?e, - "Error determining is_aggregator" + "subcommittee_index" = subcommittee_index, + // log full selection proof for debugging + "full selection proof" = ?response_data.selection_proof, + "Validator is sync aggregator" + ); + + // Store the proof + duty.aggregation_duties.proofs.write().insert( + (slot, subcommittee_index.into()), + full_selection_proof, ); } } + Ok(false) => {} // Not an aggregator + Err(e) => { + warn!( + validator_index, + %slot, + "error" = ?e, + "Error determining is_aggregator" + ); + } } } - Err(e) => { - warn!( - "error" = %e, - %slot, - "Failed to get sync selection proofs from middleware" - ); - } + } + Err(e) => { + warn!( + "error" = %e, + %slot, + "Failed to get sync selection proofs from middleware" + ); } } } else { // For non-distributed mode + debug!( + period = sync_committee_period, + %current_slot, + %pre_compute_slot, + "Calculating sync selection proofs" + ); + let mut validator_proofs = vec![]; for (validator_start_slot, duty) in pre_compute_duties { // Proofs are already known at this slot for this validator.