Implement sync committee selection endpoint

This commit is contained in:
Tan Chee Keong
2025-03-24 14:12:34 +08:00
parent 8b2f058d7b
commit 1631c860dc
6 changed files with 169 additions and 55 deletions

View File

@@ -141,6 +141,7 @@ pub struct Timeouts {
pub proposer_duties: Duration, pub proposer_duties: Duration,
pub sync_committee_contribution: Duration, pub sync_committee_contribution: Duration,
pub sync_duties: Duration, pub sync_duties: Duration,
pub sync_aggregators: Duration,
pub get_beacon_blocks_ssz: Duration, pub get_beacon_blocks_ssz: Duration,
pub get_debug_beacon_states: Duration, pub get_debug_beacon_states: Duration,
pub get_deposit_snapshot: Duration, pub get_deposit_snapshot: Duration,
@@ -159,6 +160,7 @@ impl Timeouts {
proposer_duties: timeout, proposer_duties: timeout,
sync_committee_contribution: timeout, sync_committee_contribution: timeout,
sync_duties: timeout, sync_duties: timeout,
sync_aggregators: timeout,
get_beacon_blocks_ssz: timeout, get_beacon_blocks_ssz: timeout,
get_debug_beacon_states: timeout, get_debug_beacon_states: timeout,
get_deposit_snapshot: timeout, get_deposit_snapshot: timeout,
@@ -2686,6 +2688,22 @@ impl BeaconNodeHttpClient {
) )
.await .await
} }
/// `POST validator/sync_committee_selections`
pub async fn post_validator_sync_committee_selections(
&self,
selections: &[SyncCommitteeSelection],
) -> Result<GenericResponse<Vec<SyncCommitteeSelection>>, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("validator")
.push("sync_committee_selections");
self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators)
.await
}
} }
/// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an

View File

@@ -955,6 +955,17 @@ pub struct BeaconCommitteeSelection {
pub slot: Slot, pub slot: Slot,
pub selection_proof: Signature, pub selection_proof: Signature,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct SyncCommitteeSelection {
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64,
pub slot: Slot,
#[serde(with = "serde_utils::quoted_u64")]
pub subcommittee_index: u64,
pub selection_proof: Signature,
}
// --------- Server Sent Event Types ----------- // --------- Server Sent Event Types -----------
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]

View File

