From 6e15533b54861a7450c4b2015ecb4f52c626364a Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sun, 5 Mar 2023 23:43:29 +0000 Subject: [PATCH] Add latency measurement service to VC (#4024) ## Issue Addressed NA ## Proposed Changes Adds a service which periodically polls (11s into each mainnet slot) the `node/version` endpoint on each BN and roughly measures the round-trip latency. The latency is exposed as a `DEBG` log and a Prometheus metric. The `--latency-measurement-service` has been added to the VC, with the following options: - `--latency-measurement-service true`: enable the service (default). - `--latency-measurement-service`: (without a value) has the same effect. - `--latency-measurement-service false`: disable the service. ## Additional Info Whilst looking at our staking setup, I think the BN+VC latency is contributing to late blocks. Now that we have to wait for the builders to respond it's nice to try and do everything we can to reduce that latency. Having visibility is the first step. --- lighthouse/tests/validator_client.rs | 25 +++++++++ validator_client/src/beacon_node_fallback.rs | 51 ++++++++++++++++- validator_client/src/cli.rs | 9 +++ validator_client/src/config.rs | 6 ++ validator_client/src/http_metrics/metrics.rs | 8 +++ validator_client/src/latency.rs | 58 ++++++++++++++++++++ validator_client/src/lib.rs | 9 +++ 7 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 validator_client/src/latency.rs diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index f0ed4f737d..45cd989a44 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -476,3 +476,28 @@ fn disable_run_on_all() { assert!(config.disable_run_on_all); }); } + +#[test] +fn latency_measurement_service() { + CommandLineTest::new().run().with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", None) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("true")) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("false")) + .run() + .with_config(|config| { + assert!(!config.enable_latency_measurement_service); + }); +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 06ddcbaf3b..668e1bcf09 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; @@ -27,6 +27,14 @@ use types::{ChainSpec, Config, EthSpec}; /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); +/// Indicates a measurement of latency between the VC and a BN. +pub struct LatencyMeasurement { + /// An identifier for the beacon node (e.g. the URL). + pub beacon_node_id: String, + /// The round-trip latency, if the BN responded successfully. + pub latency: Option, +} + /// Starts a service that will routinely try and update the status of the provided `beacon_nodes`. /// /// See `SLOT_LOOKAHEAD` for information about when this should run. @@ -394,6 +402,47 @@ impl BeaconNodeFallback { let _ = future::join_all(futures).await; } + /// Concurrently send a request to all candidates (regardless of + /// offline/online) status and attempt to collect a rough reading on the + /// latency between the VC and candidate. + pub async fn measure_latency(&self) -> Vec { + let futures: Vec<_> = self + .candidates + .iter() + .map(|candidate| async { + let beacon_node_id = candidate.beacon_node.to_string(); + // The `node/version` endpoint is used since I imagine it would + // require the least processing in the BN and therefore measure + // the connection moreso than the BNs processing speed. + // + // I imagine all clients have the version string availble as a + // pre-computed string. + let response_instant = candidate + .beacon_node + .get_node_version() + .await + .ok() + .map(|_| Instant::now()); + (beacon_node_id, response_instant) + }) + .collect(); + + let request_instant = Instant::now(); + + // Send the request to all BNs at the same time. This might involve some + // queueing on the sending host, however I hope it will avoid bias + // caused by sending requests at different times. + future::join_all(futures) + .await + .into_iter() + .map(|(beacon_node_id, response_instant)| LatencyMeasurement { + beacon_node_id, + latency: response_instant + .and_then(|response| response.checked_duration_since(request_instant)), + }) + .collect() + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. /// diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 9142a0c7ec..fd96aa1f5c 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -318,6 +318,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { set here moves too far from the previous block's gas limit. [default: 30,000,000]") .requires("builder-proposals"), ) + .arg( + Arg::with_name("latency-measurement-service") + .long("latency-measurement-service") + .value_name("BOOLEAN") + .help("Set to 'true' to enable a service that periodically attempts to measure latency to BNs. \ + Set to 'false' to disable.") + .default_value("true") + .takes_value(true), + ) /* * Experimental/development options. */ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 0f24e81d54..724d6c74f1 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -73,6 +73,8 @@ pub struct Config { pub block_delay: Option, /// Disables publishing http api requests to all beacon nodes for select api calls. pub disable_run_on_all: bool, + /// Enables a service which attempts to measure latency between the VC and BNs. + pub enable_latency_measurement_service: bool, } impl Default for Config { @@ -111,6 +113,7 @@ impl Default for Config { builder_registration_timestamp_override: None, gas_limit: None, disable_run_on_all: false, + enable_latency_measurement_service: true, } } } @@ -357,6 +360,9 @@ impl Config { ); } + config.enable_latency_measurement_service = + parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true); + /* * Experimental */ diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 0cb3417fc7..9b60d0edec 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -178,6 +178,14 @@ lazy_static::lazy_static! { "Attestation duty slot for all managed validators", &["validator"] ); + /* + * BN latency + */ + pub static ref VC_BEACON_NODE_LATENCY: Result = try_create_histogram_vec( + "vc_beacon_node_latency", + "Round-trip latency for a simple API endpoint on each BN", + &["endpoint"] + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/latency.rs b/validator_client/src/latency.rs new file mode 100644 index 0000000000..9ab9b630b1 --- /dev/null +++ b/validator_client/src/latency.rs @@ -0,0 +1,58 @@ +use crate::{http_metrics::metrics, BeaconNodeFallback}; +use environment::RuntimeContext; +use slog::debug; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::time::sleep; +use types::EthSpec; + +/// The latency service will run 11/12ths of the way through the slot. +pub const SLOT_DELAY_MULTIPLIER: u32 = 11; +pub const SLOT_DELAY_DENOMINATOR: u32 = 12; + +/// Starts a service that periodically checks the latency between the VC and the +/// candidate BNs. +pub fn start_latency_service( + context: RuntimeContext, + slot_clock: T, + beacon_nodes: Arc>, +) { + let log = context.log().clone(); + + let future = async move { + loop { + let sleep_time = slot_clock + .duration_to_next_slot() + .map(|next_slot| { + // This is 11/12ths through the next slot. On mainnet this + // will happen in the 11th second of each slot, one second + // before the next slot. + next_slot + (next_slot / SLOT_DELAY_DENOMINATOR) * SLOT_DELAY_MULTIPLIER + }) + // If we can't read the slot clock, just wait one slot. Running + // the measurement at a non-exact time is not a big issue. + .unwrap_or_else(|| slot_clock.slot_duration()); + + // Sleep until it's time to perform the measurement. + sleep(sleep_time).await; + + for measurement in beacon_nodes.measure_latency().await { + if let Some(latency) = measurement.latency { + debug!( + log, + "Measured BN latency"; + "node" => &measurement.beacon_node_id, + "latency" => latency.as_millis(), + ); + metrics::observe_timer_vec( + &metrics::VC_BEACON_NODE_LATENCY, + &[&measurement.beacon_node_id], + latency, + ) + } + } + } + }; + + context.executor.spawn(future, "latency"); +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f2d6474901..82cacccc60 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -8,6 +8,7 @@ mod duties_service; mod graffiti_file; mod http_metrics; mod key_cache; +mod latency; mod notifier; mod preparation_service; mod signing_method; @@ -563,6 +564,14 @@ impl ProductionValidatorClient { None }; + if self.config.enable_latency_measurement_service { + latency::start_latency_service( + self.context.clone(), + self.duties_service.slot_clock.clone(), + self.duties_service.beacon_nodes.clone(), + ); + } + Ok(()) } }