Optimising process_epoch again (inactivity scores)

This commit is contained in:
Michael Sproul
2022-06-09 21:51:43 +10:00
parent a3e396cfdd
commit f0cc077ae3
4 changed files with 91 additions and 51 deletions

View File

@@ -99,3 +99,6 @@ eth2_hashing = { path = "crypto/eth2_hashing" }
tree_hash = { path = "consensus/tree_hash" } tree_hash = { path = "consensus/tree_hash" }
tree_hash_derive = { path = "consensus/tree_hash_derive" } tree_hash_derive = { path = "consensus/tree_hash_derive" }
eth2_serde_utils = { path = "consensus/serde_utils" } eth2_serde_utils = { path = "consensus/serde_utils" }
[patch."https://github.com/sigp/milhouse"]
milhouse = { path = "../milhouse" }

View File

@@ -35,7 +35,7 @@ pub fn process_epoch<T: EthSpec>(
// Justification and finalization. // Justification and finalization.
process_justification_and_finalization(state, &participation_cache)?; process_justification_and_finalization(state, &participation_cache)?;
process_inactivity_updates(state, &participation_cache, spec)?; process_inactivity_updates(state, &mut participation_cache, spec)?;
// Rewards and Penalties. // Rewards and Penalties.
process_rewards_and_penalties(state, &participation_cache, spec)?; process_rewards_and_penalties(state, &participation_cache, spec)?;

View File

@@ -9,13 +9,22 @@ use types::eth_spec::EthSpec;
pub fn process_inactivity_updates<T: EthSpec>( pub fn process_inactivity_updates<T: EthSpec>(
state: &mut BeaconState<T>, state: &mut BeaconState<T>,
participation_cache: &ParticipationCache, participation_cache: &mut ParticipationCache,
spec: &ChainSpec, spec: &ChainSpec,
) -> Result<(), EpochProcessingError> { ) -> Result<(), EpochProcessingError> {
// Score updates based on previous epoch participation, skip genesis epoch // Score updates based on previous epoch participation, skip genesis epoch
if state.current_epoch() == T::genesis_epoch() { if state.current_epoch() == T::genesis_epoch() {
return Ok(()); return Ok(());
} }
// Fast path: inactivity scores have already been pre-computed.
if let Some(inactivity_score_updates) = participation_cache.inactivity_score_updates.take() {
state
.inactivity_scores_mut()?
.bulk_update(inactivity_score_updates)?;
return Ok(());
}
let is_in_inactivity_leak = state.is_in_inactivity_leak(spec); let is_in_inactivity_leak = state.is_in_inactivity_leak(spec);
let mut inactivity_scores = state.inactivity_scores_mut()?.iter_cow(); let mut inactivity_scores = state.inactivity_scores_mut()?.iter_cow();

View File

@@ -12,8 +12,8 @@
//! to get useful summaries about the validator participation in an epoch. //! to get useful summaries about the validator participation in an epoch.
use crate::common::altair::get_base_reward; use crate::common::altair::get_base_reward;
use rustc_hash::FxHashMap as HashMap;
use safe_arith::{ArithError, SafeArith}; use safe_arith::{ArithError, SafeArith};
use std::collections::BTreeMap;
use types::{ use types::{
consts::altair::{ consts::altair::{
NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX, NUM_FLAG_INDICES, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX,
@@ -28,6 +28,20 @@ pub enum Error {
InvalidFlagIndex(usize), InvalidFlagIndex(usize),
NoUnslashedParticipatingIndices, NoUnslashedParticipatingIndices,
MissingValidator(usize), MissingValidator(usize),
BeaconState(BeaconStateError),
Arith(ArithError),
}
impl From<BeaconStateError> for Error {
fn from(e: BeaconStateError) -> Self {
Self::BeaconState(e)
}
}
impl From<ArithError> for Error {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
} }
/// A balance which will never be below the specified `minimum`. /// A balance which will never be below the specified `minimum`.
@@ -130,7 +144,7 @@ impl SingleEpochParticipationCache {
} }
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq, Clone)]
pub struct ValidatorInfo { pub struct ValidatorInfo {
pub effective_balance: u64, pub effective_balance: u64,
pub base_reward: u64, pub base_reward: u64,
@@ -142,6 +156,7 @@ pub struct ValidatorInfo {
} }
impl ValidatorInfo { impl ValidatorInfo {
#[inline]
pub fn is_unslashed_participating_index(&self, flag_index: usize) -> Result<bool, Error> { pub fn is_unslashed_participating_index(&self, flag_index: usize) -> Result<bool, Error> {
Ok(self.is_active_previous_epoch Ok(self.is_active_previous_epoch
&& !self.is_slashed && !self.is_slashed
@@ -155,13 +170,13 @@ impl ValidatorInfo {
/// Single `HashMap` for validator info relevant to `process_epoch`. /// Single `HashMap` for validator info relevant to `process_epoch`.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
struct ValidatorInfoCache { struct ValidatorInfoCache {
info: HashMap<usize, ValidatorInfo>, info: Vec<Option<ValidatorInfo>>,
} }
impl ValidatorInfoCache { impl ValidatorInfoCache {
pub fn new(capacity: usize) -> Self { pub fn new(capacity: usize) -> Self {
Self { Self {
info: HashMap::with_capacity_and_hasher(capacity, Default::default()), info: vec![None; capacity],
} }
} }
} }
@@ -182,6 +197,8 @@ pub struct ParticipationCache {
/// Caches the indices and effective balances of validators that need to be processed by /// Caches the indices and effective balances of validators that need to be processed by
/// `process_slashings`. /// `process_slashings`.
process_slashings_indices: Vec<(usize, u64)>, process_slashings_indices: Vec<(usize, u64)>,
/// Updates to the inactivity scores if we are definitely not in an inactivity leak.
pub inactivity_score_updates: Option<BTreeMap<usize, u64>>,
} }
impl ParticipationCache { impl ParticipationCache {
@@ -190,23 +207,16 @@ impl ParticipationCache {
/// ## Errors /// ## Errors
/// ///
/// - The provided `state` **must** be an Altair state. An error will be returned otherwise. /// - The provided `state` **must** be an Altair state. An error will be returned otherwise.
pub fn new<T: EthSpec>( pub fn new<T: EthSpec>(state: &BeaconState<T>, spec: &ChainSpec) -> Result<Self, Error> {
state: &BeaconState<T>,
spec: &ChainSpec,
) -> Result<Self, BeaconStateError> {
let current_epoch = state.current_epoch(); let current_epoch = state.current_epoch();
let previous_epoch = state.previous_epoch(); let previous_epoch = state.previous_epoch();
let num_previous_epoch_active_vals = state
.get_cached_active_validator_indices(RelativeEpoch::Previous)?
.len();
// Both the current/previous epoch participations are set to a capacity that is slightly // Both the current/previous epoch participations are set to a capacity that is slightly
// larger than required. The difference will be due slashed-but-active validators. // larger than required. The difference will be due slashed-but-active validators.
let mut current_epoch_participation = SingleEpochParticipationCache::new(spec); let mut current_epoch_participation = SingleEpochParticipationCache::new(spec);
let mut previous_epoch_participation = SingleEpochParticipationCache::new(spec); let mut previous_epoch_participation = SingleEpochParticipationCache::new(spec);
let mut validators = ValidatorInfoCache::new(num_previous_epoch_active_vals); let mut validators = ValidatorInfoCache::new(state.validators().len());
let current_epoch_total_active_balance = state.get_total_active_balance()?; let current_epoch_total_active_balance = state.get_total_active_balance()?;
@@ -221,6 +231,14 @@ impl ParticipationCache {
let mut process_slashings_indices = vec![]; let mut process_slashings_indices = vec![];
// Fast path for inactivity scores update when we are definitely not in an inactivity leak.
// This breaks the dependence of `process_inactivity_updates` on the finalization
// re-calculation.
let definitely_not_in_inactivity_leak =
state.finalized_checkpoint().epoch + spec.min_epochs_to_inactivity_penalty + 1
>= state.current_epoch();
let mut inactivity_score_updates = BTreeMap::new();
// Iterate through all validators, updating: // Iterate through all validators, updating:
// //
// 1. Validator participation for current and previous epochs. // 1. Validator participation for current and previous epochs.
@@ -233,8 +251,9 @@ impl ParticipationCache {
.iter() .iter()
.zip(state.current_epoch_participation()?) .zip(state.current_epoch_participation()?)
.zip(state.previous_epoch_participation()?) .zip(state.previous_epoch_participation()?)
.zip(state.inactivity_scores()?)
.enumerate(); .enumerate();
for (val_index, ((val, curr_epoch_flags), prev_epoch_flags)) in iter { for (val_index, (((val, curr_epoch_flags), prev_epoch_flags), inactivity_score)) in iter {
let is_active_current_epoch = val.is_active_at(current_epoch); let is_active_current_epoch = val.is_active_at(current_epoch);
let is_active_previous_epoch = val.is_active_at(previous_epoch); let is_active_previous_epoch = val.is_active_at(previous_epoch);
let is_eligible = state.is_eligible_validator(val); let is_eligible = state.is_eligible_validator(val);
@@ -275,23 +294,40 @@ impl ParticipationCache {
eligible_indices.push(val_index); eligible_indices.push(val_index);
} }
let mut validator_info = ValidatorInfo {
effective_balance: val.effective_balance,
base_reward: 0, // not read
is_eligible,
is_slashed: val.slashed,
is_active_current_epoch,
is_active_previous_epoch,
previous_epoch_participation: *prev_epoch_flags,
};
// Calculate inactivity updates.
if is_eligible && definitely_not_in_inactivity_leak {
let mut new_inactivity_score =
if validator_info.is_unslashed_participating_index(TIMELY_TARGET_FLAG_INDEX)? {
inactivity_score.saturating_sub(1)
} else {
inactivity_score.safe_add(spec.inactivity_score_bias)?
};
// Decrease the score of all validators for forgiveness when not during a leak
new_inactivity_score =
new_inactivity_score.saturating_sub(spec.inactivity_score_recovery_rate);
if new_inactivity_score != *inactivity_score {
inactivity_score_updates.insert(val_index, new_inactivity_score);
}
}
if is_eligible || is_active_current_epoch { if is_eligible || is_active_current_epoch {
let effective_balance = val.effective_balance; let effective_balance = val.effective_balance;
let base_reward = let base_reward =
get_base_reward(effective_balance, current_epoch_total_active_balance, spec)?; get_base_reward(effective_balance, current_epoch_total_active_balance, spec)?;
validator_info.base_reward = base_reward;
validators.info.insert( validators.info[val_index] = Some(validator_info);
val_index,
ValidatorInfo {
effective_balance,
base_reward,
is_eligible,
is_slashed: val.slashed,
is_active_current_epoch,
is_active_previous_epoch,
previous_epoch_participation: *prev_epoch_flags,
},
);
} }
} }
@@ -310,6 +346,8 @@ impl ParticipationCache {
validators, validators,
eligible_indices, eligible_indices,
process_slashings_indices, process_slashings_indices,
inactivity_score_updates: definitely_not_in_inactivity_leak
.then(|| inactivity_score_updates),
}) })
} }
@@ -361,27 +399,23 @@ impl ParticipationCache {
*/ */
pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool { pub fn is_active_unslashed_in_previous_epoch(&self, val_index: usize) -> bool {
self.validators self.get_validator(val_index).map_or(false, |validator| {
.info validator.is_active_previous_epoch && !validator.is_slashed
.get(&val_index) })
.map_or(false, |validator| {
validator.is_active_previous_epoch && !validator.is_slashed
})
} }
pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool { pub fn is_active_unslashed_in_current_epoch(&self, val_index: usize) -> bool {
self.validators self.get_validator(val_index).map_or(false, |validator| {
.info validator.is_active_current_epoch && !validator.is_slashed
.get(&val_index) })
.map_or(false, |validator| {
validator.is_active_current_epoch && !validator.is_slashed
})
} }
pub fn get_validator(&self, val_index: usize) -> Result<&ValidatorInfo, Error> { pub fn get_validator(&self, val_index: usize) -> Result<&ValidatorInfo, Error> {
self.validators self.validators
.info .info
.get(&val_index) .get(val_index)
.ok_or(Error::MissingValidator(val_index))?
.as_ref()
.ok_or(Error::MissingValidator(val_index)) .ok_or(Error::MissingValidator(val_index))
} }
@@ -393,9 +427,7 @@ impl ParticipationCache {
&self, &self,
val_index: usize, val_index: usize,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
self.validators self.get_validator(val_index)
.info
.get(&val_index)
.map_or(Ok(false), |validator| { .map_or(Ok(false), |validator| {
Ok(!validator.is_slashed Ok(!validator.is_slashed
&& validator && validator
@@ -410,9 +442,7 @@ impl ParticipationCache {
&self, &self,
val_index: usize, val_index: usize,
) -> Result<bool, Error> { ) -> Result<bool, Error> {
self.validators self.get_validator(val_index)
.info
.get(&val_index)
.map_or(Ok(false), |validator| { .map_or(Ok(false), |validator| {
Ok(!validator.is_slashed Ok(!validator.is_slashed
&& validator && validator
@@ -424,9 +454,7 @@ impl ParticipationCache {
/// Always returns false for a slashed validator. /// Always returns false for a slashed validator.
pub fn is_previous_epoch_timely_head_attester(&self, val_index: usize) -> Result<bool, Error> { pub fn is_previous_epoch_timely_head_attester(&self, val_index: usize) -> Result<bool, Error> {
self.validators self.get_validator(val_index)
.info
.get(&val_index)
.map_or(Ok(false), |validator| { .map_or(Ok(false), |validator| {
Ok(!validator.is_slashed Ok(!validator.is_slashed
&& validator && validator