Detailed validator monitoring (#2151)

## Issue Addressed

- Resolves #2064

## Proposed Changes

Adds a `ValidatorMonitor` struct which provides additional logging and Grafana metrics for specific validators.

Use `lighthouse bn --validator-monitor` to automatically enable monitoring for any validator that hits the [subnet subscription](https://ethereum.github.io/eth2.0-APIs/#/Validator/prepareBeaconCommitteeSubnet) HTTP API endpoint.

Also, use `lighthouse bn --validator-monitor-pubkeys` to supply a list of validators which will always be monitored.

See the new docs included in this PR for more info.

## TODO

- [x] Track validator balance, `slashed` status, etc.
- [x] ~~Register slashings in current epoch, not offense epoch~~
- [ ] Publish Grafana dashboard, update TODO link in docs
- [x] ~~#2130 is merged into this branch, resolve that~~
This commit is contained in:
Paul Hauner
2021-01-20 19:19:38 +00:00
parent 1eb0915301
commit 2b2a358522
29 changed files with 1646 additions and 37 deletions

View File

@@ -778,6 +778,11 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
&self.attestation
}
/// Returns the wrapped `indexed_attestation`.
pub fn indexed_attestation(&self) -> &IndexedAttestation<T::EthSpec> {
&self.indexed_attestation
}
/// Returns a mutable reference to the underlying attestation.
///
/// Only use during testing since modifying the `IndexedAttestation` can cause the attestation

View File

@@ -23,6 +23,9 @@ use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
ValidatorMonitor, HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
@@ -242,6 +245,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) graffiti: Graffiti,
/// Optional slasher.
pub slasher: Option<Arc<Slasher<T::EthSpec>>>,
/// Provides monitoring of a set of explicitly defined validators.
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
}
type BeaconBlockAndState<T> = (BeaconBlock<T>, BeaconState<T>);
@@ -1609,6 +1614,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), &state);
let validator_monitor = self.validator_monitor.read();
// Register each attestation in the block with the fork choice service.
for attestation in &block.body.attestations[..] {
let _fork_choice_attestation_timer =
@@ -1626,8 +1637,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()),
Err(e) => Err(BlockError::BeaconChainError(e.into())),
}?;
// Only register this with the validator monitor when the block is sufficiently close to
// the current slot.
if VALIDATOR_MONITOR_HISTORIC_EPOCHS as u64 * T::EthSpec::slots_per_epoch()
+ block.slot.as_u64()
>= current_slot.as_u64()
{
validator_monitor.register_attestation_in_block(
&indexed_attestation,
&block,
&self.spec,
);
}
}
for exit in &block.body.voluntary_exits {
validator_monitor.register_block_voluntary_exit(&exit.message)
}
for slashing in &block.body.attester_slashings {
validator_monitor.register_block_attester_slashing(slashing)
}
for slashing in &block.body.proposer_slashings {
validator_monitor.register_block_proposer_slashing(slashing)
}
drop(validator_monitor);
metrics::observe(
&metrics::OPERATIONS_PER_BLOCK_ATTESTATION,
block.body.attestations.len() as f64,

View File

@@ -6,6 +6,7 @@ use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::ValidatorMonitor;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::ChainConfig;
use crate::{
@@ -26,8 +27,8 @@ use std::sync::Arc;
use std::time::Duration;
use store::{HotColdDB, ItemStore};
use types::{
BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, Signature, SignedBeaconBlock,
Slot,
BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature,
SignedBeaconBlock, Slot,
};
pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz";
@@ -88,6 +89,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
log: Option<Logger>,
graffiti: Graffiti,
slasher: Option<Arc<Slasher<T::EthSpec>>>,
validator_monitor: Option<ValidatorMonitor<T::EthSpec>>,
}
impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
@@ -126,6 +128,7 @@ where
log: None,
graffiti: Graffiti::default(),
slasher: None,
validator_monitor: None,
}
}
@@ -170,8 +173,8 @@ where
/// Sets the logger.
///
/// Should generally be called early in the build chain.
pub fn logger(mut self, logger: Logger) -> Self {
self.log = Some(logger);
pub fn logger(mut self, log: Logger) -> Self {
self.log = Some(log);
self
}
@@ -391,6 +394,23 @@ where
self
}
/// Register some validators for additional monitoring.
///
/// `validators` is a comma-separated string of 0x-formatted BLS pubkeys.
pub fn monitor_validators(
mut self,
auto_register: bool,
validators: Vec<PublicKeyBytes>,
log: Logger,
) -> Self {
self.validator_monitor = Some(ValidatorMonitor::new(
validators,
auto_register,
log.clone(),
));
self
}
/// Consumes `self`, returning a `BeaconChain` if all required parameters have been supplied.
///
/// An error will be returned at runtime if all required parameters have not been configured.
@@ -418,6 +438,9 @@ where
let genesis_state_root = self
.genesis_state_root
.ok_or("Cannot build without a genesis state root")?;
let mut validator_monitor = self
.validator_monitor
.ok_or("Cannot build without a validator monitor")?;
let current_slot = if slot_clock
.is_prior_to_genesis()
@@ -496,6 +519,13 @@ where
log.clone(),
);
if let Some(slot) = slot_clock.now() {
validator_monitor.process_valid_state(
slot.epoch(TEthSpec::slots_per_epoch()),
&canonical_head.beacon_state,
);
}
let beacon_chain = BeaconChain {
spec: self.spec,
config: self.chain_config,
@@ -538,6 +568,7 @@ where
log: log.clone(),
graffiti: self.graffiti,
slasher: self.slasher.clone(),
validator_monitor: RwLock::new(validator_monitor),
};
let head = beacon_chain
@@ -706,6 +737,7 @@ mod test {
.testing_slot_clock(Duration::from_secs(1))
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log.clone())
.build()
.expect("should build");

