diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 6e04af01b9..4cc783e64f 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -4,7 +4,7 @@ use std::collections::{BTreeMap, HashMap}; use types::{ attestation::{AttestationBase, AttestationElectra}, superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector, - Checkpoint, Epoch, EthSpec, Hash256, Slot, + Checkpoint, Epoch, EthSpec, Hash256, Slot, Unsigned, }; #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] @@ -30,7 +30,6 @@ pub struct CompactIndexedAttestation { #[superstruct(only(Electra), partial_getter(rename = "aggregation_bits_electra"))] pub aggregation_bits: BitList, pub signature: AggregateSignature, - pub index: u64, #[superstruct(only(Electra))] pub committee_bits: BitVector, } @@ -79,7 +78,6 @@ impl SplitAttestation { attesting_indices, aggregation_bits: attn.aggregation_bits, signature: attestation.signature().clone(), - index: data.index, }) } Attestation::Electra(attn) => { @@ -87,7 +85,6 @@ impl SplitAttestation { attesting_indices, aggregation_bits: attn.aggregation_bits, signature: attestation.signature().clone(), - index: data.index, committee_bits: attn.committee_bits, }) } @@ -182,18 +179,11 @@ impl CompactIndexedAttestation { ( CompactIndexedAttestation::Electra(this), CompactIndexedAttestation::Electra(other), - ) => this.aggregate(other), + ) => this.aggregate_same_committee(other), // TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with? _ => (), } } - - pub fn committee_index(&self) -> u64 { - match self { - CompactIndexedAttestation::Base(att) => att.index, - CompactIndexedAttestation::Electra(att) => att.committee_index(), - } - } } impl CompactIndexedAttestationBase { @@ -225,14 +215,43 @@ impl CompactIndexedAttestationElectra { .is_zero() } - pub fn aggregate(&mut self, other: &Self) { + pub fn aggregate_same_committee(&mut self, other: &Self) { + // TODO(electra): remove assert in favour of Result + assert_eq!(self.committee_bits, other.committee_bits); + self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); + self.attesting_indices = self + .attesting_indices + .drain(..) + .merge(other.attesting_indices.iter().copied()) + .dedup() + .collect(); + self.signature.add_assign_aggregate(&other.signature); + } + + pub fn aggregate_with_disjoint_committees(&mut self, other: &Self) { + // TODO(electra): remove asserts or use Result + assert!( + self.committee_bits + .intersection(&other.committee_bits) + .is_zero(), + self.committee_bits, + other.committee_bits + ); + // The attestation being aggregated in must only have 1 committee bit set. + assert_eq!(other.committee_bits.num_set_bits(), 1); + // Check we are aggregating in increasing committee index order (so we can append + // aggregation bits). + assert!(self.committee_bits.highest_set_bit() < other.committee_bits.highest_set_bit()); + + self.committee_bits = self.committee_bits.union(&other.committee_bits); + self.aggregation_bits = + bitlist_extend(&self.aggregation_bits, &other.aggregation_bits).unwrap(); self.attesting_indices = self .attesting_indices .drain(..) .merge(other.attesting_indices.iter().copied()) .dedup() .collect(); - self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits); self.signature.add_assign_aggregate(&other.signature); } @@ -249,6 +268,25 @@ impl CompactIndexedAttestationElectra { } } +// TODO(electra): upstream this or a more efficient implementation +fn bitlist_extend(list1: &BitList, list2: &BitList) -> Option> { + let new_length = list1.len() + list2.len(); + let mut list = BitList::::with_capacity(new_length).ok()?; + + // Copy bits from list1. + for (i, bit) in list1.iter().enumerate() { + list.set(i, bit).ok()?; + } + + // Copy bits from list2, starting from the end of list1. + let offset = list1.len(); + for (i, bit) in list2.iter().enumerate() { + list.set(offset + i, bit).ok()?; + } + + Some(list) +} + impl AttestationMap { pub fn insert(&mut self, attestation: Attestation, attesting_indices: Vec) { let SplitAttestation { @@ -279,102 +317,85 @@ impl AttestationMap { } } + /// Aggregate Electra attestations for the same attestation data signed by different + /// committees. + /// + /// Non-Electra attestations are left as-is. pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) { let Some(attestation_map) = self.checkpoint_map.get_mut(&checkpoint_key) else { return; }; - for (_, compact_indexed_attestations) in attestation_map.attestations.iter_mut() { + for compact_indexed_attestations in attestation_map.attestations.values_mut() { let unaggregated_attestations = std::mem::take(compact_indexed_attestations); let mut aggregated_attestations: Vec> = vec![]; // Aggregate the best attestations for each committee and leave the rest. - let mut best_attestations_by_committee: BTreeMap> = - BTreeMap::new(); + let mut best_attestations_by_committee: BTreeMap< + u64, + CompactIndexedAttestationElectra, + > = BTreeMap::new(); for committee_attestation in unaggregated_attestations { - // TODO(electra) - // compare to best attestations by committee - // could probably use `.entry` here - if let Some(existing_attestation) = - best_attestations_by_committee.get_mut(&committee_attestation.committee_index()) - { - // compare and swap, put the discarded one straight into - // `aggregated_attestations` in case we have room to pack it without - // cross-committee aggregation - if existing_attestation.should_aggregate(&committee_attestation) { - existing_attestation.aggregate(&committee_attestation); - - best_attestations_by_committee.insert( - committee_attestation.committee_index(), - committee_attestation, - ); - } else { - aggregated_attestations.push(committee_attestation); + let mut electra_attestation = match committee_attestation { + CompactIndexedAttestation::Electra(att) + if att.committee_bits.num_set_bits() == 1 => + { + att } + CompactIndexedAttestation::Electra(att) => { + // Aggregate already covers multiple committees, leave it as-is. + aggregated_attestations.push(CompactIndexedAttestation::Electra(att)); + continue; + } + CompactIndexedAttestation::Base(att) => { + // Leave as-is. + aggregated_attestations.push(CompactIndexedAttestation::Base(att)); + continue; + } + }; + let committee_index = electra_attestation.committee_index(); + if let Some(existing_attestation) = + best_attestations_by_committee.get_mut(&committee_index) + { + // Search for the best (most aggregation bits) attestation for this committee + // index. + if electra_attestation.aggregation_bits.num_set_bits() + > existing_attestation.aggregation_bits.num_set_bits() + { + // New attestation is better than the previously known one for this + // committee. Replace it. + std::mem::swap(existing_attestation, &mut electra_attestation); + } + // Put the inferior attestation into the list of aggregated attestations + // without performing any cross-committee aggregation. + aggregated_attestations + .push(CompactIndexedAttestation::Electra(electra_attestation)); } else { - best_attestations_by_committee.insert( - committee_attestation.committee_index(), - committee_attestation, - ); + // First attestation seen for this committee. Place it in the map + // provisionally. + best_attestations_by_committee.insert(committee_index, electra_attestation); } } - // TODO(electra): aggregate all the best attestations by committee - // (use btreemap sort order to get order by committee index) - aggregated_attestations.extend(Self::compute_on_chain_aggregate( - best_attestations_by_committee, - )); + if let Some(on_chain_aggregate) = + Self::compute_on_chain_aggregate(best_attestations_by_committee) + { + aggregated_attestations + .push(CompactIndexedAttestation::Electra(on_chain_aggregate)); + } *compact_indexed_attestations = aggregated_attestations; } } - // TODO(electra) unwraps in this function should be cleaned up - // also in general this could be a bit more elegant pub fn compute_on_chain_aggregate( - mut attestations_by_committee: BTreeMap>, - ) -> Vec> { - let mut aggregated_attestations = vec![]; - if let Some((_, on_chain_aggregate)) = attestations_by_committee.pop_first() { - match on_chain_aggregate { - CompactIndexedAttestation::Base(a) => { - aggregated_attestations.push(CompactIndexedAttestation::Base(a)); - aggregated_attestations.extend( - attestations_by_committee - .values() - .map(|a| { - CompactIndexedAttestation::Base(CompactIndexedAttestationBase { - attesting_indices: a.attesting_indices().clone(), - aggregation_bits: a.aggregation_bits_base().unwrap().clone(), - signature: a.signature().clone(), - index: *a.index(), - }) - }) - .collect::>>(), - ); - } - CompactIndexedAttestation::Electra(mut a) => { - for (_, attestation) in attestations_by_committee.iter_mut() { - let new_committee_bits = a - .committee_bits - .union(attestation.committee_bits().unwrap()); - a.aggregate(attestation.as_electra().unwrap()); - - a = CompactIndexedAttestationElectra { - attesting_indices: a.attesting_indices.clone(), - aggregation_bits: a.aggregation_bits.clone(), - signature: a.signature.clone(), - index: a.index, - committee_bits: new_committee_bits, - }; - } - - aggregated_attestations.push(CompactIndexedAttestation::Electra(a)); - } - } + mut attestations_by_committee: BTreeMap>, + ) -> Option> { + let (_, mut on_chain_aggregate) = attestations_by_committee.pop_first()?; + for (_, attestation) in attestations_by_committee { + on_chain_aggregate.aggregate_with_disjoint_committees(&attestation); } - - aggregated_attestations + Some(on_chain_aggregate) } /// Iterate all attestations matching the given `checkpoint_key`. diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4f247e6bf2..daddbf7665 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -287,8 +287,10 @@ impl OperationPool { // TODO(electra): Work out how to do this more elegantly. This is a bit of a hack. let mut all_attestations = self.attestations.write(); - all_attestations.aggregate_across_committees(prev_epoch_key); - all_attestations.aggregate_across_committees(curr_epoch_key); + if fork_name >= ForkName::Electra { + all_attestations.aggregate_across_committees(prev_epoch_key); + all_attestations.aggregate_across_committees(curr_epoch_key); + } let all_attestations = parking_lot::RwLockWriteGuard::downgrade(all_attestations);