@@ -7,11 +7,12 @@ use crate::{
}; };
use ethereum_hashing::hash; use ethereum_hashing::hash;
use safe_arith::{ArithError, SafeArith}; use safe_arith::{ArithError, SafeArith};
use serde::{Deserialize, Serialize};
use ssz::Encode; use ssz::Encode;
use ssz_types::typenum::Unsigned; use ssz_types::typenum::Unsigned;
use std::cmp; use std::cmp;
#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)] #[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncSelectionProof(Signature); pub struct SyncSelectionProof(Signature);
impl SyncSelectionProof { impl SyncSelectionProof {

View File

@@ -66,6 +66,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
@@ -321,6 +322,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
sync_committee_contribution: slot_duration sync_committee_contribution: slot_duration
/ HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT,
sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT,
sync_aggregators: slot_duration / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT,
get_beacon_blocks_ssz: slot_duration get_beacon_blocks_ssz: slot_duration
/ HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT,
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,

View File

@@ -1159,7 +1159,9 @@ async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
&[validator_metrics::ATTESTATION_SELECTION_PROOFS], &[validator_metrics::ATTESTATION_SELECTION_PROOFS],
); );
// In distributed case, sign selection proofs in parallel; otherwise, sign them serially in non-distributed case // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties,
// as the middleware will need to have a threshold of partial selection proof to be able to return the full selection proof
// Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case
let duty_and_proof_results = if duties_service.distributed { let duty_and_proof_results = if duties_service.distributed {
futures::future::join_all(relevant_duties.into_values().flatten().map( futures::future::join_all(relevant_duties.into_values().flatten().map(
|duty| async { |duty| async {

View File

@@ -1,5 +1,6 @@
use crate::duties_service::{DutiesService, Error}; use crate::duties_service::{DutiesService, Error};
use doppelganger_service::DoppelgangerStatus; use doppelganger_service::DoppelgangerStatus;
use eth2::types::SyncCommitteeSelection;
use futures::future::join_all; use futures::future::join_all;
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, info, warn}; use slog::{crit, debug, info, warn};
@@ -542,61 +543,140 @@ pub async fn fill_in_aggregation_proofs<T: SlotClock + 'static, E: EthSpec>(
// Create futures to produce proofs. // Create futures to produce proofs.
let duties_service_ref = &duties_service; let duties_service_ref = &duties_service;
let futures = subnet_ids.iter().map(|subnet_id| async move { let futures = subnet_ids.iter().map(|subnet_id| {
// Construct proof for prior slot. let duties_service = duties_service.clone();
let proof_slot = slot - 1; async move {
// Construct proof for prior slot.
let proof_slot = slot - 1;
let proof = match duties_service_ref let proof = if duties_service.distributed {
.validator_store let sync_selection_proof = SyncCommitteeSelection {
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) validator_index: duty.validator_index,
.await slot: proof_slot,
{ subcommittee_index: **subnet_id,
Ok(proof) => proof, selection_proof: match duties_service_ref
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { .validator_store
// A pubkey can be missing when a validator was recently .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
// removed via the API. .await
debug!( {
log, Ok(proof) => proof.into(),
"Missing pubkey for sync selection proof"; Err(e) => {
"pubkey" => ?pubkey, return match e {
"pubkey" => ?duty.pubkey, ValidatorStoreError::UnknownPubkey(pubkey) => {
"slot" => proof_slot, debug!(
); log,
return None; "Missing pubkey for sync selection proof";
} "pubkey" => ?pubkey,
Err(e) => { "slot" => proof_slot,
warn!( );
log, None
"Unable to sign selection proof"; }
"error" => ?e, _ => {
"pubkey" => ?duty.pubkey, warn!(
"slot" => proof_slot, log,
); "Unable to sign selection proof";
return None; "error" => ?e,
} "pubkey" => ?duty.pubkey,
}; "slot" => proof_slot,
);
None
}
};
}
},
};
match proof.is_aggregator::<E>() { let response = match duties_service
Ok(true) => { .beacon_nodes
debug!( .first_success(|beacon_node| {
log, let selection = sync_selection_proof.clone();
"Validator is sync aggregator"; debug!(
"validator_index" => duty.validator_index, log,
"slot" => proof_slot, "Partial sync selection proof from VC";
"subnet_id" => %subnet_id, "Sync selection proof" => ?selection,
); );
Some(((proof_slot, *subnet_id), proof)) async move {
} let response = beacon_node
Ok(false) => None, .post_validator_sync_committee_selections(&[selection])
Err(e) => { .await;
warn!( debug!(
log, log,
"Error determining is_aggregator"; "Response from middleware for sync";
"pubkey" => ?duty.pubkey, "response" => ?response,
"slot" => proof_slot, );
"error" => ?e,
); response
None }
})
.await
{
Ok(response) => response,
Err(e) => {
warn! {
log,
"Unable to sign selection proof in middleware level";
"error" => %e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
};
return None;
}
};
SyncSelectionProof::from(response.data[0].selection_proof.clone())
} else {
match duties_service_ref
.validator_store
.produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id)
.await
{
Ok(proof) => proof,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(
log,
"Missing pubkey for sync selection proof";
"pubkey" => ?pubkey,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
Err(e) => {
warn!(
log,
"Unable to sign selection proof";
"error" => ?e,
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
);
return None;
}
}
};
match proof.is_aggregator::<E>() {
Ok(true) => {
debug!(
log,
"Validator is sync aggregator";
"validator_index" => duty.validator_index,
"slot" => proof_slot,
"subnet_id" => %subnet_id,
);
Some(((proof_slot, *subnet_id), proof))
}
Ok(false) => None,
Err(e) => {
warn!(
log,
"Error determining is_aggregator";
"pubkey" => ?duty.pubkey,
"slot" => proof_slot,
"error" => ?e,
);
None
}
} }
} }
}); });