Electra: Remaining Consensus Data Structures (#5712)

* Attestation superstruct changes for EIP 7549 (#5644)

* update

* experiment

* superstruct changes

* revert

* superstruct changes

* fix tests

* indexed attestation

* indexed attestation superstruct

* updated TODOs

* `superstruct` the `AttesterSlashing` (#5636)

* `superstruct` Attester Fork Variants

* Push a little further

* Deal with Encode / Decode of AttesterSlashing

* not so sure about this..

* Stop Encode/Decode Bounds from Propagating Out

* Tons of Changes..

* More Conversions to AttestationRef

* Add AsReference trait (#15)

* Add AsReference trait

* Fix some snafus

* Got it Compiling! :D

* Got Tests Building

* Get beacon chain tests compiling

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Merge remote-tracking branch 'upstream/unstable' into electra_attestation_changes

* Make EF Tests Fork-Agnostic (#5713)

* Finish EF Test Fork Agnostic (#5714)

* Superstruct `AggregateAndProof` (#5715)

* Upgrade `superstruct` to `0.8.0`

* superstruct `AggregateAndProof`

* Merge remote-tracking branch 'sigp/unstable' into electra_attestation_changes

* cargo fmt

* Merge pull request #5726 from realbigsean/electra_attestation_changes

Merge unstable into Electra attestation changes

* EIP7549 `get_attestation_indices` (#5657)

* get attesting indices electra impl

* fmt

* get tests to pass

* fmt

* fix some beacon chain tests

* fmt

* fix slasher test

* fmt got me again

* fix more tests

* fix tests

* Some small changes (#5739)

* cargo fmt (#5740)

* Sketch op pool changes

* fix get attesting indices (#5742)

* fix get attesting indices

* better errors

* fix compile

* only get committee index once

* Ef test fixes (#5753)

* attestation related ef test fixes

* delete commented out stuff

* Fix Aggregation Pool for Electra (#5754)

* Fix Aggregation Pool for Electra

* Remove Outdated Interface

* fix ssz (#5755)

* Get `electra_op_pool` up to date (#5756)

* fix get attesting indices (#5742)

* fix get attesting indices

* better errors

* fix compile

* only get committee index once

* Ef test fixes (#5753)

* attestation related ef test fixes

* delete commented out stuff

* Fix Aggregation Pool for Electra (#5754)

* Fix Aggregation Pool for Electra

* Remove Outdated Interface

* fix ssz (#5755)

---------

Co-authored-by: realbigsean <sean@sigmaprime.io>

* Revert "Get `electra_op_pool` up to date (#5756)" (#5757)

This reverts commit ab9e58aa3d.

* Merge branch 'electra_attestation_changes' of https://github.com/sigp/lighthouse into electra_op_pool

* Compute on chain aggregate impl (#5752)

* add compute_on_chain_agg impl to op pool changes

* fmt

* get op pool tests to pass

* update the naive agg pool interface (#5760)

* Fix bugs in cross-committee aggregation

* Add comment to max cover optimisation

* Fix assert

* Merge pull request #5749 from sigp/electra_op_pool

Optimise Electra op pool aggregation

* update committee offset

* Fix Electra Fork Choice Tests (#5764)

* Subscribe to the correct subnets for electra attestations (#5782)

* subscribe to the correct att subnets for electra

* subscribe to the correct att subnets for electra

* cargo fmt

* fix slashing handling

* Merge remote-tracking branch 'upstream/unstable'

* Send unagg attestation based on fork

* Publish all aggregates

* just one more check bro plz..

* Merge pull request #5832 from ethDreamer/electra_attestation_changes_merge_unstable

Merge `unstable` into `electra_attestation_changes`

* Merge pull request #5835 from realbigsean/fix-validator-logic

Fix validator logic

* Merge pull request #5816 from realbigsean/electra-attestation-slashing-handling

Electra slashing handling

* Electra attestation changes rm decode impl (#5856)

* Remove Crappy Decode impl for Attestation

* Remove Inefficient Attestation Decode impl

* Implement Schema Upgrade / Downgrade

* Update beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Fix failing attestation tests and misc electra attestation cleanup (#5810)

* - get attestation related beacon chain tests to pass
- observed attestations are now keyed off of data + committee index
- rename op pool attestationref to compactattestationref
- remove unwraps in agg pool and use options instead
- cherry pick some changes from ef-tests-electra

* cargo fmt

* fix failing test

* Revert dockerfile changes

* make committee_index return option

* function args shouldnt be a ref to attestation ref

* fmt

* fix dup imports

---------

Co-authored-by: realbigsean <seananderson33@GMAIL.com>

* fix some todos (#5817)

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* add consolidations to merkle calc for inclusion proof

* Remove Duplicate KZG Commitment Merkle Proof Code (#5874)

* Remove Duplicate KZG Commitment Merkle Proof Code

* s/tree_lists/fields/

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* fix compile

* Fix slasher tests (#5906)

* Fix electra tests

* Add electra attestations to double vote tests

* Update superstruct to 0.8

* Merge remote-tracking branch 'origin/unstable' into electra_attestation_changes

* Small cleanup in slasher tests

* Clean up Electra observed aggregates (#5929)

* Use consistent key in observed_attestations

* Remove unwraps from observed aggregates

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* De-dup attestation constructor logic

* Remove unwraps in Attestation construction

* Dedup match_attestation_data

* Remove outdated TODO

* Use ForkName Ord in fork-choice tests

* Use ForkName Ord in BeaconBlockBody

* Make to_electra not fallible

* Remove TestRandom impl for IndexedAttestation

* Remove IndexedAttestation faulty Decode impl

* Drop TestRandom impl

* Add PendingAttestationInElectra

* Indexed att on disk (#35)

* indexed att on disk

* fix lints

* Update slasher/src/migrate.rs

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

---------

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>
Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

* add electra fork enabled fn to ForkName impl (#36)

* add electra fork enabled fn to ForkName impl

* remove inadvertent file

* Update common/eth2/src/types.rs

Co-authored-by: ethDreamer <37123614+ethDreamer@users.noreply.github.com>

* Dedup attestation constructor logic in attester cache

* Use if let Ok for committee_bits

* Dedup Attestation constructor code

* Diff reduction in tests

* Fix beacon_chain tests

* Diff reduction

* Use Ord for ForkName in pubsub

* Resolve into_attestation_and_indices todo

* Remove stale TODO

* Fix beacon_chain tests

* Test spec invariant

* Use electra_enabled in pubsub

* Remove get_indexed_attestation_from_signed_aggregate

* Use ok_or instead of if let else

* committees are sorted

* remove dup method `get_indexed_attestation_from_committees`

* Merge pull request #5940 from dapplion/electra_attestation_changes_lionreview

Electra attestations #5712 review

* update default persisted op pool deserialization

* ensure aggregate and proof uses serde untagged on ref

* Fork aware ssz static attestation tests

* Electra attestation changes from Lions review (#5971)

* dedup/cleanup and remove unneeded hashset use

* remove irrelevant TODOs

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into electra_attestation_changes

* Electra attestation changes sean review (#5972)

* instantiate empty bitlist in unreachable code

* clean up error conversion

* fork enabled bool cleanup

* remove a couple todos

* return bools instead of options in `aggregate` and use the result

* delete commented out code

* use map macros in simple transformations

* remove signers_disjoint_from

* get ef tests compiling

* get ef tests compiling

* update intentionally excluded files

* Avoid changing slasher schema for Electra

* Delete slasher schema v4

* Fix clippy

* Fix compilation of beacon_chain tests

* Update database.rs

* Add electra lightclient types

* Update slasher/src/database.rs

* fix imports

* Merge pull request #5980 from dapplion/electra-lightclient

Add electra lightclient types

* Merge pull request #5975 from michaelsproul/electra-slasher-no-migration

Avoid changing slasher schema for Electra

* Update beacon_node/beacon_chain/src/attestation_verification.rs

* Update beacon_node/beacon_chain/src/attestation_verification.rs
This commit is contained in:
ethDreamer
2024-06-24 16:08:07 -05:00
committed by GitHub
parent 758b58c9e9
commit c52c598f69
118 changed files with 5076 additions and 1741 deletions

View File

@@ -1,8 +1,8 @@
use crate::attestation_storage::AttestationRef;
use crate::attestation_storage::{CompactAttestationRef, CompactIndexedAttestation};
use crate::max_cover::MaxCover;
use crate::reward_cache::RewardCache;
use state_processing::common::{
base, get_attestation_participation_flag_indices, get_attesting_indices,
attesting_indices_base::get_attesting_indices, base, get_attestation_participation_flag_indices,
};
use std::collections::HashMap;
use types::{
@@ -14,14 +14,14 @@ use types::{
#[derive(Debug, Clone)]
pub struct AttMaxCover<'a, E: EthSpec> {
/// Underlying attestation.
pub att: AttestationRef<'a, E>,
pub att: CompactAttestationRef<'a, E>,
/// Mapping of validator indices and their rewards.
pub fresh_validators_rewards: HashMap<u64, u64>,
}
impl<'a, E: EthSpec> AttMaxCover<'a, E> {
pub fn new(
att: AttestationRef<'a, E>,
att: CompactAttestationRef<'a, E>,
state: &BeaconState<E>,
reward_cache: &'a RewardCache,
total_active_balance: u64,
@@ -36,7 +36,7 @@ impl<'a, E: EthSpec> AttMaxCover<'a, E> {
/// Initialise an attestation cover object for base/phase0 hard fork.
pub fn new_for_base(
att: AttestationRef<'a, E>,
att: CompactAttestationRef<'a, E>,
state: &BeaconState<E>,
base_state: &BeaconStateBase<E>,
total_active_balance: u64,
@@ -69,7 +69,7 @@ impl<'a, E: EthSpec> AttMaxCover<'a, E> {
/// Initialise an attestation cover object for Altair or later.
pub fn new_for_altair_deneb(
att: AttestationRef<'a, E>,
att: CompactAttestationRef<'a, E>,
state: &BeaconState<E>,
reward_cache: &'a RewardCache,
spec: &ChainSpec,
@@ -83,7 +83,7 @@ impl<'a, E: EthSpec> AttMaxCover<'a, E> {
let fresh_validators_rewards = att
.indexed
.attesting_indices
.attesting_indices()
.iter()
.filter_map(|&index| {
if reward_cache
@@ -119,14 +119,14 @@ impl<'a, E: EthSpec> AttMaxCover<'a, E> {
impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> {
type Object = Attestation<E>;
type Intermediate = AttestationRef<'a, E>;
type Intermediate = CompactAttestationRef<'a, E>;
type Set = HashMap<u64, u64>;
fn intermediate(&self) -> &AttestationRef<'a, E> {
fn intermediate(&self) -> &CompactAttestationRef<'a, E> {
&self.att
}
fn convert_to_object(att_ref: &AttestationRef<'a, E>) -> Attestation<E> {
fn convert_to_object(att_ref: &CompactAttestationRef<'a, E>) -> Attestation<E> {
att_ref.clone_as_attestation()
}
@@ -144,9 +144,16 @@ impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> {
/// because including two attestations on chain to satisfy different participation bits is
/// impossible without the validator double voting. I.e. it is only suboptimal in the presence
/// of slashable voting, which is rare.
///
/// Post-Electra this optimisation is still OK. The `self.att.data.index` will always be 0 for
/// all Electra attestations, so when a new attestation is added to the solution, we will
/// remove its validators from all attestations at the same slot. It may happen that the
/// included attestation and the attestation being updated have no validators in common, in
/// which case the `retain` will be a no-op. We could consider optimising this in future by only
/// executing the `retain` when the `committee_bits` of the two attestations intersect.
fn update_covering_set(
&mut self,
best_att: &AttestationRef<'a, E>,
best_att: &CompactAttestationRef<'a, E>,
covered_validators: &HashMap<u64, u64>,
) {
if self.att.data.slot == best_att.data.slot && self.att.data.index == best_att.data.index {
@@ -170,12 +177,16 @@ impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> {
///
/// This isn't optimal, but with the Altair fork this code is obsolete and not worth upgrading.
pub fn earliest_attestation_validators<E: EthSpec>(
attestation: &AttestationRef<E>,
attestation: &CompactAttestationRef<E>,
state: &BeaconState<E>,
base_state: &BeaconStateBase<E>,
) -> BitList<E::MaxValidatorsPerCommittee> {
// Bitfield of validators whose attestations are new/fresh.
let mut new_validators = attestation.indexed.aggregation_bits.clone();
let mut new_validators = match attestation.indexed {
CompactIndexedAttestation::Base(indexed_att) => indexed_att.aggregation_bits.clone(),
// This code path is obsolete post altair fork, so we just return an empty bitlist here.
CompactIndexedAttestation::Electra(_) => return BitList::with_capacity(0).unwrap(),
};
let state_attestations = if attestation.checkpoint.target_epoch == state.current_epoch() {
&base_state.current_epoch_attestations

View File

@@ -1,9 +1,10 @@
use crate::AttestationStats;
use itertools::Itertools;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use types::{
AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch,
EthSpec, Hash256, Slot,
attestation::{AttestationBase, AttestationElectra},
superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector,
Checkpoint, Epoch, EthSpec, Hash256, Slot, Unsigned,
};
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
@@ -20,11 +21,17 @@ pub struct CompactAttestationData {
pub target_root: Hash256,
}
#[superstruct(variants(Base, Electra), variant_attributes(derive(Debug, PartialEq,)))]
#[derive(Debug, PartialEq)]
pub struct CompactIndexedAttestation<E: EthSpec> {
pub attesting_indices: Vec<u64>,
#[superstruct(only(Base), partial_getter(rename = "aggregation_bits_base"))]
pub aggregation_bits: BitList<E::MaxValidatorsPerCommittee>,
#[superstruct(only(Electra), partial_getter(rename = "aggregation_bits_electra"))]
pub aggregation_bits: BitList<E::MaxValidatorsPerSlot>,
pub signature: AggregateSignature,
#[superstruct(only(Electra))]
pub committee_bits: BitVector<E::MaxCommitteesPerSlot>,
}
#[derive(Debug)]
@@ -35,7 +42,7 @@ pub struct SplitAttestation<E: EthSpec> {
}
#[derive(Debug, Clone)]
pub struct AttestationRef<'a, E: EthSpec> {
pub struct CompactAttestationRef<'a, E: EthSpec> {
pub checkpoint: &'a CheckpointKey,
pub data: &'a CompactAttestationData,
pub indexed: &'a CompactIndexedAttestation<E>,
@@ -54,20 +61,34 @@ pub struct AttestationDataMap<E: EthSpec> {
impl<E: EthSpec> SplitAttestation<E> {
pub fn new(attestation: Attestation<E>, attesting_indices: Vec<u64>) -> Self {
let checkpoint = CheckpointKey {
source: attestation.data.source,
target_epoch: attestation.data.target.epoch,
source: attestation.data().source,
target_epoch: attestation.data().target.epoch,
};
let data = CompactAttestationData {
slot: attestation.data.slot,
index: attestation.data.index,
beacon_block_root: attestation.data.beacon_block_root,
target_root: attestation.data.target.root,
slot: attestation.data().slot,
index: attestation.data().index,
beacon_block_root: attestation.data().beacon_block_root,
target_root: attestation.data().target.root,
};
let indexed = CompactIndexedAttestation {
attesting_indices,
aggregation_bits: attestation.aggregation_bits,
signature: attestation.signature,
let indexed = match attestation.clone() {
Attestation::Base(attn) => {
CompactIndexedAttestation::Base(CompactIndexedAttestationBase {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
})
}
Attestation::Electra(attn) => {
CompactIndexedAttestation::Electra(CompactIndexedAttestationElectra {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
committee_bits: attn.committee_bits,
})
}
};
Self {
checkpoint,
data,
@@ -75,8 +96,8 @@ impl<E: EthSpec> SplitAttestation<E> {
}
}
pub fn as_ref(&self) -> AttestationRef<E> {
AttestationRef {
pub fn as_ref(&self) -> CompactAttestationRef<E> {
CompactAttestationRef {
checkpoint: &self.checkpoint,
data: &self.data,
indexed: &self.indexed,
@@ -84,7 +105,7 @@ impl<E: EthSpec> SplitAttestation<E> {
}
}
impl<'a, E: EthSpec> AttestationRef<'a, E> {
impl<'a, E: EthSpec> CompactAttestationRef<'a, E> {
pub fn attestation_data(&self) -> AttestationData {
AttestationData {
slot: self.data.slot,
@@ -99,10 +120,20 @@ impl<'a, E: EthSpec> AttestationRef<'a, E> {
}
pub fn clone_as_attestation(&self) -> Attestation<E> {
Attestation {
aggregation_bits: self.indexed.aggregation_bits.clone(),
data: self.attestation_data(),
signature: self.indexed.signature.clone(),
match self.indexed {
CompactIndexedAttestation::Base(indexed_att) => Attestation::Base(AttestationBase {
aggregation_bits: indexed_att.aggregation_bits.clone(),
data: self.attestation_data(),
signature: indexed_att.signature.clone(),
}),
CompactIndexedAttestation::Electra(indexed_att) => {
Attestation::Electra(AttestationElectra {
aggregation_bits: indexed_att.aggregation_bits.clone(),
data: self.attestation_data(),
signature: indexed_att.signature.clone(),
committee_bits: indexed_att.committee_bits.clone(),
})
}
}
}
}
@@ -125,7 +156,37 @@ impl CheckpointKey {
}
impl<E: EthSpec> CompactIndexedAttestation<E> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
pub fn should_aggregate(&self, other: &Self) -> bool {
match (self, other) {
(CompactIndexedAttestation::Base(this), CompactIndexedAttestation::Base(other)) => {
this.should_aggregate(other)
}
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.should_aggregate(other),
_ => false,
}
}
/// Returns `true` if aggregated, otherwise `false`.
pub fn aggregate(&mut self, other: &Self) -> bool {
match (self, other) {
(CompactIndexedAttestation::Base(this), CompactIndexedAttestation::Base(other)) => {
this.aggregate(other);
true
}
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.aggregate_same_committee(other),
_ => false,
}
}
}
impl<E: EthSpec> CompactIndexedAttestationBase<E> {
pub fn should_aggregate(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
@@ -143,13 +204,108 @@ impl<E: EthSpec> CompactIndexedAttestation<E> {
}
}
impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
pub fn should_aggregate(&self, other: &Self) -> bool {
// For Electra, only aggregate attestations in the same committee.
self.committee_bits == other.committee_bits
&& self
.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
}
/// Returns `true` if aggregated, otherwise `false`.
pub fn aggregate_same_committee(&mut self, other: &Self) -> bool {
if self.committee_bits != other.committee_bits {
return false;
}
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.signature.add_assign_aggregate(&other.signature);
true
}
pub fn aggregate_with_disjoint_committees(&mut self, other: &Self) -> Option<()> {
if !self
.committee_bits
.intersection(&other.committee_bits)
.is_zero()
{
return None;
}
// The attestation being aggregated in must only have 1 committee bit set.
if other.committee_bits.num_set_bits() != 1 {
return None;
}
// Check we are aggregating in increasing committee index order (so we can append
// aggregation bits).
if self.committee_bits.highest_set_bit() >= other.committee_bits.highest_set_bit() {
return None;
}
self.committee_bits = self.committee_bits.union(&other.committee_bits);
if let Some(agg_bits) = bitlist_extend(&self.aggregation_bits, &other.aggregation_bits) {
self.aggregation_bits = agg_bits;
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.signature.add_assign_aggregate(&other.signature);
return Some(());
}
None
}
pub fn committee_index(&self) -> Option<u64> {
self.get_committee_indices().first().copied()
}
pub fn get_committee_indices(&self) -> Vec<u64> {
self.committee_bits
.iter()
.enumerate()
.filter_map(|(index, bit)| if bit { Some(index as u64) } else { None })
.collect()
}
}
// TODO(electra): upstream this or a more efficient implementation
fn bitlist_extend<N: Unsigned>(list1: &BitList<N>, list2: &BitList<N>) -> Option<BitList<N>> {
let new_length = list1.len() + list2.len();
let mut list = BitList::<N>::with_capacity(new_length).ok()?;
// Copy bits from list1.
for (i, bit) in list1.iter().enumerate() {
list.set(i, bit).ok()?;
}
// Copy bits from list2, starting from the end of list1.
let offset = list1.len();
for (i, bit) in list2.iter().enumerate() {
list.set(offset + i, bit).ok()?;
}
Some(list)
}
impl<E: EthSpec> AttestationMap<E> {
pub fn insert(&mut self, attestation: Attestation<E>, attesting_indices: Vec<u64>) {
let SplitAttestation {
checkpoint,
data,
indexed,
} = SplitAttestation::new(attestation, attesting_indices);
} = SplitAttestation::new(attestation.clone(), attesting_indices);
let attestation_map = self.checkpoint_map.entry(checkpoint).or_default();
let attestations = attestation_map.attestations.entry(data).or_default();
@@ -158,10 +314,10 @@ impl<E: EthSpec> AttestationMap<E> {
// NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
// aggregation.
let mut aggregated = false;
for existing_attestation in attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
if existing_attestation.should_aggregate(&indexed) {
aggregated = existing_attestation.aggregate(&indexed);
} else if *existing_attestation == indexed {
aggregated = true;
}
@@ -172,11 +328,93 @@ impl<E: EthSpec> AttestationMap<E> {
}
}
/// Aggregate Electra attestations for the same attestation data signed by different
/// committees.
///
/// Non-Electra attestations are left as-is.
pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) {
let Some(attestation_map) = self.checkpoint_map.get_mut(&checkpoint_key) else {
return;
};
for compact_indexed_attestations in attestation_map.attestations.values_mut() {
let unaggregated_attestations = std::mem::take(compact_indexed_attestations);
let mut aggregated_attestations: Vec<CompactIndexedAttestation<E>> = vec![];
// Aggregate the best attestations for each committee and leave the rest.
let mut best_attestations_by_committee: BTreeMap<
u64,
CompactIndexedAttestationElectra<E>,
> = BTreeMap::new();
for committee_attestation in unaggregated_attestations {
let mut electra_attestation = match committee_attestation {
CompactIndexedAttestation::Electra(att)
if att.committee_bits.num_set_bits() == 1 =>
{
att
}
CompactIndexedAttestation::Electra(att) => {
// Aggregate already covers multiple committees, leave it as-is.
aggregated_attestations.push(CompactIndexedAttestation::Electra(att));
continue;
}
CompactIndexedAttestation::Base(att) => {
// Leave as-is.
aggregated_attestations.push(CompactIndexedAttestation::Base(att));
continue;
}
};
if let Some(committee_index) = electra_attestation.committee_index() {
if let Some(existing_attestation) =
best_attestations_by_committee.get_mut(&committee_index)
{
// Search for the best (most aggregation bits) attestation for this committee
// index.
if electra_attestation.aggregation_bits.num_set_bits()
> existing_attestation.aggregation_bits.num_set_bits()
{
// New attestation is better than the previously known one for this
// committee. Replace it.
std::mem::swap(existing_attestation, &mut electra_attestation);
}
// Put the inferior attestation into the list of aggregated attestations
// without performing any cross-committee aggregation.
aggregated_attestations
.push(CompactIndexedAttestation::Electra(electra_attestation));
} else {
// First attestation seen for this committee. Place it in the map
// provisionally.
best_attestations_by_committee.insert(committee_index, electra_attestation);
}
}
}
if let Some(on_chain_aggregate) =
Self::compute_on_chain_aggregate(best_attestations_by_committee)
{
aggregated_attestations
.push(CompactIndexedAttestation::Electra(on_chain_aggregate));
}
*compact_indexed_attestations = aggregated_attestations;
}
}
pub fn compute_on_chain_aggregate(
mut attestations_by_committee: BTreeMap<u64, CompactIndexedAttestationElectra<E>>,
) -> Option<CompactIndexedAttestationElectra<E>> {
let (_, mut on_chain_aggregate) = attestations_by_committee.pop_first()?;
for (_, attestation) in attestations_by_committee {
on_chain_aggregate.aggregate_with_disjoint_committees(&attestation);
}
Some(on_chain_aggregate)
}
/// Iterate all attestations matching the given `checkpoint_key`.
pub fn get_attestations<'a>(
&'a self,
checkpoint_key: &'a CheckpointKey,
) -> impl Iterator<Item = AttestationRef<'a, E>> + 'a {
) -> impl Iterator<Item = CompactAttestationRef<'a, E>> + 'a {
self.checkpoint_map
.get(checkpoint_key)
.into_iter()
@@ -184,7 +422,7 @@ impl<E: EthSpec> AttestationMap<E> {
}
/// Iterate all attestations in the map.
pub fn iter(&self) -> impl Iterator<Item = AttestationRef<E>> {
pub fn iter(&self) -> impl Iterator<Item = CompactAttestationRef<E>> {
self.checkpoint_map
.iter()
.flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key))
@@ -215,9 +453,9 @@ impl<E: EthSpec> AttestationDataMap<E> {
pub fn iter<'a>(
&'a self,
checkpoint_key: &'a CheckpointKey,
) -> impl Iterator<Item = AttestationRef<'a, E>> + 'a {
) -> impl Iterator<Item = CompactAttestationRef<'a, E>> + 'a {
self.attestations.iter().flat_map(|(data, vec_indexed)| {
vec_indexed.iter().map(|indexed| AttestationRef {
vec_indexed.iter().map(|indexed| CompactAttestationRef {
checkpoint: checkpoint_key,
data,
indexed,

View File

@@ -1,17 +1,17 @@
use crate::max_cover::MaxCover;
use state_processing::per_block_processing::get_slashable_indices_modular;
use std::collections::{HashMap, HashSet};
use types::{AttesterSlashing, BeaconState, EthSpec};
use types::{AttesterSlashing, AttesterSlashingRef, BeaconState, EthSpec};
#[derive(Debug, Clone)]
pub struct AttesterSlashingMaxCover<'a, E: EthSpec> {
slashing: &'a AttesterSlashing<E>,
slashing: AttesterSlashingRef<'a, E>,
effective_balances: HashMap<u64, u64>,
}
impl<'a, E: EthSpec> AttesterSlashingMaxCover<'a, E> {
pub fn new(
slashing: &'a AttesterSlashing<E>,
slashing: AttesterSlashingRef<'a, E>,
proposer_slashing_indices: &HashSet<u64>,
state: &BeaconState<E>,
) -> Option<Self> {
@@ -39,16 +39,16 @@ impl<'a, E: EthSpec> AttesterSlashingMaxCover<'a, E> {
impl<'a, E: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, E> {
/// The result type, of which we would eventually like a collection of maximal quality.
type Object = AttesterSlashing<E>;
type Intermediate = AttesterSlashing<E>;
type Intermediate = AttesterSlashingRef<'a, E>;
/// The type used to represent sets.
type Set = HashMap<u64, u64>;
fn intermediate(&self) -> &AttesterSlashing<E> {
self.slashing
fn intermediate(&self) -> &AttesterSlashingRef<'a, E> {
&self.slashing
}
fn convert_to_object(slashing: &AttesterSlashing<E>) -> AttesterSlashing<E> {
slashing.clone()
fn convert_to_object(slashing: &AttesterSlashingRef<'a, E>) -> AttesterSlashing<E> {
slashing.clone_as_attester_slashing()
}
/// Get the set of elements covered.
@@ -58,7 +58,7 @@ impl<'a, E: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, E> {
/// Update the set of items covered, for the inclusion of some object in the solution.
fn update_covering_set(
&mut self,
_best_slashing: &AttesterSlashing<E>,
_best_slashing: &AttesterSlashingRef<'a, E>,
covered_validator_indices: &HashMap<u64, u64>,
) {
self.effective_balances

View File

@@ -11,11 +11,10 @@ mod sync_aggregate_id;
pub use crate::bls_to_execution_changes::ReceivedPreCapella;
pub use attestation::{earliest_attestation_validators, AttMaxCover};
pub use attestation_storage::{AttestationRef, SplitAttestation};
pub use attestation_storage::{CompactAttestationRef, SplitAttestation};
pub use max_cover::MaxCover;
pub use persistence::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
PersistedOperationPoolV15, PersistedOperationPoolV5,
PersistedOperationPool, PersistedOperationPoolV15, PersistedOperationPoolV20,
};
pub use reward_cache::RewardCache;
use state_processing::epoch_cache::is_epoch_cache_initialized;
@@ -228,7 +227,7 @@ impl<E: EthSpec> OperationPool<E> {
state: &'a BeaconState<E>,
reward_cache: &'a RewardCache,
total_active_balance: u64,
validity_filter: impl FnMut(&AttestationRef<'a, E>) -> bool + Send,
validity_filter: impl FnMut(&CompactAttestationRef<'a, E>) -> bool + Send,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, E>> + Send {
all_attestations
@@ -252,10 +251,11 @@ impl<E: EthSpec> OperationPool<E> {
pub fn get_attestations(
&self,
state: &BeaconState<E>,
prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, E>) -> bool + Send,
curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, E>) -> bool + Send,
prev_epoch_validity_filter: impl for<'a> FnMut(&CompactAttestationRef<'a, E>) -> bool + Send,
curr_epoch_validity_filter: impl for<'a> FnMut(&CompactAttestationRef<'a, E>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<E>>, OpPoolError> {
let fork_name = state.fork_name_unchecked();
if !matches!(state, BeaconState::Base(_)) {
// Epoch cache must be initialized to fetch base reward values in the max cover `score`
// function. Currently max cover ignores items on errors. If epoch cache is not
@@ -267,7 +267,6 @@ impl<E: EthSpec> OperationPool<E> {
// Attestations for the current fork, which may be from the current or previous epoch.
let (prev_epoch_key, curr_epoch_key) = CheckpointKey::keys_for_state(state);
let all_attestations = self.attestations.read();
let total_active_balance = state
.get_total_active_balance()
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
@@ -284,6 +283,16 @@ impl<E: EthSpec> OperationPool<E> {
let mut num_prev_valid = 0_i64;
let mut num_curr_valid = 0_i64;
// TODO(electra): Work out how to do this more elegantly. This is a bit of a hack.
let mut all_attestations = self.attestations.write();
if fork_name.electra_enabled() {
all_attestations.aggregate_across_committees(prev_epoch_key);
all_attestations.aggregate_across_committees(curr_epoch_key);
}
let all_attestations = parking_lot::RwLockWriteGuard::downgrade(all_attestations);
let prev_epoch_att = self
.get_valid_attestations_for_epoch(
&prev_epoch_key,
@@ -307,6 +316,11 @@ impl<E: EthSpec> OperationPool<E> {
)
.inspect(|_| num_curr_valid += 1);
let curr_epoch_limit = if fork_name.electra_enabled() {
E::MaxAttestationsElectra::to_usize()
} else {
E::MaxAttestations::to_usize()
};
let prev_epoch_limit = if let BeaconState::Base(base_state) = state {
std::cmp::min(
E::MaxPendingAttestations::to_usize()
@@ -314,7 +328,7 @@ impl<E: EthSpec> OperationPool<E> {
E::MaxAttestations::to_usize(),
)
} else {
E::MaxAttestations::to_usize()
curr_epoch_limit
};
let (prev_cover, curr_cover) = rayon::join(
@@ -329,11 +343,7 @@ impl<E: EthSpec> OperationPool<E> {
},
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_CURR_EPOCH_PACKING_TIME);
maximum_cover(
curr_epoch_att,
E::MaxAttestations::to_usize(),
"curr_epoch_attestations",
)
maximum_cover(curr_epoch_att, curr_epoch_limit, "curr_epoch_attestations")
},
);
@@ -343,7 +353,7 @@ impl<E: EthSpec> OperationPool<E> {
Ok(max_cover::merge_solutions(
curr_cover,
prev_cover,
E::MaxAttestations::to_usize(),
curr_epoch_limit,
))
}
@@ -428,7 +438,7 @@ impl<E: EthSpec> OperationPool<E> {
let relevant_attester_slashings = reader.iter().flat_map(|slashing| {
if slashing.signature_is_still_valid(&state.fork()) {
AttesterSlashingMaxCover::new(slashing.as_inner(), to_be_slashed, state)
AttesterSlashingMaxCover::new(slashing.as_inner().to_ref(), to_be_slashed, state)
} else {
None
}
@@ -442,7 +452,7 @@ impl<E: EthSpec> OperationPool<E> {
.into_iter()
.map(|cover| {
to_be_slashed.extend(cover.covering_set().keys());
cover.intermediate().clone()
AttesterSlashingMaxCover::convert_to_object(cover.intermediate())
})
.collect()
}
@@ -463,16 +473,19 @@ impl<E: EthSpec> OperationPool<E> {
// Check that the attestation's signature is still valid wrt the fork version.
let signature_ok = slashing.signature_is_still_valid(&head_state.fork());
// Slashings that don't slash any validators can also be dropped.
let slashing_ok =
get_slashable_indices_modular(head_state, slashing.as_inner(), |_, validator| {
let slashing_ok = get_slashable_indices_modular(
head_state,
slashing.as_inner().to_ref(),
|_, validator| {
// Declare that a validator is still slashable if they have not exited prior
// to the finalized epoch.
//
// We cannot check the `slashed` field since the `head` is not finalized and
// a fork could un-slash someone.
validator.exit_epoch > head_state.finalized_checkpoint().epoch
})
.map_or(false, |indices| !indices.is_empty());
},
)
.map_or(false, |indices| !indices.is_empty());
signature_ok && slashing_ok
});
@@ -891,7 +904,7 @@ mod release_tests {
);
for (atts, aggregate) in &attestations {
let att2 = aggregate.as_ref().unwrap().message.aggregate.clone();
let att2 = aggregate.as_ref().unwrap().message().aggregate().clone();
let att1 = atts
.into_iter()
@@ -899,7 +912,7 @@ mod release_tests {
.take(2)
.fold::<Option<Attestation<MainnetEthSpec>>, _>(None, |att, new_att| {
if let Some(mut a) = att {
a.aggregate(&new_att);
a.aggregate(new_att.to_ref());
Some(a)
} else {
Some(new_att.clone())
@@ -907,13 +920,13 @@ mod release_tests {
})
.unwrap();
let att1_indices = get_attesting_indices_from_state(&state, &att1).unwrap();
let att2_indices = get_attesting_indices_from_state(&state, &att2).unwrap();
let att1_indices = get_attesting_indices_from_state(&state, att1.to_ref()).unwrap();
let att2_indices = get_attesting_indices_from_state(&state, att2).unwrap();
let att1_split = SplitAttestation::new(att1.clone(), att1_indices);
let att2_split = SplitAttestation::new(att2.clone(), att2_indices);
let att2_split = SplitAttestation::new(att2.clone_as_attestation(), att2_indices);
assert_eq!(
att1.aggregation_bits.num_set_bits(),
att1.num_set_aggregation_bits(),
earliest_attestation_validators(
&att1_split.as_ref(),
&state,
@@ -927,8 +940,8 @@ mod release_tests {
.unwrap()
.current_epoch_attestations
.push(PendingAttestation {
aggregation_bits: att1.aggregation_bits.clone(),
data: att1.data.clone(),
aggregation_bits: att1.aggregation_bits_base().unwrap().clone(),
data: att1.data().clone(),
inclusion_delay: 0,
proposer_index: 0,
})
@@ -981,7 +994,8 @@ mod release_tests {
for (atts, _) in attestations {
for (att, _) in atts {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let attesting_indices =
get_attesting_indices_from_state(&state, att.to_ref()).unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
}
@@ -1007,7 +1021,7 @@ mod release_tests {
let agg_att = &block_attestations[0];
assert_eq!(
agg_att.aggregation_bits.num_set_bits(),
agg_att.num_set_aggregation_bits(),
spec.target_committee_size as usize
);
@@ -1050,12 +1064,15 @@ mod release_tests {
);
for (_, aggregate) in attestations {
let att = aggregate.unwrap().message.aggregate;
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let agg = aggregate.unwrap();
let att = agg.message().aggregate();
let attesting_indices = get_attesting_indices_from_state(&state, att).unwrap();
op_pool
.insert_attestation(att.clone(), attesting_indices.clone())
.insert_attestation(att.clone_as_attestation(), attesting_indices.clone())
.unwrap();
op_pool
.insert_attestation(att.clone_as_attestation(), attesting_indices)
.unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
assert_eq!(op_pool.num_attestations(), committees.len());
@@ -1104,7 +1121,7 @@ mod release_tests {
None,
|att, new_att| {
if let Some(mut a) = att {
a.aggregate(new_att);
a.aggregate(new_att.to_ref());
Some(a)
} else {
Some(new_att.clone())
@@ -1127,7 +1144,7 @@ mod release_tests {
None,
|att, new_att| {
if let Some(mut a) = att {
a.aggregate(new_att);
a.aggregate(new_att.to_ref());
Some(a)
} else {
Some(new_att.clone())
@@ -1139,7 +1156,8 @@ mod release_tests {
.collect::<Vec<_>>();
for att in aggs1.into_iter().chain(aggs2.into_iter()) {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let attesting_indices =
get_attesting_indices_from_state(&state, att.to_ref()).unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
}
@@ -1203,7 +1221,7 @@ mod release_tests {
.fold::<Attestation<MainnetEthSpec>, _>(
att_0.clone(),
|mut att, new_att| {
att.aggregate(new_att);
att.aggregate(new_att.to_ref());
att
},
)
@@ -1211,7 +1229,8 @@ mod release_tests {
.collect::<Vec<_>>();
for att in aggs {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let attesting_indices =
get_attesting_indices_from_state(&state, att.to_ref()).unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
};
@@ -1228,7 +1247,17 @@ mod release_tests {
let num_big = target_committee_size / big_step_size;
let stats = op_pool.attestation_stats();
assert_eq!(stats.num_attestation_data, committees.len());
let fork_name = state.fork_name_unchecked();
match fork_name {
ForkName::Electra => {
assert_eq!(stats.num_attestation_data, 1);
}
_ => {
assert_eq!(stats.num_attestation_data, committees.len());
}
};
assert_eq!(
stats.num_attestations,
(num_small + num_big) * committees.len()
@@ -1239,11 +1268,25 @@ mod release_tests {
let best_attestations = op_pool
.get_attestations(&state, |_| true, |_| true, spec)
.expect("should have best attestations");
assert_eq!(best_attestations.len(), max_attestations);
match fork_name {
ForkName::Electra => {
assert_eq!(best_attestations.len(), 8);
}
_ => {
assert_eq!(best_attestations.len(), max_attestations);
}
};
// All the best attestations should be signed by at least `big_step_size` (4) validators.
for att in &best_attestations {
assert!(att.aggregation_bits.num_set_bits() >= big_step_size);
match fork_name {
ForkName::Electra => {
assert!(att.num_set_aggregation_bits() >= small_step_size);
}
_ => {
assert!(att.num_set_aggregation_bits() >= big_step_size);
}
};
}
}
@@ -1298,7 +1341,7 @@ mod release_tests {
.fold::<Attestation<MainnetEthSpec>, _>(
att_0.clone(),
|mut att, new_att| {
att.aggregate(new_att);
att.aggregate(new_att.to_ref());
att
},
)
@@ -1306,7 +1349,8 @@ mod release_tests {
.collect::<Vec<_>>();
for att in aggs {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let attesting_indices =
get_attesting_indices_from_state(&state, att.to_ref()).unwrap();
op_pool.insert_attestation(att, attesting_indices).unwrap();
}
};
@@ -1321,11 +1365,20 @@ mod release_tests {
let num_small = target_committee_size / small_step_size;
let num_big = target_committee_size / big_step_size;
let fork_name = state.fork_name_unchecked();
match fork_name {
ForkName::Electra => {
assert_eq!(op_pool.attestation_stats().num_attestation_data, 1);
}
_ => {
assert_eq!(
op_pool.attestation_stats().num_attestation_data,
committees.len()
);
}
};
assert_eq!(
op_pool.attestation_stats().num_attestation_data,
committees.len()
);
assert_eq!(
op_pool.num_attestations(),
(num_small + num_big) * committees.len()
@@ -1336,7 +1389,15 @@ mod release_tests {
let best_attestations = op_pool
.get_attestations(&state, |_| true, |_| true, spec)
.expect("should have valid best attestations");
assert_eq!(best_attestations.len(), max_attestations);
match fork_name {
ForkName::Electra => {
assert_eq!(best_attestations.len(), 8);
}
_ => {
assert_eq!(best_attestations.len(), max_attestations);
}
};
let total_active_balance = state.get_total_active_balance().unwrap();
@@ -1349,7 +1410,7 @@ mod release_tests {
reward_cache.update(&state).unwrap();
for att in best_attestations {
let attesting_indices = get_attesting_indices_from_state(&state, &att).unwrap();
let attesting_indices = get_attesting_indices_from_state(&state, att.to_ref()).unwrap();
let split_attestation = SplitAttestation::new(att, attesting_indices);
let mut fresh_validators_rewards = AttMaxCover::new(
split_attestation.as_ref(),

View File

@@ -1,4 +1,3 @@
use crate::attestation_id::AttestationId;
use crate::attestation_storage::AttestationMap;
use crate::bls_to_execution_changes::{BlsToExecutionChanges, ReceivedPreCapella};
use crate::sync_aggregate_id::SyncAggregateId;
@@ -12,6 +11,7 @@ use state_processing::SigVerifiedOp;
use std::collections::HashSet;
use std::mem;
use store::{DBColumn, Error as StoreError, StoreItem};
use types::attestation::AttestationOnDisk;
use types::*;
type PersistedSyncContributions<E> = Vec<(SyncAggregateId, Vec<SyncCommitteeContribution<E>>)>;
@@ -21,7 +21,7 @@ type PersistedSyncContributions<E> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
#[superstruct(
variants(V5, V12, V14, V15),
variants(V15, V20),
variant_attributes(
derive(Derivative, PartialEq, Debug, Encode, Decode),
derivative(Clone),
@@ -31,38 +31,26 @@ type PersistedSyncContributions<E> = Vec<(SyncAggregateId, Vec<SyncCommitteeCont
#[derive(PartialEq, Debug, Encode)]
#[ssz(enum_behaviour = "transparent")]
pub struct PersistedOperationPool<E: EthSpec> {
/// [DEPRECATED] Mapping from attestation ID to attestation mappings.
#[superstruct(only(V5))]
pub attestations_v5: Vec<(AttestationId, Vec<Attestation<E>>)>,
#[superstruct(only(V15))]
pub attestations_v15: Vec<(AttestationBase<E>, Vec<u64>)>,
/// Attestations and their attesting indices.
#[superstruct(only(V12, V14, V15))]
pub attestations: Vec<(Attestation<E>, Vec<u64>)>,
#[superstruct(only(V20))]
pub attestations: Vec<(AttestationOnDisk<E>, Vec<u64>)>,
/// Mapping from sync contribution ID to sync contributions and aggregate.
pub sync_contributions: PersistedSyncContributions<E>,
/// [DEPRECATED] Attester slashings.
#[superstruct(only(V5))]
pub attester_slashings_v5: Vec<(AttesterSlashing<E>, ForkVersion)>,
#[superstruct(only(V15))]
pub attester_slashings_v15: Vec<SigVerifiedOp<AttesterSlashingBase<E>, E>>,
/// Attester slashings.
#[superstruct(only(V12, V14, V15))]
#[superstruct(only(V20))]
pub attester_slashings: Vec<SigVerifiedOp<AttesterSlashing<E>, E>>,
/// [DEPRECATED] Proposer slashings.
#[superstruct(only(V5))]
pub proposer_slashings_v5: Vec<ProposerSlashing>,
/// Proposer slashings with fork information.
#[superstruct(only(V12, V14, V15))]
pub proposer_slashings: Vec<SigVerifiedOp<ProposerSlashing, E>>,
/// [DEPRECATED] Voluntary exits.
#[superstruct(only(V5))]
pub voluntary_exits_v5: Vec<SignedVoluntaryExit>,
/// Voluntary exits with fork information.
#[superstruct(only(V12, V14, V15))]
pub voluntary_exits: Vec<SigVerifiedOp<SignedVoluntaryExit, E>>,
/// BLS to Execution Changes
#[superstruct(only(V14, V15))]
pub bls_to_execution_changes: Vec<SigVerifiedOp<SignedBlsToExecutionChange, E>>,
/// Validator indices with BLS to Execution Changes to be broadcast at the
/// Capella fork.
#[superstruct(only(V15))]
pub capella_bls_change_broadcast_indices: Vec<u64>,
}
@@ -75,8 +63,8 @@ impl<E: EthSpec> PersistedOperationPool<E> {
.iter()
.map(|att| {
(
att.clone_as_attestation(),
att.indexed.attesting_indices.clone(),
AttestationOnDisk::from(att.clone_as_attestation()),
att.indexed.attesting_indices().clone(),
)
})
.collect();
@@ -123,7 +111,7 @@ impl<E: EthSpec> PersistedOperationPool<E> {
.copied()
.collect();
PersistedOperationPool::V15(PersistedOperationPoolV15 {
PersistedOperationPool::V20(PersistedOperationPoolV20 {
attestations,
sync_contributions,
attester_slashings,
@@ -136,56 +124,86 @@ impl<E: EthSpec> PersistedOperationPool<E> {
/// Reconstruct an `OperationPool`.
pub fn into_operation_pool(mut self) -> Result<OperationPool<E>, OpPoolError> {
let attester_slashings = RwLock::new(self.attester_slashings()?.iter().cloned().collect());
let attester_slashings = match &self {
PersistedOperationPool::V15(pool_v15) => RwLock::new(
pool_v15
.attester_slashings_v15
.iter()
.map(|slashing| slashing.clone().into())
.collect(),
),
PersistedOperationPool::V20(pool_v20) => {
RwLock::new(pool_v20.attester_slashings.iter().cloned().collect())
}
};
let proposer_slashings = RwLock::new(
self.proposer_slashings()?
self.proposer_slashings()
.iter()
.cloned()
.map(|slashing| (slashing.as_inner().proposer_index(), slashing))
.collect(),
);
let voluntary_exits = RwLock::new(
self.voluntary_exits()?
self.voluntary_exits()
.iter()
.cloned()
.map(|exit| (exit.as_inner().message.validator_index, exit))
.collect(),
);
let sync_contributions = RwLock::new(self.sync_contributions().iter().cloned().collect());
let attestations = match self {
PersistedOperationPool::V5(_) | PersistedOperationPool::V12(_) => {
return Err(OpPoolError::IncorrectOpPoolVariant)
}
PersistedOperationPool::V14(_) | PersistedOperationPool::V15(_) => {
let attestations = match &self {
PersistedOperationPool::V15(pool_v15) => {
let mut map = AttestationMap::default();
for (att, attesting_indices) in self.attestations()?.clone() {
for (att, attesting_indices) in
pool_v15
.attestations_v15
.iter()
.map(|(att, attesting_indices)| {
(Attestation::Base(att.clone()), attesting_indices.clone())
})
{
map.insert(att, attesting_indices);
}
RwLock::new(map)
}
PersistedOperationPool::V20(pool_v20) => {
let mut map = AttestationMap::default();
for (att, attesting_indices) in
pool_v20
.attestations
.iter()
.map(|(att, attesting_indices)| {
(
AttestationRef::from(att.to_ref()).clone_as_attestation(),
attesting_indices.clone(),
)
})
{
map.insert(att, attesting_indices);
}
RwLock::new(map)
}
};
let mut bls_to_execution_changes = BlsToExecutionChanges::default();
if let Ok(persisted_changes) = self.bls_to_execution_changes_mut() {
let persisted_changes = mem::take(persisted_changes);
let persisted_changes = mem::take(self.bls_to_execution_changes_mut());
let broadcast_indices: HashSet<_> =
mem::take(self.capella_bls_change_broadcast_indices_mut())
.into_iter()
.collect();
let broadcast_indices =
if let Ok(indices) = self.capella_bls_change_broadcast_indices_mut() {
mem::take(indices).into_iter().collect()
} else {
HashSet::new()
};
for bls_to_execution_change in persisted_changes {
let received_pre_capella = if broadcast_indices
.contains(&bls_to_execution_change.as_inner().message.validator_index)
{
ReceivedPreCapella::Yes
} else {
ReceivedPreCapella::No
};
bls_to_execution_changes.insert(bls_to_execution_change, received_pre_capella);
}
for bls_to_execution_change in persisted_changes {
let received_pre_capella = if broadcast_indices
.contains(&bls_to_execution_change.as_inner().message.validator_index)
{
ReceivedPreCapella::Yes
} else {
ReceivedPreCapella::No
};
bls_to_execution_changes.insert(bls_to_execution_change, received_pre_capella);
}
let op_pool = OperationPool {
attestations,
sync_contributions,
@@ -200,48 +218,6 @@ impl<E: EthSpec> PersistedOperationPool<E> {
}
}
impl<E: EthSpec> StoreItem for PersistedOperationPoolV5<E> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
PersistedOperationPoolV5::from_ssz_bytes(bytes).map_err(Into::into)
}
}
impl<E: EthSpec> StoreItem for PersistedOperationPoolV12<E> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
PersistedOperationPoolV12::from_ssz_bytes(bytes).map_err(Into::into)
}
}
impl<E: EthSpec> StoreItem for PersistedOperationPoolV14<E> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
PersistedOperationPoolV14::from_ssz_bytes(bytes).map_err(Into::into)
}
}
impl<E: EthSpec> StoreItem for PersistedOperationPoolV15<E> {
fn db_column() -> DBColumn {
DBColumn::OpPool
@@ -256,6 +232,20 @@ impl<E: EthSpec> StoreItem for PersistedOperationPoolV15<E> {
}
}
impl<E: EthSpec> StoreItem for PersistedOperationPoolV20<E> {
fn db_column() -> DBColumn {
DBColumn::OpPool
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
PersistedOperationPoolV20::from_ssz_bytes(bytes).map_err(Into::into)
}
}
/// Deserialization for `PersistedOperationPool` defaults to `PersistedOperationPool::V12`.
impl<E: EthSpec> StoreItem for PersistedOperationPool<E> {
fn db_column() -> DBColumn {
@@ -268,8 +258,8 @@ impl<E: EthSpec> StoreItem for PersistedOperationPool<E> {
fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
// Default deserialization to the latest variant.
PersistedOperationPoolV15::from_ssz_bytes(bytes)
.map(Self::V15)
PersistedOperationPoolV20::from_ssz_bytes(bytes)
.map(Self::V20)
.map_err(Into::into)
}
}