Gloas vc ptc duty (#8338)

Co-Authored-By: shane-moore <skm1790@gmail.com>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>
This commit is contained in:
Shane K Moore
2026-04-16 03:23:18 -07:00
committed by GitHub
parent a9f43f9934
commit 794718e96b
8 changed files with 379 additions and 1 deletions

View File

@@ -197,6 +197,16 @@ pub fn gather_prometheus_metrics<E: EthSpec>(
&[NEXT_EPOCH],
duties_service.attester_count(next_epoch) as i64,
);
set_int_gauge(
&PTC_COUNT,
&[CURRENT_EPOCH],
duties_service.ptc_count(current_epoch) as i64,
);
set_int_gauge(
&PTC_COUNT,
&[NEXT_EPOCH],
duties_service.ptc_count(next_epoch) as i64,
);
}
}

View File

@@ -22,7 +22,12 @@ pub const UPDATE_ATTESTERS_CURRENT_EPOCH: &str = "update_attesters_current_epoch
pub const UPDATE_ATTESTERS_NEXT_EPOCH: &str = "update_attesters_next_epoch";
pub const UPDATE_ATTESTERS_FETCH: &str = "update_attesters_fetch";
pub const UPDATE_ATTESTERS_STORE: &str = "update_attesters_store";
pub const UPDATE_PTC_CURRENT_EPOCH: &str = "update_ptc_current_epoch";
pub const UPDATE_PTC_NEXT_EPOCH: &str = "update_ptc_next_epoch";
pub const UPDATE_PTC_FETCH: &str = "update_ptc_fetch";
pub const UPDATE_PTC_STORE: &str = "update_ptc_store";
pub const ATTESTER_DUTIES_HTTP_POST: &str = "attester_duties_http_post";
pub const PTC_DUTIES_HTTP_POST: &str = "ptc_duties_http_post";
pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get";
pub const VALIDATOR_DUTIES_SYNC_HTTP_POST: &str = "validator_duties_sync_http_post";
pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get";
@@ -162,6 +167,13 @@ pub static ATTESTER_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
&["task"],
)
});
pub static PTC_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"vc_beacon_ptc_count",
"Number of PTC (Payload Timeliness Committee) validators on this host",
&["task"],
)
});
pub static PROPOSAL_CHANGED: LazyLock<Result<IntCounter>> = LazyLock::new(|| {
try_create_int_counter(
"vc_beacon_block_proposal_changed",

View File

@@ -13,7 +13,7 @@ use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use bls::PublicKeyBytes;
use eth2::types::{
AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse,
ProposerData, StateId, ValidatorId,
ProposerData, PtcDuty, StateId, ValidatorId,
};
use futures::{
StreamExt,
@@ -46,6 +46,7 @@ const VALIDATOR_METRICS_MIN_COUNT: usize = 64;
/// The initial request is used to determine if further requests are required, so that it
/// reduces the amount of data that needs to be transferred.
const INITIAL_DUTIES_QUERY_SIZE: usize = 1;
const INITIAL_PTC_DUTIES_QUERY_SIZE: usize = 1;
/// Offsets from the attestation duty slot at which a subscription should be sent.
const ATTESTATION_SUBSCRIPTION_OFFSETS: [u64; 8] = [3, 4, 5, 6, 7, 8, 16, 32];
@@ -83,6 +84,7 @@ const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBS
pub enum Error<T> {
UnableToReadSlotClock,
FailedToDownloadAttesters(#[allow(dead_code)] String),
FailedToDownloadPtc(#[allow(dead_code)] String),
FailedToProduceSelectionProof(#[allow(dead_code)] ValidatorStoreError<T>),
InvalidModulo(#[allow(dead_code)] ArithError),
Arith(#[allow(dead_code)] ArithError),
@@ -283,6 +285,7 @@ type DependentRoot = Hash256;
type AttesterMap = HashMap<PublicKeyBytes, HashMap<Epoch, (DependentRoot, DutyAndProof)>>;
type ProposerMap = HashMap<Epoch, (DependentRoot, Vec<ProposerData>)>;
type PtcMap = HashMap<Epoch, (DependentRoot, Vec<PtcDuty>)>;
pub struct DutiesServiceBuilder<S, T> {
/// Provides the canonical list of locally-managed validators.
@@ -384,6 +387,7 @@ impl<S, T> DutiesServiceBuilder<S, T> {
attesters: Default::default(),
proposers: Default::default(),
sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config),
ptc_duties: Default::default(),
validator_store: self
.validator_store
.ok_or("Cannot build DutiesService without validator_store")?,
@@ -414,6 +418,8 @@ pub struct DutiesService<S, T> {
pub proposers: RwLock<ProposerMap>,
/// Map from validator index to sync committee duties.
pub sync_duties: SyncDutiesMap,
/// Maps an epoch to PTC duties for locally-managed validators.
pub ptc_duties: RwLock<PtcMap>,
/// Provides the canonical list of locally-managed validators.
pub validator_store: Arc<S>,
/// Maps unknown validator pubkeys to the next slot time when a poll should be conducted again.
@@ -472,6 +478,15 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
.count()
}
/// Returns the total number of validators that have PTC duties in the given epoch.
pub fn ptc_count(&self, epoch: Epoch) -> usize {
self.ptc_duties
.read()
.get(&epoch)
.map(|(_, duties)| duties.len())
.unwrap_or(0)
}
/// Returns the total number of validators that are in a doppelganger detection period.
pub fn doppelganger_detecting_count(&self) -> usize {
self.validator_store
@@ -534,6 +549,25 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
self.enable_high_validator_count_metrics
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
}
/// Get PTC duties for a specific slot.
///
/// Returns duties for local validators who have PTC assignments at the given slot.
pub fn get_ptc_duties_for_slot(&self, slot: Slot) -> Vec<PtcDuty> {
let epoch = slot.epoch(S::E::slots_per_epoch());
self.ptc_duties
.read()
.get(&epoch)
.map(|(_, ptc_duties)| {
ptc_duties
.iter()
.filter(|ptc_duty| ptc_duty.slot == slot)
.cloned()
.collect()
})
.unwrap_or_default()
}
}
/// Start the service that periodically polls the beacon node for validator duties. This will start
@@ -662,6 +696,61 @@ pub fn start_update_service<S: ValidatorStore + 'static, T: SlotClock + 'static>
},
"duties_service_sync_committee",
);
// Spawn the task which keeps track of local PTC duties.
// Only start PTC duties service if Gloas fork is scheduled.
if core_duties_service.spec.is_gloas_scheduled() {
let duties_service = core_duties_service.clone();
core_duties_service.executor.spawn(
async move {
loop {
// Check if we've reached the Gloas fork epoch before polling
let Some(current_slot) = duties_service.slot_clock.now() else {
// Unable to read slot clock, sleep and try again
sleep(duties_service.slot_clock.slot_duration()).await;
continue;
};
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let Some(gloas_fork_epoch) = duties_service.spec.gloas_fork_epoch else {
// Gloas fork epoch not configured, should not reach here
break;
};
if current_epoch + 1 < gloas_fork_epoch {
// Wait until the next slot and check again
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
sleep(duration).await;
} else {
sleep(duties_service.slot_clock.slot_duration()).await;
}
continue;
}
if let Err(e) = poll_beacon_ptc_attesters(&duties_service).await {
error!(
error = ?e,
"Failed to poll PTC duties"
);
}
// Wait until the next slot before polling again.
// This doesn't mean that the beacon node will get polled every slot
// as the PTC duties service will return early if it deems it already has
// enough information.
if let Some(duration) = duties_service.slot_clock.duration_to_next_slot() {
sleep(duration).await;
} else {
// Just sleep for one slot if we are unable to read the system clock, this gives
// us an opportunity for the clock to eventually come good.
sleep(duties_service.slot_clock.slot_duration()).await;
continue;
}
}
},
"duties_service_ptc",
);
}
}
/// Iterate through all the voting pubkeys in the `ValidatorStore` and attempt to learn any unknown
@@ -1282,6 +1371,26 @@ fn process_duty_and_proof<S: ValidatorStore>(
}
}
async fn post_validator_duties_ptc<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
validator_indices: &[u64],
) -> Result<DutiesResponse<Vec<PtcDuty>>, Error<S::Error>> {
duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PTC_DUTIES_HTTP_POST],
);
beacon_node
.post_validator_duties_ptc(epoch, validator_indices)
.await
})
.await
.map_err(|e| Error::FailedToDownloadPtc(e.to_string()))
}
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
///
/// Duties are computed in batches each slot. If a re-org is detected then the process will
@@ -1641,6 +1750,209 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
Ok(())
}
/// Query the beacon node for ptc duties for any known validators.
async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
) -> Result<(), Error<S::Error>> {
let current_epoch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PTC_CURRENT_EPOCH],
);
let current_slot = duties_service
.slot_clock
.now()
.ok_or(Error::UnableToReadSlotClock)?;
let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
// Collect *all* pubkeys, even those undergoing doppelganger protection.
let local_pubkeys: HashSet<_> = duties_service
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
let local_indices = {
let mut local_indices = Vec::with_capacity(local_pubkeys.len());
for &pubkey in &local_pubkeys {
if let Some(validator_index) = duties_service.validator_store.validator_index(&pubkey) {
local_indices.push(validator_index)
}
}
local_indices
};
// Poll for current epoch
if let Err(e) = poll_beacon_ptc_attesters_for_epoch(
duties_service,
current_epoch,
&local_indices,
&local_pubkeys,
)
.await
{
error!(
%current_epoch,
request_epoch = %current_epoch,
err = ?e,
"Failed to download PTC duties"
);
}
drop(current_epoch_timer);
let next_epoch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PTC_NEXT_EPOCH],
);
// Poll for next epoch
let next_epoch = current_epoch + 1;
if let Err(e) = poll_beacon_ptc_attesters_for_epoch(
duties_service,
next_epoch,
&local_indices,
&local_pubkeys,
)
.await
{
error!(
%current_epoch,
request_epoch = %next_epoch,
err = ?e,
"Failed to download PTC duties"
);
}
drop(next_epoch_timer);
// Prune old duties.
duties_service
.ptc_duties
.write()
.retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch);
Ok(())
}
/// For the given `local_indices` and `local_pubkeys`, download the PTC duties for the given `epoch` and
/// store them in `duties_service.ptc_duties` using bandwidth optimization.
async fn poll_beacon_ptc_attesters_for_epoch<
S: ValidatorStore + 'static,
T: SlotClock + 'static,
>(
duties_service: &Arc<DutiesService<S, T>>,
epoch: Epoch,
local_indices: &[u64],
local_pubkeys: &HashSet<PublicKeyBytes>,
) -> Result<(), Error<S::Error>> {
// No need to bother the BN if we don't have any validators.
if local_indices.is_empty() {
debug!(
%epoch,
"No validators, not downloading PTC duties"
);
return Ok(());
}
let fetch_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PTC_FETCH],
);
// TODO(gloas) Unlike attester duties which use `get_uninitialized_validators` to detect
// newly-added validators, PTC duties only check dependent_root changes. Validators added
// mid-epoch won't get PTC duties until the next epoch boundary. We should probably fix this.
let initial_indices_to_request =
&local_indices[0..min(INITIAL_PTC_DUTIES_QUERY_SIZE, local_indices.len())];
let response =
post_validator_duties_ptc(duties_service, epoch, initial_indices_to_request).await?;
let dependent_root = response.dependent_root;
// Check if we need to update duties for this epoch and collect validators to update.
// We update if we have no epoch data OR if the dependent_root changed.
let validators_to_update = {
// Avoid holding the read-lock for any longer than required.
let ptc_duties = duties_service.ptc_duties.read();
let needs_update = ptc_duties.get(&epoch).is_none_or(|(prior_root, _duties)| {
// Update if dependent_root changed
*prior_root != dependent_root
});
if needs_update {
local_pubkeys.iter().collect::<Vec<_>>()
} else {
Vec::new()
}
};
if validators_to_update.is_empty() {
// No validators have conflicting (epoch, dependent_root) values for this epoch.
return Ok(());
}
// Make a request for all indices that require updating which we have not already made a request for.
let indices_to_request = validators_to_update
.iter()
.filter_map(|pubkey| duties_service.validator_store.validator_index(pubkey))
.filter(|validator_index| !initial_indices_to_request.contains(validator_index))
.collect::<Vec<_>>();
// Filter the initial duties by their relevance so that we don't hit warnings about
// overwriting duties.
let new_initial_duties = response
.data
.into_iter()
.filter(|duty| validators_to_update.contains(&&duty.pubkey));
let mut new_duties = if !indices_to_request.is_empty() {
post_validator_duties_ptc(duties_service, epoch, indices_to_request.as_slice())
.await?
.data
} else {
vec![]
};
new_duties.extend(new_initial_duties);
drop(fetch_timer);
let _store_timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::UPDATE_PTC_STORE],
);
debug!(
%dependent_root,
num_new_duties = new_duties.len(),
"Downloaded PTC duties"
);
// Update duties - we only reach here if dependent_root changed or epoch is missing
let mut ptc_duties = duties_service.ptc_duties.write();
match ptc_duties.entry(epoch) {
hash_map::Entry::Occupied(mut entry) => {
// Dependent root must have changed, so we do complete replacement.
// We cannot support partial updates for the same dependent_root.
// The beacon node may return incomplete duty lists and we cannot distinguish between "no duties" and
// "duties not included in this response". We could query all local validators in each
// `post_validator_duties_ptc` call regardless of dependent_root changes, but the bandwidth
// cost is likely not justified since PTC assignments are sparse.
let (existing_root, _existing_duties) = entry.get();
debug!(
old_root = %existing_root,
new_root = %dependent_root,
"PTC dependent root changed, replacing all duties"
);
*entry.get_mut() = (dependent_root, new_duties);
}
hash_map::Entry::Vacant(entry) => {
// No existing duties for this epoch
entry.insert((dependent_root, new_duties));
}
}
Ok(())
}
/// Notify the block service if it should produce a block.
async fn notify_block_production_service<S: ValidatorStore>(
current_slot: Slot,

View File

@@ -109,6 +109,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
let total_validators = duties_service.total_validator_count();
let proposing_validators = duties_service.proposer_count(epoch);
let attesting_validators = duties_service.attester_count(epoch);
let ptc_validators = duties_service.ptc_count(epoch);
let doppelganger_detecting_validators = duties_service.doppelganger_detecting_count();
if doppelganger_detecting_validators > 0 {
@@ -126,6 +127,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
} else if total_validators == attesting_validators {
info!(
current_epoch_proposers = proposing_validators,
current_epoch_ptc = ptc_validators,
active_validators = attesting_validators,
total_validators = total_validators,
%epoch,
@@ -135,6 +137,7 @@ pub async fn notify<S: ValidatorStore, T: SlotClock + 'static>(
} else if attesting_validators > 0 {
info!(
current_epoch_proposers = proposing_validators,
current_epoch_ptc = ptc_validators,
active_validators = attesting_validators,
total_validators = total_validators,
%epoch,