Testing distributed mode

This commit is contained in:
Tan Chee Keong
2025-03-26 13:57:55 +08:00
parent d2bb70ee8c
commit 3b883c3d05

View File

@@ -522,125 +522,67 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Generate selection proofs for each validator at each slot, one slot at a time.
for slot in (current_slot.as_u64()..=pre_compute_slot.as_u64()).map(Slot::new) {
let mut validator_proofs = vec![];
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
// For distributed mode
if duties_service.distributed {
let mut partial_proofs = Vec::new();
let subnet_ids = match duty.subnet_ids::<E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
);
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
};
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| {
let duties_service = duties_service.clone();
async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let subnet_ids = match duty.subnet_ids::<E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
);
continue;
}
};
let proof = if duties_service.distributed {
let sync_selection_proof = SyncCommitteeSelection {
validator_index: duty.validator_index,
slot: proof_slot,
subcommittee_index: **subnet_id,
selection_proof: match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof.into(),
Err(e) => {
return match e {
ValidatorStoreError::UnknownPubkey(pubkey) => {
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"slot" => proof_slot,
);
None
}
_ => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
None
}
};
}
},
};
// Construct proof for prior slot.
let proof_slot = slot - 1;
let response = match duties_service
.beacon_nodes
.first_success(|beacon_node| {
let selection = sync_selection_proof.clone();
debug!(
log,
"Partial sync selection proof from VC";
"Sync selection proof" => ?selection,
);
async move {
let response = beacon_node
.post_validator_sync_committee_selections(&[selection])
.await;
debug!(
log,
"Response from middleware for sync";
"response" => ?response,
);
// Create futures for all subnet IDs for this validator
for subnet_id in subnet_ids {
let duties_service = duties_service.clone();
let duty = duty.clone();
let subnet_id = *subnet_id;
response
}
})
.await
{
Ok(response) => response,
Err(e) => {
warn! {
log,
"Unable to sign selection proof in middleware level";
"error" => %e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
};
return None;
}
};
SyncSelectionProof::from(response.data[0].selection_proof.clone())
} else {
match duties_service_ref
// Store all partial sync selection proofs for this slot in partial_proofs so that it can be sent together
partial_proofs.push(async move {
// Produce partial selection proof
let sync_selection_proof = duties_service
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
.produce_sync_selection_proof(
&duty.pubkey,
proof_slot,
subnet_id.into(),
)
.await;
match sync_selection_proof {
Ok(proof) => {
let sync_committee_selection = SyncCommitteeSelection {
validator_index: duty.validator_index,
slot: proof_slot,
subcommittee_index: subnet_id,
selection_proof: proof.into(),
};
Some(sync_committee_selection)
}
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
None
}
Err(e) => {
warn!(
@@ -650,9 +592,176 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
None
}
}
});
}
}
// Execute partial_proofs in parallel
let partial_proof_results = join_all(partial_proofs).await;
// Filter out None values and extract the selection proofs
let valid_results: Vec<_> = partial_proof_results.into_iter().flatten().collect();
let selection_proofs: Vec<_> = valid_results.clone();
// If we have any valid proofs, send them to the middleware
if !selection_proofs.is_empty() {
debug!(
log,
"Sending batch of partial sync selection proofs";
"count" => selection_proofs.len(),
);
let response = duties_service
.beacon_nodes
.first_success(|beacon_node| {
let proofs = selection_proofs.clone();
async move {
beacon_node
.post_validator_sync_committee_selections(&proofs)
.await
}
})
.await;
match response {
Ok(response) => {
debug!(
log,
"Received batch response from middleware for sync";
"count" => response.data.len(),
);
// Get the sync map to update duties
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(
log,
"Missing sync duties";
"period" => sync_committee_period,
);
continue;
};
let validators = committee_duties.validators.read();
// Process each response
for (i, selection_response) in response.data.iter().enumerate() {
if i >= valid_results.len() {
break;
}
let selection = &valid_results[i];
let subnet_id = SyncSubnetId::new(selection.subcommittee_index);
// Convert the response to a SyncSelectionProof
let proof = SyncSelectionProof::from(
selection_response.selection_proof.clone(),
);
// Check if the validator is an aggregator
match proof.is_aggregator::<E>() {
Ok(true) => {
if let Some(Some(duty)) =
validators.get(&selection.validator_index)
{
debug!(
log,
"Validator is sync aggregator";
"validator_index" => selection.validator_index,
"slot" => selection.slot,
"subnet_id" => %subnet_id,
);
// Store the proof
duty.aggregation_duties
.proofs
.write()
.insert((selection.slot, subnet_id), proof);
}
}
Ok(false) => {
// Not an aggregator, nothing to do
}
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"validator_index" => selection.validator_index,
"slot" => selection.slot,
"error" => ?e,
);
}
}
}
}
Err(e) => {
warn!(
log,
"Failed to get sync selection proofs from middleware";
"error" => %e,
"slot" => slot,
);
}
}
}
} else {
// For non-distributed mode
let mut validator_proofs = vec![];
for (validator_start_slot, duty) in pre_compute_duties {
// Proofs are already known at this slot for this validator.
if slot < *validator_start_slot {
continue;
}
let subnet_ids = match duty.subnet_ids::<E>() {
Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
log,
"Arithmetic error computing subnet IDs";
"error" => ?e,
);
continue;
}
};
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let proof = match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
};
match proof.is_aggregator::<E>() {
@@ -678,52 +787,52 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
None
}
}
}
});
});
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
validator_proofs.push((duty.validator_index, proofs));
}
validator_proofs.push((duty.validator_index, proofs));
}
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(
log,
"Missing sync duties";
"period" => sync_committee_period,
);
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(
log,
"Missing sync duty to update";
"validator_index" => validator_index,
"Missing sync duties";
"period" => sync_committee_period,
);
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
log,
"Missing sync duty to update";
"validator_index" => validator_index,
"period" => sync_committee_period,
);
}
}
if num_validators_updated > 0 {
debug!(
log,
"Finished computing sync selection proofs";
"slot" => slot,
"updated_validators" => num_validators_updated,
);
}
}
if num_validators_updated > 0 {
debug!(
log,
"Finished computing sync selection proofs";
"slot" => slot,
"updated_validators" => num_validators_updated,
);
}
}
}