Fix Aggregation Pool for Electra (#5754)

* Fix Aggregation Pool for Electra

* Remove Outdated Interface
This commit is contained in:
ethDreamer
2024-05-09 14:50:11 -05:00
committed by GitHub
parent 07229b76ed
commit cb8c8f59cf
6 changed files with 255 additions and 75 deletions

View File

@@ -1612,11 +1612,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`.
///
/// The attestation will be obtained from `self.naive_aggregation_pool`.
pub fn get_aggregated_attestation(
pub fn get_aggregated_attestation_base(
&self,
data: &AttestationData,
) -> Result<Option<Attestation<T::EthSpec>>, Error> {
if let Some(attestation) = self.naive_aggregation_pool.read().get(data) {
let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_base(data);
if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) {
self.filter_optimistic_attestation(attestation)
.map(Option::Some)
} else {
Ok(None)
}
}
// TODO(electra): call this function from the new beacon API method
pub fn get_aggregated_attestation_electra(
&self,
data: &AttestationData,
committee_index: CommitteeIndex,
) -> Result<Option<Attestation<T::EthSpec>>, Error> {
let attestation_key =
crate::naive_aggregation_pool::AttestationKey::new_electra(data, committee_index);
if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) {
self.filter_optimistic_attestation(attestation)
.map(Option::Some)
} else {
@@ -1628,16 +1645,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// `attestation.data.tree_hash_root()`.
///
/// The attestation will be obtained from `self.naive_aggregation_pool`.
pub fn get_aggregated_attestation_by_slot_and_root(
///
/// NOTE: This function will *only* work with pre-electra attestations and it only
/// exists to support the pre-electra validator API method.
pub fn get_pre_electra_aggregated_attestation_by_slot_and_root(
&self,
slot: Slot,
attestation_data_root: &Hash256,
) -> Result<Option<Attestation<T::EthSpec>>, Error> {
if let Some(attestation) = self
.naive_aggregation_pool
.read()
.get_by_slot_and_root(slot, attestation_data_root)
{
let attestation_key =
crate::naive_aggregation_pool::AttestationKey::new_base_from_slot_and_root(
slot,
*attestation_data_root,
);
if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) {
self.filter_optimistic_attestation(attestation)
.map(Option::Some)
} else {

View File

@@ -1,7 +1,9 @@
use crate::metrics;
use crate::observed_aggregates::AsReference;
use itertools::Itertools;
use smallvec::SmallVec;
use std::collections::HashMap;
use tree_hash::TreeHash;
use tree_hash::{MerkleHasher, TreeHash, TreeHashType};
use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT;
use types::slot_data::SlotData;
use types::sync_committee_contribution::SyncContributionData;
@@ -9,9 +11,114 @@ use types::{
Attestation, AttestationData, AttestationRef, EthSpec, Hash256, Slot, SyncCommitteeContribution,
};
type AttestationDataRoot = Hash256;
type AttestationKeyRoot = Hash256;
type SyncDataRoot = Hash256;
/// Post-Electra, we need a new key for Attestations that includes the committee index
#[derive(Debug, Clone, PartialEq)]
pub struct AttestationKey {
data_root: Hash256,
committee_index: Option<u64>,
slot: Slot,
}
// A custom implementation of `TreeHash` such that:
// AttestationKey(data, None).tree_hash_root() == data.tree_hash_root()
// AttestationKey(data, Some(index)).tree_hash_root() == (data, index).tree_hash_root()
// This is necessary because pre-Electra, the validator will ask for the tree_hash_root()
// of the `AttestationData`
impl TreeHash for AttestationKey {
fn tree_hash_type() -> TreeHashType {
TreeHashType::Container
}
fn tree_hash_packed_encoding(&self) -> SmallVec<[u8; 32]> {
unreachable!("AttestationKey should never be packed.")
}
fn tree_hash_packing_factor() -> usize {
unreachable!("AttestationKey should never be packed.")
}
fn tree_hash_root(&self) -> Hash256 {
match self.committee_index {
None => self.data_root, // Return just the data root if no committee index is present
Some(index) => {
// Combine the hash of the data with the hash of the index
let mut hasher = MerkleHasher::with_leaves(2);
hasher
.write(self.data_root.as_bytes())
.expect("should write data hash");
hasher
.write(&index.to_le_bytes())
.expect("should write index");
hasher.finish().expect("should give tree hash")
}
}
}
}
impl AttestationKey {
pub fn from_attestation_ref<E: EthSpec>(attestation: AttestationRef<E>) -> Result<Self, Error> {
let slot = attestation.data().slot;
match attestation {
AttestationRef::Base(att) => Ok(Self {
data_root: att.data.tree_hash_root(),
committee_index: None,
slot,
}),
AttestationRef::Electra(att) => {
let committee_index = att
.committee_bits
.iter()
.enumerate()
.filter_map(|(i, bit)| if bit { Some(i) } else { None })
.at_most_one()
.map_err(|_| Error::MoreThanOneCommitteeBitSet)?
.ok_or(Error::NoCommitteeBitSet)?;
Ok(Self {
data_root: att.data.tree_hash_root(),
committee_index: Some(committee_index as u64),
slot,
})
}
}
}
pub fn new_base(data: &AttestationData) -> Self {
let slot = data.slot;
Self {
data_root: data.tree_hash_root(),
committee_index: None,
slot,
}
}
pub fn new_electra(data: &AttestationData, committee_index: u64) -> Self {
let slot = data.slot;
Self {
data_root: data.tree_hash_root(),
committee_index: Some(committee_index),
slot,
}
}
pub fn new_base_from_slot_and_root(slot: Slot, data_root: Hash256) -> Self {
Self {
data_root,
committee_index: None,
slot,
}
}
}
impl SlotData for AttestationKey {
fn get_slot(&self) -> Slot {
self.slot
}
}
/// The number of slots that will be stored in the pool.
///
/// For example, if `SLOTS_RETAINED == 3` and the pool is pruned at slot `6`, then all items
@@ -49,6 +156,10 @@ pub enum Error {
/// The given `aggregation_bits` field had more than one signature. The number of
/// signatures found is included.
MoreThanOneAggregationBitSet(usize),
/// The electra attestation has more than one committee bit set
MoreThanOneCommitteeBitSet,
/// The electra attestation has NO committee bit set
NoCommitteeBitSet,
/// We have reached the maximum number of unique items that can be stored in a
/// slot. This is a DoS protection function.
ReachedMaxItemsPerSlot(usize),
@@ -90,9 +201,6 @@ where
/// Get a reference to the inner `HashMap`.
fn get_map(&self) -> &HashMap<Self::Key, Self::Value>;
/// Get a `Value` from `Self` based on `Key`, which is a hash of `Data`.
fn get_by_root(&self, root: &Self::Key) -> Option<&Self::Value>;
/// The number of items store in `Self`.
fn len(&self) -> usize;
@@ -112,13 +220,13 @@ where
/// A collection of `Attestation` objects, keyed by their `attestation.data`. Enforces that all
/// `attestation` are from the same slot.
pub struct AggregatedAttestationMap<E: EthSpec> {
map: HashMap<AttestationDataRoot, Attestation<E>>,
map: HashMap<AttestationKeyRoot, Attestation<E>>,
}
impl<E: EthSpec> AggregateMap for AggregatedAttestationMap<E> {
type Key = AttestationDataRoot;
type Key = AttestationKeyRoot;
type Value = Attestation<E>;
type Data = AttestationData;
type Data = AttestationKey;
/// Create an empty collection with the given `initial_capacity`.
fn new(initial_capacity: usize) -> Self {
@@ -133,45 +241,43 @@ impl<E: EthSpec> AggregateMap for AggregatedAttestationMap<E> {
fn insert(&mut self, a: AttestationRef<E>) -> Result<InsertOutcome, Error> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CORE_INSERT);
let set_bits = match a {
let aggregation_bit = match a {
AttestationRef::Base(att) => att
.aggregation_bits
.iter()
.enumerate()
.filter(|(_i, bit)| *bit)
.map(|(i, _bit)| i)
.collect::<Vec<_>>(),
.filter_map(|(i, bit)| if bit { Some(i) } else { None })
.at_most_one()
.map_err(|iter| Error::MoreThanOneAggregationBitSet(iter.count()))?
.ok_or(Error::NoAggregationBitsSet)?,
AttestationRef::Electra(att) => att
.aggregation_bits
.iter()
.enumerate()
.filter(|(_i, bit)| *bit)
.map(|(i, _bit)| i)
.collect::<Vec<_>>(),
.filter_map(|(i, bit)| if bit { Some(i) } else { None })
.at_most_one()
.map_err(|iter| Error::MoreThanOneAggregationBitSet(iter.count()))?
.ok_or(Error::NoAggregationBitsSet)?,
};
let committee_index = set_bits
.first()
.copied()
.ok_or(Error::NoAggregationBitsSet)?;
if set_bits.len() > 1 {
return Err(Error::MoreThanOneAggregationBitSet(set_bits.len()));
}
let attestation_data_root = a.data().tree_hash_root();
let attestation_key = AttestationKey::from_attestation_ref(a)?;
let attestation_data_root = attestation_key.tree_hash_root();
if let Some(existing_attestation) = self.map.get_mut(&attestation_data_root) {
if existing_attestation
.get_aggregation_bit(committee_index)
.get_aggregation_bit(aggregation_bit)
.map_err(|_| Error::InconsistentBitfieldLengths)?
{
Ok(InsertOutcome::SignatureAlreadyKnown { committee_index })
Ok(InsertOutcome::SignatureAlreadyKnown {
committee_index: aggregation_bit,
})
} else {
let _timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_AGGREGATION);
existing_attestation.aggregate(a);
Ok(InsertOutcome::SignatureAggregated { committee_index })
Ok(InsertOutcome::SignatureAggregated {
committee_index: aggregation_bit,
})
}
} else {
if self.map.len() >= MAX_ATTESTATIONS_PER_SLOT {
@@ -180,7 +286,9 @@ impl<E: EthSpec> AggregateMap for AggregatedAttestationMap<E> {
self.map
.insert(attestation_data_root, a.clone_as_attestation());
Ok(InsertOutcome::NewItemInserted { committee_index })
Ok(InsertOutcome::NewItemInserted {
committee_index: aggregation_bit,
})
}
}
@@ -195,11 +303,6 @@ impl<E: EthSpec> AggregateMap for AggregatedAttestationMap<E> {
&self.map
}
/// Returns an aggregated `Attestation` with the given `root`, if any.
fn get_by_root(&self, root: &Self::Key) -> Option<&Self::Value> {
self.map.get(root)
}
fn len(&self) -> usize {
self.map.len()
}
@@ -306,11 +409,6 @@ impl<E: EthSpec> AggregateMap for SyncContributionAggregateMap<E> {
&self.map
}
/// Returns an aggregated `SyncCommitteeContribution` with the given `root`, if any.
fn get_by_root(&self, root: &SyncDataRoot) -> Option<&SyncCommitteeContribution<E>> {
self.map.get(root)
}
fn len(&self) -> usize {
self.map.len()
}
@@ -445,13 +543,6 @@ where
.and_then(|map| map.get(data))
}
/// Returns an aggregated `T::Value` with the given `slot` and `root`, if any.
pub fn get_by_slot_and_root(&self, slot: Slot, root: &T::Key) -> Option<T::Value> {
self.maps
.get(&slot)
.and_then(|map| map.get_by_root(root).cloned())
}
/// Iterate all items in all slots of `self`.
pub fn iter(&self) -> impl Iterator<Item = &T::Value> {
self.maps.values().flat_map(|map| map.get_map().values())
@@ -500,19 +591,30 @@ mod tests {
use super::*;
use ssz_types::BitList;
use store::BitVector;
use tree_hash::TreeHash;
use types::{
test_utils::{generate_deterministic_keypair, test_random_instance},
Fork, Hash256, SyncCommitteeMessage,
Attestation, AttestationBase, AttestationElectra, Fork, Hash256, SyncCommitteeMessage,
};
type E = types::MainnetEthSpec;
fn get_attestation(slot: Slot) -> Attestation<E> {
let mut a: Attestation<E> = test_random_instance();
a.data_mut().slot = slot;
*a.aggregation_bits_base_mut().unwrap() =
BitList::with_capacity(4).expect("should create bitlist");
a
fn get_attestation_base(slot: Slot) -> Attestation<E> {
let mut a: AttestationBase<E> = test_random_instance();
a.data.slot = slot;
a.aggregation_bits = BitList::with_capacity(4).expect("should create bitlist");
Attestation::Base(a)
}
fn get_attestation_electra(slot: Slot) -> Attestation<E> {
let mut a: AttestationElectra<E> = test_random_instance();
a.data.slot = slot;
a.aggregation_bits = BitList::with_capacity(4).expect("should create bitlist");
a.committee_bits = BitVector::new();
a.committee_bits
.set(0, true)
.expect("should set committee bit");
Attestation::Electra(a)
}
fn get_sync_contribution(slot: Slot) -> SyncCommitteeContribution<E> {
@@ -555,10 +657,16 @@ mod tests {
}
fn unset_attestation_bit(a: &mut Attestation<E>, i: usize) {
a.aggregation_bits_base_mut()
.unwrap()
.set(i, false)
.expect("should unset aggregation bit")
match a {
Attestation::Base(ref mut att) => att
.aggregation_bits
.set(i, false)
.expect("should unset aggregation bit"),
Attestation::Electra(ref mut att) => att
.aggregation_bits
.set(i, false)
.expect("should unset aggregation bit"),
}
}
fn unset_sync_contribution_bit(a: &mut SyncCommitteeContribution<E>, i: usize) {
@@ -579,8 +687,8 @@ mod tests {
a.data().beacon_block_root == block_root
}
fn key_from_attestation(a: &Attestation<E>) -> AttestationData {
a.data().clone()
fn key_from_attestation(a: &Attestation<E>) -> AttestationKey {
AttestationKey::from_attestation_ref(a.to_ref()).expect("should create attestation key")
}
fn mutate_sync_contribution_block_root(
@@ -605,6 +713,45 @@ mod tests {
SyncContributionData::from_contribution(a)
}
#[test]
fn attestation_key_tree_hash_tests() {
let attestation_base = get_attestation_base(Slot::new(42));
// for a base attestation, the tree_hash_root() of the key should be the same as the tree_hash_root() of the data
let attestation_key_base = AttestationKey::from_attestation_ref(attestation_base.to_ref())
.expect("should create attestation key");
assert_eq!(
attestation_key_base.tree_hash_root(),
attestation_base.data().tree_hash_root()
);
let mut attestation_electra = get_attestation_electra(Slot::new(42));
// for an electra attestation, the tree_hash_root() of the key should be different from the tree_hash_root() of the data
let attestation_key_electra =
AttestationKey::from_attestation_ref(attestation_electra.to_ref())
.expect("should create attestation key");
assert_ne!(
attestation_key_electra.tree_hash_root(),
attestation_electra.data().tree_hash_root()
);
// for an electra attestation, the tree_hash_root() of the key should be dependent on which committee bit is set
let committe_bits = attestation_electra
.committee_bits_mut()
.expect("should get committee bits");
committe_bits
.set(0, false)
.expect("should set committee bit");
committe_bits
.set(1, true)
.expect("should set committee bit");
let new_attestation_key_electra =
AttestationKey::from_attestation_ref(attestation_electra.to_ref())
.expect("should create attestation key");
// this new key should have a different tree_hash_root() than the previous key
assert_ne!(
attestation_key_electra.tree_hash_root(),
new_attestation_key_electra.tree_hash_root()
);
}
macro_rules! test_suite {
(
$mod_name: ident,
@@ -800,8 +947,21 @@ mod tests {
}
test_suite! {
attestation_tests,
get_attestation,
attestation_tests_base,
get_attestation_base,
sign_attestation,
unset_attestation_bit,
mutate_attestation_block_root,
mutate_attestation_slot,
attestation_block_root_comparator,
key_from_attestation,
AggregatedAttestationMap,
MAX_ATTESTATIONS_PER_SLOT
}
test_suite! {
attestation_tests_electra,
get_attestation_electra,
sign_attestation,
unset_attestation_bit,
mutate_attestation_block_root,

View File

@@ -1374,7 +1374,7 @@ where
// aggregate locally.
let aggregate = self
.chain
.get_aggregated_attestation(attestation.data())
.get_aggregated_attestation_base(attestation.data())
.unwrap()
.unwrap_or_else(|| {
committee_attestations.iter().skip(1).fold(

View File

@@ -1228,10 +1228,7 @@ async fn attesting_to_optimistic_head() {
let get_aggregated_by_slot_and_root = || {
rig.harness
.chain
.get_aggregated_attestation_by_slot_and_root(
attestation.data().slot,
&attestation.data().tree_hash_root(),
)
.get_aggregated_attestation_base(attestation.data())
};
/*

View File

@@ -3191,7 +3191,7 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner.blocking_json_task(Priority::P0, move || {
not_synced_filter?;
chain
.get_aggregated_attestation_by_slot_and_root(
.get_pre_electra_aggregated_attestation_by_slot_and_root(
query.slot,
&query.attestation_data_root,
)

View File

@@ -92,6 +92,7 @@ impl<E: EthSpec> Decode for Attestation<E> {
}
}
// TODO(electra): think about how to handle fork variants here
impl<E: EthSpec> TestRandom for Attestation<E> {
fn random_for_test(rng: &mut impl RngCore) -> Self {
let aggregation_bits: BitList<E::MaxValidatorsPerCommittee> = BitList::random_for_test(rng);