Files
lighthouse/validator_client/validator_services/src/preparation_service.rs
Eitan Seri-Levi 3ecf964385 Replace INTERVALS_PER_SLOT with explicit slot component times (#7944)
https://github.com/ethereum/consensus-specs/pull/4476


  


Co-Authored-By: Barnabas Busa <barnabas.busa@ethereum.org>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
2026-02-02 05:58:42 +00:00

475 lines
17 KiB
Rust

use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
use bls::PublicKeyBytes;
use parking_lot::RwLock;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::Deref;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use task_executor::TaskExecutor;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use types::{
Address, ChainSpec, EthSpec, ProposerPreparationData, SignedValidatorRegistrationData,
ValidatorRegistrationData,
};
use validator_store::{
DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, ValidatorStore,
};
/// Number of epochs before the Bellatrix hard fork to begin posting proposer preparations.
const PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS: u64 = 2;
/// Number of epochs to wait before re-submitting validator registration.
const EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION: u64 = 1;
/// Builds an `PreparationService`.
#[derive(Default)]
pub struct PreparationServiceBuilder<S: ValidatorStore, T: SlotClock + 'static> {
validator_store: Option<Arc<S>>,
slot_clock: Option<T>,
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
executor: Option<TaskExecutor>,
builder_registration_timestamp_override: Option<u64>,
validator_registration_batch_size: Option<usize>,
}
impl<S: ValidatorStore, T: SlotClock + 'static> PreparationServiceBuilder<S, T> {
pub fn new() -> Self {
Self {
validator_store: None,
slot_clock: None,
beacon_nodes: None,
executor: None,
builder_registration_timestamp_override: None,
validator_registration_batch_size: None,
}
}
pub fn validator_store(mut self, store: Arc<S>) -> Self {
self.validator_store = Some(store);
self
}
pub fn slot_clock(mut self, slot_clock: T) -> Self {
self.slot_clock = Some(slot_clock);
self
}
pub fn beacon_nodes(mut self, beacon_nodes: Arc<BeaconNodeFallback<T>>) -> Self {
self.beacon_nodes = Some(beacon_nodes);
self
}
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
pub fn builder_registration_timestamp_override(
mut self,
builder_registration_timestamp_override: Option<u64>,
) -> Self {
self.builder_registration_timestamp_override = builder_registration_timestamp_override;
self
}
pub fn validator_registration_batch_size(
mut self,
validator_registration_batch_size: usize,
) -> Self {
self.validator_registration_batch_size = Some(validator_registration_batch_size);
self
}
pub fn build(self) -> Result<PreparationService<S, T>, String> {
Ok(PreparationService {
inner: Arc::new(Inner {
validator_store: self
.validator_store
.ok_or("Cannot build PreparationService without validator_store")?,
slot_clock: self
.slot_clock
.ok_or("Cannot build PreparationService without slot_clock")?,
beacon_nodes: self
.beacon_nodes
.ok_or("Cannot build PreparationService without beacon_nodes")?,
executor: self
.executor
.ok_or("Cannot build PreparationService without executor")?,
builder_registration_timestamp_override: self
.builder_registration_timestamp_override,
validator_registration_batch_size: self.validator_registration_batch_size.ok_or(
"Cannot build PreparationService without validator_registration_batch_size",
)?,
validator_registration_cache: RwLock::new(HashMap::new()),
}),
})
}
}
/// Helper to minimise `Arc` usage.
pub struct Inner<S, T> {
validator_store: Arc<S>,
slot_clock: T,
beacon_nodes: Arc<BeaconNodeFallback<T>>,
executor: TaskExecutor,
builder_registration_timestamp_override: Option<u64>,
// Used to track unpublished validator registration changes.
validator_registration_cache:
RwLock<HashMap<ValidatorRegistrationKey, SignedValidatorRegistrationData>>,
validator_registration_batch_size: usize,
}
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub struct ValidatorRegistrationKey {
pub fee_recipient: Address,
pub gas_limit: u64,
pub pubkey: PublicKeyBytes,
}
impl From<ValidatorRegistrationData> for ValidatorRegistrationKey {
fn from(data: ValidatorRegistrationData) -> Self {
let ValidatorRegistrationData {
fee_recipient,
gas_limit,
timestamp: _,
pubkey,
} = data;
Self {
fee_recipient,
gas_limit,
pubkey,
}
}
}
/// Attempts to produce proposer preparations for all known validators at the beginning of each epoch.
pub struct PreparationService<S, T> {
inner: Arc<Inner<S, T>>,
}
impl<S, T> Clone for PreparationService<S, T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S, T> Deref for PreparationService<S, T> {
type Target = Inner<S, T>;
fn deref(&self) -> &Self::Target {
self.inner.deref()
}
}
impl<S: ValidatorStore + 'static, T: SlotClock + 'static> PreparationService<S, T> {
pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> {
self.clone().start_validator_registration_service(spec)?;
self.start_proposer_prepare_service(spec)
}
/// Starts the service which periodically produces proposer preparations.
pub fn start_proposer_prepare_service(self, spec: &ChainSpec) -> Result<(), String> {
let slot_duration = spec.get_slot_duration();
info!("Proposer preparation service started");
let executor = self.executor.clone();
let spec = spec.clone();
let interval_fut = async move {
loop {
if self.should_publish_at_current_slot(&spec) {
// Poll the endpoint immediately to ensure fee recipients are received.
self.prepare_proposers_and_publish(&spec)
.await
.map_err(|e| {
error!(
error = ?e,
"Error during proposer preparation"
)
})
.unwrap_or(());
}
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot).await;
} else {
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
}
};
executor.spawn(interval_fut, "preparation_service");
Ok(())
}
/// Starts the service which periodically sends connected beacon nodes validator registration information.
pub fn start_validator_registration_service(self, spec: &ChainSpec) -> Result<(), String> {
info!("Validator registration service started");
let spec = spec.clone();
let slot_duration = spec.get_slot_duration();
let executor = self.executor.clone();
let validator_registration_fut = async move {
loop {
// Poll the endpoint immediately to ensure fee recipients are received.
if let Err(e) = self.register_validators().await {
error!(error = ?e, "Error during validator registration");
}
// Wait one slot if the register validator request fails or if we should not publish at the current slot.
if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() {
sleep(duration_to_next_slot).await;
} else {
error!("Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
}
};
executor.spawn(validator_registration_fut, "validator_registration_service");
Ok(())
}
/// Return `true` if the current slot is close to or past the Bellatrix fork epoch.
///
/// This avoids spamming the BN with preparations before the Bellatrix fork epoch, which may
/// cause errors if it doesn't support the preparation API.
fn should_publish_at_current_slot(&self, spec: &ChainSpec) -> bool {
let current_epoch = self.slot_clock.now().map_or(S::E::genesis_epoch(), |slot| {
slot.epoch(S::E::slots_per_epoch())
});
spec.bellatrix_fork_epoch.is_some_and(|fork_epoch| {
current_epoch + PROPOSER_PREPARATION_LOOKAHEAD_EPOCHS >= fork_epoch
})
}
/// Prepare proposer preparations and send to beacon node
async fn prepare_proposers_and_publish(&self, spec: &ChainSpec) -> Result<(), String> {
let preparation_data = self.collect_preparation_data(spec);
if !preparation_data.is_empty() {
self.publish_preparation_data(preparation_data).await?;
}
Ok(())
}
fn collect_preparation_data(&self, spec: &ChainSpec) -> Vec<ProposerPreparationData> {
self.collect_proposal_data(|pubkey, proposal_data| {
if let Some(fee_recipient) = proposal_data.fee_recipient {
Some(ProposerPreparationData {
// Ignore fee recipients for keys without indices, they are inactive.
validator_index: proposal_data.validator_index?,
fee_recipient,
})
} else {
if spec.bellatrix_fork_epoch.is_some() {
error!(
msg = "update validator_definitions.yml",
?pubkey,
"Validator is missing fee recipient"
);
}
None
}
})
}
fn collect_validator_registration_keys(&self) -> Vec<ValidatorRegistrationKey> {
self.collect_proposal_data(|pubkey, proposal_data| {
// Ignore fee recipients for keys without indices, they are inactive.
proposal_data.validator_index?;
// We don't log for missing fee recipients here because this will be logged more
// frequently in `collect_preparation_data`.
proposal_data.fee_recipient.and_then(|fee_recipient| {
proposal_data
.builder_proposals
.then_some(ValidatorRegistrationKey {
fee_recipient,
gas_limit: proposal_data.gas_limit,
pubkey,
})
})
})
}
fn collect_proposal_data<G, U>(&self, map_fn: G) -> Vec<U>
where
G: Fn(PublicKeyBytes, ProposalData) -> Option<U>,
{
let all_pubkeys: Vec<_> = self
.validator_store
.voting_pubkeys(DoppelgangerStatus::ignored);
all_pubkeys
.into_iter()
.filter_map(|pubkey| {
let proposal_data = self.validator_store.proposal_data(&pubkey)?;
map_fn(pubkey, proposal_data)
})
.collect()
}
async fn publish_preparation_data(
&self,
preparation_data: Vec<ProposerPreparationData>,
) -> Result<(), String> {
// Post the proposer preparations to the BN.
let preparation_data_len = preparation_data.len();
let preparation_entries = preparation_data.as_slice();
match self
.beacon_nodes
.request(ApiTopic::Subscriptions, |beacon_node| async move {
beacon_node
.post_validator_prepare_beacon_proposer(preparation_entries)
.await
})
.await
{
Ok(()) => debug!(
count = preparation_data_len,
"Published proposer preparation"
),
Err(e) => error!(
error = %e,
"Unable to publish proposer preparation to all beacon nodes"
),
}
Ok(())
}
/// Register validators with builders, used in the blinded block proposal flow.
async fn register_validators(&self) -> Result<(), String> {
let registration_keys = self.collect_validator_registration_keys();
let mut changed_keys = vec![];
// Need to scope this so the read lock is not held across an await point (I don't know why
// but the explicit `drop` is not enough).
{
let guard = self.validator_registration_cache.read();
for key in registration_keys.iter() {
if !guard.contains_key(key) {
changed_keys.push(key.clone());
}
}
drop(guard);
}
// Check if any have changed or it's been `EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION`.
if let Some(slot) = self.slot_clock.now() {
if slot % (S::E::slots_per_epoch() * EPOCHS_PER_VALIDATOR_REGISTRATION_SUBMISSION) == 0
{
self.publish_validator_registration_data(registration_keys)
.await?;
} else if !changed_keys.is_empty() {
self.publish_validator_registration_data(changed_keys)
.await?;
}
}
Ok(())
}
async fn publish_validator_registration_data(
&self,
registration_keys: Vec<ValidatorRegistrationKey>,
) -> Result<(), String> {
let registration_data_len = registration_keys.len();
let mut signed = Vec::with_capacity(registration_data_len);
for key in registration_keys {
let cached_registration_opt =
self.validator_registration_cache.read().get(&key).cloned();
let signed_data = if let Some(signed_data) = cached_registration_opt {
signed_data
} else {
let timestamp =
if let Some(timestamp) = self.builder_registration_timestamp_override {
timestamp
} else {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| format!("{e:?}"))?
.as_secs()
};
let ValidatorRegistrationKey {
fee_recipient,
gas_limit,
pubkey,
} = key.clone();
match self
.validator_store
.sign_validator_registration_data(ValidatorRegistrationData {
fee_recipient,
gas_limit,
timestamp,
pubkey,
})
.await
{
Ok(data) => data,
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
debug!(?pubkey, "Missing pubkey for registration data");
continue;
}
Err(e) => {
error!(
error = ?e,
?pubkey,
"Unable to sign validator registration data"
);
continue;
}
}
};
signed.push(signed_data);
}
if !signed.is_empty() {
for batch in signed.chunks(self.validator_registration_batch_size) {
match self
.beacon_nodes
.broadcast(|beacon_node| async move {
beacon_node.post_validator_register_validator(batch).await
})
.await
{
Ok(()) => {
info!(
count = batch.len(),
"Published validator registrations to the builder network"
);
let mut guard = self.validator_registration_cache.write();
for signed_data in batch {
guard.insert(
ValidatorRegistrationKey::from(signed_data.message.clone()),
signed_data.clone(),
);
}
}
Err(e) => warn!(
error = %e,
"Unable to publish validator registrations to the builder network"
),
}
}
}
Ok(())
}
}