diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index 73e4c69d71..8f42183aa3 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -522,125 +522,67 @@ pub async fn fill_in_aggregation_proofs( // Generate selection proofs for each validator at each slot, one slot at a time. for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) { - let mut validator_proofs = vec![]; - for (validator_start_slot, duty) in pre_compute_duties { - // Proofs are already known at this slot for this validator. - if slot < *validator_start_slot { - continue; - } + // For distributed mode + if duties_service.distributed { + let mut partial_proofs = Vec::new(); - let subnet_ids = match duty.subnet_ids::() { - Ok(subnet_ids) => subnet_ids, - Err(e) => { - crit!( - log, - "Arithmetic error computing subnet IDs"; - "error" => ?e, - ); + for (validator_start_slot, duty) in pre_compute_duties { + // Proofs are already known at this slot for this validator. + if slot < *validator_start_slot { continue; } - }; - // Create futures to produce proofs. - let duties_service_ref = &duties_service; - let futures = subnet_ids.iter().map(|subnet_id| { - let duties_service = duties_service.clone(); - async move { - // Construct proof for prior slot. - let proof_slot = slot - 1; + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + log, + "Arithmetic error computing subnet IDs"; + "error" => ?e, + ); + continue; + } + }; - let proof = if duties_service.distributed { - let sync_selection_proof = SyncCommitteeSelection { - validator_index: duty.validator_index, - slot: proof_slot, - subcommittee_index: **subnet_id, - selection_proof: match duties_service_ref - .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) - .await - { - Ok(proof) => proof.into(), - Err(e) => { - return match e { - ValidatorStoreError::UnknownPubkey(pubkey) => { - debug!( - log, - "Missing pubkey for sync selection proof"; - "pubkey" => ?pubkey, - "slot" => proof_slot, - ); - None - } - _ => { - warn!( - log, - "Unable to sign selection proof"; - "error" => ?e, - "pubkey" => ?duty.pubkey, - "slot" => proof_slot, - ); - None - } - }; - } - }, - }; + // Construct proof for prior slot. + let proof_slot = slot - 1; - let response = match duties_service - .beacon_nodes - .first_success(|beacon_node| { - let selection = sync_selection_proof.clone(); - debug!( - log, - "Partial sync selection proof from VC"; - "Sync selection proof" => ?selection, - ); - async move { - let response = beacon_node - .post_validator_sync_committee_selections(&[selection]) - .await; - debug!( - log, - "Response from middleware for sync"; - "response" => ?response, - ); + // Create futures for all subnet IDs for this validator + for subnet_id in subnet_ids { + let duties_service = duties_service.clone(); + let duty = duty.clone(); + let subnet_id = *subnet_id; - response - } - }) - .await - { - Ok(response) => response, - Err(e) => { - warn! { - log, - "Unable to sign selection proof in middleware level"; - "error" => %e, - "pubkey" => ?duty.pubkey, - "slot" => proof_slot, - }; - return None; - } - }; - SyncSelectionProof::from(response.data[0].selection_proof.clone()) - } else { - match duties_service_ref + // Store all partial sync selection proofs for this slot in partial_proofs so that it can be sent together + partial_proofs.push(async move { + // Produce partial selection proof + let sync_selection_proof = duties_service .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) - .await - { - Ok(proof) => proof, + .produce_sync_selection_proof( + &duty.pubkey, + proof_slot, + subnet_id.into(), + ) + .await; + + match sync_selection_proof { + Ok(proof) => { + let sync_committee_selection = SyncCommitteeSelection { + validator_index: duty.validator_index, + slot: proof_slot, + subcommittee_index: subnet_id, + selection_proof: proof.into(), + }; + Some(sync_committee_selection) + } Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. debug!( log, "Missing pubkey for sync selection proof"; "pubkey" => ?pubkey, - "pubkey" => ?duty.pubkey, "slot" => proof_slot, ); - return None; + None } Err(e) => { warn!( @@ -650,9 +592,176 @@ pub async fn fill_in_aggregation_proofs( "pubkey" => ?duty.pubkey, "slot" => proof_slot, ); - return None; + None } } + }); + } + } + + // Execute partial_proofs in parallel + let partial_proof_results = join_all(partial_proofs).await; + + // Filter out None values and extract the selection proofs + let valid_results: Vec<_> = partial_proof_results.into_iter().flatten().collect(); + let selection_proofs: Vec<_> = valid_results.clone(); + + // If we have any valid proofs, send them to the middleware + if !selection_proofs.is_empty() { + debug!( + log, + "Sending batch of partial sync selection proofs"; + "count" => selection_proofs.len(), + ); + + let response = duties_service + .beacon_nodes + .first_success(|beacon_node| { + let proofs = selection_proofs.clone(); + async move { + beacon_node + .post_validator_sync_committee_selections(&proofs) + .await + } + }) + .await; + + match response { + Ok(response) => { + debug!( + log, + "Received batch response from middleware for sync"; + "count" => response.data.len(), + ); + + // Get the sync map to update duties + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { + debug!( + log, + "Missing sync duties"; + "period" => sync_committee_period, + ); + continue; + }; + + let validators = committee_duties.validators.read(); + + // Process each response + for (i, selection_response) in response.data.iter().enumerate() { + if i >= valid_results.len() { + break; + } + + let selection = &valid_results[i]; + let subnet_id = SyncSubnetId::new(selection.subcommittee_index); + + // Convert the response to a SyncSelectionProof + let proof = SyncSelectionProof::from( + selection_response.selection_proof.clone(), + ); + + // Check if the validator is an aggregator + match proof.is_aggregator::() { + Ok(true) => { + if let Some(Some(duty)) = + validators.get(&selection.validator_index) + { + debug!( + log, + "Validator is sync aggregator"; + "validator_index" => selection.validator_index, + "slot" => selection.slot, + "subnet_id" => %subnet_id, + ); + + // Store the proof + duty.aggregation_duties + .proofs + .write() + .insert((selection.slot, subnet_id), proof); + } + } + Ok(false) => { + // Not an aggregator, nothing to do + } + Err(e) => { + warn!( + log, + "Error determining is_aggregator"; + "validator_index" => selection.validator_index, + "slot" => selection.slot, + "error" => ?e, + ); + } + } + } + } + Err(e) => { + warn!( + log, + "Failed to get sync selection proofs from middleware"; + "error" => %e, + "slot" => slot, + ); + } + } + } + } else { + // For non-distributed mode + let mut validator_proofs = vec![]; + for (validator_start_slot, duty) in pre_compute_duties { + // Proofs are already known at this slot for this validator. + if slot < *validator_start_slot { + continue; + } + + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + log, + "Arithmetic error computing subnet IDs"; + "error" => ?e, + ); + continue; + } + }; + + // Create futures to produce proofs. + let duties_service_ref = &duties_service; + let futures = subnet_ids.iter().map(|subnet_id| async move { + // Construct proof for prior slot. + let proof_slot = slot - 1; + + let proof = match duties_service_ref + .validator_store + .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) + .await + { + Ok(proof) => proof, + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + log, + "Missing pubkey for sync selection proof"; + "pubkey" => ?pubkey, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + ); + return None; + } + Err(e) => { + warn!( + log, + "Unable to sign selection proof"; + "error" => ?e, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + ); + return None; + } }; match proof.is_aggregator::() { @@ -678,52 +787,52 @@ pub async fn fill_in_aggregation_proofs( None } } - } - }); + }); - // Execute all the futures in parallel, collecting any successful results. - let proofs = join_all(futures) - .await - .into_iter() - .flatten() - .collect::>(); + // Execute all the futures in parallel, collecting any successful results. + let proofs = join_all(futures) + .await + .into_iter() + .flatten() + .collect::>(); - validator_proofs.push((duty.validator_index, proofs)); - } + validator_proofs.push((duty.validator_index, proofs)); + } - // Add to global storage (we add regularly so the proofs can be used ASAP). - let sync_map = duties_service.sync_duties.committees.read(); - let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!( - log, - "Missing sync duties"; - "period" => sync_committee_period, - ); - continue; - }; - let validators = committee_duties.validators.read(); - let num_validators_updated = validator_proofs.len(); - - for (validator_index, proofs) in validator_proofs { - if let Some(Some(duty)) = validators.get(&validator_index) { - duty.aggregation_duties.proofs.write().extend(proofs); - } else { + // Add to global storage (we add regularly so the proofs can be used ASAP). + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { debug!( log, - "Missing sync duty to update"; - "validator_index" => validator_index, + "Missing sync duties"; "period" => sync_committee_period, ); + continue; + }; + let validators = committee_duties.validators.read(); + let num_validators_updated = validator_proofs.len(); + + for (validator_index, proofs) in validator_proofs { + if let Some(Some(duty)) = validators.get(&validator_index) { + duty.aggregation_duties.proofs.write().extend(proofs); + } else { + debug!( + log, + "Missing sync duty to update"; + "validator_index" => validator_index, + "period" => sync_committee_period, + ); + } + } + + if num_validators_updated > 0 { + debug!( + log, + "Finished computing sync selection proofs"; + "slot" => slot, + "updated_validators" => num_validators_updated, + ); } } - - if num_validators_updated > 0 { - debug!( - log, - "Finished computing sync selection proofs"; - "slot" => slot, - "updated_validators" => num_validators_updated, - ); - } } }