Aggregate subsets (#3493)

## Issue Addressed

Resolves #3238 

## Proposed Changes

Please list or describe the changes introduced by this PR.

## Additional Info

Please provide any additional information. For example, future considerations
or information useful for reviewers.
This commit is contained in:
Pawan Dhananjay
2023-06-27 01:06:49 +00:00
parent cc780aae3e
commit 448d3ec9b3
17 changed files with 236 additions and 93 deletions

View File

@@ -117,14 +117,14 @@ pub enum Error {
///
/// The peer has sent an invalid message.
AggregatorPubkeyUnknown(u64),
/// The attestation has been seen before; either in a block, on the gossip network or from a
/// local validator.
/// The attestation or a superset of this attestation's aggregations bits for the same data
/// has been seen before; either in a block, on the gossip network or from a local validator.
///
/// ## Peer scoring
///
/// It's unclear if this attestation is valid, however we have already observed it and do not
/// need to observe it again.
AttestationAlreadyKnown(Hash256),
AttestationSupersetKnown(Hash256),
/// There has already been an aggregation observed for this validator, we refuse to process a
/// second.
///
@@ -268,7 +268,7 @@ enum CheckAttestationSignature {
struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> {
signed_aggregate: &'a SignedAggregateAndProof<T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
attestation_root: Hash256,
attestation_data_root: Hash256,
}
/// Wraps a `Attestation` that has been verified up until the point that an `IndexedAttestation` can
@@ -467,14 +467,17 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
}
// Ensure the valid aggregated attestation has not already been seen locally.
let attestation_root = attestation.tree_hash_root();
let attestation_data = &attestation.data;
let attestation_data_root = attestation_data.tree_hash_root();
if chain
.observed_attestations
.write()
.is_known(attestation, attestation_root)
.is_known_subset(attestation, attestation_data_root)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_SUBSETS);
return Err(Error::AttestationSupersetKnown(attestation_data_root));
}
let aggregator_index = signed_aggregate.message.aggregator_index;
@@ -520,7 +523,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
if attestation.aggregation_bits.is_zero() {
Err(Error::EmptyAggregationBitfield)
} else {
Ok(attestation_root)
Ok(attestation_data_root)
}
}
@@ -533,7 +536,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
let attestation = &signed_aggregate.message.aggregate;
let aggregator_index = signed_aggregate.message.aggregator_index;
let attestation_root = match Self::verify_early_checks(signed_aggregate, chain) {
let attestation_data_root = match Self::verify_early_checks(signed_aggregate, chain) {
Ok(root) => root,
Err(e) => return Err(SignatureNotChecked(&signed_aggregate.message.aggregate, e)),
};
@@ -568,7 +571,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> {
Ok(IndexedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
attestation_root,
attestation_data_root,
})
}
}
@@ -577,7 +580,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
/// Run the checks that happen after the indexed attestation and signature have been checked.
fn verify_late_checks(
signed_aggregate: &SignedAggregateAndProof<T::EthSpec>,
attestation_root: Hash256,
attestation_data_root: Hash256,
chain: &BeaconChain<T>,
) -> Result<(), Error> {
let attestation = &signed_aggregate.message.aggregate;
@@ -587,13 +590,14 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
//
// It's important to double check that the attestation is not already known, otherwise two
// attestations processed at the same time could be published.
if let ObserveOutcome::AlreadyKnown = chain
if let ObserveOutcome::Subset = chain
.observed_attestations
.write()
.observe_item(attestation, Some(attestation_root))
.observe_item(attestation, Some(attestation_data_root))
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::AttestationAlreadyKnown(attestation_root));
metrics::inc_counter(&metrics::AGGREGATED_ATTESTATION_SUBSETS);
return Err(Error::AttestationSupersetKnown(attestation_data_root));
}
// Observe the aggregator so we don't process another aggregate from them.
@@ -653,7 +657,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
let IndexedAggregatedAttestation {
signed_aggregate,
indexed_attestation,
attestation_root,
attestation_data_root,
} = signed_aggregate;
match check_signature {
@@ -677,7 +681,7 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> {
CheckAttestationSignature::No => (),
};
if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_root, chain) {
if let Err(e) = Self::verify_late_checks(signed_aggregate, attestation_data_root, chain) {
return Err(SignatureValid(indexed_attestation, e));
}

View File

@@ -998,6 +998,17 @@ lazy_static! {
"light_client_optimistic_update_verification_success_total",
"Number of light client optimistic updates verified for gossip"
);
/*
* Aggregate subset metrics
*/
pub static ref SYNC_CONTRIBUTION_SUBSETS: Result<IntCounter> = try_create_int_counter(
"beacon_sync_contribution_subsets_total",
"Count of new sync contributions that are subsets of already known aggregates"
);
pub static ref AGGREGATED_ATTESTATION_SUBSETS: Result<IntCounter> = try_create_int_counter(
"beacon_aggregated_attestation_subsets_total",
"Count of new aggregated attestations that are subsets of already known aggregates"
);
}
/// Scrape the `beacon_chain` for metrics that are not constantly updated (e.g., the present slot,

