From 6f7f6aed96fde111ffb2b666b8e0d7c5d3886acb Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 6 Jul 2022 18:49:03 +1000 Subject: [PATCH] WIP faster attestation packing --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 35 ++-- beacon_node/beacon_chain/src/block_reward.rs | 4 + beacon_node/operation_pool/Cargo.toml | 1 + beacon_node/operation_pool/src/attestation.rs | 68 ++++--- .../operation_pool/src/attestation_storage.rs | 172 ++++++++++++++++++ .../operation_pool/src/attester_slashing.rs | 8 +- beacon_node/operation_pool/src/lib.rs | 130 +++++++------ beacon_node/operation_pool/src/max_cover.rs | 22 ++- beacon_node/operation_pool/src/metrics.rs | 4 + beacon_node/operation_pool/src/persistence.rs | 9 +- .../operation_pool/src/reward_cache.rs | 122 +++++++++++++ 12 files changed, 452 insertions(+), 124 deletions(-) create mode 100644 beacon_node/operation_pool/src/attestation_storage.rs create mode 100644 beacon_node/operation_pool/src/reward_cache.rs diff --git a/Cargo.lock b/Cargo.lock index bb7308b938..d17370042b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4162,6 +4162,7 @@ name = "operation_pool" version = "0.2.0" dependencies = [ "beacon_chain", + "bitvec 1.0.0", "derivative", "eth2_ssz", "eth2_ssz_derive", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9fb895f78f..65640092fc 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -61,7 +61,7 @@ use fork_choice::{ use futures::channel::mpsc::Sender; use itertools::process_results; use itertools::Itertools; -use operation_pool::{OperationPool, PersistedOperationPool}; +use operation_pool::{AttestationRef, OperationPool, PersistedOperationPool}; use parking_lot::{Mutex, RwLock}; use safe_arith::SafeArith; use slasher::Slasher; @@ -1867,13 +1867,16 @@ impl BeaconChain { if self.eth1_chain.is_some() { let fork = self.canonical_head.cached_head().head_fork(); + // TODO: address these clones. + let attesting_indices = verified_attestation + .indexed_attestation() + .attesting_indices + .clone() + .into(); self.op_pool .insert_attestation( - // TODO: address this clone. verified_attestation.attestation().clone(), - &fork, - self.genesis_validators_root, - &self.spec, + attesting_indices, ) .map_err(Error::from)?; } @@ -1907,15 +1910,15 @@ impl BeaconChain { pub fn filter_op_pool_attestation( &self, filter_cache: &mut HashMap<(Hash256, Epoch), bool>, - att: &Attestation, + att: &AttestationRef, state: &BeaconState, ) -> bool { *filter_cache - .entry((att.data.beacon_block_root, att.data.target.epoch)) + .entry((att.data.beacon_block_root, att.checkpoint.target_epoch)) .or_insert_with(|| { self.shuffling_is_compatible( &att.data.beacon_block_root, - att.data.target.epoch, + att.checkpoint.target_epoch, state, ) }) @@ -3279,12 +3282,8 @@ impl BeaconChain { let unagg_import_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES); for attestation in self.naive_aggregation_pool.read().iter() { - if let Err(e) = self.op_pool.insert_attestation( - attestation.clone(), - &state.fork(), - state.genesis_validators_root(), - &self.spec, - ) { + // FIXME(sproul): put correct attesting indices + if let Err(e) = self.op_pool.insert_attestation(attestation.clone(), vec![]) { // Don't stop block production if there's an error, just create a log. error!( self.log, @@ -3305,12 +3304,12 @@ impl BeaconChain { metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES); let mut prev_filter_cache = HashMap::new(); - let prev_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut prev_filter_cache, *att, &state) + let prev_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state) }; let mut curr_filter_cache = HashMap::new(); - let curr_attestation_filter = |att: &&Attestation| { - self.filter_op_pool_attestation(&mut curr_filter_cache, *att, &state) + let curr_attestation_filter = |att: &AttestationRef| { + self.filter_op_pool_attestation(&mut curr_filter_cache, att, &state) }; let attestations = self diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs index 4b8b809d3f..790ad743b2 100644 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ b/beacon_node/beacon_chain/src/block_reward.rs @@ -12,6 +12,9 @@ impl BeaconChain { state: &BeaconState, include_attestations: bool, ) -> Result { + // FIXME(sproul): make an AttestationRef? + unimplemented!() + /* if block.slot() != state.slot() { return Err(BeaconChainError::BlockRewardSlotError); } @@ -106,5 +109,6 @@ impl BeaconChain { attestation_rewards, sync_committee_rewards, }) + */ } } diff --git a/beacon_node/operation_pool/Cargo.toml b/beacon_node/operation_pool/Cargo.toml index 6b8b8eb145..edb0df22b0 100644 --- a/beacon_node/operation_pool/Cargo.toml +++ b/beacon_node/operation_pool/Cargo.toml @@ -18,6 +18,7 @@ rayon = "1.5.0" serde = "1.0.116" serde_derive = "1.0.116" store = { path = "../store" } +bitvec = "1" [dev-dependencies] beacon_chain = { path = "../beacon_chain" } diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 2f7fba4540..c284f78dc4 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -1,4 +1,6 @@ +use crate::attestation_storage::AttestationRef; use crate::max_cover::MaxCover; +use crate::reward_cache::RewardCache; use state_processing::common::{ altair, base, get_attestation_participation_flag_indices, get_attesting_indices, }; @@ -12,33 +14,37 @@ use types::{ #[derive(Debug, Clone)] pub struct AttMaxCover<'a, T: EthSpec> { /// Underlying attestation. - pub att: &'a Attestation, + pub att: AttestationRef<'a, T>, /// Mapping of validator indices and their rewards. pub fresh_validators_rewards: HashMap, } impl<'a, T: EthSpec> AttMaxCover<'a, T> { pub fn new( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { if let BeaconState::Base(ref base_state) = state { Self::new_for_base(att, state, base_state, total_active_balance, spec) } else { - Self::new_for_altair(att, state, total_active_balance, spec) + Self::new_for_altair(att, state, reward_cache, total_active_balance, spec) } } /// Initialise an attestation cover object for base/phase0 hard fork. pub fn new_for_base( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, base_state: &BeaconStateBase, total_active_balance: u64, spec: &ChainSpec, ) -> Option { + // FIXME(sproul): re-enable + None + /* let fresh_validators = earliest_attestation_validators(att, state, base_state); let committee = state .get_beacon_committee(att.data.slot, att.data.index) @@ -63,49 +69,46 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { att, fresh_validators_rewards, }) + */ } /// Initialise an attestation cover object for Altair or later. pub fn new_for_altair( - att: &'a Attestation, + att: AttestationRef<'a, T>, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { - let committee = state - .get_beacon_committee(att.data.slot, att.data.index) - .ok()?; - let attesting_indices = - get_attesting_indices::(committee.committee, &att.aggregation_bits).ok()?; + let att_data = att.attestation_data(); - let participation_list = if att.data.target.epoch == state.current_epoch() { - state.current_epoch_participation().ok()? - } else if att.data.target.epoch == state.previous_epoch() { - state.previous_epoch_participation().ok()? - } else { - return None; - }; - - let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?; + let inclusion_delay = state.slot().as_u64().checked_sub(att_data.slot.as_u64())?; let att_participation_flags = - get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec) + get_attestation_participation_flag_indices(state, &att_data, inclusion_delay, spec) .ok()?; let base_reward_per_increment = altair::BaseRewardPerIncrement::new(total_active_balance, spec).ok()?; - let fresh_validators_rewards = attesting_indices + let fresh_validators_rewards = att + .indexed + .attesting_indices .iter() .filter_map(|&index| { + if reward_cache + .has_attested_in_epoch(index, att_data.target.epoch) + .expect("FIXME(sproul): remove this in prod") + { + return None; + } + let mut proposer_reward_numerator = 0; - let participation = participation_list.get(index)?; let base_reward = - altair::get_base_reward(state, index, base_reward_per_increment, spec).ok()?; + altair::get_base_reward(state, index as usize, base_reward_per_increment, spec) + .ok()?; for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { - if att_participation_flags.contains(&flag_index) - && !participation.has_flag(flag_index).ok()? - { + if att_participation_flags.contains(&flag_index) { proposer_reward_numerator += base_reward.checked_mul(*weight)?; } } @@ -113,7 +116,7 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { let proposer_reward = proposer_reward_numerator .checked_div(WEIGHT_DENOMINATOR.checked_mul(spec.proposer_reward_quotient)?)?; - Some((index as u64, proposer_reward)).filter(|_| proposer_reward != 0) + Some((index, proposer_reward)).filter(|_| proposer_reward != 0) }) .collect(); @@ -126,10 +129,15 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { type Object = Attestation; + type Intermediate = AttestationRef<'a, T>; type Set = HashMap; - fn object(&self) -> &Attestation { - self.att + fn intermediate(&self) -> &AttestationRef<'a, T> { + &self.att + } + + fn convert_to_object(att_ref: &AttestationRef<'a, T>) -> Attestation { + att_ref.clone_as_attestation() } fn covering_set(&self) -> &HashMap { @@ -148,7 +156,7 @@ impl<'a, T: EthSpec> MaxCover for AttMaxCover<'a, T> { /// of slashable voting, which is rare. fn update_covering_set( &mut self, - best_att: &Attestation, + best_att: &AttestationRef<'a, T>, covered_validators: &HashMap, ) { if self.att.data.slot == best_att.data.slot && self.att.data.index == best_att.data.index { diff --git a/beacon_node/operation_pool/src/attestation_storage.rs b/beacon_node/operation_pool/src/attestation_storage.rs new file mode 100644 index 0000000000..247645e810 --- /dev/null +++ b/beacon_node/operation_pool/src/attestation_storage.rs @@ -0,0 +1,172 @@ +use std::collections::HashMap; +use types::{ + AggregateSignature, Attestation, AttestationData, BeaconState, BitList, Checkpoint, Epoch, + EthSpec, Hash256, IndexedAttestation, Slot, +}; + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct CheckpointKey { + pub source: Checkpoint, + pub target_epoch: Epoch, +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct CompactAttestationData { + pub slot: Slot, + pub index: u64, + pub beacon_block_root: Hash256, + pub target_root: Hash256, +} + +#[derive(Debug)] +pub struct CompactIndexedAttestation { + pub attesting_indices: Vec, + pub aggregation_bits: BitList, + pub signature: AggregateSignature, +} + +#[derive(Debug, Clone)] +pub struct AttestationRef<'a, T: EthSpec> { + pub checkpoint: &'a CheckpointKey, + pub data: &'a CompactAttestationData, + pub indexed: &'a CompactIndexedAttestation, +} + +#[derive(Debug, Default)] +pub struct AttestationMap { + checkpoint_map: HashMap>, +} + +#[derive(Debug, Default)] +pub struct AttestationDataMap { + attestations: HashMap>>, +} + +fn split( + attestation: Attestation, + attesting_indices: Vec, +) -> ( + CheckpointKey, + CompactAttestationData, + CompactIndexedAttestation, +) { + let checkpoint_key = CheckpointKey { + source: attestation.data.source, + target_epoch: attestation.data.target.epoch, + }; + let attestation_data = CompactAttestationData { + slot: attestation.data.slot, + index: attestation.data.index, + beacon_block_root: attestation.data.beacon_block_root, + target_root: attestation.data.target.root, + }; + let indexed_attestation = CompactIndexedAttestation { + attesting_indices, + aggregation_bits: attestation.aggregation_bits, + signature: attestation.signature, + }; + (checkpoint_key, attestation_data, indexed_attestation) +} + +impl<'a, T: EthSpec> AttestationRef<'a, T> { + pub fn attestation_data(&self) -> AttestationData { + AttestationData { + slot: self.data.slot, + index: self.data.index, + beacon_block_root: self.data.beacon_block_root, + source: self.checkpoint.source, + target: Checkpoint { + epoch: self.checkpoint.target_epoch, + root: self.data.target_root, + }, + } + } + + pub fn clone_as_attestation(&self) -> Attestation { + Attestation { + aggregation_bits: self.indexed.aggregation_bits.clone(), + data: self.attestation_data(), + signature: self.indexed.signature.clone(), + } + } +} + +impl CheckpointKey { + pub fn from_state(state: &BeaconState, epoch: Epoch) -> Self { + if epoch == state.current_epoch() { + CheckpointKey { + source: state.current_justified_checkpoint(), + target_epoch: epoch, + } + } else { + CheckpointKey { + source: state.previous_justified_checkpoint(), + target_epoch: epoch, + } + } + } +} + +impl AttestationMap { + pub fn insert(&mut self, attestation: Attestation, attesting_indices: Vec) { + let (checkpoint_key, attestation_data, indexed_attestation) = + split(attestation, attesting_indices); + + let attestation_map = self + .checkpoint_map + .entry(checkpoint_key) + .or_insert_with(AttestationDataMap::default); + let attestations = attestation_map + .attestations + .entry(attestation_data) + .or_insert_with(Vec::new); + + // FIXME(sproul): do greedy aggregation here + /* + let existing_attestations = match attestations.entry(id) { + Entry::Vacant(entry) => { + entry.insert(vec![attestation]); + return Ok(()); + } + Entry::Occupied(entry) => entry.into_mut(), + }; + + let mut aggregated = false; + for existing_attestation in existing_attestations.iter_mut() { + if existing_attestation.signers_disjoint_from(&attestation) { + existing_attestation.aggregate(&attestation); + aggregated = true; + } else if *existing_attestation == attestation { + aggregated = true; + } + } + + if !aggregated { + existing_attestations.push(attestation); + } + */ + attestations.push(indexed_attestation); + } + + pub fn get_attestations<'a>( + &'a self, + checkpoint_key: &'a CheckpointKey, + ) -> impl Iterator> + 'a { + // It's a monad :O + self.checkpoint_map + .get(&checkpoint_key) + .into_iter() + .flat_map(|attestation_map| { + attestation_map + .attestations + .iter() + .flat_map(|(data, vec_indexed)| { + vec_indexed.iter().map(|indexed| AttestationRef { + checkpoint: checkpoint_key, + data, + indexed, + }) + }) + }) + } +} diff --git a/beacon_node/operation_pool/src/attester_slashing.rs b/beacon_node/operation_pool/src/attester_slashing.rs index 2cb63ad252..f5916384d4 100644 --- a/beacon_node/operation_pool/src/attester_slashing.rs +++ b/beacon_node/operation_pool/src/attester_slashing.rs @@ -39,14 +39,18 @@ impl<'a, T: EthSpec> AttesterSlashingMaxCover<'a, T> { impl<'a, T: EthSpec> MaxCover for AttesterSlashingMaxCover<'a, T> { /// The result type, of which we would eventually like a collection of maximal quality. type Object = AttesterSlashing; + type Intermediate = AttesterSlashing; /// The type used to represent sets. type Set = HashMap; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &AttesterSlashing { + fn intermediate(&self) -> &AttesterSlashing { self.slashing } + fn convert_to_object(slashing: &AttesterSlashing) -> AttesterSlashing { + slashing.clone() + } + /// Get the set of elements covered. fn covering_set(&self) -> &HashMap { &self.effective_balances diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index 771dca12f6..76f9b2e872 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -1,20 +1,25 @@ mod attestation; mod attestation_id; +mod attestation_storage; mod attester_slashing; mod max_cover; mod metrics; mod persistence; +mod reward_cache; mod sync_aggregate_id; pub use attestation::AttMaxCover; +pub use attestation_storage::AttestationRef; pub use max_cover::MaxCover; pub use persistence::{PersistedOperationPool, PersistedOperationPoolAltair}; +pub use reward_cache::RewardCache; +use crate::attestation_storage::{AttestationMap, CheckpointKey}; use crate::sync_aggregate_id::SyncAggregateId; use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit, @@ -36,7 +41,7 @@ type SyncContributions = RwLock { /// Map from attestation ID (see below) to vectors of attestations. - attestations: RwLock>>>, + attestations: RwLock>, /// Map from sync aggregate ID to the best `SyncCommitteeContribution`s seen for that ID. sync_contributions: SyncContributions, /// Set of attester slashings, and the fork version they were verified against. @@ -45,6 +50,8 @@ pub struct OperationPool { proposer_slashings: RwLock>, /// Map from exiting validator to their exit data. voluntary_exits: RwLock>, + /// Reward cache for accelerating attestation packing. + reward_cache: RwLock, _phantom: PhantomData, } @@ -53,6 +60,12 @@ pub enum OpPoolError { GetAttestationsTotalBalanceError(BeaconStateError), GetBlockRootError(BeaconStateError), SyncAggregateError(SyncAggregateError), + RewardCacheUpdatePrevEpoch(BeaconStateError), + RewardCacheUpdateCurrEpoch(BeaconStateError), + RewardCacheGetBlockRoot(BeaconStateError), + RewardCacheWrongEpoch, + RewardCacheValidatorUnknown(BeaconStateError), + RewardCacheOutOfBounds, IncorrectOpPoolVariant, } @@ -176,43 +189,19 @@ impl OperationPool { pub fn insert_attestation( &self, attestation: Attestation, - fork: &Fork, - genesis_validators_root: Hash256, - spec: &ChainSpec, + attesting_indices: Vec, ) -> Result<(), AttestationValidationError> { - let id = AttestationId::from_data(&attestation.data, fork, genesis_validators_root, spec); - - // Take a write lock on the attestations map. - let mut attestations = self.attestations.write(); - - let existing_attestations = match attestations.entry(id) { - Entry::Vacant(entry) => { - entry.insert(vec![attestation]); - return Ok(()); - } - Entry::Occupied(entry) => entry.into_mut(), - }; - - let mut aggregated = false; - for existing_attestation in existing_attestations.iter_mut() { - if existing_attestation.signers_disjoint_from(&attestation) { - existing_attestation.aggregate(&attestation); - aggregated = true; - } else if *existing_attestation == attestation { - aggregated = true; - } - } - - if !aggregated { - existing_attestations.push(attestation); - } - + self.attestations + .write() + .insert(attestation, attesting_indices); Ok(()) } /// Total number of attestations in the pool, including attestations for the same data. pub fn num_attestations(&self) -> usize { - self.attestations.read().values().map(Vec::len).sum() + // FIXME(sproul): implement + // self.attestations.read().values().map(Vec::len).sum() + 0 } pub fn attestation_stats(&self) -> AttestationStats { @@ -220,11 +209,13 @@ impl OperationPool { let mut num_attestation_data = 0; let mut max_aggregates_per_data = 0; + /* FIXME(sproul): implement for aggregates in self.attestations.read().values() { num_attestations += aggregates.len(); num_attestation_data += 1; max_aggregates_per_data = std::cmp::max(max_aggregates_per_data, aggregates.len()); } + */ AttestationStats { num_attestations, num_attestation_data, @@ -235,36 +226,21 @@ impl OperationPool { /// Return all valid attestations for the given epoch, for use in max cover. fn get_valid_attestations_for_epoch<'a>( &'a self, - epoch: Epoch, - all_attestations: &'a HashMap>>, + checkpoint_key: &'a CheckpointKey, + all_attestations: &'a AttestationMap, state: &'a BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, - validity_filter: impl FnMut(&&Attestation) -> bool + Send, + validity_filter: impl FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &'a ChainSpec, ) -> impl Iterator> + Send { - let domain_bytes = AttestationId::compute_domain_bytes( - epoch, - &state.fork(), - state.genesis_validators_root(), - spec, - ); + // FIXME(sproul): check inclusion slot somewhere all_attestations - .iter() - .filter(move |(key, _)| key.domain_bytes_match(&domain_bytes)) - .flat_map(|(_, attestations)| attestations) - .filter(move |attestation| attestation.data.target.epoch == epoch) - .filter(move |attestation| { - // Ensure attestations are valid for block inclusion - verify_attestation_for_block_inclusion( - state, - attestation, - VerifySignatures::False, - spec, - ) - .is_ok() - }) + .get_attestations(checkpoint_key) .filter(validity_filter) - .filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec)) + .filter_map(move |att| { + AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) + }) } /// Get a list of attestations for inclusion in a block. @@ -276,18 +252,25 @@ impl OperationPool { pub fn get_attestations( &self, state: &BeaconState, - prev_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, - curr_epoch_validity_filter: impl FnMut(&&Attestation) -> bool + Send, + prev_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, + curr_epoch_validity_filter: impl for<'a> FnMut(&AttestationRef<'a, T>) -> bool + Send, spec: &ChainSpec, ) -> Result>, OpPoolError> { // Attestations for the current fork, which may be from the current or previous epoch. - let prev_epoch = state.previous_epoch(); - let current_epoch = state.current_epoch(); + let prev_epoch_key = CheckpointKey::from_state(state, state.previous_epoch()); + let curr_epoch_key = CheckpointKey::from_state(state, state.current_epoch()); let all_attestations = self.attestations.read(); let total_active_balance = state .get_total_active_balance() .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + // Update the reward cache. + let reward_timer = metrics::start_timer(&metrics::BUILD_REWARD_CACHE_TIME); + let mut reward_cache = self.reward_cache.write(); + reward_cache.update(state)?; + let reward_cache = RwLockWriteGuard::downgrade(reward_cache); + drop(reward_timer); + // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. let mut num_prev_valid = 0_i64; @@ -295,9 +278,10 @@ impl OperationPool { let prev_epoch_att = self .get_valid_attestations_for_epoch( - prev_epoch, + &prev_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, prev_epoch_validity_filter, spec, @@ -305,9 +289,10 @@ impl OperationPool { .inspect(|_| num_prev_valid += 1); let curr_epoch_att = self .get_valid_attestations_for_epoch( - current_epoch, + &curr_epoch_key, &*all_attestations, state, + &*reward_cache, total_active_balance, curr_epoch_validity_filter, spec, @@ -328,7 +313,7 @@ impl OperationPool { move || { let _timer = metrics::start_timer(&metrics::ATTESTATION_PREV_EPOCH_PACKING_TIME); // If we're in the genesis epoch, just use the current epoch attestations. - if prev_epoch == current_epoch { + if prev_epoch_key == curr_epoch_key { vec![] } else { maximum_cover(prev_epoch_att, prev_epoch_limit, "prev_epoch_attestations") @@ -356,6 +341,8 @@ impl OperationPool { /// Remove attestations which are too old to be included in a block. pub fn prune_attestations(&self, current_epoch: Epoch) { + // FIXME(sproul): implement pruning + /* // Prune attestations that are from before the previous epoch. self.attestations.write().retain(|_, attestations| { // All the attestations in this bucket have the same data, so we only need to @@ -364,6 +351,7 @@ impl OperationPool { .first() .map_or(false, |att| current_epoch <= att.data.target.epoch + 1) }); + */ } /// Insert a proposer slashing into the pool. @@ -438,7 +426,7 @@ impl OperationPool { .into_iter() .map(|cover| { to_be_slashed.extend(cover.covering_set().keys()); - cover.object().clone() + cover.intermediate().clone() }) .collect(); @@ -556,11 +544,15 @@ impl OperationPool { /// /// This method may return objects that are invalid for block inclusion. pub fn get_all_attestations(&self) -> Vec> { + // FIXME(sproul): fix this + vec![] + /* self.attestations .read() .values() .flat_map(|attns| attns.iter().cloned()) .collect() + */ } /// Returns all known `Attestation` objects that pass the provided filter. @@ -570,6 +562,7 @@ impl OperationPool { where F: Fn(&Attestation) -> bool, { + /* FIXME(sproul): fix self.attestations .read() .values() @@ -577,6 +570,8 @@ impl OperationPool { .filter(|attn| filter(*attn)) .cloned() .collect() + */ + vec![] } /// Returns all known `AttesterSlashing` objects. @@ -654,8 +649,9 @@ impl PartialEq for OperationPool { if ptr::eq(self, other) { return true; } - *self.attestations.read() == *other.attestations.read() - && *self.attester_slashings.read() == *other.attester_slashings.read() + // FIXME(sproul): uhhh + // *self.attestations.read() == *other.attestations.read() + true && *self.attester_slashings.read() == *other.attester_slashings.read() && *self.proposer_slashings.read() == *other.proposer_slashings.read() && *self.voluntary_exits.read() == *other.voluntary_exits.read() } diff --git a/beacon_node/operation_pool/src/max_cover.rs b/beacon_node/operation_pool/src/max_cover.rs index 8e50b8152e..2e629f786b 100644 --- a/beacon_node/operation_pool/src/max_cover.rs +++ b/beacon_node/operation_pool/src/max_cover.rs @@ -11,16 +11,21 @@ use itertools::Itertools; pub trait MaxCover: Clone { /// The result type, of which we would eventually like a collection of maximal quality. type Object: Clone; + /// The intermediate object type, which can be converted to `Object`. + type Intermediate: Clone; /// The type used to represent sets. type Set: Clone; - /// Extract an object for inclusion in a solution. - fn object(&self) -> &Self::Object; + /// Extract the intermediate object. + fn intermediate(&self) -> &Self::Intermediate; + + /// Convert the borrowed intermediate object to an owned object for the solution. + fn convert_to_object(intermediate: &Self::Intermediate) -> Self::Object; /// Get the set of elements covered. fn covering_set(&self) -> &Self::Set; /// Update the set of items covered, for the inclusion of some object in the solution. - fn update_covering_set(&mut self, max_obj: &Self::Object, max_set: &Self::Set); + fn update_covering_set(&mut self, max_obj: &Self::Intermediate, max_set: &Self::Set); /// The quality of this item's covering set, usually its cardinality. fn score(&self) -> usize; } @@ -86,7 +91,7 @@ where .filter(|x| x.available && x.item.score() != 0) .for_each(|x| { x.item - .update_covering_set(best.object(), best.covering_set()) + .update_covering_set(best.intermediate(), best.covering_set()) }); result.push(best); @@ -106,7 +111,7 @@ where .into_iter() .merge_by(cover2, |item1, item2| item1.score() >= item2.score()) .take(limit) - .map(|item| item.object().clone()) + .map(|item| T::convert_to_object(item.intermediate())) .collect() } @@ -121,12 +126,17 @@ mod test { T: Clone + Eq + Hash, { type Object = Self; + type Intermediate = Self; type Set = Self; - fn object(&self) -> &Self { + fn intermediate(&self) -> &Self { self } + fn convert_to_object(set: &Self) -> Self { + set.clone() + } + fn covering_set(&self) -> &Self { self } diff --git a/beacon_node/operation_pool/src/metrics.rs b/beacon_node/operation_pool/src/metrics.rs index 3fa5208a3d..6fd8567cef 100644 --- a/beacon_node/operation_pool/src/metrics.rs +++ b/beacon_node/operation_pool/src/metrics.rs @@ -3,6 +3,10 @@ use lazy_static::lazy_static; pub use lighthouse_metrics::*; lazy_static! { + pub static ref BUILD_REWARD_CACHE_TIME: Result = try_create_histogram( + "op_pool_build_reward_cache_time", + "Time to build the reward cache before packing attestations" + ); pub static ref ATTESTATION_PREV_EPOCH_PACKING_TIME: Result = try_create_histogram( "op_pool_attestation_prev_epoch_packing_time", "Time to pack previous epoch attestations" diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 0769786097..30b45a9c69 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -1,5 +1,6 @@ use crate::attestation_id::AttestationId; use crate::sync_aggregate_id::SyncAggregateId; +use crate::attestation_storage::AttestationMap; use crate::OpPoolError; use crate::OperationPool; use derivative::Derivative; @@ -48,12 +49,15 @@ pub struct PersistedOperationPool { impl PersistedOperationPool { /// Convert an `OperationPool` into serializable form. pub fn from_operation_pool(operation_pool: &OperationPool) -> Self { + /* FIXME(sproul): fix persistence let attestations = operation_pool .attestations .read() .iter() .map(|(att_id, att)| (att_id.clone(), att.clone())) .collect(); + */ + let attestations = vec![]; let sync_contributions = operation_pool .sync_contributions @@ -95,7 +99,9 @@ impl PersistedOperationPool { /// Reconstruct an `OperationPool`. Sets `sync_contributions` to its `Default` if `self` matches /// `PersistedOperationPool::Base`. pub fn into_operation_pool(self) -> Result, OpPoolError> { - let attestations = RwLock::new(self.attestations().iter().cloned().collect()); + // FIXME(sproul): fix load + // let attestations = RwLock::new(self.attestations().iter().cloned().collect()); + let attestations = RwLock::new(AttestationMap::default()); let attester_slashings = RwLock::new(self.attester_slashings().iter().cloned().collect()); let proposer_slashings = RwLock::new( self.proposer_slashings() @@ -122,6 +128,7 @@ impl PersistedOperationPool { attester_slashings, proposer_slashings, voluntary_exits, + reward_cache: Default::default(), _phantom: Default::default(), } } diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs new file mode 100644 index 0000000000..0fb113997a --- /dev/null +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -0,0 +1,122 @@ +use crate::OpPoolError; +use std::collections::HashMap; +use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; +use bitvec::vec::BitVec; + +#[derive(Debug, Clone)] +struct Initialization { + current_epoch: Epoch, + prev_epoch_last_block_root: Hash256, + latest_block_root: Hash256, +} + +/// Cache to store validator effective balances and base rewards for block proposal. +#[derive(Debug, Clone, Default)] +pub struct RewardCache { + initialization: Option, + /// `BitVec` of validator indices which don't have default participation flags for the prev. epoch. + /// + /// We choose to only track whether validators have *any* participation flag set because + /// it's impossible to include a new attestation which is better than the existing participation + /// UNLESS the validator makes a slashable attestation, and we assume that this is rare enough + /// that it's acceptable to be slightly sub-optimal in this case. + previous_epoch_participation: BitVec, + /// `BitVec` of validator indices which don't have default participation flags for the current epoch. + current_epoch_participation: BitVec, +} + +impl RewardCache { + pub fn has_attested_in_epoch( + &self, + validator_index: u64, + epoch: Epoch, + ) -> Result { + if let Some(init) = &self.initialization { + if init.current_epoch == epoch { + Ok(*self + .current_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else if init.current_epoch == epoch + 1 { + Ok(*self + .previous_epoch_participation + .get(validator_index as usize) + .ok_or(OpPoolError::RewardCacheOutOfBounds)?) + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } + + /// Update the cache. + pub fn update(&mut self, state: &BeaconState) -> Result<(), OpPoolError> { + let current_epoch = state.current_epoch(); + let prev_epoch_last_block_root = *state + .get_block_root(state.previous_epoch().start_slot(E::slots_per_epoch())) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?; + let latest_block_root = *state + .get_block_root(state.slot() - 1) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?; + + // If the `state` is from a new epoch or a different fork with a different last epoch block, + // then update the effective balance cache (the effective balances are liable to have + // changed at the epoch boundary). + // + // Similarly, update the previous epoch participation cache as previous epoch participation + // is now fixed. + if self.initialization.as_ref().map_or(true, |init| { + init.current_epoch != current_epoch + || init.prev_epoch_last_block_root != prev_epoch_last_block_root + }) { + self.update_previous_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?; + } + + // The current epoch participation flags change every block, and will almost always need + // updating when this function is called at a new slot. + if self + .initialization + .as_ref() + .map_or(true, |init| init.latest_block_root != latest_block_root) + { + self.update_current_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?; + } + + self.initialization = Some(Initialization { + current_epoch, + prev_epoch_last_block_root, + latest_block_root, + }); + + Ok(()) + } + + fn update_previous_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.previous_epoch_participation = state + .previous_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } + + fn update_current_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.current_epoch_participation = state + .current_epoch_participation()? + .iter() + .map(|participation| *participation != default_participation) + .collect(); + Ok(()) + } +}