From ab1d2c06c6a25c826c4e16e4135f4ff454edcc19 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Tue, 15 Apr 2025 21:39:55 +0800 Subject: [PATCH] Modify to FuturesUnordered for Sync --- .../validator_services/src/duties_service.rs | 10 ++--- .../validator_services/src/sync.rs | 43 +++++++++++++++---- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 3df04c84f3..0e2d5537b1 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -126,11 +126,11 @@ pub struct SubscriptionSlots { duty_slot: Slot, } -struct SelectionProofConfig { - lookahead_slot: u64, - computation_offset: Duration, // The seconds to compute the selection proof before a slot - selections_endpoint: bool, - parallel_sign: bool, +pub struct SelectionProofConfig { + pub lookahead_slot: u64, + pub computation_offset: Duration, // The seconds to compute the selection proof before a slot + pub selections_endpoint: bool, + pub parallel_sign: bool, } /// Create a selection proof for `duty`. diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index 844d751a25..12060d8a2d 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -1,13 +1,15 @@ -use crate::duties_service::{DutiesService, Error}; +use crate::duties_service::{DutiesService, Error, SelectionProofConfig}; use doppelganger_service::DoppelgangerStatus; use eth2::types::{Signature, SyncCommitteeSelection}; use futures::future::join_all; +use futures::stream::{FuturesUnordered, StreamExt}; use logging::crit; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; use tracing::{debug, info, warn}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use validator_store::Error as ValidatorStoreError; @@ -349,12 +351,22 @@ pub async fn poll_sync_committee_duties( let sub_duties_service = duties_service.clone(); duties_service.context.executor.spawn( async move { + let config = SelectionProofConfig { + lookahead_slot: sub_duties_service + .sync_duties + .aggregation_pre_compute_slots(), + computation_offset: Duration::from_secs(12), + selections_endpoint: sub_duties_service.distributed, + parallel_sign: sub_duties_service.distributed, + }; + fill_in_aggregation_proofs( sub_duties_service, &new_pre_compute_duties, current_sync_committee_period, current_slot, current_pre_compute_slot, + config, ) .await }, @@ -393,12 +405,22 @@ pub async fn poll_sync_committee_duties( let sub_duties_service = duties_service.clone(); duties_service.context.executor.spawn( async move { + let config = SelectionProofConfig { + lookahead_slot: sub_duties_service + .sync_duties + .aggregation_pre_compute_slots(), + computation_offset: Duration::from_secs(12), + selections_endpoint: sub_duties_service.distributed, + parallel_sign: sub_duties_service.distributed, + }; + fill_in_aggregation_proofs( sub_duties_service, &new_pre_compute_duties, next_sync_committee_period, current_slot, pre_compute_slot, + config, ) .await }, @@ -503,11 +525,12 @@ pub async fn fill_in_aggregation_proofs( sync_committee_period: u64, current_slot: Slot, pre_compute_slot: Slot, + config: SelectionProofConfig, ) { // Generate selection proofs for each validator at each slot, one slot at a time. for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { // For distributed mode - if duties_service.distributed { + if config.parallel_sign { let mut sync_committee_selection = Vec::new(); for (_, duty) in pre_compute_duties { @@ -576,14 +599,18 @@ pub async fn fill_in_aggregation_proofs( })); } - let sync_committee_selection_data = join_all(sync_committee_selection).await; + let mut futures_unordered = FuturesUnordered::new(); - // Collect the SyncCommitteeSelection data - let sync_selection_data: Vec<_> = sync_committee_selection_data - .into_iter() - .flatten() - .collect(); + for future in sync_committee_selection { + futures_unordered.push(future); + } + let mut sync_selection_data = Vec::new(); + while let Some(result) = futures_unordered.next().await { + if let Some(selection) = result { + sync_selection_data.push(selection); + } + } // Call the endpoint /eth/v1/validator/sync_committee_selections // by sending the SyncCommitteeSelection that contains partial sync selection proof // The middleware should return SyncCommitteeSelection that contains full sync selection proof