From e29b607257d8807336fd5f38615256b890f1a58a Mon Sep 17 00:00:00 2001 From: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Wed, 21 May 2025 03:25:53 +0200 Subject: [PATCH] Move notifier and latency service to `validator_services` (#7427) We would like to reuse the `notifier` and `latency_service` in Anchor. To make this possible, this PR moves these from `validator_client` to `validator_services` and makes them use the new `ValidatorStore` trait is used so that the code can be reused in Anchor. --- validator_client/src/lib.rs | 17 +++++++---- .../validator_services/Cargo.toml | 8 +++--- .../src/latency_service.rs} | 9 +++--- .../validator_services/src/lib.rs | 2 ++ .../src/notifier_service.rs} | 28 +++++++++---------- 5 files changed, 35 insertions(+), 29 deletions(-) rename validator_client/{src/latency.rs => validator_services/src/latency_service.rs} (91%) rename validator_client/{src/notifier.rs => validator_services/src/notifier_service.rs} (86%) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index a7993dc879..6692fe3a7b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,7 +1,5 @@ pub mod cli; pub mod config; -mod latency; -mod notifier; use crate::cli::ValidatorClient; pub use config::Config; @@ -21,7 +19,6 @@ use environment::RuntimeContext; use eth2::{reqwest::ClientBuilder, BeaconNodeHttpClient, StatusCode, Timeouts}; use initialized_validators::Error::UnableToOpenVotingKeystore; use lighthouse_validator_store::LighthouseValidatorStore; -use notifier::spawn_notifier; use parking_lot::RwLock; use reqwest::Certificate; use slot_clock::SlotClock; @@ -39,10 +36,12 @@ use tokio::{ use tracing::{debug, error, info, warn}; use types::{EthSpec, Hash256}; use validator_http_api::ApiSecret; +use validator_services::notifier_service::spawn_notifier; use validator_services::{ attestation_service::{AttestationService, AttestationServiceBuilder}, block_service::{BlockService, BlockServiceBuilder}, duties_service::{self, DutiesService, DutiesServiceBuilder}, + latency_service, preparation_service::{PreparationService, PreparationServiceBuilder}, sync_committee_service::SyncCommitteeService, }; @@ -601,11 +600,17 @@ impl ProductionValidatorClient { info!("Doppelganger protection disabled.") } - spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; + let context = self.context.service_context("notifier".into()); + spawn_notifier( + self.duties_service.clone(), + context.executor, + &self.context.eth2_config.spec, + ) + .map_err(|e| format!("Failed to start notifier: {}", e))?; if self.config.enable_latency_measurement_service { - latency::start_latency_service( - self.context.clone(), + latency_service::start_latency_service( + self.context.executor.clone(), self.duties_service.slot_clock.clone(), self.duties_service.beacon_nodes.clone(), ); diff --git a/validator_client/validator_services/Cargo.toml b/validator_client/validator_services/Cargo.toml index 86208dadef..c914940914 100644 --- a/validator_client/validator_services/Cargo.toml +++ b/validator_client/validator_services/Cargo.toml @@ -14,11 +14,11 @@ graffiti_file = { workspace = true } logging = { workspace = true } parking_lot = { workspace = true } safe_arith = { workspace = true } -slot_clock = { workspace = true } +slot_clock = { workspace = true } task_executor = { workspace = true } -tokio = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } -tree_hash = { workspace = true } -types = { workspace = true } +tree_hash = { workspace = true } +types = { workspace = true } validator_metrics = { workspace = true } validator_store = { workspace = true } diff --git a/validator_client/src/latency.rs b/validator_client/validator_services/src/latency_service.rs similarity index 91% rename from validator_client/src/latency.rs rename to validator_client/validator_services/src/latency_service.rs index 2382d350af..c810a03a80 100644 --- a/validator_client/src/latency.rs +++ b/validator_client/validator_services/src/latency_service.rs @@ -1,10 +1,9 @@ use beacon_node_fallback::BeaconNodeFallback; -use environment::RuntimeContext; use slot_clock::SlotClock; use std::sync::Arc; +use task_executor::TaskExecutor; use tokio::time::sleep; use tracing::debug; -use types::EthSpec; /// The latency service will run 11/12ths of the way through the slot. pub const SLOT_DELAY_MULTIPLIER: u32 = 11; @@ -12,8 +11,8 @@ pub const SLOT_DELAY_DENOMINATOR: u32 = 12; /// Starts a service that periodically checks the latency between the VC and the /// candidate BNs. -pub fn start_latency_service( - context: RuntimeContext, +pub fn start_latency_service( + executor: TaskExecutor, slot_clock: T, beacon_nodes: Arc>, ) { @@ -57,5 +56,5 @@ pub fn start_latency_service( } }; - context.executor.spawn(future, "latency"); + executor.spawn(future, "latency"); } diff --git a/validator_client/validator_services/src/lib.rs b/validator_client/validator_services/src/lib.rs index abf8fab3cb..3b8bd9ae14 100644 --- a/validator_client/validator_services/src/lib.rs +++ b/validator_client/validator_services/src/lib.rs @@ -1,6 +1,8 @@ pub mod attestation_service; pub mod block_service; pub mod duties_service; +pub mod latency_service; +pub mod notifier_service; pub mod preparation_service; pub mod sync; pub mod sync_committee_service; diff --git a/validator_client/src/notifier.rs b/validator_client/validator_services/src/notifier_service.rs similarity index 86% rename from validator_client/src/notifier.rs rename to validator_client/validator_services/src/notifier_service.rs index 05f1c919d2..6b8ea04edb 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/validator_services/src/notifier_service.rs @@ -1,18 +1,20 @@ -use crate::{DutiesService, ProductionValidatorClient}; -use lighthouse_validator_store::LighthouseValidatorStore; -use metrics::set_gauge; +use crate::duties_service::DutiesService; use slot_clock::SlotClock; +use std::sync::Arc; +use task_executor::TaskExecutor; use tokio::time::{sleep, Duration}; use tracing::{debug, error, info}; -use types::EthSpec; +use types::{ChainSpec, EthSpec}; +use validator_metrics::set_gauge; +use validator_store::ValidatorStore; /// Spawns a notifier service which periodically logs information about the node. -pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result<(), String> { - let context = client.context.service_context("notifier".into()); - let executor = context.executor.clone(); - let duties_service = client.duties_service.clone(); - - let slot_duration = Duration::from_secs(context.eth2_config.spec.seconds_per_slot); +pub fn spawn_notifier( + duties_service: Arc>, + executor: TaskExecutor, + spec: &ChainSpec, +) -> Result<(), String> { + let slot_duration = Duration::from_secs(spec.seconds_per_slot); let interval_fut = async move { loop { @@ -33,9 +35,7 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu } /// Performs a single notification routine. -async fn notify( - duties_service: &DutiesService, T>, -) { +async fn notify(duties_service: &DutiesService) { let (candidate_info, num_available, num_synced) = duties_service.beacon_nodes.get_notifier_info().await; let num_total = candidate_info.len(); @@ -102,7 +102,7 @@ async fn notify( } if let Some(slot) = duties_service.slot_clock.now() { - let epoch = slot.epoch(E::slots_per_epoch()); + let epoch = slot.epoch(S::E::slots_per_epoch()); let total_validators = duties_service.total_validator_count(); let proposing_validators = duties_service.proposer_count(epoch);