diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f2378b4f9e..2fd70056cc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -453,7 +453,7 @@ pub struct BeaconChain { /// A cache of eth1 deposit data at epoch boundaries for deposit finalization pub eth1_finalization_cache: TimeoutRwLock, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. - pub beacon_proposer_cache: Mutex, + pub beacon_proposer_cache: Arc>, /// Caches a map of `validator_index -> validator_pubkey`. pub(crate) validator_pubkey_cache: TimeoutRwLock>, /// A cache used when producing attestations. diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index eae71bd63e..b51592cadd 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -25,7 +25,7 @@ const CACHE_SIZE: usize = 16; /// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being /// incorrect is non-substantial from a consensus perspective (and probably also from a /// performance perspective). -const TYPICAL_SLOTS_PER_EPOCH: usize = 32; +pub const TYPICAL_SLOTS_PER_EPOCH: usize = 32; /// For some given slot, this contains the proposer index (`index`) and the `fork` that should be /// used to verify their signature. diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index fd8a3f0460..bffb23aeb7 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,5 @@ use crate::beacon_chain::{CanonicalHead, BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY}; +use crate::beacon_proposer_cache::BeaconProposerCache; use crate::data_availability_checker::DataAvailabilityChecker; use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::eth1_finalization_cache::Eth1FinalizationCache; @@ -10,7 +11,7 @@ use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; -use crate::validator_monitor::ValidatorMonitor; +use crate::validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::ChainConfig; use crate::{ @@ -23,10 +24,10 @@ use fork_choice::{ForkChoice, ResetPayloadStatuses}; use futures::channel::mpsc::Sender; use kzg::{Kzg, TrustedSetup}; use operation_pool::{OperationPool, PersistedOperationPool}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold}; use slasher::Slasher; -use slog::{crit, debug, error, info, Logger}; +use slog::{crit, debug, error, info, o, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::per_slot_processing; use std::marker::PhantomData; @@ -35,8 +36,8 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use types::{ - BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, - PublicKeyBytes, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, Signature, + SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -93,12 +94,12 @@ pub struct BeaconChainBuilder { log: Option, graffiti: Graffiti, slasher: Option>>, - validator_monitor: Option>, // Pending I/O batch that is constructed during building and should be executed atomically // alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called. pending_io_batch: Vec, trusted_setup: Option, task_executor: Option, + validator_monitor_config: Option, } impl @@ -135,10 +136,10 @@ where log: None, graffiti: Graffiti::default(), slasher: None, - validator_monitor: None, pending_io_batch: vec![], trusted_setup: None, task_executor: None, + validator_monitor_config: None, } } @@ -623,19 +624,8 @@ where /// Register some validators for additional monitoring. /// /// `validators` is a comma-separated string of 0x-formatted BLS pubkeys. - pub fn monitor_validators( - mut self, - auto_register: bool, - validators: Vec, - individual_metrics_threshold: usize, - log: Logger, - ) -> Self { - self.validator_monitor = Some(ValidatorMonitor::new( - validators, - auto_register, - individual_metrics_threshold, - log.clone(), - )); + pub fn validator_monitor_config(mut self, config: ValidatorMonitorConfig) -> Self { + self.validator_monitor_config = Some(config); self } @@ -671,11 +661,16 @@ where let genesis_state_root = self .genesis_state_root .ok_or("Cannot build without a genesis state root")?; - let mut validator_monitor = self - .validator_monitor - .ok_or("Cannot build without a validator monitor")?; + let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); + let beacon_proposer_cache: Arc> = <_>::default(); + let mut validator_monitor = ValidatorMonitor::new( + validator_monitor_config, + beacon_proposer_cache.clone(), + log.new(o!("service" => "val_mon")), + ); + let current_slot = if slot_clock .is_prior_to_genesis() .ok_or("Unable to read slot clock")? @@ -911,7 +906,7 @@ where log.clone(), )), eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())), - beacon_proposer_cache: <_>::default(), + beacon_proposer_cache, block_times_cache: <_>::default(), pre_finalization_block_cache: <_>::default(), validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache), @@ -1097,7 +1092,6 @@ fn descriptive_db_error(item: &str, error: &StoreError) -> String { mod test { use super::*; use crate::test_utils::EphemeralHarnessType; - use crate::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use ethereum_hashing::hash; use genesis::{ generate_deterministic_keypairs, interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH, @@ -1155,12 +1149,6 @@ mod test { .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .shutdown_sender(shutdown_tx) - .monitor_validators( - true, - vec![], - DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, - log.clone(), - ) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index a23bcdc0b5..0fe68ba19e 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1033,6 +1033,11 @@ lazy_static! { "beacon_aggregated_attestation_subsets_total", "Count of new aggregated attestations that are subsets of already known aggregates" ); + pub static ref VALIDATOR_MONITOR_MISSED_BLOCKS_TOTAL: Result = try_create_int_counter_vec( + "validator_monitor_missed_blocks_total", + "Number of non-finalized blocks missed", + &["validator"] + ); /* * Kzg related metrics diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 333318f52f..23af0c8126 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -6,7 +6,7 @@ pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, sync_committee_verification::Error as SyncCommitteeError, - validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, + validator_monitor::{ValidatorMonitor, ValidatorMonitorConfig}, BeaconChainError, NotifyExecutionLayer, ProduceBlockVerification, }; use crate::{ @@ -181,6 +181,7 @@ pub struct Builder { execution_layer: Option>, mock_execution_layer: Option>, testing_slot_clock: Option, + validator_monitor_config: Option, runtime: TestRuntime, log: Logger, } @@ -316,6 +317,7 @@ where execution_layer: None, mock_execution_layer: None, testing_slot_clock: None, + validator_monitor_config: None, runtime, log, } @@ -388,6 +390,14 @@ where self } + pub fn validator_monitor_config( + mut self, + validator_monitor_config: ValidatorMonitorConfig, + ) -> Self { + self.validator_monitor_config = Some(validator_monitor_config); + self + } + /// Purposefully replace the `store_mutator`. pub fn override_store_mutator(mut self, mutator: BoxedMutator) -> Self { assert!(self.store_mutator.is_some(), "store mutator not set"); @@ -494,11 +504,13 @@ where let validator_keypairs = self .validator_keypairs .expect("cannot build without validator keypairs"); - let chain_config = self.chain_config.unwrap_or_default(); let trusted_setup: TrustedSetup = serde_json::from_reader(TRUSTED_SETUP_BYTES) .map_err(|e| format!("Unable to read trusted setup file: {}", e)) .unwrap(); + let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); + + let chain_config = self.chain_config.unwrap_or_default(); let mut builder = BeaconChainBuilder::new(self.eth_spec_instance) .logger(log.clone()) .custom_spec(spec) @@ -518,7 +530,7 @@ where log.clone(), 5, ))) - .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log) + .validator_monitor_config(validator_monitor_config) .trusted_setup(trusted_setup); builder = if let Some(mutator) = self.initial_mutator { diff --git a/beacon_node/beacon_chain/src/validator_monitor.rs b/beacon_node/beacon_chain/src/validator_monitor.rs index 396aac71b0..015cdb1c26 100644 --- a/beacon_node/beacon_chain/src/validator_monitor.rs +++ b/beacon_node/beacon_chain/src/validator_monitor.rs @@ -2,10 +2,14 @@ //! //! This component should not affect consensus. +use crate::beacon_proposer_cache::{BeaconProposerCache, TYPICAL_SLOTS_PER_EPOCH}; use crate::metrics; -use parking_lot::RwLock; -use slog::{crit, debug, info, Logger}; +use itertools::Itertools; +use parking_lot::{Mutex, RwLock}; +use serde::{Deserialize, Serialize}; +use slog::{crit, debug, error, info, warn, Logger}; use slot_clock::SlotClock; +use smallvec::SmallVec; use state_processing::per_epoch_processing::{ errors::EpochProcessingError, EpochProcessingSummary, }; @@ -14,6 +18,7 @@ use std::convert::TryFrom; use std::io; use std::marker::PhantomData; use std::str::Utf8Error; +use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::AbstractExecPayload; use types::{ @@ -35,7 +40,34 @@ pub const HISTORIC_EPOCHS: usize = 10; /// Once the validator monitor reaches this number of validators it will stop /// tracking their metrics/logging individually in an effort to reduce /// Prometheus cardinality and log volume. -pub const DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD: usize = 64; +const DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD: usize = 64; + +/// Lag slots used in detecting missed blocks for the monitored validators +pub const MISSED_BLOCK_LAG_SLOTS: usize = 4; + +/// The number of epochs to look back when determining if a validator has missed a block. This value is used with +/// the beacon_proposer_cache to determine if a validator has missed a block. +/// And so, setting this value to anything higher than 1 is likely going to be problematic because the beacon_proposer_cache +/// is only populated for the current and the previous epoch. +pub const MISSED_BLOCK_LOOKBACK_EPOCHS: u64 = 1; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +// Initial configuration values for the `ValidatorMonitor`. +pub struct ValidatorMonitorConfig { + pub auto_register: bool, + pub validators: Vec, + pub individual_tracking_threshold: usize, +} + +impl Default for ValidatorMonitorConfig { + fn default() -> Self { + Self { + auto_register: false, + validators: vec![], + individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, + } + } +} #[derive(Debug)] pub enum Error { @@ -323,6 +355,13 @@ impl MonitoredValidator { } } +#[derive(PartialEq, Hash, Eq)] +struct MissedBlock { + slot: Slot, + parent_root: Hash256, + validator_index: u64, +} + /// Holds a collection of `MonitoredValidator` and is notified about a variety of events on the P2P /// network, HTTP API and `BeaconChain`. /// @@ -343,26 +382,37 @@ pub struct ValidatorMonitor { /// large validator counts causing infeasibly high cardinailty for /// Prometheus and high log volumes. individual_tracking_threshold: usize, + /// A Map representing the (non-finalized) missed blocks by epoch, validator_index(state.validators) and slot + missed_blocks: HashSet, + // A beacon proposer cache + beacon_proposer_cache: Arc>, log: Logger, _phantom: PhantomData, } impl ValidatorMonitor { pub fn new( - pubkeys: Vec, - auto_register: bool, - individual_tracking_threshold: usize, + config: ValidatorMonitorConfig, + beacon_proposer_cache: Arc>, log: Logger, ) -> Self { + let ValidatorMonitorConfig { + auto_register, + validators, + individual_tracking_threshold, + } = config; + let mut s = Self { validators: <_>::default(), indices: <_>::default(), auto_register, individual_tracking_threshold, + missed_blocks: <_>::default(), + beacon_proposer_cache, log, _phantom: PhantomData, }; - for pubkey in pubkeys { + for pubkey in validators { s.add_validator_pubkey(pubkey) } s @@ -411,6 +461,9 @@ impl ValidatorMonitor { self.indices.insert(i, validator.pubkey); }); + // Add missed non-finalized blocks for the monitored validators + self.add_validators_missed_blocks(state); + // Update metrics for individual validators. for monitored_validator in self.validators.values() { if let Some(i) = monitored_validator.index { @@ -489,6 +542,116 @@ impl ValidatorMonitor { } } } + + // Prune missed blocks that are prior to last finalized epochs - MISSED_BLOCK_LOOKBACK_EPOCHS + let finalized_epoch = state.finalized_checkpoint().epoch; + self.missed_blocks.retain(|missed_block| { + let epoch = missed_block.slot.epoch(T::slots_per_epoch()); + epoch + Epoch::new(MISSED_BLOCK_LOOKBACK_EPOCHS) >= finalized_epoch + }); + } + + /// Add missed non-finalized blocks for the monitored validators + fn add_validators_missed_blocks(&mut self, state: &BeaconState) { + // Define range variables + let current_slot = state.slot(); + let current_epoch = current_slot.epoch(T::slots_per_epoch()); + // start_slot needs to be coherent with what can be retrieved from the beacon_proposer_cache + let start_slot = current_epoch.start_slot(T::slots_per_epoch()) + - Slot::new(MISSED_BLOCK_LOOKBACK_EPOCHS * T::slots_per_epoch()); + + let end_slot = current_slot.saturating_sub(MISSED_BLOCK_LAG_SLOTS).as_u64(); + + // List of proposers per epoch from the beacon_proposer_cache + let mut proposers_per_epoch: Option> = None; + + for (prev_slot, slot) in (start_slot.as_u64()..=end_slot) + .map(Slot::new) + .tuple_windows() + { + // Condition for missed_block is defined such as block_root(slot) == block_root(slot - 1) + // where the proposer who missed the block is the proposer of the block at block_root(slot) + if let (Ok(block_root), Ok(prev_block_root)) = + (state.get_block_root(slot), state.get_block_root(prev_slot)) + { + // Found missed block + if block_root == prev_block_root { + let slot_epoch = slot.epoch(T::slots_per_epoch()); + let prev_slot_epoch = prev_slot.epoch(T::slots_per_epoch()); + + if let Ok(shuffling_decision_block) = + state.proposer_shuffling_decision_root_at_epoch(slot_epoch, *block_root) + { + // Only update the cache if it needs to be initialised or because + // slot is at epoch + 1 + if proposers_per_epoch.is_none() || slot_epoch != prev_slot_epoch { + proposers_per_epoch = self.get_proposers_by_epoch_from_cache( + slot_epoch, + shuffling_decision_block, + ); + } + + // Only add missed blocks for the proposer if it's in the list of monitored validators + let slot_in_epoch = slot % T::slots_per_epoch(); + if let Some(proposer_index) = proposers_per_epoch + .as_deref() + .and_then(|proposers| proposers.get(slot_in_epoch.as_usize())) + { + let i = *proposer_index as u64; + if let Some(pub_key) = self.indices.get(&i) { + if let Some(validator) = self.validators.get(pub_key) { + let missed_block = MissedBlock { + slot, + parent_root: *prev_block_root, + validator_index: i, + }; + // Incr missed block counter for the validator only if it doesn't already exist in the hashset + if self.missed_blocks.insert(missed_block) { + self.aggregatable_metric(&validator.id, |label| { + metrics::inc_counter_vec( + &metrics::VALIDATOR_MONITOR_MISSED_BLOCKS_TOTAL, + &[label], + ); + }); + error!( + self.log, + "Validator missed a block"; + "index" => i, + "slot" => slot, + "parent block root" => ?prev_block_root, + ); + } + } else { + warn!( + self.log, + "Missing validator index"; + "info" => "potentially inconsistency in the validator manager", + "index" => i, + ) + } + } + } else { + debug!( + self.log, + "Could not get proposers for from cache"; + "epoch" => ?slot_epoch + ); + } + } + } + } + } + } + + fn get_proposers_by_epoch_from_cache( + &mut self, + epoch: Epoch, + shuffling_decision_block: Hash256, + ) -> Option> { + let mut cache = self.beacon_proposer_cache.lock(); + cache + .get_epoch::(shuffling_decision_block, epoch) + .cloned() } /// Run `func` with the `TOTAL_LABEL` and optionally the @@ -822,6 +985,17 @@ impl ValidatorMonitor { } } + pub fn get_monitored_validator_missed_block_count(&self, validator_index: u64) -> u64 { + self.missed_blocks + .iter() + .filter(|missed_block| missed_block.validator_index == validator_index) + .count() as u64 + } + + pub fn get_beacon_proposer_cache(&self) -> Arc> { + self.beacon_proposer_cache.clone() + } + /// If `self.auto_register == true`, add the `validator_index` to `self.monitored_validators`. /// Otherwise, do nothing. pub fn auto_register_local_validator(&mut self, validator_index: u64) { diff --git a/beacon_node/beacon_chain/tests/main.rs b/beacon_node/beacon_chain/tests/main.rs index 332f6a4829..e0564e1510 100644 --- a/beacon_node/beacon_chain/tests/main.rs +++ b/beacon_node/beacon_chain/tests/main.rs @@ -10,3 +10,4 @@ mod rewards; mod store_tests; mod sync_committee_verification; mod tests; +mod validator_monitor; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 59a4c1decc..5107396ac2 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -8,7 +8,6 @@ use beacon_chain::test_utils::{ mock_execution_layer_from_parts, test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; -use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; use beacon_chain::{ data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot, @@ -2358,6 +2357,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { Duration::from_secs(seconds_per_slot), ); slot_clock.set_slot(harness.get_current_slot().as_u64()); + let beacon_chain = BeaconChainBuilder::>::new(MinimalEthSpec) .store(store.clone()) .custom_spec(test_spec::()) @@ -2376,7 +2376,6 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { 1, ))) .execution_layer(Some(mock.el)) - .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log) .trusted_setup(trusted_setup) .build() .expect("should build"); diff --git a/beacon_node/beacon_chain/tests/validator_monitor.rs b/beacon_node/beacon_chain/tests/validator_monitor.rs new file mode 100644 index 0000000000..5bc6b758c2 --- /dev/null +++ b/beacon_node/beacon_chain/tests/validator_monitor.rs @@ -0,0 +1,299 @@ +use lazy_static::lazy_static; + +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, +}; +use beacon_chain::validator_monitor::{ValidatorMonitorConfig, MISSED_BLOCK_LAG_SLOTS}; +use types::{Epoch, EthSpec, Keypair, MainnetEthSpec, PublicKeyBytes, Slot}; + +// Should ideally be divisible by 3. +pub const VALIDATOR_COUNT: usize = 48; + +lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +type E = MainnetEthSpec; + +fn get_harness( + validator_count: usize, + validator_indexes_to_monitor: Vec, +) -> BeaconChainHarness> { + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .default_spec() + .keypairs(KEYPAIRS[0..validator_count].to_vec()) + .fresh_ephemeral_store() + .mock_execution_layer() + .validator_monitor_config(ValidatorMonitorConfig { + validators: validator_indexes_to_monitor + .iter() + .map(|i| PublicKeyBytes::from(KEYPAIRS[*i].pk.clone())) + .collect(), + ..<_>::default() + }) + .build(); + + harness.advance_slot(); + + harness +} + +#[tokio::test] +async fn produces_missed_blocks() { + let validator_count = 16; + + let slots_per_epoch = E::slots_per_epoch(); + + let nb_epoch_to_simulate = Epoch::new(2); + + // Generate 63 slots (2 epochs * 32 slots per epoch - 1) + let initial_blocks = slots_per_epoch * nb_epoch_to_simulate.as_u64() - 1; + + // The validator index of the validator that is 'supposed' to miss a block + let mut validator_index_to_monitor = 1; + + // 1st scenario // + // + // Missed block happens when slot and prev_slot are in the same epoch + let harness1 = get_harness(validator_count, vec![validator_index_to_monitor]); + harness1 + .extend_chain( + initial_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let mut _state = &mut harness1.get_current_state(); + let mut epoch = _state.current_epoch(); + + // We have a total of 63 slots and we want slot 57 to be a missed block + // and this is slot=25 in epoch=1 + let mut idx = initial_blocks - 6; + let mut slot = Slot::new(idx); + let mut slot_in_epoch = slot % slots_per_epoch; + let mut prev_slot = Slot::new(idx - 1); + let mut duplicate_block_root = *_state.block_roots().get(idx as usize).unwrap(); + let mut validator_indexes = _state.get_beacon_proposer_indices(&harness1.spec).unwrap(); + let mut validator_index = validator_indexes[slot_in_epoch.as_usize()]; + let mut proposer_shuffling_decision_root = _state + .proposer_shuffling_decision_root(duplicate_block_root) + .unwrap(); + + let beacon_proposer_cache = harness1 + .chain + .validator_monitor + .read() + .get_beacon_proposer_cache(); + + // Let's fill the cache with the proposers for the current epoch + // and push the duplicate_block_root to the block_roots vector + assert_eq!( + beacon_proposer_cache.lock().insert( + epoch, + proposer_shuffling_decision_root, + validator_indexes.into_iter().collect::>(), + _state.fork() + ), + Ok(()) + ); + + // Modify the block root of the previous slot to be the same as the block root of the current slot + // in order to simulate a missed block + assert_eq!( + _state.set_block_root(prev_slot, duplicate_block_root), + Ok(()) + ); + + { + // Let's validate the state which will call the function responsible for + // adding the missed blocks to the validator monitor + let mut validator_monitor = harness1.chain.validator_monitor.write(); + validator_monitor.process_valid_state(nb_epoch_to_simulate, _state); + + // We should have one entry in the missed blocks map + assert_eq!( + validator_monitor.get_monitored_validator_missed_block_count(validator_index as u64), + 1 + ); + } + + // 2nd scenario // + // + // Missed block happens when slot and prev_slot are not in the same epoch + // making sure that the cache reloads when the epoch changes + // in that scenario the slot that missed a block is the first slot of the epoch + validator_index_to_monitor = 7; + // We are adding other validators to monitor as thoses one will miss a block depending on + // the fork name specified when running the test as the proposer cache differs depending on the fork name (cf. seed) + let validator_index_to_monitor_altair = 2; + // Same as above but for the merge upgrade + let validator_index_to_monitor_merge = 4; + // Same as above but for the capella upgrade + let validator_index_to_monitor_capella = 11; + // Same as above but for the deneb upgrade + let validator_index_to_monitor_deneb = 3; + let harness2 = get_harness( + validator_count, + vec![ + validator_index_to_monitor, + validator_index_to_monitor_altair, + validator_index_to_monitor_merge, + validator_index_to_monitor_capella, + validator_index_to_monitor_deneb, + ], + ); + let advance_slot_by = 9; + harness2 + .extend_chain( + (initial_blocks + advance_slot_by) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let mut _state2 = &mut harness2.get_current_state(); + epoch = _state2.current_epoch(); + + // We have a total of 72 slots and we want slot 64 to be the missed block + // and this is slot=64 in epoch=2 + idx = initial_blocks + (advance_slot_by) - 8; + slot = Slot::new(idx); + prev_slot = Slot::new(idx - 1); + slot_in_epoch = slot % slots_per_epoch; + duplicate_block_root = *_state2.block_roots().get(idx as usize).unwrap(); + validator_indexes = _state2.get_beacon_proposer_indices(&harness2.spec).unwrap(); + validator_index = validator_indexes[slot_in_epoch.as_usize()]; + + let beacon_proposer_cache = harness2 + .chain + .validator_monitor + .read() + .get_beacon_proposer_cache(); + + // Let's fill the cache with the proposers for the current epoch + // and push the duplicate_block_root to the block_roots vector + assert_eq!( + beacon_proposer_cache.lock().insert( + epoch, + duplicate_block_root, + validator_indexes.into_iter().collect::>(), + _state2.fork() + ), + Ok(()) + ); + + assert_eq!( + _state2.set_block_root(prev_slot, duplicate_block_root), + Ok(()) + ); + + { + // Let's validate the state which will call the function responsible for + // adding the missed blocks to the validator monitor + let mut validator_monitor2 = harness2.chain.validator_monitor.write(); + validator_monitor2.process_valid_state(epoch, _state2); + // We should have one entry in the missed blocks map + assert_eq!( + validator_monitor2.get_monitored_validator_missed_block_count(validator_index as u64), + 1 + ); + + // 3rd scenario // + // + // A missed block happens but the validator is not monitored + // it should not be flagged as a missed block + idx = initial_blocks + (advance_slot_by) - 7; + slot = Slot::new(idx); + prev_slot = Slot::new(idx - 1); + slot_in_epoch = slot % slots_per_epoch; + duplicate_block_root = *_state2.block_roots().get(idx as usize).unwrap(); + validator_indexes = _state2.get_beacon_proposer_indices(&harness2.spec).unwrap(); + let not_monitored_validator_index = validator_indexes[slot_in_epoch.as_usize()]; + + assert_eq!( + _state2.set_block_root(prev_slot, duplicate_block_root), + Ok(()) + ); + + // Let's validate the state which will call the function responsible for + // adding the missed blocks to the validator monitor + validator_monitor2.process_valid_state(epoch, _state2); + + // We shouldn't have any entry in the missed blocks map + assert_ne!(validator_index, not_monitored_validator_index); + assert_eq!( + validator_monitor2 + .get_monitored_validator_missed_block_count(not_monitored_validator_index as u64), + 0 + ); + } + + // 4th scenario // + // + // A missed block happens at state.slot - LOG_SLOTS_PER_EPOCH + // it shouldn't be flagged as a missed block + let harness3 = get_harness(validator_count, vec![validator_index_to_monitor]); + harness3 + .extend_chain( + slots_per_epoch as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + let mut _state3 = &mut harness3.get_current_state(); + epoch = _state3.current_epoch(); + + // We have a total of 32 slots and we want slot 30 to be a missed block + // and this is slot=30 in epoch=0 + idx = slots_per_epoch - MISSED_BLOCK_LAG_SLOTS as u64 + 2; + slot = Slot::new(idx); + slot_in_epoch = slot % slots_per_epoch; + prev_slot = Slot::new(idx - 1); + duplicate_block_root = *_state3.block_roots().get(idx as usize).unwrap(); + validator_indexes = _state3.get_beacon_proposer_indices(&harness3.spec).unwrap(); + validator_index = validator_indexes[slot_in_epoch.as_usize()]; + proposer_shuffling_decision_root = _state3 + .proposer_shuffling_decision_root_at_epoch(epoch, duplicate_block_root) + .unwrap(); + + let beacon_proposer_cache = harness3 + .chain + .validator_monitor + .read() + .get_beacon_proposer_cache(); + + // Let's fill the cache with the proposers for the current epoch + // and push the duplicate_block_root to the block_roots vector + assert_eq!( + beacon_proposer_cache.lock().insert( + epoch, + proposer_shuffling_decision_root, + validator_indexes.into_iter().collect::>(), + _state3.fork() + ), + Ok(()) + ); + + // Modify the block root of the previous slot to be the same as the block root of the current slot + // in order to simulate a missed block + assert_eq!( + _state3.set_block_root(prev_slot, duplicate_block_root), + Ok(()) + ); + + { + // Let's validate the state which will call the function responsible for + // adding the missed blocks to the validator monitor + let mut validator_monitor3 = harness3.chain.validator_monitor.write(); + validator_monitor3.process_valid_state(epoch, _state3); + + // We shouldn't have one entry in the missed blocks map + assert_eq!( + validator_monitor3.get_monitored_validator_missed_block_count(validator_index as u64), + 0 + ); + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index d1184cf75d..fe8a56084d 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -191,15 +191,7 @@ where .graffiti(graffiti) .event_handler(event_handler) .execution_layer(execution_layer) - .monitor_validators( - config.validator_monitor_auto, - config.validator_monitor_pubkeys.clone(), - config.validator_monitor_individual_tracking_threshold, - runtime_context - .service_context("val_mon".to_string()) - .log() - .clone(), - ); + .validator_monitor_config(config.validator_monitor.clone()); let builder = if let Some(slasher) = self.slasher.clone() { builder.slasher(slasher) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 8b47d0fc62..492431cd36 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,4 +1,4 @@ -use beacon_chain::validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD; +use beacon_chain::validator_monitor::ValidatorMonitorConfig; use beacon_chain::TrustedSetup; use beacon_processor::BeaconProcessorConfig; use directory::DEFAULT_ROOT_DIR; @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use std::fs; use std::path::PathBuf; use std::time::Duration; -use types::{Graffiti, PublicKeyBytes}; +use types::Graffiti; /// Default directory name for the freezer database under the top-level data dir. const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; @@ -56,15 +56,7 @@ pub struct Config { pub sync_eth1_chain: bool, /// Graffiti to be inserted everytime we create a block. pub graffiti: Graffiti, - /// When true, automatically monitor validators using the HTTP API. - pub validator_monitor_auto: bool, - /// A list of validator pubkeys to monitor. - pub validator_monitor_pubkeys: Vec, - /// Once the number of monitored validators goes above this threshold, we - /// will stop tracking metrics on a per-validator basis. This prevents large - /// validator counts causing infeasibly high cardinailty for Prometheus and - /// high log volumes. - pub validator_monitor_individual_tracking_threshold: usize, + pub validator_monitor: ValidatorMonitorConfig, #[serde(skip)] /// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined /// via the CLI at runtime, instead of from a configuration file saved to disk. @@ -107,9 +99,7 @@ impl Default for Config { http_metrics: <_>::default(), monitoring_api: None, slasher: None, - validator_monitor_auto: false, - validator_monitor_pubkeys: vec![], - validator_monitor_individual_tracking_threshold: DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, + validator_monitor: <_>::default(), logger_config: LoggerConfig::default(), beacon_processor: <_>::default(), genesis_state_url: <_>::default(), diff --git a/beacon_node/http_api/src/proposer_duties.rs b/beacon_node/http_api/src/proposer_duties.rs index 708df39b4d..c31dd9b1fa 100644 --- a/beacon_node/http_api/src/proposer_duties.rs +++ b/beacon_node/http_api/src/proposer_duties.rs @@ -97,12 +97,12 @@ fn try_proposer_duties_from_cache( let head = chain.canonical_head.cached_head(); let head_block = &head.snapshot.beacon_block; let head_block_root = head.head_block_root(); + let head_epoch = head_block.slot().epoch(T::EthSpec::slots_per_epoch()); let head_decision_root = head .snapshot .beacon_state .proposer_shuffling_decision_root(head_block_root) .map_err(warp_utils::reject::beacon_state_error)?; - let head_epoch = head_block.slot().epoch(T::EthSpec::slots_per_epoch()); let execution_optimistic = chain .is_optimistic_or_invalid_head_block(head_block) .map_err(warp_utils::reject::beacon_chain_error)?; diff --git a/beacon_node/network/src/subnet_service/tests/mod.rs b/beacon_node/network/src/subnet_service/tests/mod.rs index 3b8c89a442..769775a62c 100644 --- a/beacon_node/network/src/subnet_service/tests/mod.rs +++ b/beacon_node/network/src/subnet_service/tests/mod.rs @@ -2,7 +2,6 @@ use super::*; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, - validator_monitor::DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, BeaconChain, }; use futures::prelude::*; @@ -76,7 +75,6 @@ impl TestBeaconChain { Duration::from_millis(SLOT_DURATION_MILLIS), )) .shutdown_sender(shutdown_tx) - .monitor_validators(true, vec![], DEFAULT_INDIVIDUAL_TRACKING_THRESHOLD, log) .build() .expect("should build"), ); diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 94b94677a5..a4fdf94c66 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -719,7 +719,7 @@ pub fn get_config( } if cli_args.is_present("validator-monitor-auto") { - client_config.validator_monitor_auto = true; + client_config.validator_monitor.auto_register = true; } if let Some(pubkeys) = cli_args.value_of("validator-monitor-pubkeys") { @@ -729,7 +729,8 @@ pub fn get_config( .collect::, _>>() .map_err(|e| format!("Invalid --validator-monitor-pubkeys value: {:?}", e))?; client_config - .validator_monitor_pubkeys + .validator_monitor + .validators .extend_from_slice(&pubkeys); } @@ -747,14 +748,17 @@ pub fn get_config( .collect::, _>>() .map_err(|e| format!("Invalid --validator-monitor-file contents: {:?}", e))?; client_config - .validator_monitor_pubkeys + .validator_monitor + .validators .extend_from_slice(&pubkeys); } if let Some(count) = clap_utils::parse_optional(cli_args, "validator-monitor-individual-tracking-threshold")? { - client_config.validator_monitor_individual_tracking_threshold = count; + client_config + .validator_monitor + .individual_tracking_threshold = count; } if cli_args.is_present("disable-lock-timeouts") { @@ -850,7 +854,7 @@ pub fn get_config( // Graphical user interface config. if cli_args.is_present("gui") { client_config.http_api.enabled = true; - client_config.validator_monitor_auto = true; + client_config.validator_monitor.auto_register = true; } // Optimistic finalized sync. diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index 18b9866f35..e2e25f24b8 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -626,6 +626,25 @@ impl BeaconState { cache.get_all_beacon_committees() } + /// Returns the block root which decided the proposer shuffling for the epoch passed in parameter. This root + /// can be used to key this proposer shuffling. + /// + /// ## Notes + /// + /// The `block_root` must be equal to the latest block applied to `self`. + pub fn proposer_shuffling_decision_root_at_epoch( + &self, + epoch: Epoch, + block_root: Hash256, + ) -> Result { + let decision_slot = self.proposer_shuffling_decision_slot(epoch); + if self.slot() <= decision_slot { + Ok(block_root) + } else { + self.get_block_root(decision_slot).map(|root| *root) + } + } + /// Returns the block root which decided the proposer shuffling for the current epoch. This root /// can be used to key this proposer shuffling. /// @@ -634,7 +653,7 @@ impl BeaconState { /// The `block_root` covers the one-off scenario where the genesis block decides its own /// shuffling. It should be set to the latest block applied to `self` or the genesis block root. pub fn proposer_shuffling_decision_root(&self, block_root: Hash256) -> Result { - let decision_slot = self.proposer_shuffling_decision_slot(); + let decision_slot = self.proposer_shuffling_decision_slot(self.current_epoch()); if self.slot() == decision_slot { Ok(block_root) } else { @@ -643,11 +662,9 @@ impl BeaconState { } /// Returns the slot at which the proposer shuffling was decided. The block root at this slot - /// can be used to key the proposer shuffling for the current epoch. - fn proposer_shuffling_decision_slot(&self) -> Slot { - self.current_epoch() - .start_slot(T::slots_per_epoch()) - .saturating_sub(1_u64) + /// can be used to key the proposer shuffling for the given epoch. + fn proposer_shuffling_decision_slot(&self, epoch: Epoch) -> Slot { + epoch.start_slot(T::slots_per_epoch()).saturating_sub(1_u64) } /// Returns the block root which decided the attester shuffling for the given `relative_epoch`. diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 7937b1d496..c416aba4fc 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1772,11 +1772,17 @@ fn metrics_allow_origin_all_flag() { // Tests for Validator Monitor flags. #[test] +fn validator_monitor_default_values() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert!(config.validator_monitor == <_>::default())); +} +#[test] fn validator_monitor_auto_flag() { CommandLineTest::new() .flag("validator-monitor-auto", None) .run_with_zero_port() - .with_config(|config| assert!(config.validator_monitor_auto)); + .with_config(|config| assert!(config.validator_monitor.auto_register)); } #[test] fn validator_monitor_pubkeys_flag() { @@ -1785,8 +1791,8 @@ fn validator_monitor_pubkeys_flag() { 0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef")) .run_with_zero_port() .with_config(|config| { - assert_eq!(config.validator_monitor_pubkeys[0].to_string(), "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); - assert_eq!(config.validator_monitor_pubkeys[1].to_string(), "0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); + assert_eq!(config.validator_monitor.validators[0].to_string(), "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); + assert_eq!(config.validator_monitor.validators[1].to_string(), "0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); }); } #[test] @@ -1800,8 +1806,8 @@ fn validator_monitor_file_flag() { .flag("validator-monitor-file", dir.path().join("pubkeys.txt").as_os_str().to_str()) .run_with_zero_port() .with_config(|config| { - assert_eq!(config.validator_monitor_pubkeys[0].to_string(), "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); - assert_eq!(config.validator_monitor_pubkeys[1].to_string(), "0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); + assert_eq!(config.validator_monitor.validators[0].to_string(), "0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); + assert_eq!(config.validator_monitor.validators[1].to_string(), "0xbeefdeadbeefdeaddeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"); }); } #[test] @@ -1810,7 +1816,7 @@ fn validator_monitor_metrics_threshold_default() { .run_with_zero_port() .with_config(|config| { assert_eq!( - config.validator_monitor_individual_tracking_threshold, + config.validator_monitor.individual_tracking_threshold, // If this value changes make sure to update the help text for // the CLI command. 64 @@ -1826,7 +1832,7 @@ fn validator_monitor_metrics_threshold_custom() { ) .run_with_zero_port() .with_config(|config| { - assert_eq!(config.validator_monitor_individual_tracking_threshold, 42) + assert_eq!(config.validator_monitor.individual_tracking_threshold, 42) }); } @@ -2472,7 +2478,7 @@ fn gui_flag() { .run_with_zero_port() .with_config(|config| { assert!(config.http_api.enabled); - assert!(config.validator_monitor_auto); + assert!(config.validator_monitor.auto_register); }); }