Add epoch cache

This commit is contained in:
Michael Sproul
2022-10-21 18:01:03 +11:00
parent 76071fcc27
commit abc62a9ef0
13 changed files with 291 additions and 64 deletions

View File

@@ -47,18 +47,17 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
.get_beacon_committee(att.data.slot, att.data.index)
.ok()?;
let indices = get_attesting_indices::<T>(committee.committee, &fresh_validators).ok()?;
let sqrt_total_active_balance = base::SqrtTotalActiveBalance::new(total_active_balance);
let fresh_validators_rewards: HashMap<u64, u64> = indices
.iter()
.map(|i| *i as u64)
.flat_map(|validator_index| {
let reward = base::get_base_reward(
state,
validator_index as usize,
total_active_balance,
spec,
)
.ok()?
.checked_div(spec.proposer_reward_quotient)?;
let effective_balance =
state.get_effective_balance(validator_index as usize).ok()?;
let reward =
base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)
.ok()?
.checked_div(spec.proposer_reward_quotient)?;
Some((validator_index, reward))
})
.collect();

View File

@@ -1,19 +1,30 @@
use integer_sqrt::IntegerSquareRoot;
use safe_arith::SafeArith;
use safe_arith::{ArithError, SafeArith};
use types::*;
/// Returns the base reward for some validator.
pub fn get_base_reward<T: EthSpec>(
state: &BeaconState<T>,
index: usize,
// Should be == get_total_active_balance(state, spec)
total_active_balance: u64,
spec: &ChainSpec,
) -> Result<u64, BeaconStateError> {
state
.get_effective_balance(index)?
.safe_mul(spec.base_reward_factor)?
.safe_div(total_active_balance.integer_sqrt())?
.safe_div(spec.base_rewards_per_epoch)
.map_err(Into::into)
/// This type exists to avoid confusing `total_active_balance` with `sqrt_total_active_balance`,
/// since they are used in close proximity and the same type (`u64`).
#[derive(Copy, Clone)]
pub struct SqrtTotalActiveBalance(u64);
impl SqrtTotalActiveBalance {
pub fn new(total_active_balance: u64) -> Self {
Self(total_active_balance.integer_sqrt())
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
/// Returns the base reward for some validator.
pub fn get_base_reward(
validator_effective_balance: u64,
sqrt_total_active_balance: SqrtTotalActiveBalance,
spec: &ChainSpec,
) -> Result<u64, ArithError> {
validator_effective_balance
.safe_mul(spec.base_reward_factor)?
.safe_div(sqrt_total_active_balance.as_u64())?
.safe_div(spec.base_rewards_per_epoch)
}

View File

@@ -1,3 +1,5 @@
use crate::{EpochCache, EpochCacheError};
use std::borrow::Cow;
use std::marker::PhantomData;
use tree_hash::TreeHash;
use types::{
@@ -5,7 +7,7 @@ use types::{
Slot,
};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ConsensusContext<T: EthSpec> {
/// Slot to act as an identifier/safeguard
slot: Slot,
@@ -13,12 +15,15 @@ pub struct ConsensusContext<T: EthSpec> {
proposer_index: Option<u64>,
/// Block root of the block at `slot`.
current_block_root: Option<Hash256>,
/// Epoch cache of values that are useful for block processing that are static over an epoch.
epoch_cache: Option<EpochCache>,
_phantom: PhantomData<T>,
}
#[derive(Debug, PartialEq, Clone)]
pub enum ContextError {
BeaconState(BeaconStateError),
EpochCache(EpochCacheError),
SlotMismatch { slot: Slot, expected: Slot },
}
@@ -28,12 +33,19 @@ impl From<BeaconStateError> for ContextError {
}
}
impl From<EpochCacheError> for ContextError {
fn from(e: EpochCacheError) -> Self {
Self::EpochCache(e)
}
}
impl<T: EthSpec> ConsensusContext<T> {
pub fn new(slot: Slot) -> Self {
Self {
slot,
proposer_index: None,
current_block_root: None,
epoch_cache: None,
_phantom: PhantomData,
}
}
@@ -89,4 +101,29 @@ impl<T: EthSpec> ConsensusContext<T> {
})
}
}
pub fn set_epoch_cache(mut self, epoch_cache: EpochCache) -> Self {
self.epoch_cache = Some(epoch_cache);
self
}
pub fn get_base_reward<E: EthSpec>(
&mut self,
state: &BeaconState<E>,
validator_index: usize,
spec: &ChainSpec,
) -> Result<u64, ContextError> {
self.check_slot(state.slot())?;
// Build epoch cache if not already built.
let epoch_cache = if let Some(ref cache) = self.epoch_cache {
Cow::Borrowed(cache)
} else {
let cache = EpochCache::new(state, spec)?;
self.epoch_cache = Some(cache.clone());
Cow::Owned(cache)
};
Ok(epoch_cache.get_base_reward(validator_index)?)
}
}

View File

@@ -0,0 +1,135 @@
use crate::common::{
altair::{self, BaseRewardPerIncrement},
base::{self, SqrtTotalActiveBalance},
};
use safe_arith::ArithError;
use std::sync::Arc;
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Hash256, Slot};
/// Cache of values which are uniquely determined at the start of an epoch.
///
/// The values are fixed with respect to the last block of the _prior_ epoch, which we refer
/// to as the "decision block". This cache is very similar to the `BeaconProposerCache` in that
/// beacon proposers are determined at exactly the same time as the values in this cache, so
/// the keys for the two caches are identical.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct EpochCache {
inner: Arc<Inner>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
struct Inner {
/// Unique identifier for this cache, which can be used to check its validity before use
/// with any `BeaconState`.
key: EpochCacheKey,
/// Base reward for every validator in this epoch.
base_rewards: Vec<u64>,
}
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct EpochCacheKey {
pub epoch: Epoch,
pub decision_block_root: Hash256,
}
#[derive(Debug, PartialEq, Clone)]
pub enum EpochCacheError {
IncorrectEpoch { cache: Epoch, state: Epoch },
IncorrectDecisionBlock { cache: Hash256, state: Hash256 },
ValidatorIndexOutOfBounds { validator_index: usize },
InvalidSlot { slot: Slot },
Arith(ArithError),
BeaconState(BeaconStateError),
}
impl From<BeaconStateError> for EpochCacheError {
fn from(e: BeaconStateError) -> Self {
Self::BeaconState(e)
}
}
impl From<ArithError> for EpochCacheError {
fn from(e: ArithError) -> Self {
Self::Arith(e)
}
}
impl EpochCache {
pub fn new<E: EthSpec>(
state: &BeaconState<E>,
spec: &ChainSpec,
) -> Result<Self, EpochCacheError> {
let epoch = state.current_epoch();
let decision_block_root = state
.proposer_shuffling_decision_root(Hash256::zero())
.map_err(EpochCacheError::BeaconState)?;
// The cache should never be constructed at slot 0 because it should only be used for
// block processing (which implies slot > 0) or epoch processing (which implies slot >= 32).
if decision_block_root.is_zero() {
return Err(EpochCacheError::InvalidSlot { slot: state.slot() });
}
// Compute base rewards.
let total_active_balance = state.get_total_active_balance()?;
let sqrt_total_active_balance = SqrtTotalActiveBalance::new(total_active_balance);
let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?;
let mut base_rewards = Vec::with_capacity(state.validators().len());
for validator in state.validators().iter() {
let effective_balance = validator.effective_balance();
let base_reward = if spec
.altair_fork_epoch
.map_or(false, |altair_epoch| epoch < altair_epoch)
{
base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)?
} else {
altair::get_base_reward(effective_balance, base_reward_per_increment, spec)?
};
base_rewards.push(base_reward);
}
Ok(Self {
inner: Arc::new(Inner {
key: EpochCacheKey {
epoch,
decision_block_root,
},
base_rewards,
}),
})
}
pub fn check_validity<E: EthSpec>(
&self,
state: &BeaconState<E>,
) -> Result<(), EpochCacheError> {
if self.inner.key.epoch != state.current_epoch() {
return Err(EpochCacheError::IncorrectEpoch {
cache: self.inner.key.epoch,
state: state.current_epoch(),
});
}
let state_decision_root = state
.proposer_shuffling_decision_root(Hash256::zero())
.map_err(EpochCacheError::BeaconState)?;
if self.inner.key.decision_block_root != state_decision_root {
return Err(EpochCacheError::IncorrectDecisionBlock {
cache: self.inner.key.decision_block_root,
state: state_decision_root,
});
}
Ok(())
}
#[inline]
pub fn get_base_reward(&self, validator_index: usize) -> Result<u64, EpochCacheError> {
self.inner
.base_rewards
.get(validator_index)
.copied()
.ok_or(EpochCacheError::ValidatorIndexOutOfBounds { validator_index })
}
}

