mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-06 10:11:44 +00:00
Merge branch 'unstable' into eip4844
This commit is contained in:
@@ -14,10 +14,13 @@ use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::{sync::RwLock, time::sleep};
|
||||
use types::{ChainSpec, Config, EthSpec};
|
||||
|
||||
/// Message emitted when the VC detects the BN is using a different spec.
|
||||
const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updating";
|
||||
|
||||
/// The number of seconds *prior* to slot start that we will try and update the state of fallback
|
||||
/// nodes.
|
||||
///
|
||||
@@ -27,6 +30,14 @@ use types::{ChainSpec, Config, EthSpec};
|
||||
/// having the correct nodes up and running prior to the start of the slot.
|
||||
const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Indicates a measurement of latency between the VC and a BN.
|
||||
pub struct LatencyMeasurement {
|
||||
/// An identifier for the beacon node (e.g. the URL).
|
||||
pub beacon_node_id: String,
|
||||
/// The round-trip latency, if the BN responded successfully.
|
||||
pub latency: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Starts a service that will routinely try and update the status of the provided `beacon_nodes`.
|
||||
///
|
||||
/// See `SLOT_LOOKAHEAD` for information about when this should run.
|
||||
@@ -262,6 +273,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
|
||||
"Beacon node has mismatched Altair fork epoch";
|
||||
"endpoint" => %self.beacon_node,
|
||||
"endpoint_altair_fork_epoch" => ?beacon_node_spec.altair_fork_epoch,
|
||||
"hint" => UPDATE_REQUIRED_LOG_HINT,
|
||||
);
|
||||
} else if beacon_node_spec.bellatrix_fork_epoch != spec.bellatrix_fork_epoch {
|
||||
warn!(
|
||||
@@ -269,6 +281,15 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
|
||||
"Beacon node has mismatched Bellatrix fork epoch";
|
||||
"endpoint" => %self.beacon_node,
|
||||
"endpoint_bellatrix_fork_epoch" => ?beacon_node_spec.bellatrix_fork_epoch,
|
||||
"hint" => UPDATE_REQUIRED_LOG_HINT,
|
||||
);
|
||||
} else if beacon_node_spec.capella_fork_epoch != spec.capella_fork_epoch {
|
||||
warn!(
|
||||
log,
|
||||
"Beacon node has mismatched Capella fork epoch";
|
||||
"endpoint" => %self.beacon_node,
|
||||
"endpoint_capella_fork_epoch" => ?beacon_node_spec.capella_fork_epoch,
|
||||
"hint" => UPDATE_REQUIRED_LOG_HINT,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -394,6 +415,47 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
|
||||
let _ = future::join_all(futures).await;
|
||||
}
|
||||
|
||||
/// Concurrently send a request to all candidates (regardless of
|
||||
/// offline/online) status and attempt to collect a rough reading on the
|
||||
/// latency between the VC and candidate.
|
||||
pub async fn measure_latency(&self) -> Vec<LatencyMeasurement> {
|
||||
let futures: Vec<_> = self
|
||||
.candidates
|
||||
.iter()
|
||||
.map(|candidate| async {
|
||||
let beacon_node_id = candidate.beacon_node.to_string();
|
||||
// The `node/version` endpoint is used since I imagine it would
|
||||
// require the least processing in the BN and therefore measure
|
||||
// the connection moreso than the BNs processing speed.
|
||||
//
|
||||
// I imagine all clients have the version string availble as a
|
||||
// pre-computed string.
|
||||
let response_instant = candidate
|
||||
.beacon_node
|
||||
.get_node_version()
|
||||
.await
|
||||
.ok()
|
||||
.map(|_| Instant::now());
|
||||
(beacon_node_id, response_instant)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let request_instant = Instant::now();
|
||||
|
||||
// Send the request to all BNs at the same time. This might involve some
|
||||
// queueing on the sending host, however I hope it will avoid bias
|
||||
// caused by sending requests at different times.
|
||||
future::join_all(futures)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|(beacon_node_id, response_instant)| LatencyMeasurement {
|
||||
beacon_node_id,
|
||||
latency: response_instant
|
||||
.and_then(|response| response.checked_duration_since(request_instant)),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Run `func` against each candidate in `self`, returning immediately if a result is found.
|
||||
/// Otherwise, return all the errors encountered along the way.
|
||||
///
|
||||
|
||||
@@ -407,17 +407,22 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES);
|
||||
let signed_block = self_ref
|
||||
.validator_store
|
||||
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
|
||||
.await
|
||||
.map_err(|e| BlockError::Recoverable(format!("Unable to sign block: {:?}", e)))?;
|
||||
let signing_time_ms =
|
||||
Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
||||
|
||||
info!(
|
||||
log,
|
||||
"Publishing signed block";
|
||||
"slot" => slot.as_u64(),
|
||||
"signing_time_ms" => signing_time_ms,
|
||||
);
|
||||
|
||||
// Publish block with first available beacon node.
|
||||
self.beacon_nodes
|
||||
.first_success(
|
||||
|
||||
@@ -318,6 +318,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
set here moves too far from the previous block's gas limit. [default: 30,000,000]")
|
||||
.requires("builder-proposals"),
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("latency-measurement-service")
|
||||
.long("latency-measurement-service")
|
||||
.value_name("BOOLEAN")
|
||||
.help("Set to 'true' to enable a service that periodically attempts to measure latency to BNs. \
|
||||
Set to 'false' to disable.")
|
||||
.default_value("true")
|
||||
.takes_value(true),
|
||||
)
|
||||
/*
|
||||
* Experimental/development options.
|
||||
*/
|
||||
|
||||
@@ -73,6 +73,8 @@ pub struct Config {
|
||||
pub block_delay: Option<Duration>,
|
||||
/// Disables publishing http api requests to all beacon nodes for select api calls.
|
||||
pub disable_run_on_all: bool,
|
||||
/// Enables a service which attempts to measure latency between the VC and BNs.
|
||||
pub enable_latency_measurement_service: bool,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
@@ -111,6 +113,7 @@ impl Default for Config {
|
||||
builder_registration_timestamp_override: None,
|
||||
gas_limit: None,
|
||||
disable_run_on_all: false,
|
||||
enable_latency_measurement_service: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -357,6 +360,9 @@ impl Config {
|
||||
);
|
||||
}
|
||||
|
||||
config.enable_latency_measurement_service =
|
||||
parse_optional(cli_args, "latency-measurement-service")?.unwrap_or(true);
|
||||
|
||||
/*
|
||||
* Experimental
|
||||
*/
|
||||
|
||||
@@ -17,13 +17,14 @@ use crate::{
|
||||
};
|
||||
use environment::RuntimeContext;
|
||||
use eth2::types::{AttesterData, BeaconCommitteeSubscription, ProposerData, StateId, ValidatorId};
|
||||
use futures::future::join_all;
|
||||
use futures::{stream, StreamExt};
|
||||
use parking_lot::RwLock;
|
||||
use safe_arith::ArithError;
|
||||
use slog::{debug, error, info, warn, Logger};
|
||||
use slot_clock::SlotClock;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use sync::poll_sync_committee_duties;
|
||||
use sync::SyncDutiesMap;
|
||||
use tokio::{sync::mpsc::Sender, time::sleep};
|
||||
@@ -40,6 +41,14 @@ const SUBSCRIPTION_BUFFER_SLOTS: u64 = 2;
|
||||
/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch.
|
||||
const HISTORICAL_DUTIES_EPOCHS: u64 = 2;
|
||||
|
||||
/// Compute attestation selection proofs this many slots before they are required.
|
||||
///
|
||||
/// At start-up selection proofs will be computed with less lookahead out of necessity.
|
||||
const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8;
|
||||
|
||||
/// Fraction of a slot at which selection proof signing should happen (2 means half way).
|
||||
const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2;
|
||||
|
||||
/// Minimum number of validators for which we auto-enable per-validator metrics.
|
||||
/// For validators greater than this value, we need to manually set the `enable-per-validator-metrics`
|
||||
/// flag in the cli to enable collection of per validator metrics.
|
||||
@@ -71,7 +80,7 @@ pub struct DutyAndProof {
|
||||
|
||||
impl DutyAndProof {
|
||||
/// Instantiate `Self`, computing the selection proof as well.
|
||||
pub async fn new<T: SlotClock + 'static, E: EthSpec>(
|
||||
pub async fn new_with_selection_proof<T: SlotClock + 'static, E: EthSpec>(
|
||||
duty: AttesterData,
|
||||
validator_store: &ValidatorStore<T, E>,
|
||||
spec: &ChainSpec,
|
||||
@@ -99,6 +108,14 @@ impl DutyAndProof {
|
||||
selection_proof,
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a new `DutyAndProof` with the selection proof waiting to be filled in.
|
||||
pub fn new_without_selection_proof(duty: AttesterData) -> Self {
|
||||
Self {
|
||||
duty,
|
||||
selection_proof: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// To assist with readability, the dependent root for attester/proposer duties.
|
||||
@@ -471,7 +488,7 @@ async fn poll_validator_indices<T: SlotClock + 'static, E: EthSpec>(
|
||||
/// 3. Push out any attestation subnet subscriptions to the BN.
|
||||
/// 4. Prune old entries from `duties_service.attesters`.
|
||||
async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
||||
duties_service: &DutiesService<T, E>,
|
||||
duties_service: &Arc<DutiesService<T, E>>,
|
||||
) -> Result<(), Error> {
|
||||
let current_epoch_timer = metrics::start_timer_vec(
|
||||
&metrics::DUTIES_SERVICE_TIMES,
|
||||
@@ -634,7 +651,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
|
||||
/// For the given `local_indices` and `local_pubkeys`, download the duties for the given `epoch` and
|
||||
/// store them in `duties_service.attesters`.
|
||||
async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
|
||||
duties_service: &DutiesService<T, E>,
|
||||
duties_service: &Arc<DutiesService<T, E>>,
|
||||
epoch: Epoch,
|
||||
local_indices: &[u64],
|
||||
local_pubkeys: &HashSet<PublicKeyBytes>,
|
||||
@@ -742,31 +759,16 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
|
||||
"num_new_duties" => new_duties.len(),
|
||||
);
|
||||
|
||||
// Produce the `DutyAndProof` messages in parallel.
|
||||
let duty_and_proof_results = join_all(new_duties.into_iter().map(|duty| {
|
||||
DutyAndProof::new(duty, &duties_service.validator_store, &duties_service.spec)
|
||||
}))
|
||||
.await;
|
||||
|
||||
// Update the duties service with the new `DutyAndProof` messages.
|
||||
let mut attesters = duties_service.attesters.write();
|
||||
let mut already_warned = Some(());
|
||||
for result in duty_and_proof_results {
|
||||
let duty_and_proof = match result {
|
||||
Ok(duty_and_proof) => duty_and_proof,
|
||||
Err(e) => {
|
||||
error!(
|
||||
log,
|
||||
"Failed to produce duty and proof";
|
||||
"error" => ?e,
|
||||
"msg" => "may impair attestation duties"
|
||||
);
|
||||
// Do not abort the entire batch for a single failure.
|
||||
continue;
|
||||
}
|
||||
};
|
||||
for duty in &new_duties {
|
||||
let attester_map = attesters.entry(duty.pubkey).or_default();
|
||||
|
||||
let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
|
||||
// Create initial entries in the map without selection proofs. We'll compute them in the
|
||||
// background later to avoid creating a thundering herd of signing threads whenever new
|
||||
// duties are computed.
|
||||
let duty_and_proof = DutyAndProof::new_without_selection_proof(duty.clone());
|
||||
|
||||
if let Some((prior_dependent_root, _)) =
|
||||
attester_map.insert(epoch, (dependent_root, duty_and_proof))
|
||||
@@ -785,9 +787,144 @@ async fn poll_beacon_attesters_for_epoch<T: SlotClock + 'static, E: EthSpec>(
|
||||
}
|
||||
drop(attesters);
|
||||
|
||||
// Spawn the background task to compute selection proofs.
|
||||
let subservice = duties_service.clone();
|
||||
duties_service.context.executor.spawn(
|
||||
async move {
|
||||
fill_in_selection_proofs(subservice, new_duties, dependent_root).await;
|
||||
},
|
||||
"duties_service_selection_proofs_background",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map.
|
||||
///
|
||||
/// Duties are computed in batches each slot. If a re-org is detected then the process will
|
||||
/// terminate early as it is assumed the selection proofs from `duties` are no longer relevant.
|
||||
async fn fill_in_selection_proofs<T: SlotClock + 'static, E: EthSpec>(
|
||||
duties_service: Arc<DutiesService<T, E>>,
|
||||
duties: Vec<AttesterData>,
|
||||
dependent_root: Hash256,
|
||||
) {
|
||||
let log = duties_service.context.log();
|
||||
|
||||
// Sort duties by slot in a BTreeMap.
|
||||
let mut duties_by_slot: BTreeMap<Slot, Vec<_>> = BTreeMap::new();
|
||||
|
||||
for duty in duties {
|
||||
duties_by_slot.entry(duty.slot).or_default().push(duty);
|
||||
}
|
||||
|
||||
// At halfway through each slot when nothing else is likely to be getting signed, sign a batch
|
||||
// of selection proofs and insert them into the duties service `attesters` map.
|
||||
let slot_clock = &duties_service.slot_clock;
|
||||
let slot_offset = duties_service.slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM;
|
||||
|
||||
while !duties_by_slot.is_empty() {
|
||||
if let Some(duration) = slot_clock.duration_to_next_slot() {
|
||||
sleep(duration.saturating_sub(slot_offset)).await;
|
||||
|
||||
let Some(current_slot) = slot_clock.now() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let lookahead_slot = current_slot + SELECTION_PROOF_SLOT_LOOKAHEAD;
|
||||
|
||||
let mut relevant_duties = duties_by_slot.split_off(&lookahead_slot);
|
||||
std::mem::swap(&mut relevant_duties, &mut duties_by_slot);
|
||||
|
||||
let batch_size = relevant_duties.values().map(Vec::len).sum::<usize>();
|
||||
|
||||
if batch_size == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let timer = metrics::start_timer_vec(
|
||||
&metrics::DUTIES_SERVICE_TIMES,
|
||||
&[metrics::ATTESTATION_SELECTION_PROOFS],
|
||||
);
|
||||
|
||||
// Sign selection proofs (serially).
|
||||
let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten())
|
||||
.then(|duty| async {
|
||||
DutyAndProof::new_with_selection_proof(
|
||||
duty,
|
||||
&duties_service.validator_store,
|
||||
&duties_service.spec,
|
||||
)
|
||||
.await
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
// Add to attesters store.
|
||||
let mut attesters = duties_service.attesters.write();
|
||||
for result in duty_and_proof_results {
|
||||
let duty_and_proof = match result {
|
||||
Ok(duty_and_proof) => duty_and_proof,
|
||||
Err(e) => {
|
||||
error!(
|
||||
log,
|
||||
"Failed to produce duty and proof";
|
||||
"error" => ?e,
|
||||
"msg" => "may impair attestation duties"
|
||||
);
|
||||
// Do not abort the entire batch for a single failure.
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let attester_map = attesters.entry(duty_and_proof.duty.pubkey).or_default();
|
||||
let epoch = duty_and_proof.duty.slot.epoch(E::slots_per_epoch());
|
||||
match attester_map.entry(epoch) {
|
||||
hash_map::Entry::Occupied(mut entry) => {
|
||||
// No need to update duties for which no proof was computed.
|
||||
let Some(selection_proof) = duty_and_proof.selection_proof else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let (existing_dependent_root, existing_duty) = entry.get_mut();
|
||||
|
||||
if *existing_dependent_root == dependent_root {
|
||||
// Replace existing proof.
|
||||
existing_duty.selection_proof = Some(selection_proof);
|
||||
} else {
|
||||
// Our selection proofs are no longer relevant due to a reorg, abandon
|
||||
// this entire background process.
|
||||
debug!(
|
||||
log,
|
||||
"Stopping selection proof background task";
|
||||
"reason" => "re-org"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert((dependent_root, duty_and_proof));
|
||||
}
|
||||
}
|
||||
}
|
||||
drop(attesters);
|
||||
|
||||
let time_taken_ms =
|
||||
Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis();
|
||||
debug!(
|
||||
log,
|
||||
"Computed attestation selection proofs";
|
||||
"batch_size" => batch_size,
|
||||
"lookahead_slot" => lookahead_slot,
|
||||
"time_taken_ms" => time_taken_ms
|
||||
);
|
||||
} else {
|
||||
// Just sleep for one slot if we are unable to read the system clock, this gives
|
||||
// us an opportunity for the clock to eventually come good.
|
||||
sleep(duties_service.slot_clock.slot_duration()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Download the proposer duties for the current epoch and store them in `duties_service.proposers`.
|
||||
/// If there are any proposer for this slot, send out a notification to the block proposers.
|
||||
///
|
||||
|
||||
@@ -32,6 +32,7 @@ pub const PROPOSER_DUTIES_HTTP_GET: &str = "proposer_duties_http_get";
|
||||
pub const VALIDATOR_ID_HTTP_GET: &str = "validator_id_http_get";
|
||||
pub const SUBSCRIPTIONS_HTTP_POST: &str = "subscriptions_http_post";
|
||||
pub const UPDATE_PROPOSERS: &str = "update_proposers";
|
||||
pub const ATTESTATION_SELECTION_PROOFS: &str = "attestation_selection_proofs";
|
||||
pub const SUBSCRIPTIONS: &str = "subscriptions";
|
||||
pub const LOCAL_KEYSTORE: &str = "local_keystore";
|
||||
pub const WEB3SIGNER: &str = "web3signer";
|
||||
@@ -177,12 +178,28 @@ lazy_static::lazy_static! {
|
||||
"Duration to obtain a signature",
|
||||
&["type"]
|
||||
);
|
||||
pub static ref BLOCK_SIGNING_TIMES: Result<Histogram> = try_create_histogram(
|
||||
"vc_block_signing_times_seconds",
|
||||
"Duration to obtain a signature for a block",
|
||||
);
|
||||
|
||||
pub static ref ATTESTATION_DUTY: Result<IntGaugeVec> = try_create_int_gauge_vec(
|
||||
"vc_attestation_duty_slot",
|
||||
"Attestation duty slot for all managed validators",
|
||||
&["validator"]
|
||||
);
|
||||
/*
|
||||
* BN latency
|
||||
*/
|
||||
pub static ref VC_BEACON_NODE_LATENCY: Result<HistogramVec> = try_create_histogram_vec(
|
||||
"vc_beacon_node_latency",
|
||||
"Round-trip latency for a simple API endpoint on each BN",
|
||||
&["endpoint"]
|
||||
);
|
||||
pub static ref VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT: Result<Histogram> = try_create_histogram(
|
||||
"vc_beacon_node_latency_primary_endpoint",
|
||||
"Round-trip latency for the primary BN endpoint",
|
||||
);
|
||||
}
|
||||
|
||||
pub fn gather_prometheus_metrics<T: EthSpec>(
|
||||
|
||||
64
validator_client/src/latency.rs
Normal file
64
validator_client/src/latency.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
use crate::{http_metrics::metrics, BeaconNodeFallback};
|
||||
use environment::RuntimeContext;
|
||||
use slog::debug;
|
||||
use slot_clock::SlotClock;
|
||||
use std::sync::Arc;
|
||||
use tokio::time::sleep;
|
||||
use types::EthSpec;
|
||||
|
||||
/// The latency service will run 11/12ths of the way through the slot.
|
||||
pub const SLOT_DELAY_MULTIPLIER: u32 = 11;
|
||||
pub const SLOT_DELAY_DENOMINATOR: u32 = 12;
|
||||
|
||||
/// Starts a service that periodically checks the latency between the VC and the
|
||||
/// candidate BNs.
|
||||
pub fn start_latency_service<T: SlotClock + 'static, E: EthSpec>(
|
||||
context: RuntimeContext<E>,
|
||||
slot_clock: T,
|
||||
beacon_nodes: Arc<BeaconNodeFallback<T, E>>,
|
||||
) {
|
||||
let log = context.log().clone();
|
||||
|
||||
let future = async move {
|
||||
loop {
|
||||
let sleep_time = slot_clock
|
||||
.duration_to_next_slot()
|
||||
.map(|next_slot| {
|
||||
// This is 11/12ths through the next slot. On mainnet this
|
||||
// will happen in the 11th second of each slot, one second
|
||||
// before the next slot.
|
||||
next_slot + (next_slot / SLOT_DELAY_DENOMINATOR) * SLOT_DELAY_MULTIPLIER
|
||||
})
|
||||
// If we can't read the slot clock, just wait one slot. Running
|
||||
// the measurement at a non-exact time is not a big issue.
|
||||
.unwrap_or_else(|| slot_clock.slot_duration());
|
||||
|
||||
// Sleep until it's time to perform the measurement.
|
||||
sleep(sleep_time).await;
|
||||
|
||||
for (i, measurement) in beacon_nodes.measure_latency().await.iter().enumerate() {
|
||||
if let Some(latency) = measurement.latency {
|
||||
debug!(
|
||||
log,
|
||||
"Measured BN latency";
|
||||
"node" => &measurement.beacon_node_id,
|
||||
"latency" => latency.as_millis(),
|
||||
);
|
||||
metrics::observe_timer_vec(
|
||||
&metrics::VC_BEACON_NODE_LATENCY,
|
||||
&[&measurement.beacon_node_id],
|
||||
latency,
|
||||
);
|
||||
if i == 0 {
|
||||
metrics::observe_duration(
|
||||
&metrics::VC_BEACON_NODE_LATENCY_PRIMARY_ENDPOINT,
|
||||
latency,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
context.executor.spawn(future, "latency");
|
||||
}
|
||||
@@ -8,6 +8,7 @@ mod duties_service;
|
||||
mod graffiti_file;
|
||||
mod http_metrics;
|
||||
mod key_cache;
|
||||
mod latency;
|
||||
mod notifier;
|
||||
mod preparation_service;
|
||||
mod signing_method;
|
||||
@@ -563,6 +564,14 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
|
||||
None
|
||||
};
|
||||
|
||||
if self.config.enable_latency_measurement_service {
|
||||
latency::start_latency_service(
|
||||
self.context.clone(),
|
||||
self.duties_service.slot_clock.clone(),
|
||||
self.duties_service.beacon_nodes.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user