From 44bd5f13f6c10f30d5b1a5d953daaf1c7507bc4c Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Fri, 11 Apr 2025 19:37:46 +0800 Subject: [PATCH] rearrange --- .../validator_services/src/duties_service.rs | 140 +++++++++--------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index b9f8966905..55acca5fc2 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -1150,8 +1150,8 @@ async fn fill_in_selection_proofs( // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case - let duty_and_proof_results = if config.parallel_sign { - let mut futures = relevant_duties + if config.parallel_sign { + let mut duty_and_proof_results = relevant_duties .into_values() .flatten() .map(|duty| async { @@ -1167,7 +1167,7 @@ async fn fill_in_selection_proofs( }) .collect::>(); - while let Some(result) = futures.next().await { + while let Some(result) = duty_and_proof_results.next().await { let mut attesters = duties_service.attesters.write(); let (duty, selection_proof) = match result { Ok(duty_and_proof) => duty_and_proof, @@ -1233,9 +1233,8 @@ async fn fill_in_selection_proofs( } } } - Vec::new() } else { - stream::iter(relevant_duties.into_values().flatten()) + let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) .then(|duty| async { let opt_selection_proof = make_selection_proof( &duty, @@ -1248,76 +1247,77 @@ async fn fill_in_selection_proofs( Ok((duty, opt_selection_proof)) }) .collect::>() - .await - }; + .await; - // Add to attesters store. - let mut attesters = duties_service.attesters.write(); - for result in duty_and_proof_results { - let (duty, selection_proof) = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(Error::FailedToProduceSelectionProof( - ValidatorStoreError::UnknownPubkey(pubkey), - )) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - Err(e) => { - error!( - error = ?e, - msg = "may impair attestation duties", - "Failed to produce duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; - - let attester_map = attesters.entry(duty.pubkey).or_default(); - let epoch = duty.slot.epoch(E::slots_per_epoch()); - match attester_map.entry(epoch) { - hash_map::Entry::Occupied(mut entry) => { - // No need to update duties for which no proof was computed. - let Some(selection_proof) = selection_proof else { - continue; - }; - - let (existing_dependent_root, existing_duty) = entry.get_mut(); - - if *existing_dependent_root == dependent_root { - // Replace existing proof. - existing_duty.selection_proof = Some(selection_proof); - } else { - // Our selection proofs are no longer relevant due to a reorg, abandon - // this entire background process. - debug!( - reason = "re-org", - "Stopping selection proof background task" + // Add to attesters store. + let mut attesters = duties_service.attesters.write(); + for result in duty_and_proof_results { + let (duty, selection_proof) = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(Error::FailedToProduceSelectionProof( + ValidatorStoreError::UnknownPubkey(pubkey), + )) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for duty and proof" ); - return; + // Do not abort the entire batch for a single failure. + continue; + } + Err(e) => { + error!( + error = ?e, + msg = "may impair attestation duties", + "Failed to produce duty and proof" + ); + // Do not abort the entire batch for a single failure. + continue; + } + }; + + let attester_map = attesters.entry(duty.pubkey).or_default(); + let epoch = duty.slot.epoch(E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = selection_proof else { + continue; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon + // this entire background process. + debug!( + reason = "re-org", + "Stopping selection proof background task" + ); + return; + } + } + hash_map::Entry::Vacant(entry) => { + // This probably shouldn't happen, but we have enough info to fill in the + // entry so we may as well. + let subscription_slots = + SubscriptionSlots::new(duty.slot, current_slot); + let duty_and_proof = DutyAndProof { + duty, + selection_proof, + subscription_slots, + }; + entry.insert((dependent_root, duty_and_proof)); } } - hash_map::Entry::Vacant(entry) => { - // This probably shouldn't happen, but we have enough info to fill in the - // entry so we may as well. - let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); - let duty_and_proof = DutyAndProof { - duty, - selection_proof, - subscription_slots, - }; - entry.insert((dependent_root, duty_and_proof)); - } } - } - drop(attesters); + drop(attesters); + }; let time_taken_ms = Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();