Resolve FIXMEs

This commit is contained in:
Michael Sproul
2022-07-07 14:50:12 +10:00
parent 24fdd56baf
commit ebbf196745
4 changed files with 83 additions and 75 deletions

View File

@@ -45,10 +45,10 @@ use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ use types::{
Attestation, AttesterSlashing, BeaconBlockBodyMerge, BeaconBlockMerge, BeaconStateError, Attestation, AttestationData, AttesterSlashing, BeaconBlockBodyMerge, BeaconBlockMerge,
BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, FullPayload, BeaconStateError, BlindedPayload, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature, SignedAggregateAndProof, FullPayload, ProposerPreparationData, ProposerSlashing, RelativeEpoch, Signature,
SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockMerge, SignedBlindedBeaconBlock,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData, SyncCommitteeMessage, SyncContributionData,
}; };
@@ -1307,13 +1307,11 @@ pub fn serve<T: BeaconChainTypes>(
.and_then( .and_then(
|chain: Arc<BeaconChain<T>>, query: api_types::AttestationPoolQuery| { |chain: Arc<BeaconChain<T>>, query: api_types::AttestationPoolQuery| {
blocking_json_task(move || { blocking_json_task(move || {
let query_filter = |attestation: &Attestation<T::EthSpec>| { let query_filter = |data: &AttestationData| {
query query.slot.map_or(true, |slot| slot == data.slot)
.slot
.map_or(true, |slot| slot == attestation.data.slot)
&& query && query
.committee_index .committee_index
.map_or(true, |index| index == attestation.data.index) .map_or(true, |index| index == data.index)
}; };
let mut attestations = chain.op_pool.get_filtered_attestations(query_filter); let mut attestations = chain.op_pool.get_filtered_attestations(query_filter);
@@ -1323,7 +1321,7 @@ pub fn serve<T: BeaconChainTypes>(
.read() .read()
.iter() .iter()
.cloned() .cloned()
.filter(query_filter), .filter(|att| query_filter(&att.data)),
); );
Ok(api_types::GenericResponse::from(attestations)) Ok(api_types::GenericResponse::from(attestations))
}) })

View File

@@ -96,7 +96,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
.filter_map(|&index| { .filter_map(|&index| {
if reward_cache if reward_cache
.has_attested_in_epoch(index, att_data.target.epoch) .has_attested_in_epoch(index, att_data.target.epoch)
.expect("FIXME(sproul): remove this in prod") .ok()?
{ {
return None; return None;
} }

View File

@@ -1,3 +1,4 @@
use crate::AttestationStats;
use itertools::Itertools; use itertools::Itertools;
use std::collections::HashMap; use std::collections::HashMap;
use types::{ use types::{
@@ -33,12 +34,12 @@ pub struct AttestationRef<'a, T: EthSpec> {
pub indexed: &'a CompactIndexedAttestation<T>, pub indexed: &'a CompactIndexedAttestation<T>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default, PartialEq)]
pub struct AttestationMap<T: EthSpec> { pub struct AttestationMap<T: EthSpec> {
checkpoint_map: HashMap<CheckpointKey, AttestationDataMap<T>>, checkpoint_map: HashMap<CheckpointKey, AttestationDataMap<T>>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default, PartialEq)]
pub struct AttestationDataMap<T: EthSpec> { pub struct AttestationDataMap<T: EthSpec> {
attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>, attestations: HashMap<CompactAttestationData, Vec<CompactIndexedAttestation<T>>>,
} }
@@ -158,25 +159,68 @@ impl<T: EthSpec> AttestationMap<T> {
} }
} }
/// Iterate all attestations matching the given `checkpoint_key`.
pub fn get_attestations<'a>( pub fn get_attestations<'a>(
&'a self, &'a self,
checkpoint_key: &'a CheckpointKey, checkpoint_key: &'a CheckpointKey,
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a { ) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
// It's a monad :O
self.checkpoint_map self.checkpoint_map
.get(checkpoint_key) .get(checkpoint_key)
.into_iter() .into_iter()
.flat_map(|attestation_map| { .flat_map(|attestation_map| attestation_map.iter(checkpoint_key))
attestation_map }
.attestations
.iter() /// Iterate all attestations in the map.
.flat_map(|(data, vec_indexed)| { pub fn iter<'a>(&'a self) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
vec_indexed.iter().map(|indexed| AttestationRef { self.checkpoint_map
checkpoint: checkpoint_key, .iter()
data, .flat_map(|(checkpoint_key, attestation_map)| attestation_map.iter(checkpoint_key))
indexed, }
})
}) /// Prune attestation that are from before the previous epoch.
pub fn prune(&mut self, current_epoch: Epoch) {
self.checkpoint_map
.retain(|checkpoint_key, _| current_epoch <= checkpoint_key.target_epoch + 1);
}
/// Statistics about all attestations stored in the map.
pub fn stats(&self) -> AttestationStats {
self.checkpoint_map
.values()
.map(AttestationDataMap::stats)
.fold(AttestationStats::default(), |mut acc, new| {
acc.num_attestations += new.num_attestations;
acc.num_attestation_data += new.num_attestation_data;
acc.max_aggregates_per_data =
std::cmp::max(acc.max_aggregates_per_data, new.max_aggregates_per_data);
acc
}) })
} }
} }
impl<T: EthSpec> AttestationDataMap<T> {
pub fn iter<'a>(
&'a self,
checkpoint_key: &'a CheckpointKey,
) -> impl Iterator<Item = AttestationRef<'a, T>> + 'a {
self.attestations.iter().flat_map(|(data, vec_indexed)| {
vec_indexed.iter().map(|indexed| AttestationRef {
checkpoint: checkpoint_key,
data,
indexed,
})
})
}
pub fn stats(&self) -> AttestationStats {
let mut stats = AttestationStats::default();
for aggregates in self.attestations.values() {
stats.num_attestations += aggregates.len();
stats.num_attestation_data += 1;
stats.max_aggregates_per_data =
std::cmp::max(stats.max_aggregates_per_data, aggregates.len());
}
stats
}
}

