From ebbf196745e3444b4aa5b1e52181b97cb2742712 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 7 Jul 2022 14:50:12 +1000 Subject: [PATCH] Resolve FIXMEs --- beacon_node/http_api/src/lib.rs | 18 +++-- beacon_node/operation_pool/src/attestation.rs | 2 +- .../operation_pool/src/attestation_storage.rs | 72 +++++++++++++++---- beacon_node/operation_pool/src/lib.rs | 66 +++++------------ 4 files changed, 83 insertions(+), 75 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index c0994510d3..a15b52582c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,10 +45,10 @@ use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ - Attestation, AttesterSlashing, BeaconBlockBodyMerge, BeaconBlockMerge, BeaconStateError, - BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, - ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, - SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, + Attestation, AttestationData, AttesterSlashing, BeaconBlockBodyMerge, BeaconBlockMerge, + BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, + FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, + SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, }; @@ -1307,13 +1307,11 @@ pub fn serve( .and_then( |chain: Arc>, query: api_types::AttestationPoolQuery| { blocking_json_task(move || { - let query_filter = |attestation: &Attestation| { - query - .slot - .map_or(true, |slot| slot == attestation.data.slot) + let query_filter = |data: &AttestationData| { + query.slot.map_or(true, |slot| slot == data.slot) && query .committee_index - .map_or(true, |index| index == attestation.data.index) + .map_or(true, |index| index == data.index) }; let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); @@ -1323,7 +1321,7 @@ pub fn serve( .read() .iter() .cloned() - .filter(query_filter), + .filter(|att| query_filter(&att.data)), ); Ok(api_types::GenericResponse::from(attestations)) }) diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index c284f78dc4..57345ba86e 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -96,7 +96,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { .filter_map(|&index| { if reward_cache .has_attested_in_epoch(index, att_data.target.epoch) - .expect("FIXME(sproul): remove this in prod") + .ok()? { return None; } diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs index 7bda1445a9..3e3f6f1b77 100644 --- a/beacon_node/operation_pool/src/attestation_storage.rs +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -1,3 +1,4 @@ +use crate::AttestationStats; use itertools::Itertools; use std::collections::HashMap; use types::{ @@ -33,12 +34,12 @@ pub struct AttestationRef<'a, T: EthSpec> { pub indexed: &'a CompactIndexedAttestation, } -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct AttestationMap { checkpoint_map: HashMap>, } -#[derive(Debug, Default)] +#[derive(Debug, Default, PartialEq)] pub struct AttestationDataMap { attestations: HashMap>>, } @@ -158,25 +159,68 @@ impl AttestationMap { } } + /// Iterate all attestations matching the given `checkpoint_key`. pub fn get_attestations<'a>( &'a self, checkpoint_key: &'a CheckpointKey, ) -> impl Iterator> + 'a { - // It's a monad :O self.checkpoint_map .get(checkpoint_key) .into_iter() - .flat_map(|attestation_map| { - attestation_map - .attestations - .iter() - .flat_map(|(data, vec_indexed)| { - vec_indexed.iter().map(|indexed| AttestationRef { - checkpoint: checkpoint_key, - data, - indexed, - }) - }) + .flat_map(|attestation_map| attestation_map.iter(checkpoint_key)) + } + + /// Iterate all attestations in the map. + pub fn iter<'a>(&'a self) -> impl Iterator> + 'a { + self.checkpoint_map + .iter() + .flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key)) + } + + /// Prune attestation that are from before the previous epoch. + pub fn prune(&mut self, current_epoch: Epoch) { + self.checkpoint_map + .retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1); + } + + /// Statistics about all attestations stored in the map. + pub fn stats(&self) -> AttestationStats { + self.checkpoint_map + .values() + .map(AttestationDataMap::stats) + .fold(AttestationStats::default(), |mut acc, new| { + acc.num_attestations += new.num_attestations; + acc.num_attestation_data += new.num_attestation_data; + acc.max_aggregates_per_data = + std::cmp::max(acc.max_aggregates_per_data, new.max_aggregates_per_data); + acc }) } } + +impl AttestationDataMap { + pub fn iter<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + self.attestations.iter().flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }) + } + + pub fn stats(&self) -> AttestationStats { + let mut stats = AttestationStats::default(); + + for aggregates in self.attestations.values() { + stats.num_attestations += aggregates.len(); + stats.num_attestation_data += 1; + stats.max_aggregates_per_data = + std::cmp::max(stats.max_aggregates_per_data, aggregates.len()); + } + stats + } +} diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 4e180db6f4..27dcf89f70 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -29,9 +29,9 @@ use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::marker::PhantomData; use std::ptr; use types::{ - sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing, - BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256, - ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, + sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttestationData, + AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, + Hash256, ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Validator, }; @@ -68,6 +68,7 @@ pub enum OpPoolError { IncorrectOpPoolVariant, } +#[derive(Default)] pub struct AttestationStats { /// Total number of attestations for all committeees/indices/votes. pub num_attestations: usize, @@ -198,28 +199,11 @@ impl OperationPool { /// Total number of attestations in the pool, including attestations for the same data. pub fn num_attestations(&self) -> usize { - // FIXME(sproul): implement - // self.attestations.read().values().map(Vec::len).sum() - 0 + self.attestation_stats().num_attestations } pub fn attestation_stats(&self) -> AttestationStats { - let mut num_attestations = 0; - let mut num_attestation_data = 0; - let mut max_aggregates_per_data = 0; - - /* FIXME(sproul): implement - for aggregates in self.attestations.read().values() { - num_attestations += aggregates.len(); - num_attestation_data += 1; - max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len()); - } - */ - AttestationStats { - num_attestations, - num_attestation_data, - max_aggregates_per_data, - } + self.attestations.read().stats() } /// Return all valid attestations for the given epoch, for use in max cover. @@ -343,17 +327,7 @@ impl OperationPool { /// Remove attestations which are too old to be included in a block. pub fn prune_attestations(&self, current_epoch: Epoch) { - // FIXME(sproul): implement pruning - /* - // Prune attestations that are from before the previous epoch. - self.attestations.write().retain(|_, attestations| { - // All the attestations in this bucket have the same data, so we only need to - // check the first one. - attestations - .first() - .map_or(false, |att| current_epoch <= att.data.target.epoch + 1) - }); - */ + self.attestations.write().prune(current_epoch); } /// Insert a proposer slashing into the pool. @@ -546,15 +520,11 @@ impl OperationPool { /// /// This method may return objects that are invalid for block inclusion. pub fn get_all_attestations(&self) -> Vec> { - // FIXME(sproul): fix this - vec![] - /* self.attestations .read() - .values() - .flat_map(|attns| attns.iter().cloned()) + .iter() + .map(|att| att.clone_as_attestation()) .collect() - */ } /// Returns all known `Attestation` objects that pass the provided filter. @@ -562,18 +532,14 @@ impl OperationPool { /// This method may return objects that are invalid for block inclusion. pub fn get_filtered_attestations(&self, filter: F) -> Vec> where - F: Fn(&Attestation) -> bool, + F: Fn(&AttestationData) -> bool, { - /* FIXME(sproul): fix self.attestations .read() - .values() - .flat_map(|attns| attns.iter()) - .filter(|attn| filter(*attn)) - .cloned() + .iter() + .filter(|att| filter(&att.attestation_data())) + .map(|att| att.clone_as_attestation()) .collect() - */ - vec![] } /// Returns all known `AttesterSlashing` objects. @@ -651,9 +617,9 @@ impl PartialEq for OperationPool { if ptr::eq(self, other) { return true; } - // FIXME(sproul): uhhh - // *self.attestations.read() == *other.attestations.read() - true && *self.attester_slashings.read() == *other.attester_slashings.read() + *self.attestations.read() == *other.attestations.read() + && *self.sync_contributions.read() == *other.sync_contributions.read() + && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read() }