diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fc40a64c74..6242bcad49 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2873,21 +2873,19 @@ impl BeaconChain { metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS); let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES); - // Producing a block requires the tree hash cache, so clone a full state corresponding to - // the head from the snapshot cache. Unfortunately we can't move the snapshot out of the - // cache (which would be fast), because we need to re-process the block after it has been - // signed. If we miss the cache or we're producing a block that conflicts with the head, - // fall back to getting the head from `slot - 1`. let state_load_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_LOAD_TIMES); let head_info = self .head_info() .map_err(BlockProductionError::UnableToGetHeadInfo)?; - let (state, state_root_opt) = if head_info.slot < slot { - let state = self - .state_at_slot(slot - 1, StateSkipConfig::WithStateRoots) - .map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?; - - (state, None) + let (state, state_root_opt) = if head_info.slot <= slot { + // Fetch the head state advanced through to `slot`, which should be present in the state + // cache thanks to the state advance timer. + let (state_root, state) = self + .store + .get_advanced_state(head_info.block_root, slot, head_info.state_root) + .map_err(BlockProductionError::FailedToLoadState)? + .ok_or(BlockProductionError::UnableToProduceAtSlot(slot))?; + (state, Some(state_root)) } else { warn!( self.log, diff --git a/beacon_node/beacon_chain/src/block_reward.rs b/beacon_node/beacon_chain/src/block_reward.rs index 83b204113f..bc80847609 100644 --- a/beacon_node/beacon_chain/src/block_reward.rs +++ b/beacon_node/beacon_chain/src/block_reward.rs @@ -1,6 +1,6 @@ use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; use eth2::lighthouse::{AttestationRewards, BlockReward, BlockRewardMeta}; -use operation_pool::{AttMaxCover, MaxCover}; +use operation_pool::{AttMaxCover, MaxCover, RewardCache}; use state_processing::per_block_processing::altair::sync_committee::compute_sync_aggregate_rewards; use types::{BeaconBlockRef, BeaconState, EthSpec, Hash256, RelativeEpoch}; @@ -16,13 +16,15 @@ impl BeaconChain { } let active_indices = state.get_cached_active_validator_indices(RelativeEpoch::Current)?; + let mut reward_cache = RewardCache::default(); + reward_cache.update(state)?; let total_active_balance = state.get_total_balance(active_indices, &self.spec)?; let mut per_attestation_rewards = block .body() .attestations() .iter() .map(|att| { - AttMaxCover::new(att, state, total_active_balance, &self.spec) + AttMaxCover::new(att, state, &reward_cache, total_active_balance, &self.spec) .ok_or(BeaconChainError::BlockRewardAttestationError) }) .collect::, _>>()?; diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 79f7346ca2..42777cad0b 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -209,6 +209,7 @@ pub enum BlockProductionError { TerminalPoWBlockLookupFailed(execution_layer::Error), GetPayloadFailed(execution_layer::Error), FailedToReadFinalizedBlock(store::Error), + FailedToLoadState(store::Error), MissingFinalizedBlock(Hash256), BlockTooLarge(usize), } diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index a9880b2c15..9d9be8ef35 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -1,4 +1,5 @@ use crate::max_cover::MaxCover; +use crate::RewardCache; use state_processing::common::{ altair, base, get_attestation_participation_flag_indices, get_attesting_indices, }; @@ -21,13 +22,14 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { pub fn new( att: &'a Attestation, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { if let BeaconState::Base(ref base_state) = state { Self::new_for_base(att, state, base_state, total_active_balance, spec) } else { - Self::new_for_altair(att, state, total_active_balance, spec) + Self::new_for_altair(att, state, reward_cache, total_active_balance, spec) } } @@ -69,23 +71,18 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { pub fn new_for_altair( att: &'a Attestation, state: &BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, spec: &ChainSpec, ) -> Option { + // FIXME(sproul): could optimise out `get_attesting_indices` and allocations by storing + // these. let committee = state .get_beacon_committee(att.data.slot, att.data.index) .ok()?; let attesting_indices = get_attesting_indices::(committee.committee, &att.aggregation_bits).ok()?; - let participation_list = if att.data.target.epoch == state.current_epoch() { - state.current_epoch_participation().ok()? - } else if att.data.target.epoch == state.previous_epoch() { - state.previous_epoch_participation().ok()? - } else { - return None; - }; - let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?; let att_participation_flags = get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec) @@ -95,9 +92,11 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { .iter() .filter_map(|&index| { let mut proposer_reward_numerator = 0; - let participation = participation_list.get(index)?; + let participation = reward_cache + .get_epoch_participation(index, att.data.target.epoch) + .ok()??; - let effective_balance = state.get_effective_balance(index).ok()?; + let effective_balance = reward_cache.get_effective_balance(index)?; let base_reward = altair::get_base_reward(effective_balance, total_active_balance, spec).ok()?; diff --git a/beacon_node/operation_pool/src/lib.rs b/beacon_node/operation_pool/src/lib.rs index eef09631eb..4135f1ae81 100644 --- a/beacon_node/operation_pool/src/lib.rs +++ b/beacon_node/operation_pool/src/lib.rs @@ -4,6 +4,7 @@ mod attester_slashing; mod max_cover; mod metrics; mod persistence; +mod reward_cache; mod sync_aggregate_id; pub use attestation::AttMaxCover; @@ -11,12 +12,13 @@ pub use max_cover::MaxCover; pub use persistence::{ PersistedOperationPool, PersistedOperationPoolAltair, PersistedOperationPoolBase, }; +pub use reward_cache::RewardCache; use crate::sync_aggregate_id::SyncAggregateId; use attestation_id::AttestationId; use attester_slashing::AttesterSlashingMaxCover; use max_cover::maximum_cover; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockWriteGuard}; use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::per_block_processing::{ get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit, @@ -47,6 +49,8 @@ pub struct OperationPool { proposer_slashings: RwLock>, /// Map from exiting validator to their exit data. voluntary_exits: RwLock>, + /// Reward cache for accelerating attestation packing. + reward_cache: RwLock, _phantom: PhantomData, } @@ -55,6 +59,11 @@ pub enum OpPoolError { GetAttestationsTotalBalanceError(BeaconStateError), GetBlockRootError(BeaconStateError), SyncAggregateError(SyncAggregateError), + RewardCacheUpdatePrevEpoch(BeaconStateError), + RewardCacheUpdateCurrEpoch(BeaconStateError), + RewardCacheGetBlockRoot(BeaconStateError), + RewardCacheWrongEpoch, + RewardCacheValidatorUnknown(BeaconStateError), IncorrectOpPoolVariant, } @@ -240,6 +249,7 @@ impl OperationPool { epoch: Epoch, all_attestations: &'a HashMap>>, state: &'a BeaconState, + reward_cache: &'a RewardCache, total_active_balance: u64, validity_filter: impl FnMut(&&Attestation) -> bool + Send, spec: &'a ChainSpec, @@ -266,7 +276,9 @@ impl OperationPool { .is_ok() }) .filter(validity_filter) - .filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec)) + .filter_map(move |att| { + AttMaxCover::new(att, state, reward_cache, total_active_balance, spec) + }) } /// Get a list of attestations for inclusion in a block. @@ -290,6 +302,11 @@ impl OperationPool { .get_total_active_balance() .map_err(OpPoolError::GetAttestationsTotalBalanceError)?; + // Update the reward cache. + let mut reward_cache = self.reward_cache.write(); + reward_cache.update(state)?; + let reward_cache = RwLockWriteGuard::downgrade(reward_cache); + // Split attestations for the previous & current epochs, so that we // can optimise them individually in parallel. let mut num_prev_valid = 0_i64; @@ -300,6 +317,7 @@ impl OperationPool { prev_epoch, &*all_attestations, state, + &reward_cache, total_active_balance, prev_epoch_validity_filter, spec, @@ -310,6 +328,7 @@ impl OperationPool { current_epoch, &*all_attestations, state, + &reward_cache, total_active_balance, curr_epoch_validity_filter, spec, diff --git a/beacon_node/operation_pool/src/persistence.rs b/beacon_node/operation_pool/src/persistence.rs index 18749a5812..d79e38fee3 100644 --- a/beacon_node/operation_pool/src/persistence.rs +++ b/beacon_node/operation_pool/src/persistence.rs @@ -120,6 +120,7 @@ impl PersistedOperationPool { attester_slashings, proposer_slashings, voluntary_exits, + reward_cache: Default::default(), _phantom: Default::default(), }, PersistedOperationPool::Altair(_) => { @@ -132,6 +133,7 @@ impl PersistedOperationPool { attester_slashings, proposer_slashings, voluntary_exits, + reward_cache: Default::default(), _phantom: Default::default(), } } diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs new file mode 100644 index 0000000000..a2ec77c389 --- /dev/null +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -0,0 +1,146 @@ +use crate::OpPoolError; +use std::collections::HashMap; +use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags}; + +#[derive(Debug, Clone)] +struct Initialization { + current_epoch: Epoch, + prev_epoch_last_block_root: Hash256, + latest_block_root: Hash256, +} + +/// Cache to store validator effective balances and base rewards for block proposal. +#[derive(Debug, Clone, Default)] +pub struct RewardCache { + initialization: Option, + /// Map from validator index to `effective_balance`. + effective_balances: HashMap, + /// Map from validator index to participation flags for the previous epoch. + /// + /// Validators with non-zero participation for the previous epoch are omitted from this map + /// in order to keep its memory-usage as small as possible. + /// + // FIXME(sproul): choose between handling slashable attestations (keep all non-complete) and + // memory efficiency (keep all zero). + // FIXME(sproul): choose whether to filter inactive validators + previous_epoch_participation: HashMap, + /// Map from validator index to participation flags for the current epoch. + /// + /// Validators with complete participation for the current epoch are omitted from this map + /// in order to keep its memory-usage as small as possible. + current_epoch_participation: HashMap, +} + +impl RewardCache { + pub fn get_effective_balance(&self, validator_index: usize) -> Option { + self.effective_balances.get(&validator_index).copied() + } + + pub fn get_epoch_participation( + &self, + validator_index: usize, + epoch: Epoch, + ) -> Result, OpPoolError> { + if let Some(init) = &self.initialization { + if init.current_epoch == epoch { + Ok(self + .current_epoch_participation + .get(&validator_index) + .copied()) + } else if init.current_epoch == epoch + 1 { + Ok(self + .previous_epoch_participation + .get(&validator_index) + .copied()) + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } else { + Err(OpPoolError::RewardCacheWrongEpoch) + } + } + + /// Update the cache. + pub fn update(&mut self, state: &BeaconState) -> Result<(), OpPoolError> { + let current_epoch = state.current_epoch(); + let prev_epoch_last_block_root = *state + .get_block_root(state.previous_epoch().start_slot(E::slots_per_epoch())) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?; + let latest_block_root = *state + .get_block_root(state.slot() - 1) + .map_err(OpPoolError::RewardCacheGetBlockRoot)?; + + // If the `state` is from a new epoch or a different fork with a different last epoch block, + // then update the effective balance cache (the effective balances are liable to have + // changed at the epoch boundary). + // + // Similarly, update the previous epoch participation cache as previous epoch participation + // is now fixed. + if self.initialization.as_ref().map_or(true, |init| { + init.current_epoch != current_epoch + || init.prev_epoch_last_block_root != prev_epoch_last_block_root + }) { + self.update_effective_balances(state); + self.update_previous_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?; + } + + // The current epoch participation flags change every block, and will almost always need + // updating when this function is called at a new slot. + if self + .initialization + .as_ref() + .map_or(true, |init| init.latest_block_root != latest_block_root) + { + self.update_current_epoch_participation(state) + .map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?; + } + + self.initialization = Some(Initialization { + current_epoch, + prev_epoch_last_block_root, + latest_block_root, + }); + + Ok(()) + } + + fn update_effective_balances(&mut self, state: &BeaconState) { + self.effective_balances = state + .validators() + .iter() + .enumerate() + .map(|(i, val)| (i, val.effective_balance)) + .collect(); + } + + fn update_previous_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.previous_epoch_participation = state + .previous_epoch_participation()? + .iter() + .copied() + .enumerate() + .filter(|(_, participation)| *participation == default_participation) + .collect(); + Ok(()) + } + + fn update_current_epoch_participation( + &mut self, + state: &BeaconState, + ) -> Result<(), BeaconStateError> { + let default_participation = ParticipationFlags::default(); + self.current_epoch_participation = state + .current_epoch_participation()? + .iter() + .copied() + .enumerate() + .filter(|(_, participation)| *participation == default_participation) + .collect(); + Ok(()) + } +}