From f20f7491d70f9ef47d65f7e3a24d47fd31ce1153 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Wed, 23 Apr 2025 15:17:29 +0800 Subject: [PATCH] Add comments --- .../validator_services/src/duties_service.rs | 12 +++++++++-- .../validator_services/src/sync.rs | 21 ++++++++++--------- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 0e2d5537b1..a5d582381e 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -990,6 +990,9 @@ async fn poll_beacon_attesters_for_epoch( // Spawn the background task to compute selection proofs. let subservice = duties_service.clone(); + // Define a config to be pass to fill_in_selection_proofs. + // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) + // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. let config = SelectionProofConfig { lookahead_slot: if duties_service.distributed { SELECTION_PROOF_SLOT_LOOKAHEAD_DVT @@ -1091,6 +1094,7 @@ async fn post_validator_duties_attester( .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) } +// Create a helper function here to reduce code duplication for normal and distributed mode fn process_duty_and_proof( attesters: &mut RwLockWriteGuard, result: Result<(AttesterData, Option), Error>, @@ -1107,6 +1111,7 @@ fn process_duty_and_proof( "Missing pubkey for duty and proof" ); // Do not abort the entire batch for a single failure. + // return true means continue processing duties. return true; } Err(e) => { @@ -1157,6 +1162,7 @@ fn process_duty_and_proof( } } } + /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will @@ -1190,8 +1196,8 @@ async fn fill_in_selection_proofs( let lookahead_slot = current_slot + selection_lookahead; - let relevant_duties = if duties_service.distributed { - // Remove old slot duties and only keep current duties + let relevant_duties = if config.selections_endpoint { + // Remove old slot duties and only keep current duties in distributed mode duties_by_slot .remove(&lookahead_slot) .map(|duties| BTreeMap::from([(lookahead_slot, duties)])) @@ -1235,6 +1241,7 @@ async fn fill_in_selection_proofs( while let Some(result) = duty_and_proof_results.next().await { let mut attesters = duties_service.attesters.write(); + // if process_duty_and_proof returns false, exit the loop if !process_duty_and_proof::( &mut attesters, result, @@ -1245,6 +1252,7 @@ async fn fill_in_selection_proofs( } } } else { + // In normal (non-distributed case), sign selection proofs serially let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) .then(|duty| async { let opt_selection_proof = make_selection_proof( diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index aa7703e2cd..6e0ba9d83f 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -1,5 +1,5 @@ use crate::duties_service::{DutiesService, Error, SelectionProofConfig}; -use beacon_node_fallback::BeaconNodeFallback; +// use beacon_node_fallback::BeaconNodeFallback; use doppelganger_service::DoppelgangerStatus; use eth2::types::{Signature, SyncCommitteeSelection}; use futures::future::join_all; @@ -352,10 +352,12 @@ pub async fn poll_sync_committee_duties( let sub_duties_service = duties_service.clone(); duties_service.context.executor.spawn( async move { + // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) + // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. let config = SelectionProofConfig { lookahead_slot: sub_duties_service .sync_duties - .aggregation_pre_compute_slots(), + .aggregation_pre_compute_slots(), // Use the current behaviour defined in the method computation_offset: Duration::from_secs(0), selections_endpoint: sub_duties_service.sync_duties.distributed, parallel_sign: sub_duties_service.sync_duties.distributed, @@ -520,13 +522,13 @@ pub async fn poll_sync_committee_duties_for_period( duties_service: &Arc>, duty: &SyncDuty, proof_slot: Slot, subnet_id: SyncSubnetId, config: &SelectionProofConfig, - _beacon_nodes: &Arc>, ) -> Option { let sync_selection_proof = duties_service .validator_store @@ -554,12 +556,13 @@ pub async fn make_sync_selection_proof( } }; - // In --distributed mode when we want to call the selections endpoint + // In distributed mode when we want to call the selections endpoint if config.selections_endpoint { debug!( "validator_index" = duty.validator_index, "slot" = %proof_slot, "subcommittee_index" = *subnet_id, + // In distributed mode, this is partial selection proof "partial selection proof" = ?Signature::from(selection_proof.clone()), "Sending sync selection to middleware" ); @@ -589,16 +592,16 @@ pub async fn make_sync_selection_proof( match middleware_response { Ok(response) => { let response_data = &response.data[0]; - // The selection proof from middleware response will be a full selection proof debug!( "validator_index" = response_data.validator_index, "slot" = %response_data.slot, "subcommittee_index" = response_data.subcommittee_index, + // The selection proof from middleware response will be a full selection proof "full selection proof" = ?response_data.selection_proof, "Received sync selection from middleware" ); - // Convert the response to a SyncSelectionProof so we can call the is_aggregator method + // Convert the response to a SyncSelectionProof let full_selection_proof = SyncSelectionProof::from(response_data.selection_proof.clone()); Some(full_selection_proof) @@ -613,7 +616,7 @@ pub async fn make_sync_selection_proof( } } } else { - // When calling the selections endpoint is not required, return the selection_proof + // When calling the selections endpoint is not required, the selection_proof is already a full selection proof Some(selection_proof) } } @@ -648,7 +651,7 @@ pub async fn fill_in_aggregation_proofs( // Construct proof for prior slot. let proof_slot = slot - 1; - // Store all partial sync selection proofs so that it can be sent together later + // Calling the make_sync_selection_proof will return a full selection proof for &subnet_id in &subnet_ids { let duties_service = duties_service.clone(); futures_unordered.push(async move { @@ -658,7 +661,6 @@ pub async fn fill_in_aggregation_proofs( proof_slot, subnet_id, config_ref, - &duties_service.beacon_nodes, ) .await; @@ -749,7 +751,6 @@ pub async fn fill_in_aggregation_proofs( proof_slot, *subnet_id, config_ref, - &duties_service_ref.beacon_nodes, ) .await;