diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 60f08abe14..980540583e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1462,6 +1462,15 @@ fn load_parent( BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; + if block.slot() != state.slot() { + slog::warn!( + chain.log, + "Parent state is not advanced"; + "block_slot" => block.slot(), + "state_slot" => state.slot(), + ); + } + let beacon_state_root = if parent_state_root == advanced_state_root { Some(parent_state_root) } else { diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 096b07758b..cdbb7a88f4 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -49,8 +49,8 @@ enum Error { block_root: Hash256, }, BadStateSlot { - state_slot: Slot, - current_slot: Slot, + _state_slot: Slot, + _current_slot: Slot, }, } @@ -215,18 +215,18 @@ fn advance_head( .get_advanced_state(head_block_root, current_slot, head_info.state_root)? .ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?; - if state.slot() == current_slot { + if state.slot() == current_slot + 1 { return Err(Error::StateAlreadyAdvanced { block_root: head_block_root, }); - } else if state.slot() + 1 != current_slot { + } else if state.slot() != current_slot { // Protect against advancing a state more than a single slot. // // Advancing more than one slot without storing the intermediate state would corrupt the // database. Future works might store temporary, intermediate states inside this function. return Err(Error::BadStateSlot { - state_slot: state.slot(), - current_slot: current_slot, + _state_slot: state.slot(), + _current_slot: current_slot, }); } diff --git a/beacon_node/operation_pool/src/attestation.rs b/beacon_node/operation_pool/src/attestation.rs index 11537e6ec3..a9880b2c15 100644 --- a/beacon_node/operation_pool/src/attestation.rs +++ b/beacon_node/operation_pool/src/attestation.rs @@ -97,8 +97,9 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> { let mut proposer_reward_numerator = 0; let participation = participation_list.get(index)?; + let effective_balance = state.get_effective_balance(index).ok()?; let base_reward = - altair::get_base_reward(state, index, total_active_balance, spec).ok()?; + altair::get_base_reward(effective_balance, total_active_balance, spec).ok()?; for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() { if att_participation_flags.contains(&flag_index) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 22229d807d..6e18213898 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -717,7 +717,10 @@ impl, Cold: ItemStore> HotColdDB if *state_root == self.get_split_info().state_root { let mut state = get_full_state(&self.hot_db, state_root, &self.spec)? .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; - state.apply_pending_mutations()?; + + // Do a tree hash here so that the cache is fully built. + state.update_tree_hash_cache()?; + let latest_block_root = state.get_latest_block_root(*state_root); return Ok(Some((state, latest_block_root))); } @@ -750,7 +753,7 @@ impl, Cold: ItemStore> HotColdDB let state_root_iter = state_roots.into_iter().map(Ok); let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?; - state.apply_pending_mutations()?; + state.update_tree_hash_cache()?; Ok(Some((state, latest_block_root))) } else { diff --git a/consensus/state_processing/src/common/altair.rs b/consensus/state_processing/src/common/altair.rs index 6cf80bdd9e..d0f1fca552 100644 --- a/consensus/state_processing/src/common/altair.rs +++ b/consensus/state_processing/src/common/altair.rs @@ -5,15 +5,13 @@ use types::*; /// Returns the base reward for some validator. /// /// Spec v1.1.0 -pub fn get_base_reward( - state: &BeaconState, - index: usize, +pub fn get_base_reward( + validator_effective_balance: u64, // Should be == get_total_active_balance(state, spec) total_active_balance: u64, spec: &ChainSpec, ) -> Result { - state - .get_effective_balance(index)? + validator_effective_balance .safe_div(spec.effective_balance_increment)? .safe_mul(get_base_reward_per_increment(total_active_balance, spec)?) .map_err(Into::into) diff --git a/consensus/state_processing/src/common/mod.rs b/consensus/state_processing/src/common/mod.rs index 334a293ed5..3d459f8e9b 100644 --- a/consensus/state_processing/src/common/mod.rs +++ b/consensus/state_processing/src/common/mod.rs @@ -24,8 +24,7 @@ pub fn increase_balance( index: usize, delta: u64, ) -> Result<(), BeaconStateError> { - state.get_balance_mut(index)?.safe_add_assign(delta)?; - Ok(()) + increase_balance_directly(state.get_balance_mut(index)?, delta) } /// Decrease the balance of a validator, saturating upon overflow, as per the spec. @@ -34,7 +33,17 @@ pub fn decrease_balance( index: usize, delta: u64, ) -> Result<(), BeaconStateError> { - let balance = state.get_balance_mut(index)?; + decrease_balance_directly(state.get_balance_mut(index)?, delta) +} + +/// Increase the balance of a validator, erroring upon overflow, as per the spec. +pub fn increase_balance_directly(balance: &mut u64, delta: u64) -> Result<(), BeaconStateError> { + balance.safe_add_assign(delta)?; + Ok(()) +} + +/// Decrease the balance of a validator, saturating upon overflow, as per the spec. +pub fn decrease_balance_directly(balance: &mut u64, delta: u64) -> Result<(), BeaconStateError> { *balance = balance.saturating_sub(delta); Ok(()) } diff --git a/consensus/state_processing/src/per_block_processing/process_operations.rs b/consensus/state_processing/src/per_block_processing/process_operations.rs index 0cdf54a6c8..fefdb84a06 100644 --- a/consensus/state_processing/src/per_block_processing/process_operations.rs +++ b/consensus/state_processing/src/per_block_processing/process_operations.rs @@ -141,9 +141,11 @@ pub mod altair { if participation_flag_indices.contains(&flag_index) && !validator_participation.has_flag(flag_index)? { + // FIXME(sproul): add effective balance cache here? validator_participation.add_flag(flag_index)?; + let effective_balance = state.get_validator(index)?.effective_balance; proposer_reward_numerator.safe_add_assign( - get_base_reward(state, index, total_active_balance, spec)? + get_base_reward(effective_balance, total_active_balance, spec)? .safe_mul(weight)?, )?; } diff --git a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs index 038fe77044..d637e84676 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/inactivity_updates.rs @@ -1,9 +1,8 @@ use super::ParticipationCache; use crate::EpochProcessingError; -use core::result::Result; -use core::result::Result::Ok; use safe_arith::SafeArith; use std::cmp::min; +use std::cmp::Ordering; use types::beacon_state::BeaconState; use types::chain_spec::ChainSpec; use types::consts::altair::TIMELY_TARGET_FLAG_INDEX; @@ -18,23 +17,54 @@ pub fn process_inactivity_updates( if state.current_epoch() == T::genesis_epoch() { return Ok(()); } + let is_in_inactivity_leak = state.is_in_inactivity_leak(spec); let unslashed_indices = participation_cache .get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, state.previous_epoch())?; - for &index in participation_cache.eligible_validator_indices() { + let mut eligible_validators_iter = participation_cache + .eligible_validator_indices() + .iter() + .peekable(); + + // FIXME(sproul): this is a really ugly hack + let mut is_eligible = |index: usize| -> bool { + while let Some(&eligible_index) = eligible_validators_iter.peek() { + match eligible_index.cmp(&index) { + // Should visit every + Ordering::Less => { + unreachable!("should have already visited {}", eligible_index) + } + Ordering::Equal => { + eligible_validators_iter.next(); + return true; + } + Ordering::Greater => { + return false; + } + } + } + false + }; + + let mut inactivity_scores = state.inactivity_scores_mut()?.iter_cow(); + + while let Some((index, inactivity_score)) = inactivity_scores.next_cow() { + if !is_eligible(index) { + continue; + } + + let inactivity_score = inactivity_score.to_mut(); + // Increase inactivity score of inactive validators if unslashed_indices.contains(index)? { - let inactivity_score = state.get_inactivity_score_mut(index)?; inactivity_score.safe_sub_assign(min(1, *inactivity_score))?; } else { - state - .get_inactivity_score_mut(index)? - .safe_add_assign(spec.inactivity_score_bias)?; + inactivity_score.safe_add_assign(spec.inactivity_score_bias)?; } + // Decrease the score of all validators for forgiveness when not during a leak - if !state.is_in_inactivity_leak(spec) { - let inactivity_score = state.get_inactivity_score_mut(index)?; + if !is_in_inactivity_leak { inactivity_score .safe_sub_assign(min(spec.inactivity_score_recovery_rate, *inactivity_score))?; } diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs index 47105cff15..17d7673703 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs @@ -25,6 +25,7 @@ use types::{ #[derive(Debug, PartialEq)] pub enum Error { InvalidFlagIndex(usize), + MissingEffectiveBalance(usize), } /// A balance which will never be below the specified `minimum`. @@ -122,6 +123,7 @@ impl SingleEpochParticipationCache { &mut self, val_index: usize, validator: &Validator, + epoch_participation: &ParticipationFlags, state: &BeaconState, relative_epoch: RelativeEpoch, ) -> Result<(), BeaconStateError> { @@ -131,14 +133,6 @@ impl SingleEpochParticipationCache { return Err(BeaconStateError::ValidatorIsInactive { val_index }); } - let epoch_participation = match relative_epoch { - RelativeEpoch::Current => state.current_epoch_participation(), - RelativeEpoch::Previous => state.previous_epoch_participation(), - _ => Err(BeaconStateError::EpochOutOfBounds), - }? - .get(val_index) - .ok_or(BeaconStateError::ParticipationOutOfBounds(val_index))?; - // All active validators increase the total active balance. self.total_active_balance .safe_add_assign(validator.effective_balance)?; @@ -173,6 +167,8 @@ pub struct ParticipationCache { previous_epoch: Epoch, /// Caches information about active validators pertaining to `self.previous_epoch`. previous_epoch_participation: SingleEpochParticipationCache, + /// Caches validator effective balances from the start of `process_epoch`. + effective_balances: HashMap, /// Caches the result of the `get_eligible_validator_indices` function. eligible_indices: Vec, } @@ -203,6 +199,9 @@ impl ParticipationCache { SingleEpochParticipationCache::new(num_current_epoch_active_vals, spec); let mut previous_epoch_participation = SingleEpochParticipationCache::new(num_previous_epoch_active_vals, spec); + + let mut effective_balances = HashMap::with_capacity(num_current_epoch_active_vals); + // Contains the set of validators which are either: // // - Active in the previous epoch. @@ -219,11 +218,18 @@ impl ParticipationCache { // // Care is taken to ensure that the ordering of `eligible_indices` is the same as the // `get_eligible_validator_indices` function in the spec. - for (val_index, val) in state.validators().iter().enumerate() { + let iter = state + .validators() + .iter() + .zip(state.current_epoch_participation()?) + .zip(state.previous_epoch_participation()?) + .enumerate(); + for (val_index, ((val, curr_epoch_flags), prev_epoch_flags)) in iter { if val.is_active_at(current_epoch) { current_epoch_participation.process_active_validator( val_index, val, + curr_epoch_flags, state, RelativeEpoch::Current, )?; @@ -233,6 +239,7 @@ impl ParticipationCache { previous_epoch_participation.process_active_validator( val_index, val, + prev_epoch_flags, state, RelativeEpoch::Previous, )?; @@ -240,8 +247,9 @@ impl ParticipationCache { // Note: a validator might still be "eligible" whilst returning `false` to // `Validator::is_active_at`. - if state.is_eligible_validator(val_index)? { - eligible_indices.push(val_index) + if state.is_eligible_validator(val) { + eligible_indices.push(val_index); + effective_balances.insert(val_index, val.effective_balance); } } @@ -250,6 +258,7 @@ impl ParticipationCache { current_epoch_participation, previous_epoch, previous_epoch_participation, + effective_balances, eligible_indices, }) } @@ -327,6 +336,13 @@ impl ParticipationCache { .contains_key(&val_index) } + pub fn get_effective_balance(&self, val_index: usize) -> Result { + self.effective_balances + .get(&val_index) + .copied() + .ok_or(Error::MissingEffectiveBalance(val_index)) + } + /* * Flags */ diff --git a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs index defc1d572c..20bab811bd 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/rewards_and_penalties.rs @@ -4,9 +4,11 @@ use types::consts::altair::{ PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX, WEIGHT_DENOMINATOR, }; -use types::{BeaconState, ChainSpec, EthSpec}; +use types::{BeaconState, BeaconStateError, ChainSpec, EthSpec}; -use crate::common::{altair::get_base_reward, decrease_balance, increase_balance}; +use crate::common::{ + altair::get_base_reward, decrease_balance_directly, increase_balance_directly, +}; use crate::per_epoch_processing::{Delta, Error}; /// Apply attester and proposer rewards. @@ -40,9 +42,20 @@ pub fn process_rewards_and_penalties( // Apply the deltas, erroring on overflow above but not on overflow below (saturating at 0 // instead). - for (i, delta) in deltas.into_iter().enumerate() { - increase_balance(state, i, delta.rewards)?; - decrease_balance(state, i, delta.penalties)?; + let mut balances = state.balances_mut().iter_cow(); + + while let Some((i, balance)) = balances.next_cow() { + let delta = deltas + .get(i) + .ok_or(BeaconStateError::BalancesOutOfBounds(i))?; + + if delta.rewards == 0 && delta.penalties == 0 { + continue; + } + + let balance = balance.to_mut(); + increase_balance_directly(balance, delta.rewards)?; + decrease_balance_directly(balance, delta.penalties)?; } Ok(()) @@ -69,8 +82,11 @@ pub fn get_flag_index_deltas( let active_increments = total_active_balance.safe_div(spec.effective_balance_increment)?; for &index in participation_cache.eligible_validator_indices() { - // FIXME(sproul): compute base reward in participation cache - let base_reward = get_base_reward(state, index, total_active_balance, spec)?; + let base_reward = get_base_reward( + participation_cache.get_effective_balance(index as usize)?, + total_active_balance, + spec, + )?; let mut delta = Delta::default(); if unslashed_participating_indices.contains(index as usize)? { @@ -114,9 +130,8 @@ pub fn get_inactivity_penalty_deltas( let mut delta = Delta::default(); if !matching_target_indices.contains(index)? { - let penalty_numerator = state - .get_validator(index)? - .effective_balance + let penalty_numerator = participation_cache + .get_effective_balance(index)? .safe_mul(state.get_inactivity_score(index)?)?; let penalty_denominator = spec .inactivity_score_bias diff --git a/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs b/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs index 2c1ef6178e..c31e03ba6f 100644 --- a/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs +++ b/consensus/state_processing/src/per_epoch_processing/base/rewards_and_penalties.rs @@ -93,7 +93,9 @@ pub fn get_attestation_deltas( // `get_inclusion_delay_deltas`. It's safe to do so here because any validator that is in // the unslashed indices of the matching source attestations is active, and therefore // eligible. - if !state.is_eligible_validator(index)? { + // FIXME(sproul): this is inefficient + let full_validator = state.get_validator(index)?; + if !state.is_eligible_validator(full_validator) { continue; } diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index a8b14343f1..ffed75e98b 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1729,12 +1729,10 @@ impl BeaconState { self.clone_with(CloneConfig::committee_caches_only()) } - pub fn is_eligible_validator(&self, val_index: usize) -> Result { + pub fn is_eligible_validator(&self, val: &Validator) -> bool { let previous_epoch = self.previous_epoch(); - self.get_validator(val_index).map(|val| { - val.is_active_at(previous_epoch) - || (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch) - }) + val.is_active_at(previous_epoch) + || (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch) } pub fn is_in_inactivity_leak(&self, spec: &ChainSpec) -> bool { diff --git a/lcli/src/transition_blocks.rs b/lcli/src/transition_blocks.rs index 3f539e49f7..fdfce39171 100644 --- a/lcli/src/transition_blocks.rs +++ b/lcli/src/transition_blocks.rs @@ -43,7 +43,9 @@ pub fn run_transition_blocks( let block: SignedBeaconBlock = load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)?; + let t = std::time::Instant::now(); let post_state = do_transition(pre_state.clone(), block, spec)?; + println!("Total transition time: {}ms", t.elapsed().as_millis()); let mut output_file = File::create(output_path).map_err(|e| format!("Unable to create output file: {:?}", e))?; @@ -67,25 +69,47 @@ fn do_transition( .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; + let t = std::time::Instant::now(); + pre_state + .update_tree_hash_cache() + .map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?; + println!("Initial tree hash: {}ms", t.elapsed().as_millis()); + // Transition the parent state to the block slot. + let t = std::time::Instant::now(); for i in pre_state.slot().as_u64()..block.slot().as_u64() { per_slot_processing(&mut pre_state, None, spec) .map_err(|e| format!("Failed to advance slot on iteration {}: {:?}", i, e))?; } + println!("Slot processing: {}ms", t.elapsed().as_millis()); + + let t = std::time::Instant::now(); + pre_state + .update_tree_hash_cache() + .map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?; + println!("Pre-block tree hash: {}ms", t.elapsed().as_millis()); pre_state .build_all_caches(spec) .map_err(|e| format!("Unable to build caches: {:?}", e))?; + let t = std::time::Instant::now(); per_block_processing( &mut pre_state, &block, None, - BlockSignatureStrategy::VerifyIndividual, + BlockSignatureStrategy::VerifyBulk, VerifyBlockRoot::True, spec, ) .map_err(|e| format!("State transition failed: {:?}", e))?; + println!("Process block: {}ms", t.elapsed().as_millis()); + + let t = std::time::Instant::now(); + pre_state + .update_tree_hash_cache() + .map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?; + println!("Post-block tree hash: {}ms", t.elapsed().as_millis()); Ok(pre_state) } @@ -100,5 +124,13 @@ pub fn load_from_ssz_with( let mut bytes = vec![]; file.read_to_end(&mut bytes) .map_err(|e| format!("Unable to read from file {:?}: {:?}", path, e))?; - decoder(&bytes, spec).map_err(|e| format!("Ssz decode failed: {:?}", e)) + + let t = std::time::Instant::now(); + let result = decoder(&bytes, spec).map_err(|e| format!("Ssz decode failed: {:?}", e)); + println!( + "SSZ decoding {}: {}ms", + path.display(), + t.elapsed().as_millis() + ); + result }