Add missed blocks to monitored validators (#4731)

* add missed_block metric

* init missed_block in constructor

* declare beaconproposercache in ValidatorMonitor

* refacto proposer_shuffling_decision_root to use epoch instead of current.epoch

* imple new proposer_shuffling_decision_root in callers

* push missed_blocks

* prune missed_blocks

* only add to hashmap if it's a monitored validator

* remove current_epoch dup + typos

* extract in func

* add prom metrics

* checkpoint is not only epoch but slot as well

* add safeguard if we start a new chain at slot 0

* clean

* remove unnecessary negative value for a slot

* typo in comment

* remove unused current_epoch

* share beacon_proposer_cache between validator_monitor and beacon_chain

* pass Hash256::zero()

* debug objects

* fix loop: lag is at the head

* sed s/get_slot/get_epoch

* fewer calls to cache.get_epoch

* fix typos

* remove cache first call

* export TYPICAL_SLOTS_PER_EPOCH and use it in validator_monitor

* switch to gauge & loop over missed_blocks hashset

* fix subnet_service tests

* remove unused var

* clean + fix nits

* add beacon_proposer_cache + validator_monitor in builder

* fix store_tests

* fix builder tests

* add tests

* add validator monitor set of tests

* clean tests

* nits

* optimise imports

* lint

* typo

* added self.aggregatable

* duplicate proposer_shuffling_decision_root

* remove duplication in passing beacon_proposer_cache

* remove duplication in passing beacon_proposer_cache

* using indices

* fmt

* implement missed blocks total

* nits

* avoid heap allocation

* remove recursion limit

* fix lint

* Fix valdiator monitor builder pattern

Unify validator monitor config struct

* renaming metrics

* renaming metrics in validator monitor

* add log if there's a missing validator index

* consistent log

* fix loop

* better loop

* move gauge to counter

* fmt

* add error message

* lint

* fix prom metrics

* set gauge to 0 when non-finalized epochs

* better wording

* remove hash256::zero in favour of block_root

* fix gauge total label

* fix last missed block validator

* Add `MissedBlock` struct

* Fix comment

* Refactor non-finalized block loop

* Fix off-by-one

* Avoid string allocation

* Fix compile error

* Remove non-finalized blocks metric

* fix func clojure

* remove unused variable

* remove unused DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD

* remove unused DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD in builder

* add validator index depending on the fork name

* typos

---------

Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
Joel Rousseau
2023-11-09 04:05:14 +00:00
committed by GitHub
parent bcca88a150
commit ac8811afac
16 changed files with 575 additions and 90 deletions

View File

@@ -2,10 +2,14 @@
//!
//! This component should not affect consensus.
use crate::beacon_proposer_cache::{BeaconProposerCache, TYPICAL_SLOTS_PER_EPOCH};
use crate::metrics;
use parking_lot::RwLock;
use slog::{crit, debug, info, Logger};
use itertools::Itertools;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use smallvec::SmallVec;
use state_processing::per_epoch_processing::{
errors::EpochProcessingError, EpochProcessingSummary,
};
@@ -14,6 +18,7 @@ use std::convert::TryFrom;
use std::io;
use std::marker::PhantomData;
use std::str::Utf8Error;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::AbstractExecPayload;
use types::{
@@ -35,7 +40,34 @@ pub const HISTORIC_EPOCHS: usize = 10;
/// Once the validator monitor reaches this number of validators it will stop
/// tracking their metrics/logging individually in an effort to reduce
/// Prometheus cardinality and log volume.
pub const DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD: usize = 64;
const DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD: usize = 64;
/// Lag slots used in detecting missed blocks for the monitored validators
pub const MISSED_BLOCK_LAG_SLOTS: usize = 4;
/// The number of epochs to look back when determining if a validator has missed a block. This value is used with
/// the beacon_proposer_cache to determine if a validator has missed a block.
/// And so, setting this value to anything higher than 1 is likely going to be problematic because the beacon_proposer_cache
/// is only populated for the current and the previous epoch.
pub const MISSED_BLOCK_LOOKBACK_EPOCHS: u64 = 1;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
// Initial configuration values for the `ValidatorMonitor`.
pub struct ValidatorMonitorConfig {
pub auto_register: bool,
pub validators: Vec<PublicKeyBytes>,
pub individual_tracking_threshold: usize,
}
impl Default for ValidatorMonitorConfig {
fn default() -> Self {
Self {
auto_register: false,
validators: vec![],
individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD,
}
}
}
#[derive(Debug)]
pub enum Error {
@@ -323,6 +355,13 @@ impl MonitoredValidator {
}
}
#[derive(PartialEq, Hash, Eq)]
struct MissedBlock {
slot: Slot,
parent_root: Hash256,
validator_index: u64,
}
/// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P
/// network, HTTP API and `BeaconChain`.
///
@@ -343,26 +382,37 @@ pub struct ValidatorMonitor<T> {
/// large validator counts causing infeasibly high cardinailty for
/// Prometheus and high log volumes.
individual_tracking_threshold: usize,
/// A Map representing the (non-finalized) missed blocks by epoch, validator_index(state.validators) and slot
missed_blocks: HashSet<MissedBlock>,
// A beacon proposer cache
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
log: Logger,
_phantom: PhantomData<T>,
}
impl<T: EthSpec> ValidatorMonitor<T> {
pub fn new(
pubkeys: Vec<PublicKeyBytes>,
auto_register: bool,
individual_tracking_threshold: usize,
config: ValidatorMonitorConfig,
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
log: Logger,
) -> Self {
let ValidatorMonitorConfig {
auto_register,
validators,
individual_tracking_threshold,
} = config;
let mut s = Self {
validators: <_>::default(),
indices: <_>::default(),
auto_register,
individual_tracking_threshold,
missed_blocks: <_>::default(),
beacon_proposer_cache,
log,
_phantom: PhantomData,
};
for pubkey in pubkeys {
for pubkey in validators {
s.add_validator_pubkey(pubkey)
}
s
@@ -411,6 +461,9 @@ impl<T: EthSpec> ValidatorMonitor<T> {
self.indices.insert(i, validator.pubkey);
});
// Add missed non-finalized blocks for the monitored validators
self.add_validators_missed_blocks(state);
// Update metrics for individual validators.
for monitored_validator in self.validators.values() {
if let Some(i) = monitored_validator.index {
@@ -489,6 +542,116 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}
}
// Prune missed blocks that are prior to last finalized epochs - MISSED_BLOCK_LOOKBACK_EPOCHS
let finalized_epoch = state.finalized_checkpoint().epoch;
self.missed_blocks.retain(|missed_block| {
let epoch = missed_block.slot.epoch(T::slots_per_epoch());
epoch + Epoch::new(MISSED_BLOCK_LOOKBACK_EPOCHS) >= finalized_epoch
});
}
/// Add missed non-finalized blocks for the monitored validators
fn add_validators_missed_blocks(&mut self, state: &BeaconState<T>) {
// Define range variables
let current_slot = state.slot();
let current_epoch = current_slot.epoch(T::slots_per_epoch());
// start_slot needs to be coherent with what can be retrieved from the beacon_proposer_cache
let start_slot = current_epoch.start_slot(T::slots_per_epoch())
- Slot::new(MISSED_BLOCK_LOOKBACK_EPOCHS * T::slots_per_epoch());
let end_slot = current_slot.saturating_sub(MISSED_BLOCK_LAG_SLOTS).as_u64();
// List of proposers per epoch from the beacon_proposer_cache
let mut proposers_per_epoch: Option<SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> = None;
for (prev_slot, slot) in (start_slot.as_u64()..=end_slot)
.map(Slot::new)
.tuple_windows()
{
// Condition for missed_block is defined such as block_root(slot) == block_root(slot - 1)
// where the proposer who missed the block is the proposer of the block at block_root(slot)
if let (Ok(block_root), Ok(prev_block_root)) =
(state.get_block_root(slot), state.get_block_root(prev_slot))
{
// Found missed block
if block_root == prev_block_root {
let slot_epoch = slot.epoch(T::slots_per_epoch());
let prev_slot_epoch = prev_slot.epoch(T::slots_per_epoch());
if let Ok(shuffling_decision_block) =
state.proposer_shuffling_decision_root_at_epoch(slot_epoch, *block_root)
{
// Only update the cache if it needs to be initialised or because
// slot is at epoch + 1
if proposers_per_epoch.is_none() || slot_epoch != prev_slot_epoch {
proposers_per_epoch = self.get_proposers_by_epoch_from_cache(
slot_epoch,
shuffling_decision_block,
);
}
// Only add missed blocks for the proposer if it's in the list of monitored validators
let slot_in_epoch = slot % T::slots_per_epoch();
if let Some(proposer_index) = proposers_per_epoch
.as_deref()
.and_then(|proposers| proposers.get(slot_in_epoch.as_usize()))
{
let i = *proposer_index as u64;
if let Some(pub_key) = self.indices.get(&i) {
if let Some(validator) = self.validators.get(pub_key) {
let missed_block = MissedBlock {
slot,
parent_root: *prev_block_root,
validator_index: i,
};
// Incr missed block counter for the validator only if it doesn't already exist in the hashset
if self.missed_blocks.insert(missed_block) {
self.aggregatable_metric(&validator.id, |label| {
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_MISSED_BLOCKS_TOTAL,
&[label],
);
});
error!(
self.log,
"Validator missed a block";
"index" => i,
"slot" => slot,
"parent block root" => ?prev_block_root,
);
}
} else {
warn!(
self.log,
"Missing validator index";
"info" => "potentially inconsistency in the validator manager",
"index" => i,
)
}
}
} else {
debug!(
self.log,
"Could not get proposers for from cache";
"epoch" => ?slot_epoch
);
}
}
}
}
}
}
fn get_proposers_by_epoch_from_cache(
&mut self,
epoch: Epoch,
shuffling_decision_block: Hash256,
) -> Option<SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>> {
let mut cache = self.beacon_proposer_cache.lock();
cache
.get_epoch::<T>(shuffling_decision_block, epoch)
.cloned()
}
/// Run `func` with the `TOTAL_LABEL` and optionally the
@@ -822,6 +985,17 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}
pub fn get_monitored_validator_missed_block_count(&self, validator_index: u64) -> u64 {
self.missed_blocks
.iter()
.filter(|missed_block| missed_block.validator_index == validator_index)
.count() as u64
}
pub fn get_beacon_proposer_cache(&self) -> Arc<Mutex<BeaconProposerCache>> {
self.beacon_proposer_cache.clone()
}
/// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`.
/// Otherwise, do nothing.
pub fn auto_register_local_validator(&mut self, validator_index: u64) {