View File

@@ -1,7 +1,9 @@
//! Provides an `ObservedAggregates` struct which allows us to reject aggregated attestations or
//! sync committee contributions if we've already seen them.
use std::collections::HashSet;
use crate::sync_committee_verification::SyncCommitteeData;
use ssz_types::{BitList, BitVector};
use std::collections::HashMap;
use std::marker::PhantomData;
use tree_hash::TreeHash;
use types::consts::altair::{
@@ -10,8 +12,16 @@ use types::consts::altair::{
use types::slot_data::SlotData;
use types::{Attestation, EthSpec, Hash256, Slot, SyncCommitteeContribution};
pub type ObservedSyncContributions<E> = ObservedAggregates<SyncCommitteeContribution<E>, E>;
pub type ObservedAggregateAttestations<E> = ObservedAggregates<Attestation<E>, E>;
pub type ObservedSyncContributions<E> = ObservedAggregates<
SyncCommitteeContribution<E>,
E,
BitVector<<E as types::EthSpec>::SyncSubcommitteeSize>,
>;
pub type ObservedAggregateAttestations<E> = ObservedAggregates<
Attestation<E>,
E,
BitList<<E as types::EthSpec>::MaxValidatorsPerCommittee>,
>;
/// A trait use to associate capacity constants with the type being stored in `ObservedAggregates`.
pub trait Consts {
@@ -69,10 +79,81 @@ impl<T: EthSpec> Consts for SyncCommitteeContribution<T> {
}
}
/// A trait for types that implement a behaviour where one object of that type
/// can be a subset/superset of another.
/// This trait allows us to be generic over the aggregate item that we store in the cache that
/// we want to prevent duplicates/subsets for.
pub trait SubsetItem {
/// The item that is stored for later comparison with new incoming aggregate items.
type Item;
/// Returns `true` if `self` is a non-strict subset of `other` and `false` otherwise.
fn is_subset(&self, other: &Self::Item) -> bool;
/// Returns `true` if `self` is a non-strict superset of `other` and `false` otherwise.
fn is_superset(&self, other: &Self::Item) -> bool;
/// Returns the item that gets stored in `ObservedAggregates` for later subset
/// comparison with incoming aggregates.
fn get_item(&self) -> Self::Item;
/// Returns a unique value that keys the object to the item that is being stored
/// in `ObservedAggregates`.
fn root(&self) -> Hash256;
}
impl<T: EthSpec> SubsetItem for Attestation<T> {
type Item = BitList<T::MaxValidatorsPerCommittee>;
fn is_subset(&self, other: &Self::Item) -> bool {
self.aggregation_bits.is_subset(other)
}
fn is_superset(&self, other: &Self::Item) -> bool {
other.is_subset(&self.aggregation_bits)
}
/// Returns the sync contribution aggregation bits.
fn get_item(&self) -> Self::Item {
self.aggregation_bits.clone()
}
/// Returns the hash tree root of the attestation data.
fn root(&self) -> Hash256 {
self.data.tree_hash_root()
}
}
impl<T: EthSpec> SubsetItem for SyncCommitteeContribution<T> {
type Item = BitVector<T::SyncSubcommitteeSize>;
fn is_subset(&self, other: &Self::Item) -> bool {
self.aggregation_bits.is_subset(other)
}
fn is_superset(&self, other: &Self::Item) -> bool {
other.is_subset(&self.aggregation_bits)
}
/// Returns the sync contribution aggregation bits.
fn get_item(&self) -> Self::Item {
self.aggregation_bits.clone()
}
/// Returns the hash tree root of the root, slot and subcommittee index
/// of the sync contribution.
fn root(&self) -> Hash256 {
SyncCommitteeData {
root: self.beacon_block_root,
slot: self.slot,
subcommittee_index: self.subcommittee_index,
}
.tree_hash_root()
}
}
#[derive(Debug, PartialEq)]
pub enum ObserveOutcome {
/// This item was already known.
AlreadyKnown,
/// This item is a non-strict subset of an already known item.
Subset,
/// This was the first time this item was observed.
New,
}
@@ -94,26 +175,28 @@ pub enum Error {
},
}
/// A `HashSet` that contains entries related to some `Slot`.
struct SlotHashSet {
set: HashSet<Hash256>,
/// A `HashMap` that contains entries related to some `Slot`.
struct SlotHashSet<I> {
/// Contains a vector of maximally-sized aggregation bitfields/bitvectors
/// such that no bitfield/bitvector is a subset of any other in the list.
map: HashMap<Hash256, Vec<I>>,
slot: Slot,
max_capacity: usize,
}
impl SlotHashSet {
impl<I> SlotHashSet<I> {
pub fn new(slot: Slot, initial_capacity: usize, max_capacity: usize) -> Self {
Self {
slot,
set: HashSet::with_capacity(initial_capacity),
map: HashMap::with_capacity(initial_capacity),
max_capacity,
}
}
/// Store the items in self so future observations recognise its existence.
pub fn observe_item<T: SlotData>(
pub fn observe_item<S: SlotData + SubsetItem<Item = I>>(
&mut self,
item: &T,
item: &S,
root: Hash256,
) -> Result<ObserveOutcome, Error> {
if item.get_slot() != self.slot {
@@ -123,29 +206,45 @@ impl SlotHashSet {
});
}
if self.set.contains(&root) {
Ok(ObserveOutcome::AlreadyKnown)
} else {
// Here we check to see if this slot has reached the maximum observation count.
//
// The resulting behaviour is that we are no longer able to successfully observe new
// items, however we will continue to return `is_known` values. We could also
// disable `is_known`, however then we would stop forwarding items across the
// gossip network and I think that this is a worse case than sending some invalid ones.
// The underlying libp2p network is responsible for removing duplicate messages, so
// this doesn't risk a broadcast loop.
if self.set.len() >= self.max_capacity {
return Err(Error::ReachedMaxObservationsPerSlot(self.max_capacity));
if let Some(aggregates) = self.map.get_mut(&root) {
for existing in aggregates {
// Check if `item` is a subset of any of the observed aggregates
if item.is_subset(existing) {
return Ok(ObserveOutcome::Subset);
// Check if `item` is a superset of any of the observed aggregates
// If true, we replace the new item with its existing subset. This allows us
// to hold fewer items in the list.
} else if item.is_superset(existing) {
*existing = item.get_item();
return Ok(ObserveOutcome::New);
}
}
self.set.insert(root);
Ok(ObserveOutcome::New)
}
// Here we check to see if this slot has reached the maximum observation count.
//
// The resulting behaviour is that we are no longer able to successfully observe new
// items, however we will continue to return `is_known_subset` values. We could also
// disable `is_known_subset`, however then we would stop forwarding items across the
// gossip network and I think that this is a worse case than sending some invalid ones.
// The underlying libp2p network is responsible for removing duplicate messages, so
// this doesn't risk a broadcast loop.
if self.map.len() >= self.max_capacity {
return Err(Error::ReachedMaxObservationsPerSlot(self.max_capacity));
}
let item = item.get_item();
self.map.entry(root).or_default().push(item);
Ok(ObserveOutcome::New)
}
/// Indicates if `item` has been observed before.
pub fn is_known<T: SlotData>(&self, item: &T, root: Hash256) -> Result<bool, Error> {
/// Check if `item` is a non-strict subset of any of the already observed aggregates for
/// the given root and slot.
pub fn is_known_subset<S: SlotData + SubsetItem<Item = I>>(
&self,
item: &S,
root: Hash256,
) -> Result<bool, Error> {
if item.get_slot() != self.slot {
return Err(Error::IncorrectSlot {
expected: self.slot,
@@ -153,25 +252,28 @@ impl SlotHashSet {
});
}
Ok(self.set.contains(&root))
Ok(self
.map
.get(&root)
.map_or(false, |agg| agg.iter().any(|val| item.is_subset(val))))
}
/// The number of observed items in `self`.
pub fn len(&self) -> usize {
self.set.len()
self.map.len()
}
}
/// Stores the roots of objects for some number of `Slots`, so we can determine if
/// these have previously been seen on the network.
pub struct ObservedAggregates<T: TreeHash + SlotData + Consts, E: EthSpec> {
pub struct ObservedAggregates<T: SlotData + Consts, E: EthSpec, I> {
lowest_permissible_slot: Slot,
sets: Vec<SlotHashSet>,
sets: Vec<SlotHashSet<I>>,
_phantom_spec: PhantomData<E>,
_phantom_tree_hash: PhantomData<T>,
}
impl<T: TreeHash + SlotData + Consts, E: EthSpec> Default for ObservedAggregates<T, E> {
impl<T: SlotData + Consts, E: EthSpec, I> Default for ObservedAggregates<T, E, I> {
fn default() -> Self {
Self {
lowest_permissible_slot: Slot::new(0),
@@ -182,17 +284,17 @@ impl<T: TreeHash + SlotData + Consts, E: EthSpec> Default for ObservedAggregates
}
}
impl<T: TreeHash + SlotData + Consts, E: EthSpec> ObservedAggregates<T, E> {
/// Store the root of `item` in `self`.
impl<T: SlotData + Consts + SubsetItem<Item = I>, E: EthSpec, I> ObservedAggregates<T, E, I> {
/// Store `item` in `self` keyed at `root`.
///
/// `root` must equal `item.tree_hash_root()`.
/// `root` must equal `item.root::<SubsetItem>()`.
pub fn observe_item(
&mut self,
item: &T,
root_opt: Option<Hash256>,
) -> Result<ObserveOutcome, Error> {
let index = self.get_set_index(item.get_slot())?;
let root = root_opt.unwrap_or_else(|| item.tree_hash_root());
let root = root_opt.unwrap_or_else(|| item.root());
self.sets
.get_mut(index)
@@ -200,17 +302,18 @@ impl<T: TreeHash + SlotData + Consts, E: EthSpec> ObservedAggregates<T, E> {
.and_then(|set| set.observe_item(item, root))
}
/// Check to see if the `root` of `item` is in self.
/// Check if `item` is a non-strict subset of any of the already observed aggregates for
/// the given root and slot.
///
/// `root` must equal `a.tree_hash_root()`.
/// `root` must equal `item.root::<SubsetItem>()`.
#[allow(clippy::wrong_self_convention)]
pub fn is_known(&mut self, item: &T, root: Hash256) -> Result<bool, Error> {
pub fn is_known_subset(&mut self, item: &T, root: Hash256) -> Result<bool, Error> {
let index = self.get_set_index(item.get_slot())?;
self.sets
.get(index)
.ok_or(Error::InvalidSetIndex(index))
.and_then(|set| set.is_known(item, root))
.and_then(|set| set.is_known_subset(item, root))
}
/// The maximum number of slots that items are stored for.
@@ -296,7 +399,6 @@ impl<T: TreeHash + SlotData + Consts, E: EthSpec> ObservedAggregates<T, E> {
#[cfg(not(debug_assertions))]
mod tests {
use super::*;
use tree_hash::TreeHash;
use types::{test_utils::test_random_instance, Hash256};
type E = types::MainnetEthSpec;
@@ -330,7 +432,7 @@ mod tests {
for a in &items {
assert_eq!(
store.is_known(a, a.tree_hash_root()),
store.is_known_subset(a, a.root()),
Ok(false),
"should indicate an unknown attestation is unknown"
);
@@ -343,13 +445,13 @@ mod tests {
for a in &items {
assert_eq!(
store.is_known(a, a.tree_hash_root()),
store.is_known_subset(a, a.root()),
Ok(true),
"should indicate a known attestation is known"
);
assert_eq!(
store.observe_item(a, Some(a.tree_hash_root())),
Ok(ObserveOutcome::AlreadyKnown),
store.observe_item(a, Some(a.root())),
Ok(ObserveOutcome::Subset),
"should acknowledge an existing attestation"
);
}

View File

@@ -37,6 +37,7 @@ use bls::{verify_signature_sets, PublicKeyBytes};
use derivative::Derivative;
use safe_arith::ArithError;
use slot_clock::SlotClock;
use ssz_derive::{Decode, Encode};
use state_processing::per_block_processing::errors::SyncCommitteeMessageValidationError;
use state_processing::signature_sets::{
signed_sync_aggregate_selection_proof_signature_set, signed_sync_aggregate_signature_set,
@@ -47,6 +48,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use strum::AsRefStr;
use tree_hash::TreeHash;
use tree_hash_derive::TreeHash;
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
use types::slot_data::SlotData;
use types::sync_committee::Error as SyncCommitteeError;
@@ -110,14 +112,14 @@ pub enum Error {
///
/// The peer has sent an invalid message.
AggregatorPubkeyUnknown(u64),
/// The sync contribution has been seen before; either in a block, on the gossip network or from a
/// local validator.
/// The sync contribution or a superset of this sync contribution's aggregation bits for the same data
/// has been seen before; either in a block on the gossip network or from a local validator.
///
/// ## Peer scoring
///
/// It's unclear if this sync contribution is valid, however we have already observed it and do not
/// need to observe it again.
SyncContributionAlreadyKnown(Hash256),
SyncContributionSupersetKnown(Hash256),
/// There has already been an aggregation observed for this validator, we refuse to process a
/// second.
///
@@ -268,6 +270,14 @@ pub struct VerifiedSyncContribution<T: BeaconChainTypes> {
participant_pubkeys: Vec<PublicKeyBytes>,
}
/// The sync contribution data.
#[derive(Encode, Decode, TreeHash)]
pub struct SyncCommitteeData {
pub slot: Slot,
pub root: Hash256,
pub subcommittee_index: u64,
}
/// Wraps a `SyncCommitteeMessage` that has been verified for propagation on the gossip network.
#[derive(Clone)]
pub struct VerifiedSyncCommitteeMessage {
@@ -314,15 +324,22 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
return Err(Error::AggregatorNotInCommittee { aggregator_index });
};
// Ensure the valid sync contribution has not already been seen locally.
let contribution_root = contribution.tree_hash_root();
// Ensure the valid sync contribution or its superset has not already been seen locally.
let contribution_data_root = SyncCommitteeData {
slot: contribution.slot,
root: contribution.beacon_block_root,
subcommittee_index: contribution.subcommittee_index,
}
.tree_hash_root();
if chain
.observed_sync_contributions
.write()
.is_known(contribution, contribution_root)
.is_known_subset(contribution, contribution_data_root)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::SyncContributionAlreadyKnown(contribution_root));
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_SUBSETS);
return Err(Error::SyncContributionSupersetKnown(contribution_data_root));
}
// Ensure there has been no other observed aggregate for the given `aggregator_index`.
@@ -376,13 +393,14 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
//
// It's important to double check that the contribution is not already known, otherwise two
// contribution processed at the same time could be published.
if let ObserveOutcome::AlreadyKnown = chain
if let ObserveOutcome::Subset = chain
.observed_sync_contributions
.write()
.observe_item(contribution, Some(contribution_root))
.observe_item(contribution, Some(contribution_data_root))
.map_err(|e| Error::BeaconChainError(e.into()))?
{
return Err(Error::SyncContributionAlreadyKnown(contribution_root));
metrics::inc_counter(&metrics::SYNC_CONTRIBUTION_SUBSETS);
return Err(Error::SyncContributionSupersetKnown(contribution_data_root));
}
// Observe the aggregator so we don't process another aggregate from them.