WIP faster attestation packing

This commit is contained in:
Michael Sproul
2022-07-06 18:49:03 +10:00
parent 748475be1d
commit 6f7f6aed96
12 changed files with 452 additions and 124 deletions

View File

@@ -1,20 +1,25 @@
mod attestation;
mod attestation_id;
mod attestation_storage;
mod attester_slashing;
mod max_cover;
mod metrics;
mod persistence;
mod reward_cache;
mod sync_aggregate_id;
pub use attestation::AttMaxCover;
pub use attestation_storage::AttestationRef;
pub use max_cover::MaxCover;
pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair};
pub use reward_cache::RewardCache;
use crate::attestation_storage::{AttestationMap, CheckpointKey};
use crate::sync_aggregate_id::SyncAggregateId;
use attestation_id::AttestationId;
use attester_slashing::AttesterSlashingMaxCover;
use max_cover::maximum_cover;
use parking_lot::RwLock;
use parking_lot::{RwLock, RwLockWriteGuard};
use state_processing::per_block_processing::errors::AttestationValidationError;
use state_processing::per_block_processing::{
get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit,
@@ -36,7 +41,7 @@ type SyncContributions<T> = RwLock<HashMap<SyncAggregateId, Vec<SyncCommitteeCon
#[derive(Default, Debug)]
pub struct OperationPool<T: EthSpec + Default> {
/// Map from attestation ID (see below) to vectors of attestations.
attestations: RwLock<HashMap<AttestationId, Vec<Attestation<T>>>>,
attestations: RwLock<AttestationMap<T>>,
/// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID.
sync_contributions: SyncContributions<T>,
/// Set of attester slashings, and the fork version they were verified against.
@@ -45,6 +50,8 @@ pub struct OperationPool<T: EthSpec + Default> {
proposer_slashings: RwLock<HashMap<u64, ProposerSlashing>>,
/// Map from exiting validator to their exit data.
voluntary_exits: RwLock<HashMap<u64, SignedVoluntaryExit>>,
/// Reward cache for accelerating attestation packing.
reward_cache: RwLock<RewardCache>,
_phantom: PhantomData<T>,
}
@@ -53,6 +60,12 @@ pub enum OpPoolError {
GetAttestationsTotalBalanceError(BeaconStateError),
GetBlockRootError(BeaconStateError),
SyncAggregateError(SyncAggregateError),
RewardCacheUpdatePrevEpoch(BeaconStateError),
RewardCacheUpdateCurrEpoch(BeaconStateError),
RewardCacheGetBlockRoot(BeaconStateError),
RewardCacheWrongEpoch,
RewardCacheValidatorUnknown(BeaconStateError),
RewardCacheOutOfBounds,
IncorrectOpPoolVariant,
}
@@ -176,43 +189,19 @@ impl<T: EthSpec> OperationPool<T> {
pub fn insert_attestation(
&self,
attestation: Attestation<T>,
fork: &Fork,
genesis_validators_root: Hash256,
spec: &ChainSpec,
attesting_indices: Vec<u64>,
) -> Result<(), AttestationValidationError> {
let id = AttestationId::from_data(&attestation.data, fork, genesis_validators_root, spec);
// Take a write lock on the attestations map.
let mut attestations = self.attestations.write();
let existing_attestations = match attestations.entry(id) {
Entry::Vacant(entry) => {
entry.insert(vec![attestation]);
return Ok(());
}
Entry::Occupied(entry) => entry.into_mut(),
};
let mut aggregated = false;
for existing_attestation in existing_attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&attestation) {
existing_attestation.aggregate(&attestation);
aggregated = true;
} else if *existing_attestation == attestation {
aggregated = true;
}
}
if !aggregated {
existing_attestations.push(attestation);
}
self.attestations
.write()
.insert(attestation, attesting_indices);
Ok(())
}
/// Total number of attestations in the pool, including attestations for the same data.
pub fn num_attestations(&self) -> usize {
self.attestations.read().values().map(Vec::len).sum()
// FIXME(sproul): implement
// self.attestations.read().values().map(Vec::len).sum()
0
}
pub fn attestation_stats(&self) -> AttestationStats {
@@ -220,11 +209,13 @@ impl<T: EthSpec> OperationPool<T> {
let mut num_attestation_data = 0;
let mut max_aggregates_per_data = 0;
/* FIXME(sproul): implement
for aggregates in self.attestations.read().values() {
num_attestations += aggregates.len();
num_attestation_data += 1;
max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len());
}
*/
AttestationStats {
num_attestations,
num_attestation_data,
@@ -235,36 +226,21 @@ impl<T: EthSpec> OperationPool<T> {
/// Return all valid attestations for the given epoch, for use in max cover.
fn get_valid_attestations_for_epoch<'a>(
&'a self,
epoch: Epoch,
all_attestations: &'a HashMap<AttestationId, Vec<Attestation<T>>>,
checkpoint_key: &'a CheckpointKey,
all_attestations: &'a AttestationMap<T>,
state: &'a BeaconState<T>,
reward_cache: &'a RewardCache,
total_active_balance: u64,
validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &'a ChainSpec,
) -> impl Iterator<Item = AttMaxCover<'a, T>> + Send {
let domain_bytes = AttestationId::compute_domain_bytes(
epoch,
&state.fork(),
state.genesis_validators_root(),
spec,
);
// FIXME(sproul): check inclusion slot somewhere
all_attestations
.iter()
.filter(move |(key, _)| key.domain_bytes_match(&domain_bytes))
.flat_map(|(_, attestations)| attestations)
.filter(move |attestation| attestation.data.target.epoch == epoch)
.filter(move |attestation| {
// Ensure attestations are valid for block inclusion
verify_attestation_for_block_inclusion(
state,
attestation,
VerifySignatures::False,
spec,
)
.is_ok()
})
.get_attestations(checkpoint_key)
.filter(validity_filter)
.filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec))
.filter_map(move |att| {
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
})
}
/// Get a list of attestations for inclusion in a block.
@@ -276,18 +252,25 @@ impl<T: EthSpec> OperationPool<T> {
pub fn get_attestations(
&self,
state: &BeaconState<T>,
prev_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
curr_epoch_validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send,
spec: &ChainSpec,
) -> Result<Vec<Attestation<T>>, OpPoolError> {
// Attestations for the current fork, which may be from the current or previous epoch.
let prev_epoch = state.previous_epoch();
let current_epoch = state.current_epoch();
let prev_epoch_key = CheckpointKey::from_state(state, state.previous_epoch());
let curr_epoch_key = CheckpointKey::from_state(state, state.current_epoch());
let all_attestations = self.attestations.read();
let total_active_balance = state
.get_total_active_balance()
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
// Update the reward cache.
let reward_timer = metrics::start_timer(&metrics::BUILD_REWARD_CACHE_TIME);
let mut reward_cache = self.reward_cache.write();
reward_cache.update(state)?;
let reward_cache = RwLockWriteGuard::downgrade(reward_cache);
drop(reward_timer);
// Split attestations for the previous & current epochs, so that we
// can optimise them individually in parallel.
let mut num_prev_valid = 0_i64;
@@ -295,9 +278,10 @@ impl<T: EthSpec> OperationPool<T> {
let prev_epoch_att = self
.get_valid_attestations_for_epoch(
prev_epoch,
&prev_epoch_key,
&*all_attestations,
state,
&*reward_cache,
total_active_balance,
prev_epoch_validity_filter,
spec,
@@ -305,9 +289,10 @@ impl<T: EthSpec> OperationPool<T> {
.inspect(|_| num_prev_valid += 1);
let curr_epoch_att = self
.get_valid_attestations_for_epoch(
current_epoch,
&curr_epoch_key,
&*all_attestations,
state,
&*reward_cache,
total_active_balance,
curr_epoch_validity_filter,
spec,
@@ -328,7 +313,7 @@ impl<T: EthSpec> OperationPool<T> {
move || {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME);
// If we're in the genesis epoch, just use the current epoch attestations.
if prev_epoch == current_epoch {
if prev_epoch_key == curr_epoch_key {
vec![]
} else {
maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations")
@@ -356,6 +341,8 @@ impl<T: EthSpec> OperationPool<T> {
/// Remove attestations which are too old to be included in a block.
pub fn prune_attestations(&self, current_epoch: Epoch) {
// FIXME(sproul): implement pruning
/*
// Prune attestations that are from before the previous epoch.
self.attestations.write().retain(|_, attestations| {
// All the attestations in this bucket have the same data, so we only need to
@@ -364,6 +351,7 @@ impl<T: EthSpec> OperationPool<T> {
.first()
.map_or(false, |att| current_epoch <= att.data.target.epoch + 1)
});
*/
}
/// Insert a proposer slashing into the pool.
@@ -438,7 +426,7 @@ impl<T: EthSpec> OperationPool<T> {
.into_iter()
.map(|cover| {
to_be_slashed.extend(cover.covering_set().keys());
cover.object().clone()
cover.intermediate().clone()
})
.collect();
@@ -556,11 +544,15 @@ impl<T: EthSpec> OperationPool<T> {
///
/// This method may return objects that are invalid for block inclusion.
pub fn get_all_attestations(&self) -> Vec<Attestation<T>> {
// FIXME(sproul): fix this
vec![]
/*
self.attestations
.read()
.values()
.flat_map(|attns| attns.iter().cloned())
.collect()
*/
}
/// Returns all known `Attestation` objects that pass the provided filter.
@@ -570,6 +562,7 @@ impl<T: EthSpec> OperationPool<T> {
where
F: Fn(&Attestation<T>) -> bool,
{
/* FIXME(sproul): fix
self.attestations
.read()
.values()
@@ -577,6 +570,8 @@ impl<T: EthSpec> OperationPool<T> {
.filter(|attn| filter(*attn))
.cloned()
.collect()
*/
vec![]
}
/// Returns all known `AttesterSlashing` objects.
@@ -654,8 +649,9 @@ impl<T: EthSpec + Default> PartialEq for OperationPool<T> {
if ptr::eq(self, other) {
return true;
}
*self.attestations.read() == *other.attestations.read()
&& *self.attester_slashings.read() == *other.attester_slashings.read()
// FIXME(sproul): uhhh
// *self.attestations.read() == *other.attestations.read()
true && *self.attester_slashings.read() == *other.attester_slashings.read()
&& *self.proposer_slashings.read() == *other.proposer_slashings.read()
&& *self.voluntary_exits.read() == *other.voluntary_exits.read()
}