diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index eff50701d7..48419d46ed 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -579,11 +579,13 @@ where mut self, auto_register: bool, validators: Vec, + individual_metrics_threshold: usize, log: Logger, ) -> Self { self.validator_monitor = Some(ValidatorMonitor::new( validators, auto_register, + individual_metrics_threshold, log.clone(), )); self @@ -989,6 +991,7 @@ fn descriptive_db_error(item: &str, error: &StoreError) -> String { #[cfg(test)] mod test { use super::*; + use crate::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use eth2_hashing::hash; use genesis::{ generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH, @@ -1045,7 +1048,12 @@ mod test { .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) - .monitor_validators(true, vec![], log.clone()) + .monitor_validators( + true, + vec![], + DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, + log.clone(), + ) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9183583fb1..66de3f02d2 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2,6 +2,7 @@ pub use crate::persisted_beacon_chain::PersistedBeaconChain; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, + validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, }; use crate::{ @@ -472,7 +473,7 @@ where log.clone(), 5, ))) - .monitor_validators(true, vec![], log); + .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log); builder = if let Some(mutator) = self.initial_mutator { mutator(builder) diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index e95394bb78..dad5e1517a 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -21,10 +21,21 @@ use types::{ SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit, }; +/// Used for Prometheus labels. +/// +/// We've used `total` for this value to align with Nimbus, as per: +/// https://github.com/sigp/lighthouse/pull/3728#issuecomment-1375173063 +const TOTAL_LABEL: &str = "total"; + /// The validator monitor collects per-epoch data about each monitored validator. Historical data /// will be kept around for `HISTORIC_EPOCHS` before it is pruned. pub const HISTORIC_EPOCHS: usize = 4; +/// Once the validator monitor reaches this number of validators it will stop +/// tracking their metrics/logging individually in an effort to reduce +/// Prometheus cardinality and log volume. +pub const DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD: usize = 64; + #[derive(Debug)] pub enum Error { InvalidPubkey(String), @@ -258,16 +269,27 @@ pub struct ValidatorMonitor { indices: HashMap, /// If true, allow the automatic registration of validators. auto_register: bool, + /// Once the number of monitored validators goes above this threshold, we + /// will stop tracking metrics/logs on a per-validator basis. This prevents + /// large validator counts causing infeasibly high cardinailty for + /// Prometheus and high log volumes. + individual_tracking_threshold: usize, log: Logger, _phantom: PhantomData, } impl ValidatorMonitor { - pub fn new(pubkeys: Vec, auto_register: bool, log: Logger) -> Self { + pub fn new( + pubkeys: Vec, + auto_register: bool, + individual_tracking_threshold: usize, + log: Logger, + ) -> Self { let mut s = Self { validators: <_>::default(), indices: <_>::default(), auto_register, + individual_tracking_threshold, log, _phantom: PhantomData, }; @@ -277,6 +299,13 @@ impl ValidatorMonitor { s } + /// Returns `true` when the validator count is sufficiently low enough to + /// emit metrics and logs on a per-validator basis (rather than just an + /// aggregated basis). + fn individual_tracking(&self) -> bool { + self.validators.len() <= self.individual_tracking_threshold + } + /// Add some validators to `self` for additional monitoring. fn add_validator_pubkey(&mut self, pubkey: PublicKeyBytes) { let index_opt = self @@ -317,6 +346,12 @@ impl ValidatorMonitor { for monitored_validator in self.validators.values() { if let Some(i) = monitored_validator.index { monitored_validator.touch_epoch_summary(current_epoch); + + // Only log the per-validator metrics if it's enabled. + if !self.individual_tracking() { + continue; + } + let i = i as usize; let id = &monitored_validator.id; @@ -379,6 +414,24 @@ impl ValidatorMonitor { } } + /// Run `func` with the `TOTAL_LABEL` and optionally the + /// `individual_id`. + /// + /// This function is used for registering metrics that can be applied to + /// both all validators and an indivdual validator. For example, the count + /// of missed head votes can be aggregated across all validators in a single + /// metric and also tracked on a per-validator basis. + /// + /// We allow disabling tracking metrics on an individual validator basis + /// since it can result in untenable cardinality with high validator counts. + fn aggregatable_metric(&self, individual_id: &str, func: F) { + func(TOTAL_LABEL); + + if self.individual_tracking() { + func(individual_id); + } + } + pub fn process_validator_statuses( &self, epoch: Epoch, @@ -431,72 +484,92 @@ impl ValidatorMonitor { // For Base states, this will be *any* attestation whatsoever. For Altair states, // this will be any attestation that matched a "timely" flag. if previous_epoch_matched_any { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_HIT, + &[label], + ) + }); attestation_success.push(id); - debug!( - self.log, - "Previous epoch attestation success"; - "matched_source" => previous_epoch_matched_source, - "matched_target" => previous_epoch_matched_target, - "matched_head" => previous_epoch_matched_head, - "epoch" => prev_epoch, - "validator" => id, - ) + if self.individual_tracking() { + debug!( + self.log, + "Previous epoch attestation success"; + "matched_source" => previous_epoch_matched_source, + "matched_target" => previous_epoch_matched_target, + "matched_head" => previous_epoch_matched_head, + "epoch" => prev_epoch, + "validator" => id, + ) + } } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_ATTESTER_MISS, + &[label], + ); + }); attestation_miss.push(id); - debug!( - self.log, - "Previous epoch attestation missing"; - "epoch" => prev_epoch, - "validator" => id, - ) + if self.individual_tracking() { + debug!( + self.log, + "Previous epoch attestation missing"; + "epoch" => prev_epoch, + "validator" => id, + ) + } } // Indicates if any on-chain attestation hit the head. if previous_epoch_matched_head { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_HIT, + &[label], + ); + }); } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_HEAD_ATTESTER_MISS, + &[label], + ); + }); head_miss.push(id); - debug!( - self.log, - "Attestation failed to match head"; - "epoch" => prev_epoch, - "validator" => id, - ); + if self.individual_tracking() { + debug!( + self.log, + "Attestation failed to match head"; + "epoch" => prev_epoch, + "validator" => id, + ); + } } // Indicates if any on-chain attestation hit the target. if previous_epoch_matched_target { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_HIT, + &[label], + ); + }); } else { - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, - &[id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_TARGET_ATTESTER_MISS, + &[label], + ); + }); target_miss.push(id); - debug!( - self.log, - "Attestation failed to match target"; - "epoch" => prev_epoch, - "validator" => id, - ); + if self.individual_tracking() { + debug!( + self.log, + "Attestation failed to match target"; + "epoch" => prev_epoch, + "validator" => id, + ); + } } // Get the minimum value among the validator monitor observed inclusion distance @@ -511,21 +584,25 @@ impl ValidatorMonitor { if let Some(inclusion_delay) = min_inclusion_distance { if inclusion_delay > spec.min_attestation_inclusion_delay { suboptimal_inclusion.push(id); - debug!( - self.log, - "Potential sub-optimal inclusion delay"; - "optimal" => spec.min_attestation_inclusion_delay, - "delay" => inclusion_delay, - "epoch" => prev_epoch, - "validator" => id, - ); + if self.individual_tracking() { + debug!( + self.log, + "Potential sub-optimal inclusion delay"; + "optimal" => spec.min_attestation_inclusion_delay, + "delay" => inclusion_delay, + "epoch" => prev_epoch, + "validator" => id, + ); + } } - metrics::set_int_gauge( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE, - &[id], - inclusion_delay as i64, - ); + if self.individual_tracking() { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ON_CHAIN_INCLUSION_DISTANCE, + &[id], + inclusion_delay as i64, + ); + } } // Indicates the number of sync committee signatures that made it into @@ -536,13 +613,19 @@ impl ValidatorMonitor { let current_epoch = epoch - 1; if let Some(sync_committee) = summary.sync_committee() { if sync_committee.contains(pubkey) { - metrics::set_int_gauge( - &metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE, - &[id], - 1, - ); + if self.individual_tracking() { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE, + &[id], + 1, + ); + } let epoch_summary = monitored_validator.summaries.read(); if let Some(summary) = epoch_summary.get(¤t_epoch) { + // This log is not gated by + // `self.individual_tracking()` since the number of + // logs that can be generated is capped by the size + // of the sync committee. info!( self.log, "Current epoch sync signatures"; @@ -552,7 +635,7 @@ impl ValidatorMonitor { "validator" => id, ); } - } else { + } else if self.individual_tracking() { metrics::set_int_gauge( &metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE, &[id], @@ -693,12 +776,17 @@ impl ValidatorMonitor { let id = &validator.id; let delay = get_block_delay_ms(seen_timestamp, block, slot_clock); - metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL, &[src, id]); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS, + &[src, label], + delay, + ); + }); info!( self.log, @@ -764,27 +852,31 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(*i) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL, - &[src, id], - ); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS, + &[src, label], + delay, + ); + }); - info!( - self.log, - "Unaggregated attestation"; - "head" => ?data.beacon_block_root, - "index" => %data.index, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %data.slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Unaggregated attestation"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_unaggregated_attestation(delay) @@ -848,27 +940,31 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(aggregator_index) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL, - &[src, id], - ); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS, + &[src, label], + delay, + ); + }); - info!( - self.log, - "Aggregated attestation"; - "head" => ?data.beacon_block_root, - "index" => %data.index, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %data.slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Aggregated attestation"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_aggregated_attestation(delay) @@ -879,27 +975,31 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(*i) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL, - &[src, id], - ); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS, + &[src, label], + delay, + ); + }); - info!( - self.log, - "Attestation included in aggregate"; - "head" => ?data.beacon_block_root, - "index" => %data.index, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %data.slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Attestation included in aggregate"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %data.slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_aggregate_attestation_inclusion() @@ -933,26 +1033,31 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(*i) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL, - &["block", id], - ); - metrics::set_int_gauge( - &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS, - &["block", id], - delay.as_u64() as i64, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL, + &["block", label], + ); + }); - info!( - self.log, - "Attestation included in block"; - "head" => ?data.beacon_block_root, - "index" => %data.index, - "inclusion_lag" => format!("{} slot(s)", delay), - "epoch" => %epoch, - "slot" => %data.slot, - "validator" => %id, - ); + if self.individual_tracking() { + metrics::set_int_gauge( + &metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS, + &["block", id], + delay.as_u64() as i64, + ); + + info!( + self.log, + "Attestation included in block"; + "head" => ?data.beacon_block_root, + "index" => %data.index, + "inclusion_lag" => format!("{} slot(s)", delay), + "epoch" => %epoch, + "slot" => %data.slot, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_attestation_block_inclusion(inclusion_distance) @@ -1010,26 +1115,30 @@ impl ValidatorMonitor { slot_clock, ); - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_TOTAL, - &[src, id], - ); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_DELAY_SECONDS, + &[src, label], + delay, + ); + }); - info!( - self.log, - "Sync committee message"; - "head" => %sync_committee_message.beacon_block_root, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %sync_committee_message.slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Sync committee message"; + "head" => %sync_committee_message.beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %sync_committee_message.slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_sync_committee_message(delay) @@ -1094,26 +1203,30 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(aggregator_index) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_TOTAL, - &[src, id], - ); - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_DELAY_SECONDS, - &[src, id], - delay, - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_TOTAL, + &[src, label], + ); + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_DELAY_SECONDS, + &[src, label], + delay, + ); + }); - info!( - self.log, - "Sync contribution"; - "head" => %beacon_block_root, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Sync contribution"; + "head" => %beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_sync_committee_contribution(delay) @@ -1124,21 +1237,25 @@ impl ValidatorMonitor { if let Some(validator) = self.validators.get(validator_pubkey) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_CONTRIBUTION_TOTAL, - &[src, id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_CONTRIBUTION_TOTAL, + &[src, label], + ); + }); - info!( - self.log, - "Sync signature included in contribution"; - "head" => %beacon_block_root, - "delay_ms" => %delay.as_millis(), - "epoch" => %epoch, - "slot" => %slot, - "src" => src, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Sync signature included in contribution"; + "head" => %beacon_block_root, + "delay_ms" => %delay.as_millis(), + "epoch" => %epoch, + "slot" => %slot, + "src" => src, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_sync_signature_contribution_inclusion() @@ -1160,19 +1277,23 @@ impl ValidatorMonitor { if let Some(validator) = self.validators.get(validator_pubkey) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_BLOCK_TOTAL, - &["block", id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_BLOCK_TOTAL, + &["block", label], + ); + }); - info!( - self.log, - "Sync signature included in block"; - "head" => %beacon_block_root, - "epoch" => %epoch, - "slot" => %slot, - "validator" => %id, - ); + if self.individual_tracking() { + info!( + self.log, + "Sync signature included in block"; + "head" => %beacon_block_root, + "epoch" => %epoch, + "slot" => %slot, + "validator" => %id, + ); + } validator.with_epoch_summary(epoch, |summary| { summary.register_sync_signature_block_inclusions(); @@ -1201,8 +1322,12 @@ impl ValidatorMonitor { let id = &validator.id; let epoch = exit.epoch; - metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_EXIT_TOTAL, &[src, id]); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_EXIT_TOTAL, &[src, label]); + }); + // Not gated behind `self.individual_tracking()` since it's an + // infrequent and interesting message. info!( self.log, "Voluntary exit"; @@ -1240,11 +1365,15 @@ impl ValidatorMonitor { if let Some(validator) = self.get_validator(proposer) { let id = &validator.id; - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL, - &[src, id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL, + &[src, label], + ); + }); + // Not gated behind `self.individual_tracking()` since it's an + // infrequent and interesting message. crit!( self.log, "Proposer slashing"; @@ -1293,11 +1422,15 @@ impl ValidatorMonitor { let id = &validator.id; let epoch = data.slot.epoch(T::slots_per_epoch()); - metrics::inc_counter_vec( - &metrics::VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL, - &[src, id], - ); + self.aggregatable_metric(id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL, + &[src, label], + ); + }); + // Not gated behind `self.individual_tracking()` since it's an + // infrequent and interesting message. crit!( self.log, "Attester slashing"; @@ -1347,69 +1480,80 @@ impl ValidatorMonitor { /* * Attestations */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL, - &[id], - summary.attestations as i64, - ); if let Some(delay) = summary.attestation_min_delay { - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS, - &[id], - delay, - ); + self.aggregatable_metric(id, |tag| { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS, + &[tag], + delay, + ); + }); } - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS, - &[id], - summary.attestation_aggregate_inclusions as i64, - ); - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS, - &[id], - summary.attestation_block_inclusions as i64, - ); - if let Some(distance) = summary.attestation_min_block_inclusion_distance { + if self.individual_tracking() { metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE, + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL, &[id], - distance.as_u64() as i64, + summary.attestations as i64, ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS, + &[id], + summary.attestation_aggregate_inclusions as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS, + &[id], + summary.attestation_block_inclusions as i64, + ); + + if let Some(distance) = summary.attestation_min_block_inclusion_distance { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE, + &[id], + distance.as_u64() as i64, + ); + } } /* * Sync committee messages */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_TOTAL, - &[id], - summary.sync_committee_messages as i64, - ); if let Some(delay) = summary.sync_committee_message_min_delay { - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_MIN_DELAY_SECONDS, + self.aggregatable_metric(id, |tag| { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_MIN_DELAY_SECONDS, + &[tag], + delay, + ); + }); + } + if self.individual_tracking() { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_TOTAL, &[id], - delay, + summary.sync_committee_messages as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_INCLUSIONS, + &[id], + summary.sync_signature_contribution_inclusions as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_SIGNATURE_BLOCK_INCLUSIONS, + &[id], + summary.sync_signature_block_inclusions as i64, ); } - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_INCLUSIONS, - &[id], - summary.sync_signature_contribution_inclusions as i64, - ); - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_SIGNATURE_BLOCK_INCLUSIONS, - &[id], - summary.sync_signature_block_inclusions as i64, - ); /* * Sync contributions */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTIONS_TOTAL, - &[id], - summary.sync_contributions as i64, - ); + if self.individual_tracking() { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTIONS_TOTAL, + &[id], + summary.sync_contributions as i64, + ); + } if let Some(delay) = summary.sync_contribution_min_delay { metrics::observe_timer_vec( &metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_MIN_DELAY_SECONDS, @@ -1421,51 +1565,61 @@ impl ValidatorMonitor { /* * Blocks */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL, - &[id], - summary.blocks as i64, - ); - if let Some(delay) = summary.block_min_delay { - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS, + if self.individual_tracking() { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL, &[id], - delay, + summary.blocks as i64, ); } + if let Some(delay) = summary.block_min_delay { + self.aggregatable_metric(id, |tag| { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS, + &[tag], + delay, + ); + }); + } /* * Aggregates */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL, - &[id], - summary.aggregates as i64, - ); - if let Some(delay) = summary.aggregate_min_delay { - metrics::observe_timer_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS, + if self.individual_tracking() { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL, &[id], - delay, + summary.aggregates as i64, ); } + if let Some(delay) = summary.aggregate_min_delay { + self.aggregatable_metric(id, |tag| { + metrics::observe_timer_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS, + &[tag], + delay, + ); + }); + } /* * Other */ - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL, - &[id], - summary.exits as i64, - ); - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL, - &[id], - summary.proposer_slashings as i64, - ); - metrics::set_gauge_vec( - &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL, - &[id], - summary.attester_slashings as i64, - ); + if self.individual_tracking() { + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL, + &[id], + summary.exits as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL, + &[id], + summary.proposer_slashings as i64, + ); + metrics::set_gauge_vec( + &metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL, + &[id], + summary.attester_slashings as i64, + ); + } } } } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b2fc7a6402..8a6ea9cfe1 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -5,6 +5,7 @@ use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::test_utils::{ test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; +use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::{ historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, ChainConfig, NotifyExecutionLayer, @@ -2121,7 +2122,7 @@ async fn weak_subjectivity_sync() { log.clone(), 1, ))) - .monitor_validators(true, vec![], log) + .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log) .build() .expect("should build"), ); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f3e937b2e5..3b016ebda9 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -173,6 +173,7 @@ where .monitor_validators( config.validator_monitor_auto, config.validator_monitor_pubkeys.clone(), + config.validator_monitor_individual_tracking_threshold, runtime_context .service_context("val_mon".to_string()) .log() diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 0a2997762a..22b868256a 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,3 +1,4 @@ +use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use directory::DEFAULT_ROOT_DIR; use environment::LoggerConfig; use network::NetworkConfig; @@ -59,6 +60,11 @@ pub struct Config { pub validator_monitor_auto: bool, /// A list of validator pubkeys to monitor. pub validator_monitor_pubkeys: Vec, + /// Once the number of monitored validators goes above this threshold, we + /// will stop tracking metrics on a per-validator basis. This prevents large + /// validator counts causing infeasibly high cardinailty for Prometheus and + /// high log volumes. + pub validator_monitor_individual_tracking_threshold: usize, #[serde(skip)] /// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined /// via the CLI at runtime, instead of from a configuration file saved to disk. @@ -97,6 +103,7 @@ impl Default for Config { slasher: None, validator_monitor_auto: false, validator_monitor_pubkeys: vec![], + validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, logger_config: LoggerConfig::default(), } } diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 30f030eba7..9e1c9f51bc 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -2,6 +2,7 @@ use super::*; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, + validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChain, }; use futures::prelude::*; @@ -75,7 +76,7 @@ impl TestBeaconChain { Duration::from_millis(SLOT_DURATION_MILLIS), )) .shutdown_sender(shutdown_tx) - .monitor_validators(true, vec![], log) + .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log) .build() .expect("should build"), ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 915872e2b5..38d81512e4 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -753,6 +753,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .value_name("PATH") .takes_value(true) ) + .arg( + Arg::with_name("validator-monitor-individual-tracking-threshold") + .long("validator-monitor-individual-tracking-threshold") + .help("Once the validator monitor reaches this number of local validators \ + it will stop collecting per-validator Prometheus metrics and issuing \ + per-validator logs. Instead, it will provide aggregate metrics and logs. \ + This avoids infeasibly high cardinality in the Prometheus database and \ + high log volume when using many validators. Defaults to 64.") + .value_name("INTEGER") + .takes_value(true) + ) .arg( Arg::with_name("disable-lock-timeouts") .long("disable-lock-timeouts") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 241a5952ee..294568cca9 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -675,6 +675,12 @@ pub fn get_config( .extend_from_slice(&pubkeys); } + if let Some(count) = + clap_utils::parse_optional(cli_args, "validator-monitor-individual-tracking-threshold")? + { + client_config.validator_monitor_individual_tracking_threshold = count; + } + if cli_args.is_present("disable-lock-timeouts") { client_config.chain.enable_lock_timeouts = false; } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index bd96885c68..4a2e160e8b 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1237,6 +1237,31 @@ fn validator_monitor_file_flag() { assert_eq!(config.validator_monitor_pubkeys[1].to_string(), "0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); }); } +#[test] +fn validator_monitor_metrics_threshold_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.validator_monitor_individual_tracking_threshold, + // If this value changes make sure to update the help text for + // the CLI command. + 64 + ) + }); +} +#[test] +fn validator_monitor_metrics_threshold_custom() { + CommandLineTest::new() + .flag( + "validator-monitor-individual-tracking-threshold", + Some("42"), + ) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.validator_monitor_individual_tracking_threshold, 42) + }); +} // Tests for Store flags. #[test]