Single-pass epoch processing (#4483)

This commit is contained in:
Michael Sproul
2023-07-18 16:59:55 +10:00
committed by GitHub
parent 079cd67df2
commit 5d2063d262
42 changed files with 1558 additions and 730 deletions

View File

@@ -1,15 +1,10 @@
use crate::{ForkChoiceStore, InvalidationOperation};
use per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError;
use proto_array::{
Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, ProposerHeadError,
ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold,
};
use slog::{crit, debug, error, warn, Logger};
use slog::{crit, debug, warn, Logger};
use ssz_derive::{Decode, Encode};
use state_processing::per_epoch_processing::altair::ParticipationCache;
use state_processing::per_epoch_processing::{
weigh_justification_and_finalization, JustificationAndFinalizationState,
};
use state_processing::{
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
};
@@ -17,13 +12,13 @@ use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::marker::PhantomData;
use std::time::Duration;
use types::ProgressiveBalancesMode;
use types::{
consts::merge::INTERVALS_PER_SLOT, AbstractExecPayload, AttestationShufflingId,
AttesterSlashing, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Checkpoint, Epoch,
EthSpec, ExecPayload, ExecutionBlockHash, Hash256, IndexedAttestation, RelativeEpoch,
SignedBeaconBlock, Slot,
};
use types::{ProgressiveBalancesCache, ProgressiveBalancesMode};
#[derive(Debug)]
pub enum Error<T> {
@@ -77,7 +72,6 @@ pub enum Error<T> {
proposer_boost_root: Hash256,
},
UnrealizedVoteProcessing(state_processing::EpochProcessingError),
ParticipationCacheError(ParticipationCacheError),
ValidatorStatuses(BeaconStateError),
ProgressiveBalancesCacheCheckFailed(String),
}
@@ -106,12 +100,6 @@ impl<T> From<BeaconStateError> for Error<T> {
}
}
impl<T> From<ParticipationCacheError> for Error<T> {
fn from(e: ParticipationCacheError) -> Self {
Error::ParticipationCacheError(e)
}
}
#[derive(Debug, Clone, Copy)]
/// Controls how fork choice should behave when restoring from a persisted fork choice.
pub enum ResetPayloadStatuses {
@@ -654,6 +642,7 @@ where
/// The supplied block **must** pass the `state_transition` function as it will not be run
/// here.
#[allow(clippy::too_many_arguments)]
// FIXME(sproul): remove progressive balances mode
pub fn on_block<Payload: AbstractExecPayload<E>>(
&mut self,
system_time_current_slot: Slot,
@@ -662,9 +651,9 @@ where
block_delay: Duration,
state: &BeaconState<E>,
payload_verification_status: PayloadVerificationStatus,
progressive_balances_mode: ProgressiveBalancesMode,
_progressive_balances_mode: ProgressiveBalancesMode,
spec: &ChainSpec,
log: &Logger,
_log: &Logger,
) -> Result<(), Error<T::Error>> {
// If this block has already been processed we do not need to reprocess it.
// We check this immediately in case re-processing the block mutates some property of the
@@ -758,79 +747,38 @@ where
parent_justified.epoch == block_epoch && parent_finalized.epoch + 1 >= block_epoch
});
let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = if let Some((
parent_justified,
parent_finalized,
)) =
parent_checkpoints
{
(parent_justified, parent_finalized)
} else {
let justification_and_finalization_state = match block {
BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => match progressive_balances_mode {
ProgressiveBalancesMode::Disabled => {
let participation_cache = ParticipationCache::new(state, spec)?;
per_epoch_processing::altair::process_justification_and_finalization(
let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) =
if let Some((parent_justified, parent_finalized)) = parent_checkpoints {
(parent_justified, parent_finalized)
} else {
let justification_and_finalization_state = match block {
BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => {
// FIXME(sproul): initialize progressive balances
per_epoch_processing::altair::process_justification_and_finalization(state)?
}
BeaconBlockRef::Base(_) => {
let mut validator_statuses =
per_epoch_processing::base::ValidatorStatuses::new(state, spec)
.map_err(Error::ValidatorStatuses)?;
validator_statuses
.process_attestations(state)
.map_err(Error::ValidatorStatuses)?;
per_epoch_processing::base::process_justification_and_finalization(
state,
&participation_cache,
&validator_statuses.total_balances,
spec,
)?
}
ProgressiveBalancesMode::Fast
| ProgressiveBalancesMode::Checked
| ProgressiveBalancesMode::Strict => {
let maybe_participation_cache = progressive_balances_mode
.perform_comparative_checks()
.then(|| ParticipationCache::new(state, spec))
.transpose()?;
};
process_justification_and_finalization_from_progressive_cache::<E, T>(
state,
maybe_participation_cache.as_ref(),
)
.or_else(|e| {
if progressive_balances_mode != ProgressiveBalancesMode::Strict {
error!(
log,
"Processing with progressive balances cache failed";
"info" => "falling back to the non-optimized processing method",
"error" => ?e,
);
let participation_cache = maybe_participation_cache
.map(Ok)
.unwrap_or_else(|| ParticipationCache::new(state, spec))?;
per_epoch_processing::altair::process_justification_and_finalization(
state,
&participation_cache,
).map_err(Error::from)
} else {
Err(e)
}
})?
}
},
BeaconBlockRef::Base(_) => {
let mut validator_statuses =
per_epoch_processing::base::ValidatorStatuses::new(state, spec)
.map_err(Error::ValidatorStatuses)?;
validator_statuses
.process_attestations(state)
.map_err(Error::ValidatorStatuses)?;
per_epoch_processing::base::process_justification_and_finalization(
state,
&validator_statuses.total_balances,
spec,
)?
}
(
justification_and_finalization_state.current_justified_checkpoint(),
justification_and_finalization_state.finalized_checkpoint(),
)
};
(
justification_and_finalization_state.current_justified_checkpoint(),
justification_and_finalization_state.finalized_checkpoint(),
)
};
// Update best known unrealized justified & finalized checkpoints
if unrealized_justified_checkpoint.epoch
> self.fc_store.unrealized_justified_checkpoint().epoch
@@ -1556,92 +1504,6 @@ where
}
}
/// Process justification and finalization using progressive cache. Also performs a comparative
/// check against the `ParticipationCache` if it is supplied.
///
/// Returns an error if the cache is not initialized or if there is a mismatch on the comparative check.
fn process_justification_and_finalization_from_progressive_cache<E, T>(
state: &BeaconState<E>,
maybe_participation_cache: Option<&ParticipationCache>,
) -> Result<JustificationAndFinalizationState<E>, Error<T::Error>>
where
E: EthSpec,
T: ForkChoiceStore<E>,
{
let justification_and_finalization_state = JustificationAndFinalizationState::new(state);
if state.current_epoch() <= E::genesis_epoch() + 1 {
return Ok(justification_and_finalization_state);
}
// Load cached balances
let progressive_balances_cache: &ProgressiveBalancesCache = state.progressive_balances_cache();
let previous_target_balance =
progressive_balances_cache.previous_epoch_target_attesting_balance()?;
let current_target_balance =
progressive_balances_cache.current_epoch_target_attesting_balance()?;
let total_active_balance = state.get_total_active_balance()?;
if let Some(participation_cache) = maybe_participation_cache {
check_progressive_balances::<E, T>(
state,
participation_cache,
previous_target_balance,
current_target_balance,
total_active_balance,
)?;
}
weigh_justification_and_finalization(
justification_and_finalization_state,
total_active_balance,
previous_target_balance,
current_target_balance,
)
.map_err(Error::from)
}
/// Perform comparative checks against `ParticipationCache`, will return error if there's a mismatch.
fn check_progressive_balances<E, T>(
state: &BeaconState<E>,
participation_cache: &ParticipationCache,
cached_previous_target_balance: u64,
cached_current_target_balance: u64,
cached_total_active_balance: u64,
) -> Result<(), Error<T::Error>>
where
E: EthSpec,
T: ForkChoiceStore<E>,
{
let slot = state.slot();
let epoch = state.current_epoch();
// Check previous epoch target balances
let previous_target_balance = participation_cache.previous_epoch_target_attesting_balance()?;
if previous_target_balance != cached_previous_target_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Previous epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, previous_target_balance, cached_previous_target_balance)
));
}
// Check current epoch target balances
let current_target_balance = participation_cache.current_epoch_target_attesting_balance()?;
if current_target_balance != cached_current_target_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Current epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, current_target_balance, cached_current_target_balance)
));
}
// Check current epoch total balances
let total_active_balance = participation_cache.current_epoch_total_active_balance();
if total_active_balance != cached_total_active_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Current epoch total active balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, total_active_balance, cached_total_active_balance)
));
}
Ok(())
}
/// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes.
///
/// This is used when persisting the state of the fork choice to disk.