Add attestation simulator (#4880)

* basic scaffold

* remove unnecessary ?

* check if committee cache is init

* typed ValidatorMonitor with ethspecs + store attestations within

* nits

* process unaggregated attestation

* typo

* extract in func

* add tests

* better naming

* better naming 2

* less verbose

* use same naming as validator monitor

* use attestation_simulator

* add metrics

* remove cache

* refacto flag_indices process

* add lag

* remove copying state

* clean and lint

* extract metrics

* nits

* compare prom metrics in tests

* implement lag

* nits

* nits

* add attestation simulator service

* fmt

* return beacon_chain as arc

* nit: debug

* sed s/unaggregated/unagg.//

* fmt

* fmt

* nit: remove unused comments

* increase max unaggregated attestation hashmap to 64

* nit: sed s/clone/copied//

* improve perf: remove unecessary hashmap copy

* fix flag indices comp

* start service in client builder

* remove //

* cargo fmt

* lint

* cloned keys

* fmt

* use Slot value instead of pointer

* Update beacon_node/beacon_chain/src/attestation_simulator.rs

Co-authored-by: Paul Hauner <paul@paulhauner.com>

---------

Co-authored-by: Paul Hauner <paul@paulhauner.com>
This commit is contained in:
Joel Rousseau
2023-12-14 00:44:56 +00:00
committed by GitHub
parent a3a370302a
commit 189430a45c
9 changed files with 397 additions and 11 deletions

View File

@@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use smallvec::SmallVec;
use state_processing::common::get_attestation_participation_flag_indices;
use state_processing::per_epoch_processing::{
errors::EpochProcessingError, EpochProcessingSummary,
};
@@ -21,8 +22,11 @@ use std::str::Utf8Error;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::AbstractExecPayload;
use types::consts::altair::{
TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX,
};
use types::{
AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
Attestation, AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit,
};
@@ -69,6 +73,15 @@ impl Default for ValidatorMonitorConfig {
}
}
/// The goal is to check the behaviour of the BN if it pretends to attest at each slot
/// Check the head/target/source once the state.slot is some slots beyond attestation.data.slot
/// to defend against re-orgs. 16 slots is the minimum to defend against re-orgs of up to 16 slots.
pub const UNAGGREGATED_ATTESTATION_LAG_SLOTS: usize = 16;
/// Bound the storage size of simulated attestations. The head state can only verify attestations
/// from the current and previous epoch.
pub const MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH: usize = 64;
#[derive(Debug)]
pub enum Error {
InvalidPubkey(String),
@@ -370,7 +383,7 @@ struct MissedBlock {
///
/// The intention of this struct is to provide users with more logging and Prometheus metrics around
/// validators that they are interested in.
pub struct ValidatorMonitor<T> {
pub struct ValidatorMonitor<T: EthSpec> {
/// The validators that require additional monitoring.
validators: HashMap<PublicKeyBytes, MonitoredValidator>,
/// A map of validator index (state.validators) to a validator public key.
@@ -386,6 +399,8 @@ pub struct ValidatorMonitor<T> {
missed_blocks: HashSet<MissedBlock>,
// A beacon proposer cache
beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
// Unaggregated attestations generated by the committee index at each slot.
unaggregated_attestations: HashMap<Slot, Attestation<T>>,
log: Logger,
_phantom: PhantomData<T>,
}
@@ -409,6 +424,7 @@ impl<T: EthSpec> ValidatorMonitor<T> {
individual_tracking_threshold,
missed_blocks: <_>::default(),
beacon_proposer_cache,
unaggregated_attestations: <_>::default(),
log,
_phantom: PhantomData,
};
@@ -444,9 +460,32 @@ impl<T: EthSpec> ValidatorMonitor<T> {
});
}
/// Add an unaggregated attestation
pub fn set_unaggregated_attestation(&mut self, attestation: Attestation<T>) {
let unaggregated_attestations = &mut self.unaggregated_attestations;
// Pruning, this removes the oldest key/pair of the hashmap if it's greater than MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH
if unaggregated_attestations.len() >= MAX_UNAGGREGATED_ATTESTATION_HASHMAP_LENGTH {
if let Some(oldest_slot) = unaggregated_attestations.keys().min().copied() {
unaggregated_attestations.remove(&oldest_slot);
}
}
let slot = attestation.data.slot;
self.unaggregated_attestations.insert(slot, attestation);
}
pub fn get_unaggregated_attestation(&self, slot: Slot) -> Option<&Attestation<T>> {
self.unaggregated_attestations.get(&slot)
}
/// Reads information from the given `state`. The `state` *must* be valid (i.e, able to be
/// imported).
pub fn process_valid_state(&mut self, current_epoch: Epoch, state: &BeaconState<T>) {
pub fn process_valid_state(
&mut self,
current_epoch: Epoch,
state: &BeaconState<T>,
spec: &ChainSpec,
) {
// Add any new validator indices.
state
.validators()
@@ -463,6 +502,7 @@ impl<T: EthSpec> ValidatorMonitor<T> {
// Add missed non-finalized blocks for the monitored validators
self.add_validators_missed_blocks(state);
self.process_unaggregated_attestations(state, spec);
// Update metrics for individual validators.
for monitored_validator in self.validators.values() {
@@ -654,6 +694,107 @@ impl<T: EthSpec> ValidatorMonitor<T> {
.cloned()
}
/// Process the unaggregated attestations generated by the service `attestation_simulator_service`
/// and check if the attestation qualifies for a reward matching the flags source/target/head
fn process_unaggregated_attestations(&mut self, state: &BeaconState<T>, spec: &ChainSpec) {
let current_slot = state.slot();
// Ensures that we process attestation when there have been skipped slots between blocks
let attested_slots: Vec<_> = self
.unaggregated_attestations
.keys()
.filter(|&&attestation_slot| {
attestation_slot
< current_slot - Slot::new(UNAGGREGATED_ATTESTATION_LAG_SLOTS as u64)
})
.cloned()
.collect();
let unaggregated_attestations = &mut self.unaggregated_attestations;
for slot in attested_slots {
if let Some(unaggregated_attestation) = unaggregated_attestations.remove(&slot) {
// Don't process this attestation, it's too old to be processed by this state.
if slot.epoch(T::slots_per_epoch()) < state.previous_epoch() {
continue;
}
// We are simulating that unaggregated attestation in a service that produces unaggregated attestations
// every slot, the inclusion_delay shouldn't matter here as long as the minimum value
// that qualifies the committee index for reward is included
let inclusion_delay = spec.min_attestation_inclusion_delay;
// Get the reward indices for the unaggregated attestation or log an error
match get_attestation_participation_flag_indices(
state,
&unaggregated_attestation.data,
inclusion_delay,
spec,
) {
Ok(flag_indices) => {
let head_hit = flag_indices.contains(&TIMELY_HEAD_FLAG_INDEX);
let target_hit = flag_indices.contains(&TIMELY_TARGET_FLAG_INDEX);
let source_hit = flag_indices.contains(&TIMELY_SOURCE_FLAG_INDEX);
if head_hit {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT,
);
} else {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS,
);
}
if target_hit {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT,
);
} else {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS,
);
}
if source_hit {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT,
);
} else {
metrics::inc_counter(
&metrics::VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS,
);
}
let data = &unaggregated_attestation.data;
debug!(
self.log,
"Simulated attestation evaluated";
"attestation_source" => ?data.source.root,
"attestation_target" => ?data.target.root,
"attestation_head" => ?data.beacon_block_root,
"attestation_slot" => ?data.slot,
"source_hit" => source_hit,
"target_hit" => target_hit,
"head_hit" => head_hit,
);
}
Err(err) => {
error!(
self.log,
"Failed to get attestation participation flag indices";
"error" => ?err,
"unaggregated_attestation" => ?unaggregated_attestation,
);
}
}
} else {
error!(
self.log,
"Failed to remove unaggregated attestation from the hashmap";
"slot" => ?slot,
);
}
}
}
/// Run `func` with the `TOTAL_LABEL` and optionally the
/// `individual_id`.
///