View File

@@ -29,9 +29,9 @@ use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ptr; use std::ptr;
use types::{ use types::{
sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttesterSlashing, sync_aggregate::Error as SyncAggregateError, typenum::Unsigned, Attestation, AttestationData,
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion, Hash256, AttesterSlashing, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, ForkVersion,
ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution, Hash256, ProposerSlashing, SignedVoluntaryExit, Slot, SyncAggregate, SyncCommitteeContribution,
Validator, Validator,
}; };
@@ -68,6 +68,7 @@ pub enum OpPoolError {
IncorrectOpPoolVariant, IncorrectOpPoolVariant,
} }
#[derive(Default)]
pub struct AttestationStats { pub struct AttestationStats {
/// Total number of attestations for all committeees/indices/votes. /// Total number of attestations for all committeees/indices/votes.
pub num_attestations: usize, pub num_attestations: usize,
@@ -198,28 +199,11 @@ impl<T: EthSpec> OperationPool<T> {
/// Total number of attestations in the pool, including attestations for the same data. /// Total number of attestations in the pool, including attestations for the same data.
pub fn num_attestations(&self) -> usize { pub fn num_attestations(&self) -> usize {
// FIXME(sproul): implement self.attestation_stats().num_attestations
// self.attestations.read().values().map(Vec::len).sum()
0
} }
pub fn attestation_stats(&self) -> AttestationStats { pub fn attestation_stats(&self) -> AttestationStats {
let mut num_attestations = 0; self.attestations.read().stats()
let mut num_attestation_data = 0;
let mut max_aggregates_per_data = 0;
/* FIXME(sproul): implement
for aggregates in self.attestations.read().values() {
num_attestations += aggregates.len();
num_attestation_data += 1;
max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len());
}
*/
AttestationStats {
num_attestations,
num_attestation_data,
max_aggregates_per_data,
}
} }
/// Return all valid attestations for the given epoch, for use in max cover. /// Return all valid attestations for the given epoch, for use in max cover.
@@ -343,17 +327,7 @@ impl<T: EthSpec> OperationPool<T> {
/// Remove attestations which are too old to be included in a block. /// Remove attestations which are too old to be included in a block.
pub fn prune_attestations(&self, current_epoch: Epoch) { pub fn prune_attestations(&self, current_epoch: Epoch) {
// FIXME(sproul): implement pruning self.attestations.write().prune(current_epoch);
/*
// Prune attestations that are from before the previous epoch.
self.attestations.write().retain(|_, attestations| {
// All the attestations in this bucket have the same data, so we only need to
// check the first one.
attestations
.first()
.map_or(false, |att| current_epoch <= att.data.target.epoch + 1)
});
*/
} }
/// Insert a proposer slashing into the pool. /// Insert a proposer slashing into the pool.
@@ -546,15 +520,11 @@ impl<T: EthSpec> OperationPool<T> {
/// ///
/// This method may return objects that are invalid for block inclusion. /// This method may return objects that are invalid for block inclusion.
pub fn get_all_attestations(&self) -> Vec<Attestation<T>> { pub fn get_all_attestations(&self) -> Vec<Attestation<T>> {
// FIXME(sproul): fix this
vec![]
/*
self.attestations self.attestations
.read() .read()
.values() .iter()
.flat_map(|attns| attns.iter().cloned()) .map(|att| att.clone_as_attestation())
.collect() .collect()
*/
} }
/// Returns all known `Attestation` objects that pass the provided filter. /// Returns all known `Attestation` objects that pass the provided filter.
@@ -562,18 +532,14 @@ impl<T: EthSpec> OperationPool<T> {
/// This method may return objects that are invalid for block inclusion. /// This method may return objects that are invalid for block inclusion.
pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<T>> pub fn get_filtered_attestations<F>(&self, filter: F) -> Vec<Attestation<T>>
where where
F: Fn(&Attestation<T>) -> bool, F: Fn(&AttestationData) -> bool,
{ {
/* FIXME(sproul): fix
self.attestations self.attestations
.read() .read()
.values() .iter()
.flat_map(|attns| attns.iter()) .filter(|att| filter(&att.attestation_data()))
.filter(|attn| filter(*attn)) .map(|att| att.clone_as_attestation())
.cloned()
.collect() .collect()
*/
vec![]
} }
/// Returns all known `AttesterSlashing` objects. /// Returns all known `AttesterSlashing` objects.
@@ -651,9 +617,9 @@ impl<T: EthSpec + Default> PartialEq for OperationPool<T> {
if ptr::eq(self, other) { if ptr::eq(self, other) {
return true; return true;
} }
// FIXME(sproul): uhhh *self.attestations.read() == *other.attestations.read()
// *self.attestations.read() == *other.attestations.read() && *self.sync_contributions.read() == *other.sync_contributions.read()
true && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.attester_slashings.read() == *other.attester_slashings.read()
&& *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read()
&& *self.voluntary_exits.read() == *other.voluntary_exits.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read()
} }