Add more metrics for tracking sync messages (#4308)

## Issue Addressed

NA

## Proposed Changes

Adds metrics to track validators that are submitting equivocating (but not slashable) sync messages. This follows on from some research we've been doing in a separate fork of LH.

## Additional Info

@jimmygchen and @michaelsproul have already run their eyes over this so it should be easy to get into v4.2.0, IMO.
This commit is contained in:
Paul Hauner
2023-05-19 05:13:07 +00:00
parent 75aea7054c
commit 01ae37ac37
6 changed files with 477 additions and 81 deletions

View File

@@ -874,6 +874,14 @@ lazy_static! {
"beacon_sync_committee_message_gossip_verification_seconds",
"Full runtime of sync contribution gossip verification"
);
pub static ref SYNC_MESSAGE_EQUIVOCATIONS: Result<IntCounter> = try_create_int_counter(
"sync_message_equivocations_total",
"Number of sync messages with the same validator index for different blocks"
);
pub static ref SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD: Result<IntCounter> = try_create_int_counter(
"sync_message_equivocations_to_head_total",
"Number of sync message which conflict with a previous message but elect the head"
);
/*
* Sync Committee Contribution Verification

View File

@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use std::marker::PhantomData;
use types::slot_data::SlotData;
use types::{Epoch, EthSpec, Slot, Unsigned};
use types::{Epoch, EthSpec, Hash256, Slot, Unsigned};
/// The maximum capacity of the `AutoPruningEpochContainer`.
///
@@ -39,10 +39,10 @@ pub const MAX_CACHED_EPOCHS: u64 = 3;
pub type ObservedAttesters<E> = AutoPruningEpochContainer<EpochBitfield, E>;
pub type ObservedSyncContributors<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, SyncContributorSlotHashSet<E>, E>;
AutoPruningSlotContainer<SlotSubcommitteeIndex, Hash256, SyncContributorSlotHashSet<E>, E>;
pub type ObservedAggregators<E> = AutoPruningEpochContainer<EpochHashSet, E>;
pub type ObservedSyncAggregators<E> =
AutoPruningSlotContainer<SlotSubcommitteeIndex, SyncAggregatorSlotHashSet, E>;
AutoPruningSlotContainer<SlotSubcommitteeIndex, (), SyncAggregatorSlotHashSet, E>;
#[derive(Debug, PartialEq)]
pub enum Error {
@@ -62,7 +62,7 @@ pub enum Error {
}
/// Implemented on an item in an `AutoPruningContainer`.
pub trait Item {
pub trait Item<T> {
/// Instantiate `Self` with the given `capacity`.
fn with_capacity(capacity: usize) -> Self;
@@ -75,11 +75,11 @@ pub trait Item {
/// Returns the number of validators that have been observed by `self`.
fn validator_count(&self) -> usize;
/// Store `validator_index` in `self`.
fn insert(&mut self, validator_index: usize) -> bool;
/// Store `validator_index` and `value` in `self`.
fn insert(&mut self, validator_index: usize, value: T) -> bool;
/// Returns `true` if `validator_index` has been stored in `self`.
fn contains(&self, validator_index: usize) -> bool;
/// Returns `Some(T)` if there is an entry for `validator_index`.
fn get(&self, validator_index: usize) -> Option<T>;
}
/// Stores a `BitVec` that represents which validator indices have attested or sent sync committee
@@ -88,7 +88,7 @@ pub struct EpochBitfield {
bitfield: BitVec,
}
impl Item for EpochBitfield {
impl Item<()> for EpochBitfield {
fn with_capacity(capacity: usize) -> Self {
Self {
bitfield: BitVec::with_capacity(capacity),
@@ -108,7 +108,7 @@ impl Item for EpochBitfield {
self.bitfield.iter().filter(|bit| **bit).count()
}
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
self.bitfield
.get_mut(validator_index)
.map(|mut bit| {
@@ -129,8 +129,11 @@ impl Item for EpochBitfield {
})
}
fn contains(&self, validator_index: usize) -> bool {
self.bitfield.get(validator_index).map_or(false, |bit| *bit)
fn get(&self, validator_index: usize) -> Option<()> {
self.bitfield
.get(validator_index)
.map_or(false, |bit| *bit)
.then_some(())
}
}
@@ -140,7 +143,7 @@ pub struct EpochHashSet {
set: HashSet<usize>,
}
impl Item for EpochHashSet {
impl Item<()> for EpochHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
@@ -163,27 +166,27 @@ impl Item for EpochHashSet {
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<()> {
self.set.contains(&validator_index).then_some(())
}
}
/// Stores a `HashSet` of which validator indices have created a sync aggregate during a
/// slot.
pub struct SyncContributorSlotHashSet<E> {
set: HashSet<usize>,
map: HashMap<usize, Hash256>,
phantom: PhantomData<E>,
}
impl<E: EthSpec> Item for SyncContributorSlotHashSet<E> {
impl<E: EthSpec> Item<Hash256> for SyncContributorSlotHashSet<E> {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
map: HashMap::with_capacity(capacity),
phantom: PhantomData,
}
}
@@ -194,22 +197,24 @@ impl<E: EthSpec> Item for SyncContributorSlotHashSet<E> {
}
fn len(&self) -> usize {
self.set.len()
self.map.len()
}
fn validator_count(&self) -> usize {
self.set.len()
self.map.len()
}
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
!self.set.insert(validator_index)
fn insert(&mut self, validator_index: usize, beacon_block_root: Hash256) -> bool {
self.map
.insert(validator_index, beacon_block_root)
.is_some()
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<Hash256> {
self.map.get(&validator_index).copied()
}
}
@@ -219,7 +224,7 @@ pub struct SyncAggregatorSlotHashSet {
set: HashSet<usize>,
}
impl Item for SyncAggregatorSlotHashSet {
impl Item<()> for SyncAggregatorSlotHashSet {
fn with_capacity(capacity: usize) -> Self {
Self {
set: HashSet::with_capacity(capacity),
@@ -241,13 +246,13 @@ impl Item for SyncAggregatorSlotHashSet {
/// Inserts the `validator_index` in the set. Returns `true` if the `validator_index` was
/// already in the set.
fn insert(&mut self, validator_index: usize) -> bool {
fn insert(&mut self, validator_index: usize, _value: ()) -> bool {
!self.set.insert(validator_index)
}
/// Returns `true` if the `validator_index` is in the set.
fn contains(&self, validator_index: usize) -> bool {
self.set.contains(&validator_index)
fn get(&self, validator_index: usize) -> Option<()> {
self.set.contains(&validator_index).then_some(())
}
}
@@ -275,7 +280,7 @@ impl<T, E: EthSpec> Default for AutoPruningEpochContainer<T, E> {
}
}
impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
impl<T: Item<()>, E: EthSpec> AutoPruningEpochContainer<T, E> {
/// Observe that `validator_index` has produced attestation `a`. Returns `Ok(true)` if `a` has
/// previously been observed for `validator_index`.
///
@@ -293,7 +298,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
self.prune(epoch);
if let Some(item) = self.items.get_mut(&epoch) {
Ok(item.insert(validator_index))
Ok(item.insert(validator_index, ()))
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch.
@@ -309,7 +314,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
let initial_capacity = sum.checked_div(count).unwrap_or_else(T::default_capacity);
let mut item = T::with_capacity(initial_capacity);
item.insert(validator_index);
item.insert(validator_index, ());
self.items.insert(epoch, item);
Ok(false)
@@ -333,7 +338,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
let exists = self
.items
.get(&epoch)
.map_or(false, |item| item.contains(validator_index));
.map_or(false, |item| item.get(validator_index).is_some());
Ok(exists)
}
@@ -392,7 +397,7 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
pub fn index_seen_at_epoch(&self, index: usize, epoch: Epoch) -> bool {
self.items
.get(&epoch)
.map(|item| item.contains(index))
.map(|item| item.get(index).is_some())
.unwrap_or(false)
}
}
@@ -405,23 +410,63 @@ impl<T: Item, E: EthSpec> AutoPruningEpochContainer<T, E> {
/// sync contributions with an epoch prior to `data.slot - 3` will be cleared from the cache.
///
/// `V` should be set to a `SyncAggregatorSlotHashSet` or a `SyncContributorSlotHashSet`.
pub struct AutoPruningSlotContainer<K: SlotData + Eq + Hash, V, E: EthSpec> {
pub struct AutoPruningSlotContainer<K: SlotData + Eq + Hash, S, V, E: EthSpec> {
lowest_permissible_slot: Slot,
items: HashMap<K, V>,
_phantom: PhantomData<E>,
_phantom_e: PhantomData<E>,
_phantom_s: PhantomData<S>,
}
impl<K: SlotData + Eq + Hash, V, E: EthSpec> Default for AutoPruningSlotContainer<K, V, E> {
impl<K: SlotData + Eq + Hash, S, V, E: EthSpec> Default for AutoPruningSlotContainer<K, S, V, E> {
fn default() -> Self {
Self {
lowest_permissible_slot: Slot::new(0),
items: HashMap::new(),
_phantom: PhantomData,
_phantom_e: PhantomData,
_phantom_s: PhantomData,
}
}
}
impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V, E> {
impl<K: SlotData + Eq + Hash + Copy, S, V: Item<S>, E: EthSpec>
AutoPruningSlotContainer<K, S, V, E>
{
/// Observes the given `value` for the given `validator_index`.
///
/// The `override_observation` function is supplied `previous_observation`
/// and `value`. If it returns `true`, then any existing observation will be
/// overridden.
///
/// This function returns `Some` if:
/// - An observation already existed for the validator, AND,
/// - The `override_observation` function returned `false`.
///
/// Alternatively, it returns `None` if:
/// - An observation did not already exist for the given validator, OR,
/// - The `override_observation` function returned `true`.
pub fn observe_validator_with_override<F>(
&mut self,
key: K,
validator_index: usize,
value: S,
override_observation: F,
) -> Result<Option<S>, Error>
where
F: Fn(&S, &S) -> bool,
{
if let Some(prev_observation) = self.observation_for_validator(key, validator_index)? {
if override_observation(&prev_observation, &value) {
self.observe_validator(key, validator_index, value)?;
Ok(None)
} else {
Ok(Some(prev_observation))
}
} else {
self.observe_validator(key, validator_index, value)?;
Ok(None)
}
}
/// Observe that `validator_index` has produced a sync committee message. Returns `Ok(true)` if
/// the sync committee message has previously been observed for `validator_index`.
///
@@ -429,14 +474,19 @@ impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
pub fn observe_validator(&mut self, key: K, validator_index: usize) -> Result<bool, Error> {
pub fn observe_validator(
&mut self,
key: K,
validator_index: usize,
value: S,
) -> Result<bool, Error> {
let slot = key.get_slot();
self.sanitize_request(slot, validator_index)?;
self.prune(slot);
if let Some(item) = self.items.get_mut(&key) {
Ok(item.insert(validator_index))
Ok(item.insert(validator_index, value))
} else {
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier slot.
@@ -452,32 +502,45 @@ impl<K: SlotData + Eq + Hash, V: Item, E: EthSpec> AutoPruningSlotContainer<K, V
let initial_capacity = sum.checked_div(count).unwrap_or_else(V::default_capacity);
let mut item = V::with_capacity(initial_capacity);
item.insert(validator_index);
item.insert(validator_index, value);
self.items.insert(key, item);
Ok(false)
}
}
/// Returns `Ok(true)` if the `validator_index` has already produced a conflicting sync committee message.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
// Identical to `Self::observation_for_validator` but discards the
// observation, simply returning `true` if the validator has been observed
// at all.
pub fn validator_has_been_observed(
&self,
key: K,
validator_index: usize,
) -> Result<bool, Error> {
self.observation_for_validator(key, validator_index)
.map(|observation| observation.is_some())
}
/// Returns `Ok(Some)` if the `validator_index` has already produced a
/// conflicting sync committee message.
///
/// ## Errors
///
/// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`.
/// - `key.slot` is earlier than `self.lowest_permissible_slot`.
pub fn observation_for_validator(
&self,
key: K,
validator_index: usize,
) -> Result<Option<S>, Error> {
self.sanitize_request(key.get_slot(), validator_index)?;
let exists = self
let observation = self
.items
.get(&key)
.map_or(false, |item| item.contains(validator_index));
.and_then(|item| item.get(validator_index));
Ok(exists)
Ok(observation)
}
/// Returns the number of validators that have been observed at the given `slot`. Returns
@@ -561,6 +624,116 @@ mod tests {
type E = types::MainnetEthSpec;
#[test]
fn value_storage() {
type Container = AutoPruningSlotContainer<Slot, Hash256, SyncContributorSlotHashSet<E>, E>;
let mut store: Container = <_>::default();
let key = Slot::new(0);
let validator_index = 0;
let value = Hash256::zero();
// Assert there is no entry.
assert!(store
.observation_for_validator(key, validator_index)
.unwrap()
.is_none());
assert!(!store
.validator_has_been_observed(key, validator_index)
.unwrap());
// Add an entry.
assert!(!store
.observe_validator(key, validator_index, value)
.unwrap());
// Assert there is a correct entry.
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
assert!(store
.validator_has_been_observed(key, validator_index)
.unwrap());
let alternate_value = Hash256::from_low_u64_be(1);
// Assert that override false does not override.
assert_eq!(
store
.observe_validator_with_override(key, validator_index, alternate_value, |_, _| {
false
})
.unwrap(),
Some(value)
);
// Assert that override true overrides and acts as if there was never an
// entry there.
assert_eq!(
store
.observe_validator_with_override(key, validator_index, alternate_value, |_, _| {
true
})
.unwrap(),
None
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(alternate_value)
);
// Reset the store.
let mut store: Container = <_>::default();
// Asset that a new entry with override = false is inserted
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
None
);
assert_eq!(
store
.observe_validator_with_override(key, validator_index, value, |_, _| { false })
.unwrap(),
None,
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
// Reset the store.
let mut store: Container = <_>::default();
// Asset that a new entry with override = true is inserted
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
None
);
assert_eq!(
store
.observe_validator_with_override(key, validator_index, value, |_, _| { true })
.unwrap(),
None,
);
assert_eq!(
store
.observation_for_validator(key, validator_index)
.unwrap(),
Some(value)
);
}
macro_rules! test_suite_epoch {
($mod_name: ident, $type: ident) => {
#[cfg(test)]
@@ -722,7 +895,7 @@ mod tests {
test_suite_epoch!(observed_aggregators, ObservedAggregators);
macro_rules! test_suite_slot {
($mod_name: ident, $type: ident) => {
($mod_name: ident, $type: ident, $value: expr) => {
#[cfg(test)]
mod $mod_name {
use super::*;
@@ -737,7 +910,7 @@ mod tests {
"should indicate an unknown item is unknown"
);
assert_eq!(
store.observe_validator(key, i),
store.observe_validator(key, i, $value),
Ok(false),
"should observe new item"
);
@@ -750,7 +923,7 @@ mod tests {
"should indicate a known item is known"
);
assert_eq!(
store.observe_validator(key, i),
store.observe_validator(key, i, $value),
Ok(true),
"should acknowledge an existing item"
);
@@ -997,6 +1170,10 @@ mod tests {
}
};
}
test_suite_slot!(observed_sync_contributors, ObservedSyncContributors);
test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators);
test_suite_slot!(
observed_sync_contributors,
ObservedSyncContributors,
Hash256::zero()
);
test_suite_slot!(observed_sync_aggregators, ObservedSyncAggregators, ());
}

View File

@@ -153,7 +153,21 @@ pub enum Error {
/// It's unclear if this sync message is valid, however we have already observed a
/// signature from this validator for this slot and should not observe
/// another.
PriorSyncCommitteeMessageKnown { validator_index: u64, slot: Slot },
PriorSyncCommitteeMessageKnown {
validator_index: u64,
slot: Slot,
prev_root: Hash256,
new_root: Hash256,
},
/// We have already observed a contribution for the aggregator and refuse to
/// process another.
///
/// ## Peer scoring
///
/// It's unclear if this sync message is valid, however we have already observed a
/// signature from this validator for this slot and should not observe
/// another.
PriorSyncContributionMessageKnown { validator_index: u64, slot: Slot },
/// The sync committee message was received on an invalid sync committee message subnet.
///
/// ## Peer scoring
@@ -378,10 +392,10 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
if chain
.observed_sync_aggregators
.write()
.observe_validator(observed_key, aggregator_index as usize)
.observe_validator(observed_key, aggregator_index as usize, ())
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
return Err(Error::PriorSyncContributionMessageKnown {
validator_index: aggregator_index,
slot: contribution.slot,
});
@@ -450,19 +464,40 @@ impl VerifiedSyncCommitteeMessage {
// The sync committee message is the first valid message received for the participating validator
// for the slot, sync_message.slot.
let validator_index = sync_message.validator_index;
if chain
let head_root = chain.canonical_head.cached_head().head_block_root();
let new_root = sync_message.beacon_block_root;
let should_override_prev = |prev_root: &Hash256, new_root: &Hash256| {
let roots_differ = new_root != prev_root;
let new_elects_head = new_root == &head_root;
if roots_differ {
// Track sync committee messages that differ from each other.
metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS);
if new_elects_head {
// Track sync committee messages that swap from an old block to a new block.
metrics::inc_counter(&metrics::SYNC_MESSAGE_EQUIVOCATIONS_TO_HEAD);
}
}
roots_differ && new_elects_head
};
if let Some(prev_root) = chain
.observed_sync_contributors
.read()
.validator_has_been_observed(
.observation_for_validator(
SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()),
validator_index as usize,
)
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
});
if !should_override_prev(&prev_root, &new_root) {
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
prev_root,
new_root,
});
}
}
// The aggregate signature of the sync committee message is valid.
@@ -474,18 +509,22 @@ impl VerifiedSyncCommitteeMessage {
// It's important to double check that the sync committee message still hasn't been observed, since
// there can be a race-condition if we receive two sync committee messages at the same time and
// process them in different threads.
if chain
if let Some(prev_root) = chain
.observed_sync_contributors
.write()
.observe_validator(
.observe_validator_with_override(
SlotSubcommitteeIndex::new(sync_message.slot, subnet_id.into()),
validator_index as usize,
sync_message.beacon_block_root,
should_override_prev,
)
.map_err(BeaconChainError::from)?
{
return Err(Error::PriorSyncCommitteeMessageKnown {
validator_index,
slot: sync_message.slot,
prev_root,
new_root,
});
}