diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index f0ed4f737d..45cd989a44 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -476,3 +476,28 @@ fn disable_run_on_all() { assert!(config.disable_run_on_all); }); } + +#[test] +fn latency_measurement_service() { + CommandLineTest::new().run().with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", None) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("true")) + .run() + .with_config(|config| { + assert!(config.enable_latency_measurement_service); + }); + CommandLineTest::new() + .flag("latency-measurement-service", Some("false")) + .run() + .with_config(|config| { + assert!(!config.enable_latency_measurement_service); + }); +} diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 06ddcbaf3b..668e1bcf09 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -14,7 +14,7 @@ use std::fmt::Debug; use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; use types::{ChainSpec, Config, EthSpec}; @@ -27,6 +27,14 @@ use types::{ChainSpec, Config, EthSpec}; /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); +/// Indicates a measurement of latency between the VC and a BN. +pub struct LatencyMeasurement { + /// An identifier for the beacon node (e.g. the URL). + pub beacon_node_id: String, + /// The round-trip latency, if the BN responded successfully. + pub latency: Option, +} + /// Starts a service that will routinely try and update the status of the provided `beacon_nodes`. /// /// See `SLOT_LOOKAHEAD` for information about when this should run. @@ -394,6 +402,47 @@ impl BeaconNodeFallback { let _ = future::join_all(futures).await; } + /// Concurrently send a request to all candidates (regardless of + /// offline/online) status and attempt to collect a rough reading on the + /// latency between the VC and candidate. + pub async fn measure_latency(&self) -> Vec { + let futures: Vec<_> = self + .candidates + .iter() + .map(|candidate| async { + let beacon_node_id = candidate.beacon_node.to_string(); + // The `node/version` endpoint is used since I imagine it would + // require the least processing in the BN and therefore measure + // the connection moreso than the BNs processing speed. + // + // I imagine all clients have the version string availble as a + // pre-computed string. + let response_instant = candidate + .beacon_node + .get_node_version() + .await + .ok() + .map(|_| Instant::now()); + (beacon_node_id, response_instant) + }) + .collect(); + + let request_instant = Instant::now(); + + // Send the request to all BNs at the same time. This might involve some + // queueing on the sending host, however I hope it will avoid bias + // caused by sending requests at different times. + future::join_all(futures) + .await + .into_iter() + .map(|(beacon_node_id, response_instant)| LatencyMeasurement { + beacon_node_id, + latency: response_instant + .and_then(|response| response.checked_duration_since(request_instant)), + }) + .collect() + } + /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. /// diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 9142a0c7ec..fd96aa1f5c 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -318,6 +318,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { set here moves too far from the previous block's gas limit. [default: 30,000,000]") .requires("builder-proposals"), ) + .arg( + Arg::with_name("latency-measurement-service") + .long("latency-measurement-service") + .value_name("BOOLEAN") + .help("Set to 'true' to enable a service that periodically attempts to measure latency to BNs. \ + Set to 'false' to disable.") + .default_value("true") + .takes_value(true), + ) /* * Experimental/development options. */ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 0f24e81d54..724d6c74f1 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -73,6 +73,8 @@ pub struct Config { pub block_delay: Option, /// Disables publishing http api requests to all beacon nodes for select api calls. pub disable_run_on_all: bool, + /// Enables a service which attempts to measure latency between the VC and BNs. + pub enable_latency_measurement_service: bool, } impl Default for Config { @@ -111,6 +113,7 @@ impl Default for Config { builder_registration_timestamp_override: None, gas_limit: None, disable_run_on_all: false, + enable_latency_measurement_service: true, } } } @@ -357,6 +360,9 @@ impl Config { ); } + config.enable_latency_measurement_service = + parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true); + /* * Experimental */ diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index 0cb3417fc7..9b60d0edec 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -178,6 +178,14 @@ lazy_static::lazy_static! { "Attestation duty slot for all managed validators", &["validator"] ); + /* + * BN latency + */ + pub static ref VC_BEACON_NODE_LATENCY: Result = try_create_histogram_vec( + "vc_beacon_node_latency", + "Round-trip latency for a simple API endpoint on each BN", + &["endpoint"] + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/latency.rs b/validator_client/src/latency.rs new file mode 100644 index 0000000000..9ab9b630b1 --- /dev/null +++ b/validator_client/src/latency.rs @@ -0,0 +1,58 @@ +use crate::{http_metrics::metrics, BeaconNodeFallback}; +use environment::RuntimeContext; +use slog::debug; +use slot_clock::SlotClock; +use std::sync::Arc; +use tokio::time::sleep; +use types::EthSpec; + +/// The latency service will run 11/12ths of the way through the slot. +pub const SLOT_DELAY_MULTIPLIER: u32 = 11; +pub const SLOT_DELAY_DENOMINATOR: u32 = 12; + +/// Starts a service that periodically checks the latency between the VC and the +/// candidate BNs. +pub fn start_latency_service( + context: RuntimeContext, + slot_clock: T, + beacon_nodes: Arc>, +) { + let log = context.log().clone(); + + let future = async move { + loop { + let sleep_time = slot_clock + .duration_to_next_slot() + .map(|next_slot| { + // This is 11/12ths through the next slot. On mainnet this + // will happen in the 11th second of each slot, one second + // before the next slot. + next_slot + (next_slot / SLOT_DELAY_DENOMINATOR) * SLOT_DELAY_MULTIPLIER + }) + // If we can't read the slot clock, just wait one slot. Running + // the measurement at a non-exact time is not a big issue. + .unwrap_or_else(|| slot_clock.slot_duration()); + + // Sleep until it's time to perform the measurement. + sleep(sleep_time).await; + + for measurement in beacon_nodes.measure_latency().await { + if let Some(latency) = measurement.latency { + debug!( + log, + "Measured BN latency"; + "node" => &measurement.beacon_node_id, + "latency" => latency.as_millis(), + ); + metrics::observe_timer_vec( + &metrics::VC_BEACON_NODE_LATENCY, + &[&measurement.beacon_node_id], + latency, + ) + } + } + } + }; + + context.executor.spawn(future, "latency"); +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f2d6474901..82cacccc60 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -8,6 +8,7 @@ mod duties_service; mod graffiti_file; mod http_metrics; mod key_cache; +mod latency; mod notifier; mod preparation_service; mod signing_method; @@ -563,6 +564,14 @@ impl ProductionValidatorClient { None }; + if self.config.enable_latency_measurement_service { + latency::start_latency_service( + self.context.clone(), + self.duties_service.slot_clock.clone(), + self.duties_service.beacon_nodes.clone(), + ); + } + Ok(()) } }