Merge branch 'unstable' into separate-vc-metrics

This commit is contained in:
Daniel Knopik
2025-05-08 23:12:13 +02:00
85 changed files with 2842 additions and 2259 deletions

View File

@@ -219,6 +219,7 @@ pub struct ValidatorClient {
#[clap(
long,
requires = "http",
value_name = "PORT",
default_value_t = 5062,
help = "Set the listen TCP port for the RESTful HTTP API server.",
@@ -387,7 +388,7 @@ pub struct ValidatorClient {
#[clap(
long,
value_name = "INTEGER",
default_value_t = 30_000_000,
default_value_t = 36_000_000,
requires = "builder_proposals",
help = "The gas limit to be used in all builder proposals for all validators managed \
by this validator client. Note this will not necessarily be used if the gas limit \

View File

@@ -10,6 +10,7 @@ use directory::{
use eth2::types::Graffiti;
use graffiti_file::GraffitiFile;
use initialized_validators::Config as InitializedValidatorsConfig;
use lighthouse_validator_store::Config as ValidatorStoreConfig;
use sensitive_url::SensitiveUrl;
use serde::{Deserialize, Serialize};
use std::fs;
@@ -20,7 +21,6 @@ use tracing::{info, warn};
use types::GRAFFITI_BYTES_LEN;
use validator_http_api::{self, PK_FILENAME};
use validator_http_metrics;
use validator_store::Config as ValidatorStoreConfig;
pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/";

View File

@@ -18,13 +18,13 @@ use doppelganger_service::DoppelgangerService;
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts};
use initialized_validators::Error::UnableToOpenVotingKeystore;
use lighthouse_validator_store::LighthouseValidatorStore;
use parking_lot::RwLock;
use reqwest::Certificate;
use slot_clock::SlotClock;
use slot_clock::SystemTimeSlotClock;
use std::fs::File;
use std::io::Read;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -40,13 +40,12 @@ use validator_services::notifier_service::spawn_notifier;
use validator_services::{
attestation_service::{AttestationService, AttestationServiceBuilder},
block_service::{BlockService, BlockServiceBuilder},
duties_service::{self, DutiesService},
duties_service::{self, DutiesService, DutiesServiceBuilder},
latency_service,
preparation_service::{PreparationService, PreparationServiceBuilder},
sync::SyncDutiesMap,
sync_committee_service::SyncCommitteeService,
};
use validator_store::ValidatorStore;
use validator_store::ValidatorStore as ValidatorStoreTrait;
/// The interval between attempts to contact the beacon node during startup.
const RETRY_DELAY: Duration = Duration::from_secs(2);
@@ -68,23 +67,26 @@ const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4;
const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4;
const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4;
const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger";
type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;
#[derive(Clone)]
pub struct ProductionValidatorClient<E: EthSpec> {
context: RuntimeContext<E>,
duties_service: Arc<DutiesService<SystemTimeSlotClock, E>>,
block_service: BlockService<SystemTimeSlotClock, E>,
attestation_service: AttestationService<SystemTimeSlotClock, E>,
sync_committee_service: SyncCommitteeService<SystemTimeSlotClock, E>,
duties_service: Arc<DutiesService<ValidatorStore<E>, SystemTimeSlotClock>>,
block_service: BlockService<ValidatorStore<E>, SystemTimeSlotClock>,
attestation_service: AttestationService<ValidatorStore<E>, SystemTimeSlotClock>,
sync_committee_service: SyncCommitteeService<ValidatorStore<E>, SystemTimeSlotClock>,
doppelganger_service: Option<Arc<DoppelgangerService>>,
preparation_service: PreparationService<SystemTimeSlotClock, E>,
validator_store: Arc<ValidatorStore<SystemTimeSlotClock, E>>,
preparation_service: PreparationService<ValidatorStore<E>, SystemTimeSlotClock>,
validator_store: Arc<ValidatorStore<E>>,
slot_clock: SystemTimeSlotClock,
http_api_listen_addr: Option<SocketAddr>,
config: Config,
beacon_nodes: Arc<BeaconNodeFallback<SystemTimeSlotClock, E>>,
beacon_nodes: Arc<BeaconNodeFallback<SystemTimeSlotClock>>,
genesis_time: u64,
}
@@ -305,6 +307,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT,
get_deposit_snapshot: slot_duration / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT,
get_validator_block: slot_duration / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT,
default: slot_duration / HTTP_DEFAULT_TIMEOUT_QUOTIENT,
}
} else {
Timeouts::set_all(slot_duration.saturating_mul(config.long_timeouts_multiplier))
@@ -366,14 +369,14 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Initialize the number of connected, avaliable beacon nodes to 0.
set_gauge(&validator_metrics::AVAILABLE_BEACON_NODES_COUNT, 0);
let mut beacon_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
let mut beacon_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
context.eth2_config.spec.clone(),
);
let mut proposer_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new(
let mut proposer_nodes: BeaconNodeFallback<_> = BeaconNodeFallback::new(
proposer_candidates,
config.beacon_node_fallback,
config.broadcast_topics.clone(),
@@ -382,7 +385,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
// Perform some potentially long-running initialization tasks.
let (genesis_time, genesis_validators_root) = tokio::select! {
tuple = init_from_beacon_node(&beacon_nodes, &proposer_nodes) => tuple?,
tuple = init_from_beacon_node::<E>(&beacon_nodes, &proposer_nodes) => tuple?,
() = context.executor.exit() => return Err("Shutting down".to_string())
};
@@ -401,10 +404,10 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
proposer_nodes.set_slot_clock(slot_clock.clone());
let beacon_nodes = Arc::new(beacon_nodes);
start_fallback_updater_service(context.clone(), beacon_nodes.clone())?;
start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?;
let proposer_nodes = Arc::new(proposer_nodes);
start_fallback_updater_service(context.clone(), proposer_nodes.clone())?;
start_fallback_updater_service::<_, E>(context.executor.clone(), proposer_nodes.clone())?;
let doppelganger_service = if config.enable_doppelganger_protection {
Some(Arc::new(DoppelgangerService::default()))
@@ -412,7 +415,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
None
};
let validator_store = Arc::new(ValidatorStore::new(
let validator_store = Arc::new(LighthouseValidatorStore::new(
validators,
slashing_protection,
genesis_validators_root,
@@ -438,21 +441,18 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true);
}
let duties_context = context.service_context("duties".into());
let duties_service = Arc::new(DutiesService {
attesters: <_>::default(),
proposers: <_>::default(),
sync_duties: SyncDutiesMap::new(config.distributed),
slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(),
unknown_validator_next_poll_slots: <_>::default(),
spec: context.eth2_config.spec.clone(),
context: duties_context,
enable_high_validator_count_metrics: config.enable_high_validator_count_metrics,
distributed: config.distributed,
disable_attesting: config.disable_attesting,
});
let duties_service = Arc::new(
DutiesServiceBuilder::new()
.slot_clock(slot_clock.clone())
.beacon_nodes(beacon_nodes.clone())
.validator_store(validator_store.clone())
.spec(context.eth2_config.spec.clone())
.executor(context.executor.clone())
.enable_high_validator_count_metrics(config.enable_high_validator_count_metrics)
.distributed(config.distributed)
.disable_attesting(config.disable_attesting)
.build()?,
);
// Update the metrics server.
if let Some(ctx) = &validator_metrics_ctx {
@@ -464,7 +464,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("block".into()))
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.graffiti(config.graffiti)
.graffiti_file(config.graffiti_file.clone());
@@ -480,7 +481,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("attestation".into()))
.executor(context.executor.clone())
.chain_spec(context.eth2_config.spec.clone())
.disable(config.disable_attesting)
.build()?;
@@ -488,7 +490,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.runtime_context(context.service_context("preparation".into()))
.executor(context.executor.clone())
.builder_registration_timestamp_override(config.builder_registration_timestamp_override)
.validator_registration_batch_size(config.validator_registration_batch_size)
.build()?;
@@ -498,7 +500,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
validator_store.clone(),
slot_clock.clone(),
beacon_nodes.clone(),
context.service_context("sync_committee".into()),
context.executor.clone(),
);
Ok(Self {
@@ -541,12 +543,11 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
config: self.config.http_api.clone(),
sse_logging_components: self.context.sse_logging_components.clone(),
slot_clock: self.slot_clock.clone(),
_phantom: PhantomData,
});
let exit = self.context.executor.exit();
let (listen_addr, server) = validator_http_api::serve(ctx, exit)
let (listen_addr, server) = validator_http_api::serve::<_, E>(ctx, exit)
.map_err(|e| format!("Unable to start HTTP API server: {:?}", e))?;
self.context
@@ -620,12 +621,12 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
}
async fn init_from_beacon_node<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
proposer_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
) -> Result<(u64, Hash256), String> {
loop {
beacon_nodes.update_all_candidates().await;
proposer_nodes.update_all_candidates().await;
beacon_nodes.update_all_candidates::<E>().await;
proposer_nodes.update_all_candidates::<E>().await;
let num_available = beacon_nodes.num_available().await;
let num_total = beacon_nodes.num_total().await;
@@ -702,8 +703,8 @@ async fn init_from_beacon_node<E: EthSpec>(
Ok((genesis.genesis_time, genesis.genesis_validators_root))
}
async fn wait_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
async fn wait_for_genesis(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
genesis_time: u64,
) -> Result<(), String> {
let now = SystemTime::now()
@@ -745,8 +746,8 @@ async fn wait_for_genesis<E: EthSpec>(
/// Request the version from the node, looping back and trying again on failure. Exit once the node
/// has been contacted.
async fn poll_whilst_waiting_for_genesis<E: EthSpec>(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock, E>,
async fn poll_whilst_waiting_for_genesis(
beacon_nodes: &BeaconNodeFallback<SystemTimeSlotClock>,
genesis_time: Duration,
) -> Result<(), String> {
loop {