Modify to FuturesUnordered for Sync

This commit is contained in:
Tan Chee Keong
2025-04-15 21:39:55 +08:00
parent 3fc62f2241
commit ab1d2c06c6
2 changed files with 40 additions and 13 deletions

View File

@@ -1,13 +1,15 @@
use crate::duties_service::{DutiesService, Error};
use crate::duties_service::{DutiesService, Error, SelectionProofConfig};
use doppelganger_service::DoppelgangerStatus;
use eth2::types::{Signature, SyncCommitteeSelection};
use futures::future::join_all;
use futures::stream::{FuturesUnordered, StreamExt};
use logging::crit;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, warn};
use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId};
use validator_store::Error as ValidatorStoreError;
@@ -349,12 +351,22 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
async move {
let config = SelectionProofConfig {
lookahead_slot: sub_duties_service
.sync_duties
.aggregation_pre_compute_slots(),
computation_offset: Duration::from_secs(12),
selections_endpoint: sub_duties_service.distributed,
parallel_sign: sub_duties_service.distributed,
};
fill_in_aggregation_proofs(
sub_duties_service,
&new_pre_compute_duties,
current_sync_committee_period,
current_slot,
current_pre_compute_slot,
config,
)
.await
},
@@ -393,12 +405,22 @@ pub async fn poll_sync_committee_duties<T: SlotClock + 'static, E: EthSpec>(
let sub_duties_service = duties_service.clone();
duties_service.context.executor.spawn(
async move {
let config = SelectionProofConfig {
lookahead_slot: sub_duties_service
.sync_duties
.aggregation_pre_compute_slots(),
computation_offset: Duration::from_secs(12),
selections_endpoint: sub_duties_service.distributed,
parallel_sign: sub_duties_service.distributed,
};
fill_in_aggregation_proofs(
sub_duties_service,
&new_pre_compute_duties,
next_sync_committee_period,
current_slot,
pre_compute_slot,
config,
)
.await
},
@@ -503,11 +525,12 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
sync_committee_period: u64,
current_slot: Slot,
pre_compute_slot: Slot,
config: SelectionProofConfig,
) {
// Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
// For distributed mode
if duties_service.distributed {
if config.parallel_sign {
let mut sync_committee_selection = Vec::new();
for (_, duty) in pre_compute_duties {
@@ -576,14 +599,18 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
}));
}
let sync_committee_selection_data = join_all(sync_committee_selection).await;
let mut futures_unordered = FuturesUnordered::new();
// Collect the SyncCommitteeSelection data
let sync_selection_data: Vec<_> = sync_committee_selection_data
.into_iter()
.flatten()
.collect();
for future in sync_committee_selection {
futures_unordered.push(future);
}
let mut sync_selection_data = Vec::new();
while let Some(result) = futures_unordered.next().await {
if let Some(selection) = result {
sync_selection_data.push(selection);
}
}
// Call the endpoint /eth/v1/validator/sync_committee_selections
// by sending the SyncCommitteeSelection that contains partial sync selection proof
// The middleware should return SyncCommitteeSelection that contains full sync selection proof