diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 0e08340b8d..a8708eb749 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -16,7 +16,10 @@ use eth2::types::{ AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, }; -use futures::{stream, StreamExt}; +use futures::{ + stream::{self, FuturesUnordered}, + StreamExt, +}; use parking_lot::RwLock; use safe_arith::{ArithError, SafeArith}; use slot_clock::SlotClock; @@ -123,7 +126,7 @@ pub struct SubscriptionSlots { duty_slot: Slot, } -pub struct SelectionProofConfig { +struct SelectionProofConfig { look_ahead: u64, selections_endpoint: bool, parallel_sign: bool, @@ -1146,8 +1149,10 @@ async fn fill_in_selection_proofs( // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case let duty_and_proof_results = if config.parallel_sign { - futures::future::join_all(relevant_duties.into_values().flatten().map( - |duty| async { + let futures = relevant_duties + .into_values() + .flatten() + .map(|duty| async { let opt_selection_proof = make_selection_proof( &duty, &duties_service.validator_store, @@ -1157,9 +1162,10 @@ async fn fill_in_selection_proofs( ) .await?; Ok((duty, opt_selection_proof)) - }, - )) - .await + }) + .collect::>(); + + futures.collect::>().await } else { stream::iter(relevant_duties.into_values().flatten()) .then(|duty| async {