Use FuturesUnordered

This commit is contained in:
Tan Chee Keong
2025-04-09 22:19:13 +08:00
parent d3dbe870ef
commit a11cee21d3

View File

@@ -16,7 +16,10 @@ use eth2::types::{
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse, AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
ProposerData, StateId, ValidatorId, ProposerData, StateId, ValidatorId,
}; };
use futures::{stream, StreamExt}; use futures::{
stream::{self, FuturesUnordered},
StreamExt,
};
use parking_lot::RwLock; use parking_lot::RwLock;
use safe_arith::{ArithError, SafeArith}; use safe_arith::{ArithError, SafeArith};
use slot_clock::SlotClock; use slot_clock::SlotClock;
@@ -123,7 +126,7 @@ pub struct SubscriptionSlots {
duty_slot: Slot, duty_slot: Slot,
} }
pub struct SelectionProofConfig { struct SelectionProofConfig {
look_ahead: u64, look_ahead: u64,
selections_endpoint: bool, selections_endpoint: bool,
parallel_sign: bool, parallel_sign: bool,
@@ -1146,8 +1149,10 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
// as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
let duty_and_proof_results = if config.parallel_sign { let duty_and_proof_results = if config.parallel_sign {
futures::future::join_all(relevant_duties.into_values().flatten().map( let futures = relevant_duties
|duty| async { .into_values()
.flatten()
.map(|duty| async {
let opt_selection_proof = make_selection_proof( let opt_selection_proof = make_selection_proof(
&duty, &duty,
&duties_service.validator_store, &duties_service.validator_store,
@@ -1157,9 +1162,10 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
) )
.await?; .await?;
Ok((duty, opt_selection_proof)) Ok((duty, opt_selection_proof))
}, })
)) .collect::<FuturesUnordered<_>>();
.await
futures.collect::<Vec<_>>().await
} else { } else {
stream::iter(relevant_duties.into_values().flatten()) stream::iter(relevant_duties.into_values().flatten())
.then(|duty| async { .then(|duty| async {