From 794718e96b46cfc67c5d653a30e9b2caecd19519 Mon Sep 17 00:00:00 2001 From: Shane K Moore <41407272+shane-moore@users.noreply.github.com> Date: Thu, 16 Apr 2026 03:23:18 -0700 Subject: [PATCH] Gloas vc ptc duty (#8338) Co-Authored-By: shane-moore Co-Authored-By: Eitan Seri- Levi --- beacon_node/http_api/tests/tests.rs | 1 + common/eth2/src/lib.rs | 30 ++ common/eth2/src/types.rs | 8 + testing/simulator/src/checks.rs | 2 + validator_client/http_metrics/src/lib.rs | 10 + validator_client/validator_metrics/src/lib.rs | 12 + .../validator_services/src/duties_service.rs | 314 +++++++++++++++++- .../src/notifier_service.rs | 3 + 8 files changed, 379 insertions(+), 1 deletion(-) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index b28816302c..60e65e0049 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3473,6 +3473,7 @@ impl ApiTester { self } + // TODO(EIP-7732): Add test_get_validator_duties_ptc function to test PTC duties endpoint pub async fn test_get_validator_duties_proposer_v2(self) -> Self { let current_epoch = self.chain.epoch().unwrap(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index d5140a3878..87b4125c0e 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -73,6 +73,8 @@ const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only +// TODO(EIP-7732): Determine what this quotient should be +const HTTP_PTC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; @@ -93,6 +95,7 @@ pub struct Timeouts { pub sync_committee_contribution: Duration, pub sync_duties: Duration, pub sync_aggregators: Duration, + pub ptc_duties: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, pub get_deposit_snapshot: Duration, @@ -113,6 +116,7 @@ impl Timeouts { sync_committee_contribution: timeout, sync_duties: timeout, sync_aggregators: timeout, + ptc_duties: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, get_deposit_snapshot: timeout, @@ -135,6 +139,7 @@ impl Timeouts { / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: base_timeout / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, sync_aggregators: base_timeout / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT, + ptc_duties: base_timeout / HTTP_PTC_DUTIES_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: base_timeout / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT, @@ -3274,4 +3279,29 @@ impl BeaconNodeHttpClient { self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators) .await } + + // TODO(EIP-7732): Create corresponding beacon node response endpoint per spec + // https://github.com/ethereum/beacon-APIs/pull/552 + /// `POST validator/duties/ptc/{epoch}` + pub async fn post_validator_duties_ptc( + &self, + epoch: Epoch, + indices: &[u64], + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("duties") + .push("ptc") + .push(&epoch.to_string()); + + self.post_with_timeout_and_response( + path, + &ValidatorIndexDataRef(indices), + self.timeouts.ptc_duties, + ) + .await + } } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index e85565c580..dd16f46c55 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -778,6 +778,14 @@ pub enum GraffitiPolicy { AppendClientVersions, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PtcDuty { + pub pubkey: PublicKeyBytes, + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + pub slot: Slot, +} + #[derive(Clone, Deserialize)] pub struct ValidatorBlocksQuery { pub randao_reveal: SignatureBytes, diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index de202e5812..a2e9ae96b2 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -220,6 +220,8 @@ pub async fn verify_full_sync_aggregates_up_to( Ok(()) } +// TODO(EIP-7732): Add verify_ptc_duties_executed function to verify that PTC duties are being fetched and executed correctly when Gloas fork is enabled + /// Verify that the first merged PoS block got finalized. pub async fn verify_transition_block_finalized( network: LocalNetwork, diff --git a/validator_client/http_metrics/src/lib.rs b/validator_client/http_metrics/src/lib.rs index 70b447a493..a6624b4f44 100644 --- a/validator_client/http_metrics/src/lib.rs +++ b/validator_client/http_metrics/src/lib.rs @@ -197,6 +197,16 @@ pub fn gather_prometheus_metrics( &[NEXT_EPOCH], duties_service.attester_count(next_epoch) as i64, ); + set_int_gauge( + &PTC_COUNT, + &[CURRENT_EPOCH], + duties_service.ptc_count(current_epoch) as i64, + ); + set_int_gauge( + &PTC_COUNT, + &[NEXT_EPOCH], + duties_service.ptc_count(next_epoch) as i64, + ); } } diff --git a/validator_client/validator_metrics/src/lib.rs b/validator_client/validator_metrics/src/lib.rs index 060d8a4edd..46a86381f9 100644 --- a/validator_client/validator_metrics/src/lib.rs +++ b/validator_client/validator_metrics/src/lib.rs @@ -22,7 +22,12 @@ pub const UPDATE_ATTESTERS_CURRENT_EPOCH: &str = "update_attesters_current_epoch pub const UPDATE_ATTESTERS_NEXT_EPOCH: &str = "update_attesters_next_epoch"; pub const UPDATE_ATTESTERS_FETCH: &str = "update_attesters_fetch"; pub const UPDATE_ATTESTERS_STORE: &str = "update_attesters_store"; +pub const UPDATE_PTC_CURRENT_EPOCH: &str = "update_ptc_current_epoch"; +pub const UPDATE_PTC_NEXT_EPOCH: &str = "update_ptc_next_epoch"; +pub const UPDATE_PTC_FETCH: &str = "update_ptc_fetch"; +pub const UPDATE_PTC_STORE: &str = "update_ptc_store"; pub const ATTESTER_DUTIES_HTTP_POST: &str = "attester_duties_http_post"; +pub const PTC_DUTIES_HTTP_POST: &str = "ptc_duties_http_post"; pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get"; pub const VALIDATOR_DUTIES_SYNC_HTTP_POST: &str = "validator_duties_sync_http_post"; pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get"; @@ -162,6 +167,13 @@ pub static ATTESTER_COUNT: LazyLock> = LazyLock::new(|| { &["task"], ) }); +pub static PTC_COUNT: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "vc_beacon_ptc_count", + "Number of PTC (Payload Timeliness Committee) validators on this host", + &["task"], + ) +}); pub static PROPOSAL_CHANGED: LazyLock> = LazyLock::new(|| { try_create_int_counter( "vc_beacon_block_proposal_changed", diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index f467db92a1..9f51694f34 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -13,7 +13,7 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use bls::PublicKeyBytes; use eth2::types::{ AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse, - ProposerData, StateId, ValidatorId, + ProposerData, PtcDuty, StateId, ValidatorId, }; use futures::{ StreamExt, @@ -46,6 +46,7 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64; /// The initial request is used to determine if further requests are required, so that it /// reduces the amount of data that needs to be transferred. const INITIAL_DUTIES_QUERY_SIZE: usize = 1; +const INITIAL_PTC_DUTIES_QUERY_SIZE: usize = 1; /// Offsets from the attestation duty slot at which a subscription should be sent. const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32]; @@ -83,6 +84,7 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS pub enum Error { UnableToReadSlotClock, FailedToDownloadAttesters(#[allow(dead_code)] String), + FailedToDownloadPtc(#[allow(dead_code)] String), FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError), InvalidModulo(#[allow(dead_code)] ArithError), Arith(#[allow(dead_code)] ArithError), @@ -283,6 +285,7 @@ type DependentRoot = Hash256; type AttesterMap = HashMap>; type ProposerMap = HashMap)>; +type PtcMap = HashMap)>; pub struct DutiesServiceBuilder { /// Provides the canonical list of locally-managed validators. @@ -384,6 +387,7 @@ impl DutiesServiceBuilder { attesters: Default::default(), proposers: Default::default(), sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config), + ptc_duties: Default::default(), validator_store: self .validator_store .ok_or("Cannot build DutiesService without validator_store")?, @@ -414,6 +418,8 @@ pub struct DutiesService { pub proposers: RwLock, /// Map from validator index to sync committee duties. pub sync_duties: SyncDutiesMap, + /// Maps an epoch to PTC duties for locally-managed validators. + pub ptc_duties: RwLock, /// Provides the canonical list of locally-managed validators. pub validator_store: Arc, /// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again. @@ -472,6 +478,15 @@ impl DutiesService { .count() } + /// Returns the total number of validators that have PTC duties in the given epoch. + pub fn ptc_count(&self, epoch: Epoch) -> usize { + self.ptc_duties + .read() + .get(&epoch) + .map(|(_, duties)| duties.len()) + .unwrap_or(0) + } + /// Returns the total number of validators that are in a doppelganger detection period. pub fn doppelganger_detecting_count(&self) -> usize { self.validator_store @@ -534,6 +549,25 @@ impl DutiesService { self.enable_high_validator_count_metrics || self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT } + + /// Get PTC duties for a specific slot. + /// + /// Returns duties for local validators who have PTC assignments at the given slot. + pub fn get_ptc_duties_for_slot(&self, slot: Slot) -> Vec { + let epoch = slot.epoch(S::E::slots_per_epoch()); + + self.ptc_duties + .read() + .get(&epoch) + .map(|(_, ptc_duties)| { + ptc_duties + .iter() + .filter(|ptc_duty| ptc_duty.slot == slot) + .cloned() + .collect() + }) + .unwrap_or_default() + } } /// Start the service that periodically polls the beacon node for validator duties. This will start @@ -662,6 +696,61 @@ pub fn start_update_service }, "duties_service_sync_committee", ); + + // Spawn the task which keeps track of local PTC duties. + // Only start PTC duties service if Gloas fork is scheduled. + if core_duties_service.spec.is_gloas_scheduled() { + let duties_service = core_duties_service.clone(); + core_duties_service.executor.spawn( + async move { + loop { + // Check if we've reached the Gloas fork epoch before polling + let Some(current_slot) = duties_service.slot_clock.now() else { + // Unable to read slot clock, sleep and try again + sleep(duties_service.slot_clock.slot_duration()).await; + continue; + }; + + let current_epoch = current_slot.epoch(S::E::slots_per_epoch()); + let Some(gloas_fork_epoch) = duties_service.spec.gloas_fork_epoch else { + // Gloas fork epoch not configured, should not reach here + break; + }; + + if current_epoch + 1 < gloas_fork_epoch { + // Wait until the next slot and check again + if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() { + sleep(duration).await; + } else { + sleep(duties_service.slot_clock.slot_duration()).await; + } + continue; + } + + if let Err(e) = poll_beacon_ptc_attesters(&duties_service).await { + error!( + error = ?e, + "Failed to poll PTC duties" + ); + } + + // Wait until the next slot before polling again. + // This doesn't mean that the beacon node will get polled every slot + // as the PTC duties service will return early if it deems it already has + // enough information. + if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() { + sleep(duration).await; + } else { + // Just sleep for one slot if we are unable to read the system clock, this gives + // us an opportunity for the clock to eventually come good. + sleep(duties_service.slot_clock.slot_duration()).await; + continue; + } + } + }, + "duties_service_ptc", + ); + } } /// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown @@ -1282,6 +1371,26 @@ fn process_duty_and_proof( } } +async fn post_validator_duties_ptc( + duties_service: &Arc>, + epoch: Epoch, + validator_indices: &[u64], +) -> Result>, Error> { + duties_service + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::PTC_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_ptc(epoch, validator_indices) + .await + }) + .await + .map_err(|e| Error::FailedToDownloadPtc(e.to_string())) +} + /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will @@ -1641,6 +1750,209 @@ async fn poll_beacon_proposers( Ok(()) } +/// Query the beacon node for ptc duties for any known validators. +async fn poll_beacon_ptc_attesters( + duties_service: &Arc>, +) -> Result<(), Error> { + let current_epoch_timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::UPDATE_PTC_CURRENT_EPOCH], + ); + + let current_slot = duties_service + .slot_clock + .now() + .ok_or(Error::UnableToReadSlotClock)?; + let current_epoch = current_slot.epoch(S::E::slots_per_epoch()); + + // Collect *all* pubkeys, even those undergoing doppelganger protection. + let local_pubkeys: HashSet<_> = duties_service + .validator_store + .voting_pubkeys(DoppelgangerStatus::ignored); + + let local_indices = { + let mut local_indices = Vec::with_capacity(local_pubkeys.len()); + + for &pubkey in &local_pubkeys { + if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) { + local_indices.push(validator_index) + } + } + local_indices + }; + + // Poll for current epoch + if let Err(e) = poll_beacon_ptc_attesters_for_epoch( + duties_service, + current_epoch, + &local_indices, + &local_pubkeys, + ) + .await + { + error!( + %current_epoch, + request_epoch = %current_epoch, + err = ?e, + "Failed to download PTC duties" + ); + } + drop(current_epoch_timer); + let next_epoch_timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::UPDATE_PTC_NEXT_EPOCH], + ); + + // Poll for next epoch + let next_epoch = current_epoch + 1; + if let Err(e) = poll_beacon_ptc_attesters_for_epoch( + duties_service, + next_epoch, + &local_indices, + &local_pubkeys, + ) + .await + { + error!( + %current_epoch, + request_epoch = %next_epoch, + err = ?e, + "Failed to download PTC duties" + ); + } + drop(next_epoch_timer); + + // Prune old duties. + duties_service + .ptc_duties + .write() + .retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch); + + Ok(()) +} + +/// For the given `local_indices` and `local_pubkeys`, download the PTC duties for the given `epoch` and +/// store them in `duties_service.ptc_duties` using bandwidth optimization. +async fn poll_beacon_ptc_attesters_for_epoch< + S: ValidatorStore + 'static, + T: SlotClock + 'static, +>( + duties_service: &Arc>, + epoch: Epoch, + local_indices: &[u64], + local_pubkeys: &HashSet, +) -> Result<(), Error> { + // No need to bother the BN if we don't have any validators. + if local_indices.is_empty() { + debug!( + %epoch, + "No validators, not downloading PTC duties" + ); + return Ok(()); + } + + let fetch_timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::UPDATE_PTC_FETCH], + ); + + // TODO(gloas) Unlike attester duties which use `get_uninitialized_validators` to detect + // newly-added validators, PTC duties only check dependent_root changes. Validators added + // mid-epoch won't get PTC duties until the next epoch boundary. We should probably fix this. + let initial_indices_to_request = + &local_indices[0..min(INITIAL_PTC_DUTIES_QUERY_SIZE, local_indices.len())]; + + let response = + post_validator_duties_ptc(duties_service, epoch, initial_indices_to_request).await?; + let dependent_root = response.dependent_root; + + // Check if we need to update duties for this epoch and collect validators to update. + // We update if we have no epoch data OR if the dependent_root changed. + let validators_to_update = { + // Avoid holding the read-lock for any longer than required. + let ptc_duties = duties_service.ptc_duties.read(); + let needs_update = ptc_duties.get(&epoch).is_none_or(|(prior_root, _duties)| { + // Update if dependent_root changed + *prior_root != dependent_root + }); + + if needs_update { + local_pubkeys.iter().collect::>() + } else { + Vec::new() + } + }; + + if validators_to_update.is_empty() { + // No validators have conflicting (epoch, dependent_root) values for this epoch. + return Ok(()); + } + + // Make a request for all indices that require updating which we have not already made a request for. + let indices_to_request = validators_to_update + .iter() + .filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey)) + .filter(|validator_index| !initial_indices_to_request.contains(validator_index)) + .collect::>(); + + // Filter the initial duties by their relevance so that we don't hit warnings about + // overwriting duties. + let new_initial_duties = response + .data + .into_iter() + .filter(|duty| validators_to_update.contains(&&duty.pubkey)); + + let mut new_duties = if !indices_to_request.is_empty() { + post_validator_duties_ptc(duties_service, epoch, indices_to_request.as_slice()) + .await? + .data + } else { + vec![] + }; + new_duties.extend(new_initial_duties); + + drop(fetch_timer); + + let _store_timer = validator_metrics::start_timer_vec( + &validator_metrics::DUTIES_SERVICE_TIMES, + &[validator_metrics::UPDATE_PTC_STORE], + ); + + debug!( + %dependent_root, + num_new_duties = new_duties.len(), + "Downloaded PTC duties" + ); + + // Update duties - we only reach here if dependent_root changed or epoch is missing + let mut ptc_duties = duties_service.ptc_duties.write(); + + match ptc_duties.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // Dependent root must have changed, so we do complete replacement. + // We cannot support partial updates for the same dependent_root. + // The beacon node may return incomplete duty lists and we cannot distinguish between "no duties" and + // "duties not included in this response". We could query all local validators in each + // `post_validator_duties_ptc` call regardless of dependent_root changes, but the bandwidth + // cost is likely not justified since PTC assignments are sparse. + let (existing_root, _existing_duties) = entry.get(); + debug!( + old_root = %existing_root, + new_root = %dependent_root, + "PTC dependent root changed, replacing all duties" + ); + + *entry.get_mut() = (dependent_root, new_duties); + } + hash_map::Entry::Vacant(entry) => { + // No existing duties for this epoch + entry.insert((dependent_root, new_duties)); + } + } + + Ok(()) +} + /// Notify the block service if it should produce a block. async fn notify_block_production_service( current_slot: Slot, diff --git a/validator_client/validator_services/src/notifier_service.rs b/validator_client/validator_services/src/notifier_service.rs index a8f73490c7..e6e7a67864 100644 --- a/validator_client/validator_services/src/notifier_service.rs +++ b/validator_client/validator_services/src/notifier_service.rs @@ -109,6 +109,7 @@ pub async fn notify( let total_validators = duties_service.total_validator_count(); let proposing_validators = duties_service.proposer_count(epoch); let attesting_validators = duties_service.attester_count(epoch); + let ptc_validators = duties_service.ptc_count(epoch); let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count(); if doppelganger_detecting_validators > 0 { @@ -126,6 +127,7 @@ pub async fn notify( } else if total_validators == attesting_validators { info!( current_epoch_proposers = proposing_validators, + current_epoch_ptc = ptc_validators, active_validators = attesting_validators, total_validators = total_validators, %epoch, @@ -135,6 +137,7 @@ pub async fn notify( } else if attesting_validators > 0 { info!( current_epoch_proposers = proposing_validators, + current_epoch_ptc = ptc_validators, active_validators = attesting_validators, total_validators = total_validators, %epoch,