Update logging

This commit is contained in:
Tan Chee Keong
2025-03-31 18:33:56 +08:00
parent 89e19ecc8c
commit 1118d622f5
2 changed files with 128 additions and 152 deletions

View File

@@ -132,7 +132,6 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
spec: &ChainSpec, spec: &ChainSpec,
distributed: bool, distributed: bool,
beacon_nodes: &Arc<BeaconNodeFallback<T, E>>, beacon_nodes: &Arc<BeaconNodeFallback<T, E>>,
duties_service: &DutiesService<T, E>,
) -> Result<Option<SelectionProof>, Error> { ) -> Result<Option<SelectionProof>, Error> {
let selection_proof = if distributed { let selection_proof = if distributed {
// Submit a partial selection proof in the data field of the POST HTTP endpoint // Submit a partial selection proof in the data field of the POST HTTP endpoint
@@ -147,14 +146,12 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
}; };
// Call the endpoint /eth/v1/validator/beacon_committee_selections // Call the endpoint /eth/v1/validator/beacon_committee_selections
// The middleware should return a full selection proof here // The middleware should return a full selection proof here
let log = duties_service.context.log();
let response = beacon_nodes let response = beacon_nodes
.first_success(|beacon_node| { .first_success(|beacon_node| {
let selections = selection.clone(); let selections = selection.clone();
debug!( debug!(
log, "Selection proof" = ?selections,
"Partial selection proof from VC"; "Partial selection proof from VC"
"Selection proof" => ?selections,
); );
// println!("Selection proof: {:?}", selections); // println!("Selection proof: {:?}", selections);
async move { async move {
@@ -162,12 +159,7 @@ async fn make_selection_proof<T: SlotClock + 'static, E: EthSpec>(
.post_validator_beacon_committee_selections(&[selections]) .post_validator_beacon_committee_selections(&[selections])
.await; .await;
debug!( debug!(?response, "Response from middleware");
log,
"Response from middleware";
"response" => ?response,
);
// println!("Response from middleware {:?}", response); // println!("Response from middleware {:?}", response);
response response
@@ -1147,7 +1139,6 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duties_service.spec, &duties_service.spec,
duties_service.distributed, duties_service.distributed,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service,
) )
.await?; .await?;
Ok((duty, opt_selection_proof)) Ok((duty, opt_selection_proof))
@@ -1163,7 +1154,6 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&duties_service.spec, &duties_service.spec,
duties_service.distributed, duties_service.distributed,
&duties_service.beacon_nodes, &duties_service.beacon_nodes,
&duties_service,
) )
.await?; .await?;
Ok((duty, opt_selection_proof)) Ok((duty, opt_selection_proof))

View File

@@ -524,9 +524,8 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
Ok(subnet_ids) => subnet_ids, Ok(subnet_ids) => subnet_ids,
Err(e) => { Err(e) => {
crit!( crit!(
log, "error" = ?e,
"Arithmetic error computing subnet IDs"; "Arithmetic error computing subnet IDs"
"error" => ?e,
); );
continue; continue;
} }
@@ -563,31 +562,28 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
}; };
// Add log for debugging // Add log for debugging
debug!( debug!(
log, "validator_index" = duty.validator_index,
"Partial sync selection proof"; "slot" = %proof_slot,
"validator_index" => duty.validator_index, "subcommittee_index" = subnet_id,
"slot" => proof_slot, "partial selection proof" = ?proof,
"subcommittee_index" => subnet_id, "Partial sync selection proof"
"partial selection proof" => ?proof,
); );
Some(sync_committee_selection) Some(sync_committee_selection)
} }
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
debug!( debug!(
log, ?pubkey,
"Missing pubkey for sync selection proof"; "slot" = %proof_slot,
"pubkey" => ?pubkey, "Missing pubkey for sync selection proof"
"slot" => proof_slot,
); );
None None
} }
Err(e) => { Err(e) => {
warn!( warn!(
log, "error" = ?e,
"Unable to sign selection proof"; "pubkey" = ?duty.pubkey,
"error" => ?e, "slot" = %proof_slot,
"pubkey" => ?duty.pubkey, "Unable to sign selection proof"
"slot" => proof_slot,
); );
None None
} }
@@ -606,10 +602,8 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// If we have any valid proofs, send them to the middleware // If we have any valid proofs, send them to the middleware
if !selection_proofs.is_empty() { if !selection_proofs.is_empty() {
debug!( debug!(
log, "count" = selection_proofs.len(),
"Sending batch of partial sync selection proofs"; %slot, "Sending batch of partial sync selection proofs"
"count" => selection_proofs.len(),
"slot" => slot,
); );
let response = duties_service let response = duties_service
@@ -627,20 +621,14 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
match response { match response {
Ok(response) => { Ok(response) => {
debug!( debug!(
log, "count" = response.data.len(),
"Received batch response from middleware for sync"; %slot, "Received batch response from middleware for sync"
"count" => response.data.len(),
"slot" => slot,
); );
// Get the sync map to update duties // Get the sync map to update duties
let sync_map = duties_service.sync_duties.committees.read(); let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else { let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!( debug!("period" = sync_committee_period, "Missing sync duties");
log,
"Missing sync duties";
"period" => sync_committee_period,
);
continue; continue;
}; };
@@ -662,13 +650,12 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
Ok(true) => { Ok(true) => {
if let Some(Some(duty)) = validators.get(&validator_index) { if let Some(Some(duty)) = validators.get(&validator_index) {
debug!( debug!(
log, validator_index,
"Validator is sync aggregator"; %slot,
"validator_index" => validator_index, "subnet_id" = subcommittee_index,
"slot" => slot,
"subnet_id" => subcommittee_index,
// log full selection proof for debugging // log full selection proof for debugging
"full selection proof" => ?proof, "full selection proof" = ?proof,
"Validator is sync aggregator"
); );
// Store the proof // Store the proof
@@ -683,11 +670,10 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
} }
Err(e) => { Err(e) => {
warn!( warn!(
log, validator_index,
"Error determining is_aggregator"; %slot,
"validator_index" => validator_index, "error" = ?e,
"slot" => slot, "Error determining is_aggregator"
"error" => ?e,
); );
} }
} }
@@ -695,10 +681,9 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
} }
Err(e) => { Err(e) => {
warn!( warn!(
log, "error" = %e,
"Failed to get sync selection proofs from middleware"; %slot,
"error" => %e, "Failed to get sync selection proofs from middleware"
"slot" => slot,
); );
} }
} }
@@ -711,112 +696,113 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
if slot < *validator_start_slot { if slot < *validator_start_slot {
continue; continue;
} }
let subnet_ids = match duty.subnet_ids::<E>() { let subnet_ids = match duty.subnet_ids::<E>() {
Ok(subnet_ids) => subnet_ids, Ok(subnet_ids) => subnet_ids,
Err(e) => {
crit!(
error = ?e,
"Arithmetic error computing subnet IDs"
);
continue;
}
};
// Create futures to produce proofs.
let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let proof = match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
?pubkey,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Missing pubkey for sync selection proof"
);
return None;
}
Err(e) => { Err(e) => {
warn!( crit!(
error = ?e, error = ?e,
pubkey = ?duty.pubkey, "Arithmetic error computing subnet IDs"
slot = %proof_slot,
"Unable to sign selection proof"
); );
return None; continue;
} }
}; };
match proof.is_aggregator::<E>() { // Create futures to produce proofs.
Ok(true) => { let duties_service_ref = &duties_service;
debug!( let futures = subnet_ids.iter().map(|subnet_id| async move {
validator_index = duty.validator_index, // Construct proof for prior slot.
slot = %proof_slot, let proof_slot = slot - 1;
%subnet_id,
"Validator is sync aggregator" let proof = match duties_service_ref
); .validator_store
Some(((proof_slot, *subnet_id), proof)) .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
} .await
Ok(false) => None, {
Err(e) => { Ok(proof) => proof,
warn!( Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
pubkey = ?duty.pubkey, // A pubkey can be missing when a validator was recently
slot = %proof_slot, // removed via the API.
error = ?e, debug!(
"Error determining is_aggregator" ?pubkey,
); pubkey = ?duty.pubkey,
None slot = %proof_slot,
"Missing pubkey for sync selection proof"
);
return None;
}
Err(e) => {
warn!(
error = ?e,
pubkey = ?duty.pubkey,
slot = %proof_slot,
"Unable to sign selection proof"
);
return None;
}
};
match proof.is_aggregator::<E>() {
Ok(true) => {
debug!(
validator_index = duty.validator_index,
slot = %proof_slot,
%subnet_id,
"Validator is sync aggregator"
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
pubkey = ?duty.pubkey,
slot = %proof_slot,
error = ?e,
"Error determining is_aggregator"
);
None
}
} }
});
// Execute all the futures in parallel, collecting any successful results.
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
validator_proofs.push((duty.validator_index, proofs));
}
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(period = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!(
validator_index,
period = sync_committee_period,
"Missing sync duty to update"
);
} }
}); }
// Execute all the futures in parallel, collecting any successful results. if num_validators_updated > 0 {
let proofs = join_all(futures)
.await
.into_iter()
.flatten()
.collect::<Vec<_>>();
validator_proofs.push((duty.validator_index, proofs));
}
// Add to global storage (we add regularly so the proofs can be used ASAP).
let sync_map = duties_service.sync_duties.committees.read();
let Some(committee_duties) = sync_map.get(&sync_committee_period) else {
debug!(period = sync_committee_period, "Missing sync duties");
continue;
};
let validators = committee_duties.validators.read();
let num_validators_updated = validator_proofs.len();
for (validator_index, proofs) in validator_proofs {
if let Some(Some(duty)) = validators.get(&validator_index) {
duty.aggregation_duties.proofs.write().extend(proofs);
} else {
debug!( debug!(
validator_index, %slot,
period = sync_committee_period, updated_validators = num_validators_updated,
"Missing sync duty to update" "Finished computing sync selection proofs"
); );
} }
} }
if num_validators_updated > 0 {
debug!(
%slot,
updated_validators = num_validators_updated,
"Finished computing sync selection proofs"
);
}
} }
} }