Validator monitor support for sync committees (#2476)

## Issue Addressed

N/A

## Proposed Changes

Add functionality in the validator monitor to provide sync committee related metrics for monitored validators.


Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Pawan Dhananjay
2021-08-31 23:31:36 +00:00
parent 44fa54004c
commit 5a3bcd2904
14 changed files with 564 additions and 71 deletions

View File

@@ -4,7 +4,7 @@
use crate::metrics;
use parking_lot::RwLock;
use slog::{crit, error, info, warn, Logger};
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use state_processing::per_epoch_processing::{
errors::EpochProcessingError, EpochProcessingSummary,
@@ -16,9 +16,9 @@ use std::marker::PhantomData;
use std::str::Utf8Error;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use types::{
AttestationData, AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec,
Hash256, IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof, Slot,
VoluntaryExit,
AttesterSlashing, BeaconBlockRef, BeaconState, ChainSpec, Epoch, EthSpec, Hash256,
IndexedAttestation, ProposerSlashing, PublicKeyBytes, SignedAggregateAndProof,
SignedContributionAndProof, Slot, SyncCommitteeMessage, VoluntaryExit,
};
/// The validator monitor collects per-epoch data about each monitored validator. Historical data
@@ -43,7 +43,7 @@ struct EpochSummary {
/// The delay between when the attestation should have been produced and when it was observed.
pub attestation_min_delay: Option<Duration>,
/// The number of times a validators attestation was seen in an aggregate.
pub attestation_aggregate_incusions: usize,
pub attestation_aggregate_inclusions: usize,
/// The number of times a validators attestation was seen in a block.
pub attestation_block_inclusions: usize,
/// The minimum observed inclusion distance for an attestation for this epoch..
@@ -62,6 +62,27 @@ struct EpochSummary {
pub aggregates: usize,
/// The delay between when the aggregate should have been produced and when it was observed.
pub aggregate_min_delay: Option<Duration>,
/*
* SyncCommitteeMessages in the current epoch
*/
/// The number of sync committee messages seen.
sync_committee_messages: usize,
/// The delay between when the sync committee message should have been produced and when it was observed.
sync_committee_message_min_delay: Option<Duration>,
/// The number of times a validator's sync signature was included in the sync aggregate.
sync_signature_block_inclusions: usize,
/// The number of times a validator's sync signature was aggregated into a sync contribution.
sync_signature_contribution_inclusions: usize,
/*
* SyncContributions in the current epoch
*/
/// The number of SyncContributions observed in the current epoch.
sync_contributions: usize,
/// The delay between when the sync committee contribution should have been produced and when it was observed.
sync_contribution_min_delay: Option<Duration>,
/*
* Others pertaining to this epoch.
*/
@@ -93,13 +114,27 @@ impl EpochSummary {
Self::update_if_lt(&mut self.attestation_min_delay, delay);
}
pub fn register_sync_committee_message(&mut self, delay: Duration) {
self.sync_committee_messages += 1;
Self::update_if_lt(&mut self.sync_committee_message_min_delay, delay);
}
pub fn register_aggregated_attestation(&mut self, delay: Duration) {
self.aggregates += 1;
Self::update_if_lt(&mut self.aggregate_min_delay, delay);
}
pub fn register_sync_committee_contribution(&mut self, delay: Duration) {
self.sync_contributions += 1;
Self::update_if_lt(&mut self.sync_contribution_min_delay, delay);
}
pub fn register_aggregate_attestation_inclusion(&mut self) {
self.attestation_aggregate_incusions += 1;
self.attestation_aggregate_inclusions += 1;
}
pub fn register_sync_signature_contribution_inclusion(&mut self) {
self.sync_signature_contribution_inclusions += 1;
}
pub fn register_attestation_block_inclusion(&mut self, delay: Slot) {
@@ -107,6 +142,10 @@ impl EpochSummary {
Self::update_if_lt(&mut self.attestation_min_block_inclusion_distance, delay);
}
pub fn register_sync_signature_block_inclusions(&mut self) {
self.sync_signature_block_inclusions += 1;
}
pub fn register_exit(&mut self) {
self.exits += 1;
}
@@ -328,10 +367,10 @@ impl<T: EthSpec> ValidatorMonitor<T> {
pub fn process_validator_statuses(
&self,
epoch: Epoch,
summary: &EpochProcessingSummary,
summary: &EpochProcessingSummary<T>,
spec: &ChainSpec,
) -> Result<(), EpochProcessingError> {
for monitored_validator in self.validators.values() {
for (pubkey, monitored_validator) in self.validators.iter() {
// We subtract two from the state of the epoch that generated these summaries.
//
// - One to account for it being the previous epoch.
@@ -455,6 +494,45 @@ impl<T: EthSpec> ValidatorMonitor<T> {
inclusion_info.delay as i64,
);
}
// Indicates the number of sync committee signatures that made it into
// a sync aggregate in the current_epoch (state.epoch - 1).
// Note: Unlike attestations, sync committee signatures must be included in the
// immediate next slot. Hence, num included sync aggregates for `state.epoch - 1`
// is available right after state transition to state.epoch.
let current_epoch = epoch - 1;
if let Some(sync_committee) = summary.sync_committee() {
if sync_committee.contains(pubkey) {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE,
&[id],
1,
);
let epoch_summary = monitored_validator.summaries.read();
if let Some(summary) = epoch_summary.get(&current_epoch) {
info!(
self.log,
"Current epoch sync signatures";
"included" => summary.sync_signature_block_inclusions,
"expected" => T::slots_per_epoch(),
"epoch" => current_epoch,
"validator" => id,
);
}
} else {
metrics::set_int_gauge(
&metrics::VALIDATOR_MONITOR_VALIDATOR_IN_CURRENT_SYNC_COMMITTEE,
&[id],
0,
);
debug!(
self.log,
"Validator isn't part of the current sync committee";
"epoch" => current_epoch,
"validator" => id,
);
}
}
}
}
@@ -555,22 +633,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
}
}
/// Returns the duration between when the attestation `data` could be produced (1/3rd through
/// the slot) and `seen_timestamp`.
fn get_unaggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.unagg_attestation_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Register an attestation seen on the gossip network.
pub fn register_gossip_unaggregated_attestation<S: SlotClock>(
&self,
@@ -610,7 +672,12 @@ impl<T: EthSpec> ValidatorMonitor<T> {
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_unaggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
let delay = get_message_delay_ms(
seen_timestamp,
data.slot,
slot_clock.unagg_attestation_production_delay(),
slot_clock,
);
indexed_attestation.attesting_indices.iter().for_each(|i| {
if let Some(validator) = self.get_validator(*i) {
@@ -645,22 +712,6 @@ impl<T: EthSpec> ValidatorMonitor<T> {
})
}
/// Returns the duration between when a `AggregateAndproof` with `data` could be produced (2/3rd
/// through the slot) and `seen_timestamp`.
fn get_aggregated_attestation_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
data: &AttestationData,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(data.slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| {
gross_delay.checked_sub(slot_clock.agg_attestation_production_delay())
})
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Register a `signed_aggregate_and_proof` seen on the gossip network.
pub fn register_gossip_aggregated_attestation<S: SlotClock>(
&self,
@@ -705,7 +756,12 @@ impl<T: EthSpec> ValidatorMonitor<T> {
) {
let data = &indexed_attestation.data;
let epoch = data.slot.epoch(T::slots_per_epoch());
let delay = Self::get_aggregated_attestation_delay_ms(seen_timestamp, data, slot_clock);
let delay = get_message_delay_ms(
seen_timestamp,
data.slot,
slot_clock.agg_attestation_production_delay(),
slot_clock,
);
let aggregator_index = signed_aggregate_and_proof.message.aggregator_index;
if let Some(validator) = self.get_validator(aggregator_index) {
@@ -814,6 +870,226 @@ impl<T: EthSpec> ValidatorMonitor<T> {
})
}
/// Register a sync committee message received over gossip.
pub fn register_gossip_sync_committee_message<S: SlotClock>(
&self,
seen_timestamp: Duration,
sync_committee_message: &SyncCommitteeMessage,
slot_clock: &S,
) {
self.register_sync_committee_message(
"gossip",
seen_timestamp,
sync_committee_message,
slot_clock,
)
}
/// Register a sync committee message received over the http api.
pub fn register_api_sync_committee_message<S: SlotClock>(
&self,
seen_timestamp: Duration,
sync_committee_message: &SyncCommitteeMessage,
slot_clock: &S,
) {
self.register_sync_committee_message(
"api",
seen_timestamp,
sync_committee_message,
slot_clock,
)
}
/// Register a sync committee message.
fn register_sync_committee_message<S: SlotClock>(
&self,
src: &str,
seen_timestamp: Duration,
sync_committee_message: &SyncCommitteeMessage,
slot_clock: &S,
) {
if let Some(validator) = self.get_validator(sync_committee_message.validator_index) {
let id = &validator.id;
let epoch = sync_committee_message.slot.epoch(T::slots_per_epoch());
let delay = get_message_delay_ms(
seen_timestamp,
sync_committee_message.slot,
slot_clock.sync_committee_message_production_delay(),
slot_clock,
);
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_TOTAL,
&[src, id],
);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGES_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Sync committee message";
"head" => %sync_committee_message.beacon_block_root,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %sync_committee_message.slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_sync_committee_message(delay)
});
}
}
/// Register a sync committee contribution received over gossip.
pub fn register_gossip_sync_committee_contribution<S: SlotClock>(
&self,
seen_timestamp: Duration,
sync_contribution: &SignedContributionAndProof<T>,
participant_pubkeys: &[PublicKeyBytes],
slot_clock: &S,
) {
self.register_sync_committee_contribution(
"gossip",
seen_timestamp,
sync_contribution,
participant_pubkeys,
slot_clock,
)
}
/// Register a sync committee contribution received over the http api.
pub fn register_api_sync_committee_contribution<S: SlotClock>(
&self,
seen_timestamp: Duration,
sync_contribution: &SignedContributionAndProof<T>,
participant_pubkeys: &[PublicKeyBytes],
slot_clock: &S,
) {
self.register_sync_committee_contribution(
"api",
seen_timestamp,
sync_contribution,
participant_pubkeys,
slot_clock,
)
}
/// Register a sync committee contribution.
fn register_sync_committee_contribution<S: SlotClock>(
&self,
src: &str,
seen_timestamp: Duration,
sync_contribution: &SignedContributionAndProof<T>,
participant_pubkeys: &[PublicKeyBytes],
slot_clock: &S,
) {
let slot = sync_contribution.message.contribution.slot;
let epoch = slot.epoch(T::slots_per_epoch());
let beacon_block_root = sync_contribution.message.contribution.beacon_block_root;
let delay = get_message_delay_ms(
seen_timestamp,
slot,
slot_clock.sync_committee_contribution_production_delay(),
slot_clock,
);
let aggregator_index = sync_contribution.message.aggregator_index;
if let Some(validator) = self.get_validator(aggregator_index) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_SYNC_CONTRIBUTIONS_TOTAL,
&[src, id],
);
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_SYNC_COONTRIBUTIONS_DELAY_SECONDS,
&[src, id],
delay,
);
info!(
self.log,
"Sync contribution";
"head" => %beacon_block_root,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_sync_committee_contribution(delay)
});
}
for validator_pubkey in participant_pubkeys.iter() {
if let Some(validator) = self.validators.get(validator_pubkey) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_CONTRIBUTION_TOTAL,
&[src, id],
);
info!(
self.log,
"Sync signature included in contribution";
"head" => %beacon_block_root,
"delay_ms" => %delay.as_millis(),
"epoch" => %epoch,
"slot" => %slot,
"src" => src,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_sync_signature_contribution_inclusion()
});
}
}
}
/// Register that the `sync_aggregate` was included in a *valid* `BeaconBlock`.
pub fn register_sync_aggregate_in_block(
&self,
slot: Slot,
beacon_block_root: Hash256,
participant_pubkeys: Vec<&PublicKeyBytes>,
) {
let epoch = slot.epoch(T::slots_per_epoch());
for validator_pubkey in participant_pubkeys {
if let Some(validator) = self.validators.get(validator_pubkey) {
let id = &validator.id;
metrics::inc_counter_vec(
&metrics::VALIDATOR_MONITOR_SYNC_COMMITTEE_MESSAGE_IN_BLOCK_TOTAL,
&["block", id],
);
info!(
self.log,
"Sync signature included in block";
"head" => %beacon_block_root,
"epoch" => %epoch,
"slot" => %slot,
"validator" => %id,
);
validator.with_epoch_summary(epoch, |summary| {
summary.register_sync_signature_block_inclusions();
});
}
}
}
/// Register an exit from the gossip network.
pub fn register_gossip_voluntary_exit(&self, exit: &VoluntaryExit) {
self.register_voluntary_exit("gossip", exit)
@@ -995,7 +1271,7 @@ impl<T: EthSpec> ValidatorMonitor<T> {
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_AGGREGATE_INCLUSIONS,
&[id],
summary.attestation_aggregate_incusions as i64,
summary.attestation_aggregate_inclusions as i64,
);
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_ATTESTATION_BLOCK_INCLUSIONS,
@@ -1009,6 +1285,48 @@ impl<T: EthSpec> ValidatorMonitor<T> {
distance.as_u64() as i64,
);
}
/*
* Sync committee messages
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_TOTAL,
&[id],
summary.sync_committee_messages as i64,
);
if let Some(delay) = summary.sync_committee_message_min_delay {
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_COMMITTEE_MESSAGES_MIN_DELAY_SECONDS,
&[id],
delay,
);
}
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_INCLUSIONS,
&[id],
summary.sync_signature_contribution_inclusions as i64,
);
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_SIGNATURE_BLOCK_INCLUSIONS,
&[id],
summary.sync_signature_block_inclusions as i64,
);
/*
* Sync contributions
*/
metrics::set_gauge_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTIONS_TOTAL,
&[id],
summary.sync_contributions as i64,
);
if let Some(delay) = summary.sync_contribution_min_delay {
metrics::observe_timer_vec(
&metrics::VALIDATOR_MONITOR_PREV_EPOCH_SYNC_CONTRIBUTION_MIN_DELAY_SECONDS,
&[id],
delay,
);
}
/*
* Blocks
*/
@@ -1094,3 +1412,23 @@ pub fn get_slot_delay_ms<S: SlotClock>(
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.unwrap_or_else(|| Duration::from_secs(0))
}
/// Returns the duration between when any message could be produced and the `seen_timestamp`.
///
/// `message_production_delay` is the duration from the beginning of the slot when the message
/// should be produced.
/// e.g. for unagg attestations, `message_production_delay = slot_duration / 3`.
///
/// `slot` is the slot for which the message was produced.
fn get_message_delay_ms<S: SlotClock>(
seen_timestamp: Duration,
slot: Slot,
message_production_delay: Duration,
slot_clock: &S,
) -> Duration {
slot_clock
.start_of(slot)
.and_then(|slot_start| seen_timestamp.checked_sub(slot_start))
.and_then(|gross_delay| gross_delay.checked_sub(message_production_delay))
.unwrap_or_else(|| Duration::from_secs(0))
}