View File

@@ -19,6 +19,7 @@ mod metrics;
pub mod block_replayer;
pub mod common;
pub mod consensus_context;
pub mod epoch_cache;
pub mod genesis;
pub mod per_block_processing;
pub mod per_epoch_processing;
@@ -29,6 +30,7 @@ pub mod verify_operation;
pub use block_replayer::{BlockReplayError, BlockReplayer};
pub use consensus_context::{ConsensusContext, ContextError};
pub use epoch_cache::{EpochCache, EpochCacheError, EpochCacheKey};
pub use genesis::{
eth2_genesis_time, initialize_beacon_state_from_eth1, is_valid_genesis_state,
process_activations,

View File

@@ -47,17 +47,19 @@ pub fn process_sync_aggregate<T: EthSpec>(
// Apply participant and proposer rewards
let committee_indices = state.get_sync_committee_indices(&current_sync_committee)?;
let mut total_proposer_reward = 0;
for (participant_index, participation_bit) in committee_indices
.into_iter()
.zip(aggregate.sync_committee_bits.iter())
{
if participation_bit {
increase_balance(state, participant_index as usize, participant_reward)?;
increase_balance(state, proposer_index as usize, proposer_reward)?;
total_proposer_reward.safe_add_assign(proposer_reward)?;
} else {
decrease_balance(state, participant_index as usize, participant_reward)?;
}
}
increase_balance(state, proposer_index as usize, total_proposer_reward)?;
Ok(())
}

View File

@@ -1,5 +1,5 @@
use super::signature_sets::Error as SignatureSetError;
use crate::ContextError;
use crate::{ContextError, EpochCacheError};
use merkle_proof::MerkleTreeError;
use safe_arith::ArithError;
use types::*;
@@ -73,6 +73,7 @@ pub enum BlockProcessingError {
ExecutionInvalid,
ConsensusContext(ContextError),
MilhouseError(milhouse::Error),
EpochCacheError(EpochCacheError),
}
impl From<BeaconStateError> for BlockProcessingError {
@@ -111,6 +112,12 @@ impl From<ContextError> for BlockProcessingError {
}
}
impl From<EpochCacheError> for BlockProcessingError {
fn from(e: EpochCacheError) -> Self {
BlockProcessingError::EpochCacheError(e)
}
}
impl From<milhouse::Error> for BlockProcessingError {
fn from(e: milhouse::Error) -> Self {
Self::MilhouseError(e)

View File

@@ -1,6 +1,5 @@
use super::*;
use crate::common::{
altair::{get_base_reward, BaseRewardPerIncrement},
get_attestation_participation_flag_indices, increase_balance, initiate_validator_exit,
slash_validator,
};
@@ -95,19 +94,11 @@ pub mod altair {
ctxt: &mut ConsensusContext<T>,
spec: &ChainSpec,
) -> Result<(), BlockProcessingError> {
let proposer_index = ctxt.get_proposer_index(state, spec)?;
attestations
.iter()
.enumerate()
.try_for_each(|(i, attestation)| {
process_attestation(
state,
attestation,
i,
proposer_index,
verify_signatures,
spec,
)
process_attestation(state, attestation, i, ctxt, verify_signatures, spec)
})
}
@@ -115,13 +106,15 @@ pub mod altair {
state: &mut BeaconState<T>,
attestation: &Attestation<T>,
att_index: usize,
proposer_index: u64,
ctxt: &mut ConsensusContext<T>,
verify_signatures: VerifySignatures,
spec: &ChainSpec,
) -> Result<(), BlockProcessingError> {
state.build_committee_cache(RelativeEpoch::Previous, spec)?;
state.build_committee_cache(RelativeEpoch::Current, spec)?;
let proposer_index = ctxt.get_proposer_index(state, spec)?;
let indexed_attestation =
verify_attestation_for_block_inclusion(state, attestation, verify_signatures, spec)
.map_err(|e| e.into_with_index(att_index))?;
@@ -133,8 +126,6 @@ pub mod altair {
get_attestation_participation_flag_indices(state, data, inclusion_delay, spec)?;
// Update epoch participation flags.
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment = BaseRewardPerIncrement::new(total_active_balance, spec)?;
let mut proposer_reward_numerator = 0;
for index in &indexed_attestation.attesting_indices {
let index = *index as usize;
@@ -148,12 +139,9 @@ pub mod altair {
if participation_flag_indices.contains(&flag_index)
&& !validator_participation.has_flag(flag_index)?
{
// FIXME(sproul): add effective balance cache here?
validator_participation.add_flag(flag_index)?;
let effective_balance = state.get_validator(index)?.effective_balance();
proposer_reward_numerator.safe_add_assign(
get_base_reward(effective_balance, base_reward_per_increment, spec)?
.safe_mul(weight)?,
ctxt.get_base_reward(state, index, spec)?.safe_mul(weight)?,
)?;
}
}

View File

@@ -1,4 +1,7 @@
use crate::common::{base::get_base_reward, decrease_balance, increase_balance};
use crate::common::{
base::{get_base_reward, SqrtTotalActiveBalance},
decrease_balance, increase_balance,
};
use crate::per_epoch_processing::{
base::{TotalBalances, ValidatorStatus, ValidatorStatuses},
Delta, Error,
@@ -78,7 +81,6 @@ pub fn get_attestation_deltas<T: EthSpec>(
validator_statuses: &ValidatorStatuses,
spec: &ChainSpec,
) -> Result<Vec<AttestationDelta>, Error> {
let previous_epoch = state.previous_epoch();
let finality_delay = state
.previous_epoch()
.safe_sub(state.finalized_checkpoint().epoch)?
@@ -87,19 +89,22 @@ pub fn get_attestation_deltas<T: EthSpec>(
let mut deltas = vec![AttestationDelta::default(); state.validators().len()];
let total_balances = &validator_statuses.total_balances;
let sqrt_total_active_balance = SqrtTotalActiveBalance::new(total_balances.current_epoch());
for (index, validator) in validator_statuses.statuses.iter().enumerate() {
// Ignore ineligible validators. All sub-functions of the spec do this except for
// `get_inclusion_delay_deltas`. It's safe to do so here because any validator that is in
// the unslashed indices of the matching source attestations is active, and therefore
// eligible.
// FIXME(sproul): this is inefficient
let full_validator = state.get_validator(index)?;
if !state.is_eligible_validator(previous_epoch, full_validator) {
if !validator.is_eligible {
continue;
}
let base_reward = get_base_reward(state, index, total_balances.current_epoch(), spec)?;
let base_reward = get_base_reward(
validator.current_epoch_effective_balance,
sqrt_total_active_balance,
spec,
)?;
let source_delta =
get_source_delta(validator, base_reward, total_balances, finality_delay, spec)?;

View File

@@ -53,6 +53,8 @@ impl InclusionInfo {
pub struct ValidatorStatus {
/// True if the validator has been slashed, ever.
pub is_slashed: bool,
/// True if the validator is eligible.
pub is_eligible: bool,
/// True if the validator can withdraw in the current epoch.
pub is_withdrawable_in_current_epoch: bool,
/// True if the validator was active in the state's _current_ epoch.
@@ -92,6 +94,7 @@ impl ValidatorStatus {
// Update all the bool fields, only updating `self` if `other` is true (never setting
// `self` to false).
set_self_if_other_is_true!(self, other, is_slashed);
set_self_if_other_is_true!(self, other, is_eligible);
set_self_if_other_is_true!(self, other, is_withdrawable_in_current_epoch);
set_self_if_other_is_true!(self, other, is_active_in_current_epoch);
set_self_if_other_is_true!(self, other, is_active_in_previous_epoch);
@@ -195,24 +198,27 @@ impl ValidatorStatuses {
let mut statuses = Vec::with_capacity(state.validators().len());
let mut total_balances = TotalBalances::new(spec);
for (i, validator) in state.validators().iter().enumerate() {
let effective_balance = state.get_effective_balance(i)?;
let current_epoch = state.current_epoch();
let previous_epoch = state.previous_epoch();
for validator in state.validators().iter() {
let effective_balance = validator.effective_balance();
let mut status = ValidatorStatus {
is_slashed: validator.slashed(),
is_withdrawable_in_current_epoch: validator
.is_withdrawable_at(state.current_epoch()),
is_eligible: state.is_eligible_validator(previous_epoch, validator),
is_withdrawable_in_current_epoch: validator.is_withdrawable_at(current_epoch),
current_epoch_effective_balance: effective_balance,
..ValidatorStatus::default()
};
if validator.is_active_at(state.current_epoch()) {
if validator.is_active_at(current_epoch) {
status.is_active_in_current_epoch = true;
total_balances
.current_epoch
.safe_add_assign(effective_balance)?;
}
if validator.is_active_at(state.previous_epoch()) {
if validator.is_active_at(previous_epoch) {
status.is_active_in_previous_epoch = true;
total_balances
.previous_epoch
@@ -285,10 +291,10 @@ impl ValidatorStatuses {
}
// Compute the total balances
for (index, v) in self.statuses.iter().enumerate() {
for v in self.statuses.iter() {
// According to the spec, we only count unslashed validators towards the totals.
if !v.is_slashed {
let validator_balance = state.get_effective_balance(index)?;
let validator_balance = v.current_epoch_effective_balance;
if v.is_current_epoch_attester {
self.total_balances

View File

@@ -6,7 +6,8 @@ use ssz_derive::{Decode, Encode};
use test_random_derive::TestRandom;
use tree_hash_derive::TreeHash;
use ssz_types::{FixedVector, VariableList};
// FIXME(sproul): try milhouse FixedVector
use ssz_types::FixedVector;
pub type Transaction<N> = VariableList<u8, N>;
pub type Transactions<T> = VariableList<

View File

@@ -73,50 +73,61 @@ impl Validator {
});
}
#[inline]
pub fn withdrawal_credentials(&self) -> Hash256 {
self.immutable.withdrawal_credentials
}
#[inline]
pub fn effective_balance(&self) -> u64 {
self.mutable.effective_balance
}
#[inline]
pub fn slashed(&self) -> bool {
self.mutable.slashed
}
#[inline]
pub fn activation_eligibility_epoch(&self) -> Epoch {
self.mutable.activation_eligibility_epoch
}
#[inline]
pub fn activation_epoch(&self) -> Epoch {
self.mutable.activation_epoch
}
#[inline]
pub fn exit_epoch(&self) -> Epoch {
self.mutable.exit_epoch
}
#[inline]
pub fn withdrawable_epoch(&self) -> Epoch {
self.mutable.withdrawable_epoch
}
/// Returns `true` if the validator is considered active at some epoch.
#[inline]
pub fn is_active_at(&self, epoch: Epoch) -> bool {
self.activation_epoch() <= epoch && epoch < self.exit_epoch()
}
/// Returns `true` if the validator is slashable at some epoch.
#[inline]
pub fn is_slashable_at(&self, epoch: Epoch) -> bool {
!self.slashed() && self.activation_epoch() <= epoch && epoch < self.withdrawable_epoch()
}
/// Returns `true` if the validator is considered exited at some epoch.
#[inline]
pub fn is_exited_at(&self, epoch: Epoch) -> bool {
self.exit_epoch() <= epoch
}
/// Returns `true` if the validator is able to withdraw at some epoch.
#[inline]
pub fn is_withdrawable_at(&self, epoch: Epoch) -> bool {
epoch >= self.withdrawable_epoch()
}
@@ -124,6 +135,7 @@ impl Validator {
/// Returns `true` if the validator is eligible to join the activation queue.
///
/// Spec v0.12.1
#[inline]
pub fn is_eligible_for_activation_queue(&self, spec: &ChainSpec) -> bool {
self.activation_eligibility_epoch() == spec.far_future_epoch
&& self.effective_balance() == spec.max_effective_balance
@@ -132,6 +144,7 @@ impl Validator {
/// Returns `true` if the validator is eligible to be activated.
///
/// Spec v0.12.1
#[inline]
pub fn is_eligible_for_activation<E: EthSpec>(
&self,
state: &BeaconState<E>,

View File

@@ -74,7 +74,7 @@ use eth2::{
use ssz::Encode;
use state_processing::{
block_signature_verifier::BlockSignatureVerifier, per_block_processing, per_slot_processing,
BlockSignatureStrategy, ConsensusContext, VerifyBlockRoot,
BlockSignatureStrategy, ConsensusContext, EpochCache, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::fs::File;
@@ -195,7 +195,10 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches) -> Result<
let store = Arc::new(store);
debug!("Building pubkey cache (might take some time)");
let validator_pubkey_cache = ValidatorPubkeyCache::new(&pre_state, store)
let validator_pubkey_cache = store.immutable_validators.clone();
validator_pubkey_cache
.write()
.import_new_pubkeys(&pre_state, &store)
.map_err(|e| format!("Failed to create pubkey cache: {:?}", e))?;
/*
@@ -226,6 +229,7 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches) -> Result<
*/
let mut output_post_state = None;
let mut saved_ctxt = None;
for i in 0..runs {
let pre_state = pre_state.clone();
let block = block.clone();
@@ -238,7 +242,8 @@ pub fn run<T: EthSpec>(mut env: Environment<T>, matches: &ArgMatches) -> Result<
block,
state_root_opt,
&config,
&validator_pubkey_cache,
&*validator_pubkey_cache.read(),
&mut saved_ctxt,
spec,
)?;
@@ -300,6 +305,7 @@ fn do_transition<T: EthSpec>(
mut state_root_opt: Option<Hash256>,
config: &Config,
validator_pubkey_cache: &ValidatorPubkeyCache<EphemeralHarnessType<T>>,
saved_ctxt: &mut Option<ConsensusContext<T>>,
spec: &ChainSpec,
) -> Result<BeaconState<T>, String> {
if !config.exclude_cache_builds {
@@ -369,10 +375,25 @@ fn do_transition<T: EthSpec>(
debug!("Batch verify block signatures: {:?}", t.elapsed());
}
let mut ctxt = if let Some(ctxt) = saved_ctxt {
ctxt.clone()
} else {
let mut ctxt = ConsensusContext::new(pre_state.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index());
if config.exclude_cache_builds {
ctxt = ctxt.set_epoch_cache(
EpochCache::new(&pre_state, spec)
.map_err(|e| format!("unable to build epoch cache: {e:?}"))?,
);
*saved_ctxt = Some(ctxt.clone());
}
ctxt
};
let t = Instant::now();
let mut ctxt = ConsensusContext::new(pre_state.slot())
.set_current_block_root(block_root)
.set_proposer_index(block.message().proposer_index());
per_block_processing(
&mut pre_state,
&block,