diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index a387bd65b0..d24a5ad191 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -506,8 +506,129 @@ pub async fn fill_in_aggregation_proofs( ) { // Generate selection proofs for each validator at each slot, one slot at a time. for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { - // For distributed mode - if duties_service.distributed { + debug!( + period = sync_committee_period, + %current_slot, + %pre_compute_slot, + "Calculating sync selection proofs" + ); + if !duties_service.distributed { + // For non-distributed mode + let mut validator_proofs = vec![]; + for (validator_start_slot, duty) in pre_compute_duties { + // Proofs are already known at this slot for this validator. + if slot < *validator_start_slot { + continue; + } + + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + error = ?e, + "Arithmetic error computing subnet IDs" + ); + continue; + } + }; + + // Create futures to produce proofs. + let duties_service_ref = &duties_service; + let futures = subnet_ids.iter().map(|subnet_id| async move { + // Construct proof for prior slot. + let proof_slot = slot - 1; + + let proof = match duties_service_ref + .validator_store + .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) + .await + { + Ok(proof) => proof, + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + ?pubkey, + pubkey = ?duty.pubkey, + slot = %proof_slot, + "Missing pubkey for sync selection proof" + ); + return None; + } + Err(e) => { + warn!( + error = ?e, + pubkey = ?duty.pubkey, + slot = %proof_slot, + "Unable to sign selection proof" + ); + return None; + } + }; + + match proof.is_aggregator::() { + Ok(true) => { + debug!( + validator_index = duty.validator_index, + slot = %proof_slot, + %subnet_id, + "Validator is sync aggregator" + ); + Some(((proof_slot, *subnet_id), proof)) + } + Ok(false) => None, + Err(e) => { + warn!( + pubkey = ?duty.pubkey, + slot = %proof_slot, + error = ?e, + "Error determining is_aggregator" + ); + None + } + } + }); + + // Execute all the futures in parallel, collecting any successful results. + let proofs = join_all(futures) + .await + .into_iter() + .flatten() + .collect::>(); + + validator_proofs.push((duty.validator_index, proofs)); + } + + // Add to global storage (we add regularly so the proofs can be used ASAP). + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { + debug!(period = sync_committee_period, "Missing sync duties"); + continue; + }; + let validators = committee_duties.validators.read(); + let num_validators_updated = validator_proofs.len(); + + for (validator_index, proofs) in validator_proofs { + if let Some(Some(duty)) = validators.get(&validator_index) { + duty.aggregation_duties.proofs.write().extend(proofs); + } else { + debug!( + validator_index, + period = sync_committee_period, + "Missing sync duty to update" + ); + } + } + + if num_validators_updated > 0 { + debug!( + %slot, + updated_validators = num_validators_updated, + "Finished computing sync selection proofs" + ); + } + } else { + // For distributed mode let mut sync_committee_selection = Vec::new(); for (_validator_start_slot, duty) in pre_compute_duties { @@ -670,128 +791,6 @@ pub async fn fill_in_aggregation_proofs( ); } } - } else { - // For non-distributed mode - debug!( - period = sync_committee_period, - %current_slot, - %pre_compute_slot, - "Calculating sync selection proofs" - ); - - let mut validator_proofs = vec![]; - for (validator_start_slot, duty) in pre_compute_duties { - // Proofs are already known at this slot for this validator. - if slot < *validator_start_slot { - continue; - } - - let subnet_ids = match duty.subnet_ids::() { - Ok(subnet_ids) => subnet_ids, - Err(e) => { - crit!( - error = ?e, - "Arithmetic error computing subnet IDs" - ); - continue; - } - }; - - // Create futures to produce proofs. - let duties_service_ref = &duties_service; - let futures = subnet_ids.iter().map(|subnet_id| async move { - // Construct proof for prior slot. - let proof_slot = slot - 1; - - let proof = match duties_service_ref - .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) - .await - { - Ok(proof) => proof, - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - ?pubkey, - pubkey = ?duty.pubkey, - slot = %proof_slot, - "Missing pubkey for sync selection proof" - ); - return None; - } - Err(e) => { - warn!( - error = ?e, - pubkey = ?duty.pubkey, - slot = %proof_slot, - "Unable to sign selection proof" - ); - return None; - } - }; - - match proof.is_aggregator::() { - Ok(true) => { - debug!( - validator_index = duty.validator_index, - slot = %proof_slot, - %subnet_id, - "Validator is sync aggregator" - ); - Some(((proof_slot, *subnet_id), proof)) - } - Ok(false) => None, - Err(e) => { - warn!( - pubkey = ?duty.pubkey, - slot = %proof_slot, - error = ?e, - "Error determining is_aggregator" - ); - None - } - } - }); - - // Execute all the futures in parallel, collecting any successful results. - let proofs = join_all(futures) - .await - .into_iter() - .flatten() - .collect::>(); - - validator_proofs.push((duty.validator_index, proofs)); - } - - // Add to global storage (we add regularly so the proofs can be used ASAP). - let sync_map = duties_service.sync_duties.committees.read(); - let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!(period = sync_committee_period, "Missing sync duties"); - continue; - }; - let validators = committee_duties.validators.read(); - let num_validators_updated = validator_proofs.len(); - - for (validator_index, proofs) in validator_proofs { - if let Some(Some(duty)) = validators.get(&validator_index) { - duty.aggregation_duties.proofs.write().extend(proofs); - } else { - debug!( - validator_index, - period = sync_committee_period, - "Missing sync duty to update" - ); - } - } - - if num_validators_updated > 0 { - debug!( - %slot, - updated_validators = num_validators_updated, - "Finished computing sync selection proofs" - ); - } } } }