mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-14 18:32:42 +00:00
First tilt at accelerating block production
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
use crate::max_cover::MaxCover;
|
||||
use crate::RewardCache;
|
||||
use state_processing::common::{
|
||||
altair, base, get_attestation_participation_flag_indices, get_attesting_indices,
|
||||
};
|
||||
@@ -21,13 +22,14 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
pub fn new(
|
||||
att: &'a Attestation<T>,
|
||||
state: &BeaconState<T>,
|
||||
reward_cache: &'a RewardCache,
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Option<Self> {
|
||||
if let BeaconState::Base(ref base_state) = state {
|
||||
Self::new_for_base(att, state, base_state, total_active_balance, spec)
|
||||
} else {
|
||||
Self::new_for_altair(att, state, total_active_balance, spec)
|
||||
Self::new_for_altair(att, state, reward_cache, total_active_balance, spec)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,23 +71,18 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
pub fn new_for_altair(
|
||||
att: &'a Attestation<T>,
|
||||
state: &BeaconState<T>,
|
||||
reward_cache: &'a RewardCache,
|
||||
total_active_balance: u64,
|
||||
spec: &ChainSpec,
|
||||
) -> Option<Self> {
|
||||
// FIXME(sproul): could optimise out `get_attesting_indices` and allocations by storing
|
||||
// these.
|
||||
let committee = state
|
||||
.get_beacon_committee(att.data.slot, att.data.index)
|
||||
.ok()?;
|
||||
let attesting_indices =
|
||||
get_attesting_indices::<T>(committee.committee, &att.aggregation_bits).ok()?;
|
||||
|
||||
let participation_list = if att.data.target.epoch == state.current_epoch() {
|
||||
state.current_epoch_participation().ok()?
|
||||
} else if att.data.target.epoch == state.previous_epoch() {
|
||||
state.previous_epoch_participation().ok()?
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
let inclusion_delay = state.slot().as_u64().checked_sub(att.data.slot.as_u64())?;
|
||||
let att_participation_flags =
|
||||
get_attestation_participation_flag_indices(state, &att.data, inclusion_delay, spec)
|
||||
@@ -95,9 +92,11 @@ impl<'a, T: EthSpec> AttMaxCover<'a, T> {
|
||||
.iter()
|
||||
.filter_map(|&index| {
|
||||
let mut proposer_reward_numerator = 0;
|
||||
let participation = participation_list.get(index)?;
|
||||
let participation = reward_cache
|
||||
.get_epoch_participation(index, att.data.target.epoch)
|
||||
.ok()??;
|
||||
|
||||
let effective_balance = state.get_effective_balance(index).ok()?;
|
||||
let effective_balance = reward_cache.get_effective_balance(index)?;
|
||||
let base_reward =
|
||||
altair::get_base_reward(effective_balance, total_active_balance, spec).ok()?;
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ mod attester_slashing;
|
||||
mod max_cover;
|
||||
mod metrics;
|
||||
mod persistence;
|
||||
mod reward_cache;
|
||||
mod sync_aggregate_id;
|
||||
|
||||
pub use attestation::AttMaxCover;
|
||||
@@ -11,12 +12,13 @@ pub use max_cover::MaxCover;
|
||||
pub use persistence::{
|
||||
PersistedOperationPool, PersistedOperationPoolAltair, PersistedOperationPoolBase,
|
||||
};
|
||||
pub use reward_cache::RewardCache;
|
||||
|
||||
use crate::sync_aggregate_id::SyncAggregateId;
|
||||
use attestation_id::AttestationId;
|
||||
use attester_slashing::AttesterSlashingMaxCover;
|
||||
use max_cover::maximum_cover;
|
||||
use parking_lot::RwLock;
|
||||
use parking_lot::{RwLock, RwLockWriteGuard};
|
||||
use state_processing::per_block_processing::errors::AttestationValidationError;
|
||||
use state_processing::per_block_processing::{
|
||||
get_slashable_indices_modular, verify_attestation_for_block_inclusion, verify_exit,
|
||||
@@ -47,6 +49,8 @@ pub struct OperationPool<T: EthSpec + Default> {
|
||||
proposer_slashings: RwLock<HashMap<u64, ProposerSlashing>>,
|
||||
/// Map from exiting validator to their exit data.
|
||||
voluntary_exits: RwLock<HashMap<u64, SignedVoluntaryExit>>,
|
||||
/// Reward cache for accelerating attestation packing.
|
||||
reward_cache: RwLock<RewardCache>,
|
||||
_phantom: PhantomData<T>,
|
||||
}
|
||||
|
||||
@@ -55,6 +59,11 @@ pub enum OpPoolError {
|
||||
GetAttestationsTotalBalanceError(BeaconStateError),
|
||||
GetBlockRootError(BeaconStateError),
|
||||
SyncAggregateError(SyncAggregateError),
|
||||
RewardCacheUpdatePrevEpoch(BeaconStateError),
|
||||
RewardCacheUpdateCurrEpoch(BeaconStateError),
|
||||
RewardCacheGetBlockRoot(BeaconStateError),
|
||||
RewardCacheWrongEpoch,
|
||||
RewardCacheValidatorUnknown(BeaconStateError),
|
||||
IncorrectOpPoolVariant,
|
||||
}
|
||||
|
||||
@@ -240,6 +249,7 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
epoch: Epoch,
|
||||
all_attestations: &'a HashMap<AttestationId, Vec<Attestation<T>>>,
|
||||
state: &'a BeaconState<T>,
|
||||
reward_cache: &'a RewardCache,
|
||||
total_active_balance: u64,
|
||||
validity_filter: impl FnMut(&&Attestation<T>) -> bool + Send,
|
||||
spec: &'a ChainSpec,
|
||||
@@ -266,7 +276,9 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
.is_ok()
|
||||
})
|
||||
.filter(validity_filter)
|
||||
.filter_map(move |att| AttMaxCover::new(att, state, total_active_balance, spec))
|
||||
.filter_map(move |att| {
|
||||
AttMaxCover::new(att, state, reward_cache, total_active_balance, spec)
|
||||
})
|
||||
}
|
||||
|
||||
/// Get a list of attestations for inclusion in a block.
|
||||
@@ -290,6 +302,11 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
.get_total_active_balance()
|
||||
.map_err(OpPoolError::GetAttestationsTotalBalanceError)?;
|
||||
|
||||
// Update the reward cache.
|
||||
let mut reward_cache = self.reward_cache.write();
|
||||
reward_cache.update(state)?;
|
||||
let reward_cache = RwLockWriteGuard::downgrade(reward_cache);
|
||||
|
||||
// Split attestations for the previous & current epochs, so that we
|
||||
// can optimise them individually in parallel.
|
||||
let mut num_prev_valid = 0_i64;
|
||||
@@ -300,6 +317,7 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
prev_epoch,
|
||||
&*all_attestations,
|
||||
state,
|
||||
&reward_cache,
|
||||
total_active_balance,
|
||||
prev_epoch_validity_filter,
|
||||
spec,
|
||||
@@ -310,6 +328,7 @@ impl<T: EthSpec> OperationPool<T> {
|
||||
current_epoch,
|
||||
&*all_attestations,
|
||||
state,
|
||||
&reward_cache,
|
||||
total_active_balance,
|
||||
curr_epoch_validity_filter,
|
||||
spec,
|
||||
|
||||
@@ -120,6 +120,7 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
attester_slashings,
|
||||
proposer_slashings,
|
||||
voluntary_exits,
|
||||
reward_cache: Default::default(),
|
||||
_phantom: Default::default(),
|
||||
},
|
||||
PersistedOperationPool::Altair(_) => {
|
||||
@@ -132,6 +133,7 @@ impl<T: EthSpec> PersistedOperationPool<T> {
|
||||
attester_slashings,
|
||||
proposer_slashings,
|
||||
voluntary_exits,
|
||||
reward_cache: Default::default(),
|
||||
_phantom: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
146
beacon_node/operation_pool/src/reward_cache.rs
Normal file
146
beacon_node/operation_pool/src/reward_cache.rs
Normal file
@@ -0,0 +1,146 @@
|
||||
use crate::OpPoolError;
|
||||
use std::collections::HashMap;
|
||||
use types::{BeaconState, BeaconStateError, Epoch, EthSpec, Hash256, ParticipationFlags};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Initialization {
|
||||
current_epoch: Epoch,
|
||||
prev_epoch_last_block_root: Hash256,
|
||||
latest_block_root: Hash256,
|
||||
}
|
||||
|
||||
/// Cache to store validator effective balances and base rewards for block proposal.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct RewardCache {
|
||||
initialization: Option<Initialization>,
|
||||
/// Map from validator index to `effective_balance`.
|
||||
effective_balances: HashMap<usize, u64>,
|
||||
/// Map from validator index to participation flags for the previous epoch.
|
||||
///
|
||||
/// Validators with non-zero participation for the previous epoch are omitted from this map
|
||||
/// in order to keep its memory-usage as small as possible.
|
||||
///
|
||||
// FIXME(sproul): choose between handling slashable attestations (keep all non-complete) and
|
||||
// memory efficiency (keep all zero).
|
||||
// FIXME(sproul): choose whether to filter inactive validators
|
||||
previous_epoch_participation: HashMap<usize, ParticipationFlags>,
|
||||
/// Map from validator index to participation flags for the current epoch.
|
||||
///
|
||||
/// Validators with complete participation for the current epoch are omitted from this map
|
||||
/// in order to keep its memory-usage as small as possible.
|
||||
current_epoch_participation: HashMap<usize, ParticipationFlags>,
|
||||
}
|
||||
|
||||
impl RewardCache {
|
||||
pub fn get_effective_balance(&self, validator_index: usize) -> Option<u64> {
|
||||
self.effective_balances.get(&validator_index).copied()
|
||||
}
|
||||
|
||||
pub fn get_epoch_participation(
|
||||
&self,
|
||||
validator_index: usize,
|
||||
epoch: Epoch,
|
||||
) -> Result<Option<ParticipationFlags>, OpPoolError> {
|
||||
if let Some(init) = &self.initialization {
|
||||
if init.current_epoch == epoch {
|
||||
Ok(self
|
||||
.current_epoch_participation
|
||||
.get(&validator_index)
|
||||
.copied())
|
||||
} else if init.current_epoch == epoch + 1 {
|
||||
Ok(self
|
||||
.previous_epoch_participation
|
||||
.get(&validator_index)
|
||||
.copied())
|
||||
} else {
|
||||
Err(OpPoolError::RewardCacheWrongEpoch)
|
||||
}
|
||||
} else {
|
||||
Err(OpPoolError::RewardCacheWrongEpoch)
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the cache.
|
||||
pub fn update<E: EthSpec>(&mut self, state: &BeaconState<E>) -> Result<(), OpPoolError> {
|
||||
let current_epoch = state.current_epoch();
|
||||
let prev_epoch_last_block_root = *state
|
||||
.get_block_root(state.previous_epoch().start_slot(E::slots_per_epoch()))
|
||||
.map_err(OpPoolError::RewardCacheGetBlockRoot)?;
|
||||
let latest_block_root = *state
|
||||
.get_block_root(state.slot() - 1)
|
||||
.map_err(OpPoolError::RewardCacheGetBlockRoot)?;
|
||||
|
||||
// If the `state` is from a new epoch or a different fork with a different last epoch block,
|
||||
// then update the effective balance cache (the effective balances are liable to have
|
||||
// changed at the epoch boundary).
|
||||
//
|
||||
// Similarly, update the previous epoch participation cache as previous epoch participation
|
||||
// is now fixed.
|
||||
if self.initialization.as_ref().map_or(true, |init| {
|
||||
init.current_epoch != current_epoch
|
||||
|| init.prev_epoch_last_block_root != prev_epoch_last_block_root
|
||||
}) {
|
||||
self.update_effective_balances(state);
|
||||
self.update_previous_epoch_participation(state)
|
||||
.map_err(OpPoolError::RewardCacheUpdatePrevEpoch)?;
|
||||
}
|
||||
|
||||
// The current epoch participation flags change every block, and will almost always need
|
||||
// updating when this function is called at a new slot.
|
||||
if self
|
||||
.initialization
|
||||
.as_ref()
|
||||
.map_or(true, |init| init.latest_block_root != latest_block_root)
|
||||
{
|
||||
self.update_current_epoch_participation(state)
|
||||
.map_err(OpPoolError::RewardCacheUpdateCurrEpoch)?;
|
||||
}
|
||||
|
||||
self.initialization = Some(Initialization {
|
||||
current_epoch,
|
||||
prev_epoch_last_block_root,
|
||||
latest_block_root,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_effective_balances<E: EthSpec>(&mut self, state: &BeaconState<E>) {
|
||||
self.effective_balances = state
|
||||
.validators()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, val)| (i, val.effective_balance))
|
||||
.collect();
|
||||
}
|
||||
|
||||
fn update_previous_epoch_participation<E: EthSpec>(
|
||||
&mut self,
|
||||
state: &BeaconState<E>,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let default_participation = ParticipationFlags::default();
|
||||
self.previous_epoch_participation = state
|
||||
.previous_epoch_participation()?
|
||||
.iter()
|
||||
.copied()
|
||||
.enumerate()
|
||||
.filter(|(_, participation)| *participation == default_participation)
|
||||
.collect();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn update_current_epoch_participation<E: EthSpec>(
|
||||
&mut self,
|
||||
state: &BeaconState<E>,
|
||||
) -> Result<(), BeaconStateError> {
|
||||
let default_participation = ParticipationFlags::default();
|
||||
self.current_epoch_participation = state
|
||||
.current_epoch_participation()?
|
||||
.iter()
|
||||
.copied()
|
||||
.enumerate()
|
||||
.filter(|(_, participation)| *participation == default_participation)
|
||||
.collect();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user