mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-24 16:28:23 +00:00
Optimisations and bug fixes for state advance
This commit is reasonably performant on Prater!
This commit is contained in:
@@ -1462,6 +1462,15 @@ fn load_parent<T: BeaconChainTypes>(
|
||||
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
|
||||
})?;
|
||||
|
||||
if block.slot() != state.slot() {
|
||||
slog::warn!(
|
||||
chain.log,
|
||||
"Parent state is not advanced";
|
||||
"block_slot" => block.slot(),
|
||||
"state_slot" => state.slot(),
|
||||
);
|
||||
}
|
||||
|
||||
let beacon_state_root = if parent_state_root == advanced_state_root {
|
||||
Some(parent_state_root)
|
||||
} else {
|
||||
|
||||
@@ -49,8 +49,8 @@ enum Error {
|
||||
block_root: Hash256,
|
||||
},
|
||||
BadStateSlot {
|
||||
state_slot: Slot,
|
||||
current_slot: Slot,
|
||||
_state_slot: Slot,
|
||||
_current_slot: Slot,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -215,18 +215,18 @@ fn advance_head<T: BeaconChainTypes>(
|
||||
.get_advanced_state(head_block_root, current_slot, head_info.state_root)?
|
||||
.ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?;
|
||||
|
||||
if state.slot() == current_slot {
|
||||
if state.slot() == current_slot + 1 {
|
||||
return Err(Error::StateAlreadyAdvanced {
|
||||
block_root: head_block_root,
|
||||
});
|
||||
} else if state.slot() + 1 != current_slot {
|
||||
} else if state.slot() != current_slot {
|
||||
// Protect against advancing a state more than a single slot.
|
||||
//
|
||||
// Advancing more than one slot without storing the intermediate state would corrupt the
|
||||
// database. Future works might store temporary, intermediate states inside this function.
|
||||
return Err(Error::BadStateSlot {
|
||||
state_slot: state.slot(),
|
||||
current_slot: current_slot,
|
||||
_state_slot: state.slot(),
|
||||
_current_slot: current_slot,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -97,8 +97,9 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
let mut proposer_reward_numerator = 0;
|
||||
let participation = participation_list.get(index)?;
|
||||
|
||||
let effective_balance = state.get_effective_balance(index).ok()?;
|
||||
let base_reward =
|
||||
altair::get_base_reward(state, index, total_active_balance, spec).ok()?;
|
||||
altair::get_base_reward(effective_balance, total_active_balance, spec).ok()?;
|
||||
|
||||
for (flag_index, weight) in PARTICIPATION_FLAG_WEIGHTS.iter().enumerate() {
|
||||
if att_participation_flags.contains(&flag_index)
|
||||
|
||||
@@ -717,7 +717,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
if *state_root == self.get_split_info().state_root {
|
||||
let mut state = get_full_state(&self.hot_db, state_root, &self.spec)?
|
||||
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
|
||||
state.apply_pending_mutations()?;
|
||||
|
||||
// Do a tree hash here so that the cache is fully built.
|
||||
state.update_tree_hash_cache()?;
|
||||
|
||||
let latest_block_root = state.get_latest_block_root(*state_root);
|
||||
return Ok(Some((state, latest_block_root)));
|
||||
}
|
||||
@@ -750,7 +753,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
let state_root_iter = state_roots.into_iter().map(Ok);
|
||||
|
||||
let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?;
|
||||
state.apply_pending_mutations()?;
|
||||
state.update_tree_hash_cache()?;
|
||||
|
||||
Ok(Some((state, latest_block_root)))
|
||||
} else {
|
||||
|
||||
@@ -5,15 +5,13 @@ use types::*;
|
||||
/// Returns the base reward for some validator.
|
||||
///
|
||||
/// Spec v1.1.0
|
||||
pub fn get_base_reward<T: EthSpec>(
|
||||
state: &BeaconState<T>,
|
||||
index: usize,
|
||||
pub fn get_base_reward(
|
||||
validator_effective_balance: u64,
|
||||
// Should be == get_total_active_balance(state, spec)
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Result<u64, Error> {
|
||||
state
|
||||
.get_effective_balance(index)?
|
||||
validator_effective_balance
|
||||
.safe_div(spec.effective_balance_increment)?
|
||||
.safe_mul(get_base_reward_per_increment(total_active_balance, spec)?)
|
||||
.map_err(Into::into)
|
||||
|
||||
@@ -24,8 +24,7 @@ pub fn increase_balance<E: EthSpec>(
|
||||
index: usize,
|
||||
delta: u64,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
state.get_balance_mut(index)?.safe_add_assign(delta)?;
|
||||
Ok(())
|
||||
increase_balance_directly(state.get_balance_mut(index)?, delta)
|
||||
}
|
||||
|
||||
/// Decrease the balance of a validator, saturating upon overflow, as per the spec.
|
||||
@@ -34,7 +33,17 @@ pub fn decrease_balance<E: EthSpec>(
|
||||
index: usize,
|
||||
delta: u64,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let balance = state.get_balance_mut(index)?;
|
||||
decrease_balance_directly(state.get_balance_mut(index)?, delta)
|
||||
}
|
||||
|
||||
/// Increase the balance of a validator, erroring upon overflow, as per the spec.
|
||||
pub fn increase_balance_directly(balance: &mut u64, delta: u64) -> Result<(), BeaconStateError> {
|
||||
balance.safe_add_assign(delta)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Decrease the balance of a validator, saturating upon overflow, as per the spec.
|
||||
pub fn decrease_balance_directly(balance: &mut u64, delta: u64) -> Result<(), BeaconStateError> {
|
||||
*balance = balance.saturating_sub(delta);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -141,9 +141,11 @@ pub mod altair {
|
||||
if participation_flag_indices.contains(&flag_index)
|
||||
&& !validator_participation.has_flag(flag_index)?
|
||||
{
|
||||
// FIXME(sproul): add effective balance cache here?
|
||||
validator_participation.add_flag(flag_index)?;
|
||||
let effective_balance = state.get_validator(index)?.effective_balance;
|
||||
proposer_reward_numerator.safe_add_assign(
|
||||
get_base_reward(state, index, total_active_balance, spec)?
|
||||
get_base_reward(effective_balance, total_active_balance, spec)?
|
||||
.safe_mul(weight)?,
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use super::ParticipationCache;
|
||||
use crate::EpochProcessingError;
|
||||
use core::result::Result;
|
||||
use core::result::Result::Ok;
|
||||
use safe_arith::SafeArith;
|
||||
use std::cmp::min;
|
||||
use std::cmp::Ordering;
|
||||
use types::beacon_state::BeaconState;
|
||||
use types::chain_spec::ChainSpec;
|
||||
use types::consts::altair::TIMELY_TARGET_FLAG_INDEX;
|
||||
@@ -18,23 +17,54 @@ pub fn process_inactivity_updates<T: EthSpec>(
|
||||
if state.current_epoch() == T::genesis_epoch() {
|
||||
return Ok(());
|
||||
}
|
||||
let is_in_inactivity_leak = state.is_in_inactivity_leak(spec);
|
||||
|
||||
let unslashed_indices = participation_cache
|
||||
.get_unslashed_participating_indices(TIMELY_TARGET_FLAG_INDEX, state.previous_epoch())?;
|
||||
|
||||
for &index in participation_cache.eligible_validator_indices() {
|
||||
let mut eligible_validators_iter = participation_cache
|
||||
.eligible_validator_indices()
|
||||
.iter()
|
||||
.peekable();
|
||||
|
||||
// FIXME(sproul): this is a really ugly hack
|
||||
let mut is_eligible = |index: usize| -> bool {
|
||||
while let Some(&eligible_index) = eligible_validators_iter.peek() {
|
||||
match eligible_index.cmp(&index) {
|
||||
// Should visit every
|
||||
Ordering::Less => {
|
||||
unreachable!("should have already visited {}", eligible_index)
|
||||
}
|
||||
Ordering::Equal => {
|
||||
eligible_validators_iter.next();
|
||||
return true;
|
||||
}
|
||||
Ordering::Greater => {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
};
|
||||
|
||||
let mut inactivity_scores = state.inactivity_scores_mut()?.iter_cow();
|
||||
|
||||
while let Some((index, inactivity_score)) = inactivity_scores.next_cow() {
|
||||
if !is_eligible(index) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let inactivity_score = inactivity_score.to_mut();
|
||||
|
||||
// Increase inactivity score of inactive validators
|
||||
if unslashed_indices.contains(index)? {
|
||||
let inactivity_score = state.get_inactivity_score_mut(index)?;
|
||||
inactivity_score.safe_sub_assign(min(1, *inactivity_score))?;
|
||||
} else {
|
||||
state
|
||||
.get_inactivity_score_mut(index)?
|
||||
.safe_add_assign(spec.inactivity_score_bias)?;
|
||||
inactivity_score.safe_add_assign(spec.inactivity_score_bias)?;
|
||||
}
|
||||
|
||||
// Decrease the score of all validators for forgiveness when not during a leak
|
||||
if !state.is_in_inactivity_leak(spec) {
|
||||
let inactivity_score = state.get_inactivity_score_mut(index)?;
|
||||
if !is_in_inactivity_leak {
|
||||
inactivity_score
|
||||
.safe_sub_assign(min(spec.inactivity_score_recovery_rate, *inactivity_score))?;
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ use types::{
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
InvalidFlagIndex(usize),
|
||||
MissingEffectiveBalance(usize),
|
||||
}
|
||||
|
||||
/// A balance which will never be below the specified `minimum`.
|
||||
@@ -122,6 +123,7 @@ impl SingleEpochParticipationCache {
|
||||
&mut self,
|
||||
val_index: usize,
|
||||
validator: &Validator,
|
||||
epoch_participation: &ParticipationFlags,
|
||||
state: &BeaconState<T>,
|
||||
relative_epoch: RelativeEpoch,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
@@ -131,14 +133,6 @@ impl SingleEpochParticipationCache {
|
||||
return Err(BeaconStateError::ValidatorIsInactive { val_index });
|
||||
}
|
||||
|
||||
let epoch_participation = match relative_epoch {
|
||||
RelativeEpoch::Current => state.current_epoch_participation(),
|
||||
RelativeEpoch::Previous => state.previous_epoch_participation(),
|
||||
_ => Err(BeaconStateError::EpochOutOfBounds),
|
||||
}?
|
||||
.get(val_index)
|
||||
.ok_or(BeaconStateError::ParticipationOutOfBounds(val_index))?;
|
||||
|
||||
// All active validators increase the total active balance.
|
||||
self.total_active_balance
|
||||
.safe_add_assign(validator.effective_balance)?;
|
||||
@@ -173,6 +167,8 @@ pub struct ParticipationCache {
|
||||
previous_epoch: Epoch,
|
||||
/// Caches information about active validators pertaining to `self.previous_epoch`.
|
||||
previous_epoch_participation: SingleEpochParticipationCache,
|
||||
/// Caches validator effective balances from the start of `process_epoch`.
|
||||
effective_balances: HashMap<usize, u64>,
|
||||
/// Caches the result of the `get_eligible_validator_indices` function.
|
||||
eligible_indices: Vec<usize>,
|
||||
}
|
||||
@@ -203,6 +199,9 @@ impl ParticipationCache {
|
||||
SingleEpochParticipationCache::new(num_current_epoch_active_vals, spec);
|
||||
let mut previous_epoch_participation =
|
||||
SingleEpochParticipationCache::new(num_previous_epoch_active_vals, spec);
|
||||
|
||||
let mut effective_balances = HashMap::with_capacity(num_current_epoch_active_vals);
|
||||
|
||||
// Contains the set of validators which are either:
|
||||
//
|
||||
// - Active in the previous epoch.
|
||||
@@ -219,11 +218,18 @@ impl ParticipationCache {
|
||||
//
|
||||
// Care is taken to ensure that the ordering of `eligible_indices` is the same as the
|
||||
// `get_eligible_validator_indices` function in the spec.
|
||||
for (val_index, val) in state.validators().iter().enumerate() {
|
||||
let iter = state
|
||||
.validators()
|
||||
.iter()
|
||||
.zip(state.current_epoch_participation()?)
|
||||
.zip(state.previous_epoch_participation()?)
|
||||
.enumerate();
|
||||
for (val_index, ((val, curr_epoch_flags), prev_epoch_flags)) in iter {
|
||||
if val.is_active_at(current_epoch) {
|
||||
current_epoch_participation.process_active_validator(
|
||||
val_index,
|
||||
val,
|
||||
curr_epoch_flags,
|
||||
state,
|
||||
RelativeEpoch::Current,
|
||||
)?;
|
||||
@@ -233,6 +239,7 @@ impl ParticipationCache {
|
||||
previous_epoch_participation.process_active_validator(
|
||||
val_index,
|
||||
val,
|
||||
prev_epoch_flags,
|
||||
state,
|
||||
RelativeEpoch::Previous,
|
||||
)?;
|
||||
@@ -240,8 +247,9 @@ impl ParticipationCache {
|
||||
|
||||
// Note: a validator might still be "eligible" whilst returning `false` to
|
||||
// `Validator::is_active_at`.
|
||||
if state.is_eligible_validator(val_index)? {
|
||||
eligible_indices.push(val_index)
|
||||
if state.is_eligible_validator(val) {
|
||||
eligible_indices.push(val_index);
|
||||
effective_balances.insert(val_index, val.effective_balance);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -250,6 +258,7 @@ impl ParticipationCache {
|
||||
current_epoch_participation,
|
||||
previous_epoch,
|
||||
previous_epoch_participation,
|
||||
effective_balances,
|
||||
eligible_indices,
|
||||
})
|
||||
}
|
||||
@@ -327,6 +336,13 @@ impl ParticipationCache {
|
||||
.contains_key(&val_index)
|
||||
}
|
||||
|
||||
pub fn get_effective_balance(&self, val_index: usize) -> Result<u64, Error> {
|
||||
self.effective_balances
|
||||
.get(&val_index)
|
||||
.copied()
|
||||
.ok_or(Error::MissingEffectiveBalance(val_index))
|
||||
}
|
||||
|
||||
/*
|
||||
* Flags
|
||||
*/
|
||||
|
||||
@@ -4,9 +4,11 @@ use types::consts::altair::{
|
||||
PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_TARGET_FLAG_INDEX,
|
||||
WEIGHT_DENOMINATOR,
|
||||
};
|
||||
use types::{BeaconState, ChainSpec, EthSpec};
|
||||
use types::{BeaconState, BeaconStateError, ChainSpec, EthSpec};
|
||||
|
||||
use crate::common::{altair::get_base_reward, decrease_balance, increase_balance};
|
||||
use crate::common::{
|
||||
altair::get_base_reward, decrease_balance_directly, increase_balance_directly,
|
||||
};
|
||||
use crate::per_epoch_processing::{Delta, Error};
|
||||
|
||||
/// Apply attester and proposer rewards.
|
||||
@@ -40,9 +42,20 @@ pub fn process_rewards_and_penalties<T: EthSpec>(
|
||||
|
||||
// Apply the deltas, erroring on overflow above but not on overflow below (saturating at 0
|
||||
// instead).
|
||||
for (i, delta) in deltas.into_iter().enumerate() {
|
||||
increase_balance(state, i, delta.rewards)?;
|
||||
decrease_balance(state, i, delta.penalties)?;
|
||||
let mut balances = state.balances_mut().iter_cow();
|
||||
|
||||
while let Some((i, balance)) = balances.next_cow() {
|
||||
let delta = deltas
|
||||
.get(i)
|
||||
.ok_or(BeaconStateError::BalancesOutOfBounds(i))?;
|
||||
|
||||
if delta.rewards == 0 && delta.penalties == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let balance = balance.to_mut();
|
||||
increase_balance_directly(balance, delta.rewards)?;
|
||||
decrease_balance_directly(balance, delta.penalties)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -69,8 +82,11 @@ pub fn get_flag_index_deltas<T: EthSpec>(
|
||||
let active_increments = total_active_balance.safe_div(spec.effective_balance_increment)?;
|
||||
|
||||
for &index in participation_cache.eligible_validator_indices() {
|
||||
// FIXME(sproul): compute base reward in participation cache
|
||||
let base_reward = get_base_reward(state, index, total_active_balance, spec)?;
|
||||
let base_reward = get_base_reward(
|
||||
participation_cache.get_effective_balance(index as usize)?,
|
||||
total_active_balance,
|
||||
spec,
|
||||
)?;
|
||||
let mut delta = Delta::default();
|
||||
|
||||
if unslashed_participating_indices.contains(index as usize)? {
|
||||
@@ -114,9 +130,8 @@ pub fn get_inactivity_penalty_deltas<T: EthSpec>(
|
||||
let mut delta = Delta::default();
|
||||
|
||||
if !matching_target_indices.contains(index)? {
|
||||
let penalty_numerator = state
|
||||
.get_validator(index)?
|
||||
.effective_balance
|
||||
let penalty_numerator = participation_cache
|
||||
.get_effective_balance(index)?
|
||||
.safe_mul(state.get_inactivity_score(index)?)?;
|
||||
let penalty_denominator = spec
|
||||
.inactivity_score_bias
|
||||
|
||||
@@ -93,7 +93,9 @@ pub fn get_attestation_deltas<T: EthSpec>(
|
||||
// `get_inclusion_delay_deltas`. It's safe to do so here because any validator that is in
|
||||
// the unslashed indices of the matching source attestations is active, and therefore
|
||||
// eligible.
|
||||
if !state.is_eligible_validator(index)? {
|
||||
// FIXME(sproul): this is inefficient
|
||||
let full_validator = state.get_validator(index)?;
|
||||
if !state.is_eligible_validator(full_validator) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -1729,12 +1729,10 @@ impl<T: EthSpec> BeaconState<T> {
|
||||
self.clone_with(CloneConfig::committee_caches_only())
|
||||
}
|
||||
|
||||
pub fn is_eligible_validator(&self, val_index: usize) -> Result<bool, Error> {
|
||||
pub fn is_eligible_validator(&self, val: &Validator) -> bool {
|
||||
let previous_epoch = self.previous_epoch();
|
||||
self.get_validator(val_index).map(|val| {
|
||||
val.is_active_at(previous_epoch)
|
||||
|| (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch)
|
||||
})
|
||||
val.is_active_at(previous_epoch)
|
||||
|| (val.slashed && previous_epoch + Epoch::new(1) < val.withdrawable_epoch)
|
||||
}
|
||||
|
||||
pub fn is_in_inactivity_leak(&self, spec: &ChainSpec) -> bool {
|
||||
|
||||
@@ -43,7 +43,9 @@ pub fn run_transition_blocks<T: EthSpec>(
|
||||
let block: SignedBeaconBlock<T> =
|
||||
load_from_ssz_with(&block_path, spec, SignedBeaconBlock::from_ssz_bytes)?;
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
let post_state = do_transition(pre_state.clone(), block, spec)?;
|
||||
println!("Total transition time: {}ms", t.elapsed().as_millis());
|
||||
|
||||
let mut output_file =
|
||||
File::create(output_path).map_err(|e| format!("Unable to create output file: {:?}", e))?;
|
||||
@@ -67,25 +69,47 @@ fn do_transition<T: EthSpec>(
|
||||
.build_all_caches(spec)
|
||||
.map_err(|e| format!("Unable to build caches: {:?}", e))?;
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
pre_state
|
||||
.update_tree_hash_cache()
|
||||
.map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?;
|
||||
println!("Initial tree hash: {}ms", t.elapsed().as_millis());
|
||||
|
||||
// Transition the parent state to the block slot.
|
||||
let t = std::time::Instant::now();
|
||||
for i in pre_state.slot().as_u64()..block.slot().as_u64() {
|
||||
per_slot_processing(&mut pre_state, None, spec)
|
||||
.map_err(|e| format!("Failed to advance slot on iteration {}: {:?}", i, e))?;
|
||||
}
|
||||
println!("Slot processing: {}ms", t.elapsed().as_millis());
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
pre_state
|
||||
.update_tree_hash_cache()
|
||||
.map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?;
|
||||
println!("Pre-block tree hash: {}ms", t.elapsed().as_millis());
|
||||
|
||||
pre_state
|
||||
.build_all_caches(spec)
|
||||
.map_err(|e| format!("Unable to build caches: {:?}", e))?;
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
per_block_processing(
|
||||
&mut pre_state,
|
||||
&block,
|
||||
None,
|
||||
BlockSignatureStrategy::VerifyIndividual,
|
||||
BlockSignatureStrategy::VerifyBulk,
|
||||
VerifyBlockRoot::True,
|
||||
spec,
|
||||
)
|
||||
.map_err(|e| format!("State transition failed: {:?}", e))?;
|
||||
println!("Process block: {}ms", t.elapsed().as_millis());
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
pre_state
|
||||
.update_tree_hash_cache()
|
||||
.map_err(|e| format!("Unable to build tree hash cache: {:?}", e))?;
|
||||
println!("Post-block tree hash: {}ms", t.elapsed().as_millis());
|
||||
|
||||
Ok(pre_state)
|
||||
}
|
||||
@@ -100,5 +124,13 @@ pub fn load_from_ssz_with<T>(
|
||||
let mut bytes = vec![];
|
||||
file.read_to_end(&mut bytes)
|
||||
.map_err(|e| format!("Unable to read from file {:?}: {:?}", path, e))?;
|
||||
decoder(&bytes, spec).map_err(|e| format!("Ssz decode failed: {:?}", e))
|
||||
|
||||
let t = std::time::Instant::now();
|
||||
let result = decoder(&bytes, spec).map_err(|e| format!("Ssz decode failed: {:?}", e));
|
||||
println!(
|
||||
"SSZ decoding {}: {}ms",
|
||||
path.display(),
|
||||
t.elapsed().as_millis()
|
||||
);
|
||||
result
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user