From 1631c860dcbbb4146d4f03b3e84c9ad86c8965b7 Mon Sep 17 00:00:00 2001 From: Tan Chee Keong Date: Mon, 24 Mar 2025 14:12:34 +0800 Subject: [PATCH] Implement sync committee selection endpoint --- common/eth2/src/lib.rs | 18 ++ common/eth2/src/types.rs | 11 ++ consensus/types/src/sync_selection_proof.rs | 3 +- validator_client/src/lib.rs | 2 + .../validator_services/src/duties_service.rs | 4 +- .../validator_services/src/sync.rs | 186 +++++++++++++----- 6 files changed, 169 insertions(+), 55 deletions(-) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 19099510c9..64cf9f7ce6 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -141,6 +141,7 @@ pub struct Timeouts { pub proposer_duties: Duration, pub sync_committee_contribution: Duration, pub sync_duties: Duration, + pub sync_aggregators: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, pub get_deposit_snapshot: Duration, @@ -159,6 +160,7 @@ impl Timeouts { proposer_duties: timeout, sync_committee_contribution: timeout, sync_duties: timeout, + sync_aggregators: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, get_deposit_snapshot: timeout, @@ -2686,6 +2688,22 @@ impl BeaconNodeHttpClient { ) .await } + + /// `POST validator/sync_committee_selections` + pub async fn post_validator_sync_committee_selections( + &self, + selections: &[SyncCommitteeSelection], + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("sync_committee_selections"); + + self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators) + .await + } } /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d258b6a23d..35b0de02f7 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -955,6 +955,17 @@ pub struct BeaconCommitteeSelection { pub slot: Slot, pub selection_proof: Signature, } + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + +pub struct SyncCommitteeSelection { + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + pub slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] + pub subcommittee_index: u64, + pub selection_proof: Signature, +} // --------- Server Sent Event Types ----------- #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] diff --git a/consensus/types/src/sync_selection_proof.rs b/consensus/types/src/sync_selection_proof.rs index 4adb90b26e..e4f6ce43cd 100644 --- a/consensus/types/src/sync_selection_proof.rs +++ b/consensus/types/src/sync_selection_proof.rs @@ -7,11 +7,12 @@ use crate::{ }; use ethereum_hashing::hash; use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; use ssz::Encode; use ssz_types::typenum::Unsigned; use std::cmp; -#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)] +#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct SyncSelectionProof(Signature); impl SyncSelectionProof { diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 279d33b93a..2385ebc19a 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -66,6 +66,7 @@ const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; @@ -321,6 +322,7 @@ impl ProductionValidatorClient { sync_committee_contribution: slot_duration / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + sync_aggregators: slot_duration / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: slot_duration / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index 44d4d54d14..07a87b3ea5 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -1159,7 +1159,9 @@ async fn fill_in_selection_proofs( &[validator_metrics::ATTESTATION_SELECTION_PROOFS], ); - // In distributed case, sign selection proofs in parallel; otherwise, sign them serially in non-distributed case + // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, + // as the middleware will need to have a threshold of partial selection proof to be able to return the full selection proof + // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case let duty_and_proof_results = if duties_service.distributed { futures::future::join_all(relevant_duties.into_values().flatten().map( |duty| async { diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index dd3e05088e..73e4c69d71 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -1,5 +1,6 @@ use crate::duties_service::{DutiesService, Error}; use doppelganger_service::DoppelgangerStatus; +use eth2::types::SyncCommitteeSelection; use futures::future::join_all; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slog::{crit, debug, info, warn}; @@ -542,61 +543,140 @@ pub async fn fill_in_aggregation_proofs( // Create futures to produce proofs. let duties_service_ref = &duties_service; - let futures = subnet_ids.iter().map(|subnet_id| async move { - // Construct proof for prior slot. - let proof_slot = slot - 1; + let futures = subnet_ids.iter().map(|subnet_id| { + let duties_service = duties_service.clone(); + async move { + // Construct proof for prior slot. + let proof_slot = slot - 1; - let proof = match duties_service_ref - .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) - .await - { - Ok(proof) => proof, - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - log, - "Missing pubkey for sync selection proof"; - "pubkey" => ?pubkey, - "pubkey" => ?duty.pubkey, - "slot" => proof_slot, - ); - return None; - } - Err(e) => { - warn!( - log, - "Unable to sign selection proof"; - "error" => ?e, - "pubkey" => ?duty.pubkey, - "slot" => proof_slot, - ); - return None; - } - }; + let proof = if duties_service.distributed { + let sync_selection_proof = SyncCommitteeSelection { + validator_index: duty.validator_index, + slot: proof_slot, + subcommittee_index: **subnet_id, + selection_proof: match duties_service_ref + .validator_store + .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) + .await + { + Ok(proof) => proof.into(), + Err(e) => { + return match e { + ValidatorStoreError::UnknownPubkey(pubkey) => { + debug!( + log, + "Missing pubkey for sync selection proof"; + "pubkey" => ?pubkey, + "slot" => proof_slot, + ); + None + } + _ => { + warn!( + log, + "Unable to sign selection proof"; + "error" => ?e, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + ); + None + } + }; + } + }, + }; - match proof.is_aggregator::() { - Ok(true) => { - debug!( - log, - "Validator is sync aggregator"; - "validator_index" => duty.validator_index, - "slot" => proof_slot, - "subnet_id" => %subnet_id, - ); - Some(((proof_slot, *subnet_id), proof)) - } - Ok(false) => None, - Err(e) => { - warn!( - log, - "Error determining is_aggregator"; - "pubkey" => ?duty.pubkey, - "slot" => proof_slot, - "error" => ?e, - ); - None + let response = match duties_service + .beacon_nodes + .first_success(|beacon_node| { + let selection = sync_selection_proof.clone(); + debug!( + log, + "Partial sync selection proof from VC"; + "Sync selection proof" => ?selection, + ); + async move { + let response = beacon_node + .post_validator_sync_committee_selections(&[selection]) + .await; + debug!( + log, + "Response from middleware for sync"; + "response" => ?response, + ); + + response + } + }) + .await + { + Ok(response) => response, + Err(e) => { + warn! { + log, + "Unable to sign selection proof in middleware level"; + "error" => %e, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + }; + return None; + } + }; + SyncSelectionProof::from(response.data[0].selection_proof.clone()) + } else { + match duties_service_ref + .validator_store + .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) + .await + { + Ok(proof) => proof, + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently + // removed via the API. + debug!( + log, + "Missing pubkey for sync selection proof"; + "pubkey" => ?pubkey, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + ); + return None; + } + Err(e) => { + warn!( + log, + "Unable to sign selection proof"; + "error" => ?e, + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + ); + return None; + } + } + }; + + match proof.is_aggregator::() { + Ok(true) => { + debug!( + log, + "Validator is sync aggregator"; + "validator_index" => duty.validator_index, + "slot" => proof_slot, + "subnet_id" => %subnet_id, + ); + Some(((proof_slot, *subnet_id), proof)) + } + Ok(false) => None, + Err(e) => { + warn!( + log, + "Error determining is_aggregator"; + "pubkey" => ?duty.pubkey, + "slot" => proof_slot, + "error" => ?e, + ); + None + } } } });