View File

@@ -23,6 +23,7 @@ mod shuffling_cache;
mod snapshot_cache;
pub mod test_utils;
mod timeout_rw_lock;
pub mod validator_monitor;
mod validator_pubkey_cache;
pub use self::beacon_chain::{

View File

@@ -355,6 +355,223 @@ lazy_static! {
);
}
// Third lazy-static block is used to account for macro recursion limit.
lazy_static! {
/*
* Validator Monitor Metrics (balances, etc)
*/
pub static ref VALIDATOR_MONITOR_BALANCE_GWEI: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_balance_gwei",
"The validator's balance in gwei.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_EFFECTIVE_BALANCE_GWEI: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_effective_balance_gwei",
"The validator's effective balance in gwei.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_SLASHED: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_slashed",
"Set to 1 if the validator is slashed.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_ACTIVE: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_active",
"Set to 1 if the validator is active.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_EXITED: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_exited",
"Set to 1 if the validator is exited.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_WITHDRAWABLE: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_withdrawable",
"Set to 1 if the validator is withdrawable.",
&["validator"]
);
pub static ref VALIDATOR_ACTIVATION_ELIGIBILITY_EPOCH: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_activation_eligibility_epoch",
"Set to the epoch where the validator will be eligible for activation.",
&["validator"]
);
pub static ref VALIDATOR_ACTIVATION_EPOCH: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_activation_epoch",
"Set to the epoch where the validator will activate.",
&["validator"]
);
pub static ref VALIDATOR_EXIT_EPOCH: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_exit_epoch",
"Set to the epoch where the validator will exit.",
&["validator"]
);
pub static ref VALIDATOR_WITHDRAWABLE_EPOCH: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_withdrawable_epoch",
"Set to the epoch where the validator will be withdrawable.",
&["validator"]
);
/*
* Validator Monitor Metrics (per-epoch summaries)
*/
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attestations_total",
"The number of unagg. attestations seen in the previous epoch.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS: Result<HistogramVec> =
try_create_histogram_vec(
"validator_monitor_prev_epoch_attestations_min_delay_seconds",
"The min delay between when the validator should send the attestation and when it was received.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attestation_aggregate_inclusions",
"The count of times an attestation was seen inside an aggregate.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attestation_block_inclusions",
"The count of times an attestation was seen inside a block.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attestation_block_min_inclusion_distance",
"The minimum inclusion distance observed for the inclusion of an attestation in a block.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_beacon_blocks_total",
"The number of beacon_blocks seen in the previous epoch.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS: Result<HistogramVec> =
try_create_histogram_vec(
"validator_monitor_prev_epoch_beacon_blocks_min_delay_seconds",
"The min delay between when the validator should send the block and when it was received.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_aggregates_total",
"The number of aggregates seen in the previous epoch.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS: Result<HistogramVec> =
try_create_histogram_vec(
"validator_monitor_prev_epoch_aggregates_min_delay_seconds",
"The min delay between when the validator should send the aggregate and when it was received.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_exits_total",
"The number of exits seen in the previous epoch.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_proposer_slashings_total",
"The number of proposer slashings seen in the previous epoch.",
&["validator"]
);
pub static ref VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL: Result<IntGaugeVec> =
try_create_int_gauge_vec(
"validator_monitor_prev_epoch_attester_slashings_total",
"The number of attester slashings seen in the previous epoch.",
&["validator"]
);
/*
* Validator Monitor Metrics (real-time)
*/
pub static ref VALIDATOR_MONITOR_VALIDATORS_TOTAL: Result<IntGauge> = try_create_int_gauge(
"validator_monitor_validators_total",
"Count of validators that are specifically monitored by this beacon node"
);
pub static ref VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_unaggregated_attestation_total",
"Number of unaggregated attestations seen",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS: Result<HistogramVec> = try_create_histogram_vec(
"validator_monitor_unaggregated_attestation_delay_seconds",
"The delay between when the validator should send the attestation and when it was received.",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_aggregated_attestation_total",
"Number of aggregated attestations seen",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS: Result<HistogramVec> = try_create_histogram_vec(
"validator_monitor_aggregated_attestation_delay_seconds",
"The delay between then the validator should send the aggregate and when it was received.",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_attestation_in_aggregate_total",
"Number of times an attestation has been seen in an aggregate",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS: Result<HistogramVec> = try_create_histogram_vec(
"validator_monitor_attestation_in_aggregate_delay_seconds",
"The delay between when the validator should send the aggregate and when it was received.",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_attestation_in_block_total",
"Number of times an attestation has been seen in a block",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS: Result<IntGaugeVec> = try_create_int_gauge_vec(
"validator_monitor_attestation_in_block_delay_slots",
"The excess slots (beyond the minimum delay) between the attestation slot and the block slot.",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_beacon_block_total",
"Number of beacon blocks seen",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS: Result<HistogramVec> = try_create_histogram_vec(
"validator_monitor_beacon_block_delay_seconds",
"The delay between when the validator should send the block and when it was received.",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_EXIT_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_exit_total",
"Number of beacon exits seen",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_proposer_slashing_total",
"Number of proposer slashings seen",
&["src", "validator"]
);
pub static ref VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_attester_slashing_total",
"Number of attester slashings seen",
&["src", "validator"]
);
}
/// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,
/// head state info, etc) and update the Prometheus `DEFAULT_REGISTRY`.
pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
@@ -382,6 +599,11 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
&OP_POOL_NUM_VOLUNTARY_EXITS,
beacon_chain.op_pool.num_voluntary_exits(),
);
beacon_chain
.validator_monitor
.read()
.scrape_metrics(&beacon_chain.slot_clock, &beacon_chain.spec);
}
/// Scrape the given `state` assuming it's the head state, updating the `DEFAULT_REGISTRY`.

View File

@@ -198,7 +198,11 @@ impl<E: EthSpec> BeaconChainHarness<EphemeralHarnessType<E>> {
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.chain_config(chain_config)
.event_handler(Some(ServerSentEventHandler::new_with_capacity(log, 1)))
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.monitor_validators(true, vec![], log)
.build()
.expect("should build");
@@ -243,6 +247,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
.testing_slot_clock(HARNESS_SLOT_TIME)
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log)
.build()
.expect("should build");
@@ -284,6 +289,7 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
.testing_slot_clock(Duration::from_secs(1))
.expect("should configure testing slot clock")
.shutdown_sender(shutdown_tx)
.monitor_validators(true, vec![], log)
.build()
.expect("should build");

View File

@@ -0,0 +1,953 @@
//! Provides detailed logging and metrics for a set of registered validators.
//!
//! This component should not affect consensus.
use crate::metrics;
use parking_lot::RwLock;
use slog::{crit, info, Logger};
use slot_clock::SlotClock;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::io;
use std::marker::PhantomData;
use std::str::Utf8Error;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use types::{
AttestationData, AttesterSlashing, BeaconBlock, BeaconState, ChainSpec, Epoch, EthSpec,
Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, Slot,
VoluntaryExit,
};
/// The validator monitor collects per-epoch data about each monitored validator. Historical data
/// will be kept around for `HISTORIC_EPOCHS` before it is pruned.
pub const HISTORIC_EPOCHS: usize = 4;
#[derive(Debug)]
pub enum Error {
InvalidPubkey(String),
FileError(io::Error),
InvalidUtf8(Utf8Error),
}
/// Contains data pertaining to one validator for one epoch.
#[derive(Default)]
struct EpochSummary {
/*
* Attestations with a target in the current epoch.
*/
/// The number of attestations seen.
pub attestations: usize,
/// The delay between when the attestation should have been produced and when it was observed.
pub attestation_min_delay: Option<Duration>,
/// The number of times a validators attestation was seen in an aggregate.
pub attestation_aggregate_incusions: usize,
/// The number of times a validators attestation was seen in a block.
pub attestation_block_inclusions: usize,
/// The minimum observed inclusion distance for an attestation for this epoch..
pub attestation_min_block_inclusion_distance: Option<Slot>,
/*
* Blocks with a slot in the current epoch.
*/
/// The number of blocks observed.
pub blocks: usize,
/// The delay between when the block should have been produced and when it was observed.
pub block_min_delay: Option<Duration>,
/*
* Aggregates with a target in the current epoch
*/
/// The number of signed aggregate and proofs observed.
pub aggregates: usize,
/// The delay between when the aggregate should have been produced and when it was observed.
pub aggregate_min_delay: Option<Duration>,
/*
* Others pertaining to this epoch.
*/
/// The number of voluntary exists observed.
pub exits: usize,
/// The number of proposer slashings observed.
pub proposer_slashings: usize,
/// The number of attester slashings observed.
pub attester_slashings: usize,
}
impl EpochSummary {
/// Update `current` if:
///
/// - It is `None`.
/// - `new` is greater than its current value.
fn update_if_lt<T: Ord>(current: &mut Option<T>, new: T) {
if let Some(ref mut current) = current {
if new < *current {
*current = new
}
} else {
*current = Some(new)
}
}
pub fn register_unaggregated_attestation(&mut self, delay: Duration) {
self.attestations += 1;
Self::update_if_lt(&mut self.attestation_min_delay, delay);
}
pub fn register_aggregated_attestation(&mut self, delay: Duration) {
self.aggregates += 1;
Self::update_if_lt(&mut self.aggregate_min_delay, delay);
}
pub fn register_aggregate_attestation_inclusion(&mut self) {
self.attestation_aggregate_incusions += 1;
}
pub fn register_attestation_block_inclusion(&mut self, delay: Slot) {
self.attestation_block_inclusions += 1;
Self::update_if_lt(&mut self.attestation_min_block_inclusion_distance, delay);
}
pub fn register_exit(&mut self) {
self.exits += 1;
}
pub fn register_proposer_slashing(&mut self) {
self.proposer_slashings += 1;
}
pub fn register_attester_slashing(&mut self) {
self.attester_slashings += 1;
}
}
type SummaryMap = HashMap<Epoch, EpochSummary>;
/// A validator that is being monitored by the `ValidatorMonitor`.
struct MonitoredValidator {
/// A human-readable identifier for the validator.
pub id: String,
/// The validator voting pubkey.
pub pubkey: PublicKeyBytes,
/// The validator index in the state.
pub index: Option<u64>,
/// A history of the validator over time.
pub summaries: RwLock<SummaryMap>,
}
impl MonitoredValidator {
fn new(pubkey: PublicKeyBytes, index: Option<u64>) -> Self {
Self {
id: index
.map(|i| i.to_string())
.unwrap_or_else(|| pubkey.to_string()),
pubkey,
index,
summaries: <_>::default(),
}
}
fn set_index(&mut self, validator_index: u64) {
if self.index.is_none() {
self.index = Some(validator_index);
self.id = validator_index.to_string();
}
}
/// Maps `func` across the `self.summaries`.
///
/// ## Warning
///
/// It is possible to deadlock this function by trying to obtain a lock on
/// `self.summary` inside `func`.
///
/// ## Notes
///
/// - If `epoch` doesn't exist in `self.summaries`, it is created.
/// - `self.summaries` may be pruned after `func` is run.
fn with_epoch_summary<F>(&self, epoch: Epoch, func: F)
where
F: Fn(&mut EpochSummary),
{
let mut summaries = self.summaries.write();
func(summaries.entry(epoch).or_default());
// Prune
while summaries.len() > HISTORIC_EPOCHS {
if let Some(key) = summaries.iter().map(|(epoch, _)| *epoch).min() {
summaries.remove(&key);
}
}
}
}
/// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P
/// network, HTTP API and `BeaconChain`.
///
/// If any of the events pertain to a `MonitoredValidator`, additional logging and metrics will be
/// performed.
///
/// The intention of this struct is to provide users with more logging and Prometheus metrics around
/// validators that they are interested in.
pub struct ValidatorMonitor<T> {
/// The validators that require additional monitoring.
validators: HashMap<PublicKeyBytes, MonitoredValidator>,
/// A map of validator index (state.validators) to a validator public key.
indices: HashMap<u64, PublicKeyBytes>,
/// If true, allow the automatic registration of validators.
auto_register: bool,
log: Logger,
_phantom: PhantomData<T>,
}
impl<T: EthSpec> ValidatorMonitor<T> {
pub fn new(pubkeys: Vec<PublicKeyBytes>, auto_register: bool, log: Logger) -> Self {
let mut s = Self {
validators: <_>::default(),
indices: <_>::default(),
auto_register,
log,
_phantom: PhantomData,
};
for pubkey in pubkeys {
s.add_validator_pubkey(pubkey)
}
s
}
/// Add some validators to `self` for additional monitoring.
fn add_validator_pubkey(&mut self, pubkey: PublicKeyBytes) {
let index_opt = self
.indices
.iter()
.find(|(_, candidate_pk)| **candidate_pk == pubkey)
.map(|(index, _)| *index);
let log = self.log.clone();
self.validators.entry(pubkey).or_insert_with(|| {
info!(
log,
"Started monitoring validator";
"pubkey" => %pubkey,
);
MonitoredValidator::new(pubkey, index_opt)
});
}
/// Reads information from the given `state`. The `state` *must* be valid (i.e, able to be
/// imported).
pub fn process_valid_state(&mut self, current_epoch: Epoch, state: &BeaconState<T>) {
// Add any new validator indices.
state
.validators
.iter()
.enumerate()
.skip(self.indices.len())
.for_each(|(i, validator)| {
let i = i as u64;
if let Some(validator) = self.validators.get_mut(&validator.pubkey) {
validator.set_index(i)
}
self.indices.insert(i, validator.pubkey);
});
// Update metrics for individual validators.
for monitored_validator in self.validators.values() {
if let Some(i) = monitored_validator.index {
let i = i as usize;
let id = &monitored_validator.id;
if let Some(balance) = state.balances.get(i) {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_BALANCE_GWEI,
&[id],
*balance as i64,
);
}
if let Some(validator) = state.validators.get(i) {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_EFFECTIVE_BALANCE_GWEI,
&[id],
u64_to_i64(validator.effective_balance),
);
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_SLASHED,
&[id],
if validator.slashed { 1 } else { 0 },
);
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_ACTIVE,
&[id],
if validator.is_active_at(current_epoch) {
1
} else {
0
},
);
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_EXITED,
&[id],
if validator.is_exited_at(current_epoch) {
1
} else {
0
},
);
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_WITHDRAWABLE,
&[id],
if validator.is_withdrawable_at(current_epoch) {
1
} else {
0
},
);
metrics::set_int_gauge(
&metrics::VALIDATOR_ACTIVATION_ELIGIBILITY_EPOCH,
&[id],
u64_to_i64(validator.activation_eligibility_epoch),
);
metrics::set_int_gauge(
&metrics::VALIDATOR_ACTIVATION_EPOCH,
&[id],
u64_to_i64(validator.activation_epoch),
);
metrics::set_int_gauge(
&metrics::VALIDATOR_EXIT_EPOCH,
&[id],
u64_to_i64(validator.exit_epoch),
);
metrics::set_int_gauge(
&metrics::VALIDATOR_WITHDRAWABLE_EPOCH,
&[id],
u64_to_i64(validator.withdrawable_epoch),
);
}
}
}
}
fn get_validator_id(&self, validator_index: u64) -> Option<&str> {
self.indices
.get(&validator_index)
.and_then(|pubkey| self.validators.get(pubkey))
.map(|validator| validator.id.as_str())
}
fn get_validator(&self, validator_index: u64) -> Option<&MonitoredValidator> {
self.indices
.get(&validator_index)
.and_then(|pubkey| self.validators.get(pubkey))
}
/// Returns the number of validators monitored by `self`.
pub fn num_validators(&self) -> usize {
self.validators.len()
}
/// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`.
/// Otherwise, do nothing.
pub fn auto_register_local_validator(&mut self, validator_index: u64) {
if !self.auto_register {
return;
}
if let Some(pubkey) = self.indices.get(&validator_index) {
if !self.validators.contains_key(pubkey) {
info!(
self.log,
"Started monitoring validator";
"pubkey" => %pubkey,
"validator" => %validator_index,
);
self.validators.insert(
*pubkey,
MonitoredValidator::new(*pubkey, Some(validator_index)),
);
}
}
}
/// Returns the delay between the start of `block.slot` and `seen_timestamp`.
fn get_block_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
block: &BeaconBlock<T>,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(block.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Process a block received on gossip.
pub fn register_gossip_block<S: SlotClock>(
&self,
seen_timestamp: Duration,
block: &BeaconBlock<T>,
block_root: Hash256,
slot_clock: &S,
) {
self.register_beacon_block("gossip", seen_timestamp, block, block_root, slot_clock)
}
/// Process a block received on the HTTP API from a local validator.
pub fn register_api_block<S: SlotClock>(
&self,
seen_timestamp: Duration,
block: &BeaconBlock<T>,
block_root: Hash256,
slot_clock: &S,
) {
self.register_beacon_block("api", seen_timestamp, block, block_root, slot_clock)
}
fn register_beacon_block<S: SlotClock>(
&self,
src: &str,
seen_timestamp: Duration,
block: &BeaconBlock<T>,
block_root: Hash256,
slot_clock: &S,
) {
if let Some(id) = self.get_validator_id(block.proposer_index) {
let delay = Self::get_block_delay_ms(seen_timestamp, block, slot_clock);
metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_BEACON_BLOCK_TOTAL, &[src, id]);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_BEACON_BLOCK_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Block from API";
"root" => ?block_root,
"delay" => %delay.as_millis(),
"slot" => %block.slot,
"src" => src,
"validator" => %id,
);
}
}
/// Returns the duration between when the attestation `data` could be produced (1/3rd through
/// the slot) and `seen_timestamp`.
fn get_unaggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
let production_delay = slot_clock.slot_duration() / 3;
gross_delay.checked_sub(production_delay)
})
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Register an attestation seen on the gossip network.
pub fn register_gossip_unaggregated_attestation<S: SlotClock>(
&self,
seen_timestamp: Duration,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
self.register_unaggregated_attestation(
"gossip",
seen_timestamp,
indexed_attestation,
slot_clock,
)
}
/// Register an attestation seen on the HTTP API.
pub fn register_api_unaggregated_attestation<S: SlotClock>(
&self,
seen_timestamp: Duration,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
self.register_unaggregated_attestation(
"api",
seen_timestamp,
indexed_attestation,
slot_clock,
)
}
fn register_unaggregated_attestation<S: SlotClock>(
&self,
src: &str,
seen_timestamp: Duration,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_unaggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
indexed_attestation.attesting_indices.iter().for_each(|i| {
if let Some(validator) = self.get_validator(*i) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_TOTAL,
&[src, id],
);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_UNAGGREGATED_ATTESTATION_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Unaggregated attestation";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_unaggregated_attestation(delay)
});
}
})
}
/// Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd
/// through the slot) and `seen_timestamp`.
fn get_aggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
let production_delay = slot_clock.slot_duration() / 2;
gross_delay.checked_sub(production_delay)
})
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Register a `signed_aggregate_and_proof` seen on the gossip network.
pub fn register_gossip_aggregated_attestation<S: SlotClock>(
&self,
seen_timestamp: Duration,
signed_aggregate_and_proof: &SignedAggregateAndProof<T>,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
self.register_aggregated_attestation(
"gossip",
seen_timestamp,
signed_aggregate_and_proof,
indexed_attestation,
slot_clock,
)
}
/// Register a `signed_aggregate_and_proof` seen on the HTTP API.
pub fn register_api_aggregated_attestation<S: SlotClock>(
&self,
seen_timestamp: Duration,
signed_aggregate_and_proof: &SignedAggregateAndProof<T>,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
self.register_aggregated_attestation(
"api",
seen_timestamp,
signed_aggregate_and_proof,
indexed_attestation,
slot_clock,
)
}
fn register_aggregated_attestation<S: SlotClock>(
&self,
src: &str,
seen_timestamp: Duration,
signed_aggregate_and_proof: &SignedAggregateAndProof<T>,
indexed_attestation: &IndexedAttestation<T>,
slot_clock: &S,
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_aggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
let aggregator_index = signed_aggregate_and_proof.message.aggregator_index;
if let Some(validator) = self.get_validator(aggregator_index) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_TOTAL,
&[src, id],
);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_AGGREGATED_ATTESTATION_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Aggregated attestation";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_aggregated_attestation(delay)
});
}
indexed_attestation.attesting_indices.iter().for_each(|i| {
if let Some(validator) = self.get_validator(*i) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_TOTAL,
&[src, id],
);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_ATTESTATION_IN_AGGREGATE_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Attestation included in aggregate";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %data.slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_aggregate_attestation_inclusion()
});
}
})
}
/// Register that the `indexed_attestation` was included in a *valid* `BeaconBlock`.
pub fn register_attestation_in_block(
&self,
indexed_attestation: &IndexedAttestation<T>,
block: &BeaconBlock<T>,
spec: &ChainSpec,
) {
let data = &indexed_attestation.data;
let delay = (block.slot - data.slot) - spec.min_attestation_inclusion_delay;
let epoch = data.slot.epoch(T::slots_per_epoch());
indexed_attestation.attesting_indices.iter().for_each(|i| {
if let Some(validator) = self.get_validator(*i) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_TOTAL,
&["block", id],
);
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_ATTESTATION_IN_BLOCK_DELAY_SLOTS,
&["block", id],
delay.as_u64() as i64,
);
info!(
self.log,
"Attestation included in block";
"head" => ?data.beacon_block_root,
"index" => %data.index,
"inclusion_lag" => format!("{} slot(s)", delay),
"epoch" => %epoch,
"slot" => %data.slot,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_attestation_block_inclusion(delay)
});
}
})
}
/// Register an exit from the gossip network.
pub fn register_gossip_voluntary_exit(&self, exit: &VoluntaryExit) {
self.register_voluntary_exit("gossip", exit)
}
/// Register an exit from the HTTP API.
pub fn register_api_voluntary_exit(&self, exit: &VoluntaryExit) {
self.register_voluntary_exit("api", exit)
}
/// Register an exit included in a *valid* beacon block.
pub fn register_block_voluntary_exit(&self, exit: &VoluntaryExit) {
self.register_voluntary_exit("block", exit)
}
fn register_voluntary_exit(&self, src: &str, exit: &VoluntaryExit) {
if let Some(validator) = self.get_validator(exit.validator_index) {
let id = &validator.id;
let epoch = exit.epoch;
metrics::inc_counter_vec(&metrics::VALIDATOR_MONITOR_EXIT_TOTAL, &[src, id]);
info!(
self.log,
"Voluntary exit";
"epoch" => %epoch,
"validator" => %id,
"src" => src,
);
validator.with_epoch_summary(epoch, |summary| summary.register_exit());
}
}
/// Register a proposer slashing from the gossip network.
pub fn register_gossip_proposer_slashing(&self, slashing: &ProposerSlashing) {
self.register_proposer_slashing("gossip", slashing)
}
/// Register a proposer slashing from the HTTP API.
pub fn register_api_proposer_slashing(&self, slashing: &ProposerSlashing) {
self.register_proposer_slashing("api", slashing)
}
/// Register a proposer slashing included in a *valid* `BeaconBlock`.
pub fn register_block_proposer_slashing(&self, slashing: &ProposerSlashing) {
self.register_proposer_slashing("block", slashing)
}
fn register_proposer_slashing(&self, src: &str, slashing: &ProposerSlashing) {
let proposer = slashing.signed_header_1.message.proposer_index;
let slot = slashing.signed_header_1.message.slot;
let epoch = slot.epoch(T::slots_per_epoch());
let root_1 = slashing.signed_header_1.message.canonical_root();
let root_2 = slashing.signed_header_2.message.canonical_root();
if let Some(validator) = self.get_validator(proposer) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_PROPOSER_SLASHING_TOTAL,
&[src, id],
);
crit!(
self.log,
"Proposer slashing";
"root_2" => %root_2,
"root_1" => %root_1,
"slot" => %slot,
"validator" => %id,
"src" => src,
);
validator.with_epoch_summary(epoch, |summary| summary.register_proposer_slashing());
}
}
/// Register an attester slashing from the gossip network.
pub fn register_gossip_attester_slashing(&self, slashing: &AttesterSlashing<T>) {
self.register_attester_slashing("gossip", slashing)
}
/// Register an attester slashing from the HTTP API.
pub fn register_api_attester_slashing(&self, slashing: &AttesterSlashing<T>) {
self.register_attester_slashing("api", slashing)
}
/// Register an attester slashing included in a *valid* `BeaconBlock`.
pub fn register_block_attester_slashing(&self, slashing: &AttesterSlashing<T>) {
self.register_attester_slashing("block", slashing)
}
fn register_attester_slashing(&self, src: &str, slashing: &AttesterSlashing<T>) {
let data = &slashing.attestation_1.data;
let attestation_1_indices: HashSet<u64> = slashing
.attestation_1
.attesting_indices
.iter()
.copied()
.collect();
slashing
.attestation_2
.attesting_indices
.iter()
.filter(|index| attestation_1_indices.contains(index))
.filter_map(|index| self.get_validator(*index))
.for_each(|validator| {
let id = &validator.id;
let epoch = data.slot.epoch(T::slots_per_epoch());
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_ATTESTER_SLASHING_TOTAL,
&[src, id],
);
crit!(
self.log,
"Attester slashing";
"epoch" => %epoch,
"slot" => %data.slot,
"validator" => %id,
"src" => src,
);
validator.with_epoch_summary(epoch, |summary| summary.register_attester_slashing());
})
}
/// Scrape `self` for metrics.
///
/// Should be called whenever Prometheus is scraping Lighthouse.
pub fn scrape_metrics<S: SlotClock>(&self, slot_clock: &S, spec: &ChainSpec) {
metrics::set_gauge(
&metrics::VALIDATOR_MONITOR_VALIDATORS_TOTAL,
self.num_validators() as i64,
);
if let Some(slot) = slot_clock.now() {
let epoch = slot.epoch(T::slots_per_epoch());
let slot_in_epoch = slot % T::slots_per_epoch();
// Only start to report on the current epoch once we've progressed past the point where
// all attestation should be included in a block.
//
// This allows us to set alarms on Grafana to detect when an attestation has been
// missed. If we didn't delay beyond the attestation inclusion period then we could
// expect some occasional false-positives on attestation misses.
//
// I have chosen 3 as an arbitrary number where we *probably* shouldn't see that many
// skip slots on mainnet.
let previous_epoch = if slot_in_epoch > spec.min_attestation_inclusion_delay + 3 {
epoch - 1
} else {
epoch - 2
};
for (_, validator) in self.validators.iter() {
let id = &validator.id;
let summaries = validator.summaries.read();
if let Some(summary) = summaries.get(&previous_epoch) {
/*
* Attestations
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_TOTAL,
&[id],
summary.attestations as i64,
);
if let Some(delay) = summary.attestation_min_delay {
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATIONS_MIN_DELAY_SECONDS,
&[id],
delay,
);
}
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS,
&[id],
summary.attestation_aggregate_incusions as i64,
);
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS,
&[id],
summary.attestation_block_inclusions as i64,
);
if let Some(distance) = summary.attestation_min_block_inclusion_distance {
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_MIN_INCLUSION_DISTANCE,
&[id],
distance.as_u64() as i64,
);
}
/*
* Blocks
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_TOTAL,
&[id],
summary.blocks as i64,
);
if let Some(delay) = summary.block_min_delay {
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_BEACON_BLOCKS_MIN_DELAY_SECONDS,
&[id],
delay,
);
}
/*
* Aggregates
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_TOTAL,
&[id],
summary.aggregates as i64,
);
if let Some(delay) = summary.aggregate_min_delay {
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_AGGREGATES_MIN_DELAY_SECONDS,
&[id],
delay,
);
}
/*
* Other
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_EXITS_TOTAL,
&[id],
summary.exits as i64,
);
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_PROPOSER_SLASHINGS_TOTAL,
&[id],
summary.proposer_slashings as i64,
);
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTER_SLASHINGS_TOTAL,
&[id],
summary.attester_slashings as i64,
);
}
}
}
}
}
/// Returns the duration since the unix epoch.
pub fn timestamp_now() -> Duration {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0))
}
fn u64_to_i64(n: impl Into<u64>) -> i64 {
i64::try_from(n.into()).unwrap_or(i64::max_value())
}

View File

@@ -93,7 +93,7 @@ impl ValidatorPubkeyCache {
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.indices.insert(v.pubkey.clone(), i);
self.indices.insert(v.pubkey, i);
}
Ok(())