Single-pass epoch processing and optimised block processing (#5279)

* Single-pass epoch processing (#4483, #4573)

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Delete unused epoch processing code (#5170)

* Delete unused epoch processing code

* Compare total deltas

* Remove unnecessary apply_pending

* cargo fmt

* Remove newline

* Use epoch cache in block packing (#5223)

* Remove progressive balances mode (#5224)

* inline inactivity_penalty_quotient_for_state

* drop previous_epoch_total_active_balance

* fc lint

* spec compliant process_sync_aggregate (#15)

* spec compliant process_sync_aggregate

* Update consensus/state_processing/src/per_block_processing/altair/sync_committee.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Delete the participation cache (#16)

* update help

* Fix op_pool tests

* Fix fork choice tests

* Merge remote-tracking branch 'sigp/unstable' into epoch-single-pass

* Simplify exit cache (#5280)

* Fix clippy on exit cache

* Clean up single-pass a bit (#5282)

* Address Mark's review of single-pass (#5386)

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* Address Sean's review comments (#5414)

* Address most of Sean's review comments

* Simplify total balance cache building

* Clean up unused junk

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* More self-review

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* Merge branch 'unstable' into epoch-single-pass

* Fix imports for beta compiler

* Fix tests, probably
This commit is contained in:
Michael Sproul
2024-04-05 00:14:36 +11:00
committed by GitHub
parent f4cdcea7b1
commit feb531f85b
81 changed files with 2545 additions and 1316 deletions

View File

@@ -1,15 +1,10 @@
use crate::{ForkChoiceStore, InvalidationOperation};
use per_epoch_processing::altair::participation_cache::Error as ParticipationCacheError;
use proto_array::{
Block as ProtoBlock, DisallowedReOrgOffsets, ExecutionStatus, ProposerHeadError,
ProposerHeadInfo, ProtoArrayForkChoice, ReOrgThreshold,
};
use slog::{crit, debug, error, warn, Logger};
use slog::{crit, debug, warn, Logger};
use ssz_derive::{Decode, Encode};
use state_processing::per_epoch_processing::altair::ParticipationCache;
use state_processing::per_epoch_processing::{
weigh_justification_and_finalization, JustificationAndFinalizationState,
};
use state_processing::{
per_block_processing::errors::AttesterSlashingValidationError, per_epoch_processing,
};
@@ -23,7 +18,6 @@ use types::{
EthSpec, ExecPayload, ExecutionBlockHash, Hash256, IndexedAttestation, RelativeEpoch,
SignedBeaconBlock, Slot,
};
use types::{ProgressiveBalancesCache, ProgressiveBalancesMode};
#[derive(Debug)]
pub enum Error<T> {
@@ -77,10 +71,7 @@ pub enum Error<T> {
proposer_boost_root: Hash256,
},
UnrealizedVoteProcessing(state_processing::EpochProcessingError),
ParticipationCacheBuild(BeaconStateError),
ParticipationCacheError(ParticipationCacheError),
ValidatorStatuses(BeaconStateError),
ProgressiveBalancesCacheCheckFailed(String),
}
impl<T> From<InvalidAttestation> for Error<T> {
@@ -107,12 +98,6 @@ impl<T> From<BeaconStateError> for Error<T> {
}
}
impl<T> From<ParticipationCacheError> for Error<T> {
fn from(e: ParticipationCacheError) -> Self {
Error::ParticipationCacheError(e)
}
}
#[derive(Debug, Clone, Copy)]
/// Controls how fork choice should behave when restoring from a persisted fork choice.
pub enum ResetPayloadStatuses {
@@ -658,9 +643,7 @@ where
block_delay: Duration,
state: &BeaconState<E>,
payload_verification_status: PayloadVerificationStatus,
progressive_balances_mode: ProgressiveBalancesMode,
spec: &ChainSpec,
log: &Logger,
) -> Result<(), Error<T::Error>> {
// If this block has already been processed we do not need to reprocess it.
// We check this immediately in case re-processing the block mutates some property of the
@@ -755,86 +738,44 @@ where
parent_justified.epoch == block_epoch && parent_finalized.epoch + 1 == block_epoch
});
let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) = if let Some((
parent_justified,
parent_finalized,
)) =
parent_checkpoints
{
(parent_justified, parent_finalized)
} else {
let justification_and_finalization_state = match block {
BeaconBlockRef::Electra(_)
| BeaconBlockRef::Deneb(_)
| BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => match progressive_balances_mode {
ProgressiveBalancesMode::Disabled => {
let participation_cache = ParticipationCache::new(state, spec)
.map_err(Error::ParticipationCacheBuild)?;
per_epoch_processing::altair::process_justification_and_finalization(
let (unrealized_justified_checkpoint, unrealized_finalized_checkpoint) =
if let Some((parent_justified, parent_finalized)) = parent_checkpoints {
(parent_justified, parent_finalized)
} else {
let justification_and_finalization_state = match block {
BeaconBlockRef::Electra(_)
| BeaconBlockRef::Deneb(_)
| BeaconBlockRef::Capella(_)
| BeaconBlockRef::Merge(_)
| BeaconBlockRef::Altair(_) => {
// NOTE: Processing justification & finalization requires the progressive
// balances cache, but we cannot initialize it here as we only have an
// immutable reference. The state *should* have come straight from block
// processing, which initialises the cache, but if we add other `on_block`
// calls in future it could be worth passing a mutable reference.
per_epoch_processing::altair::process_justification_and_finalization(state)?
}
BeaconBlockRef::Base(_) => {
let mut validator_statuses =
per_epoch_processing::base::ValidatorStatuses::new(state, spec)
.map_err(Error::ValidatorStatuses)?;
validator_statuses
.process_attestations(state)
.map_err(Error::ValidatorStatuses)?;
per_epoch_processing::base::process_justification_and_finalization(
state,
&participation_cache,
&validator_statuses.total_balances,
spec,
)?
}
ProgressiveBalancesMode::Fast
| ProgressiveBalancesMode::Checked
| ProgressiveBalancesMode::Strict => {
let maybe_participation_cache = progressive_balances_mode
.perform_comparative_checks()
.then(|| {
ParticipationCache::new(state, spec)
.map_err(Error::ParticipationCacheBuild)
})
.transpose()?;
};
process_justification_and_finalization_from_progressive_cache::<E, T>(
state,
maybe_participation_cache.as_ref(),
)
.or_else(|e| {
if progressive_balances_mode != ProgressiveBalancesMode::Strict {
error!(
log,
"Processing with progressive balances cache failed";
"info" => "falling back to the non-optimized processing method",
"error" => ?e,
);
let participation_cache = maybe_participation_cache
.map(Ok)
.unwrap_or_else(|| ParticipationCache::new(state, spec))
.map_err(Error::ParticipationCacheBuild)?;
per_epoch_processing::altair::process_justification_and_finalization(
state,
&participation_cache,
).map_err(Error::from)
} else {
Err(e)
}
})?
}
},
BeaconBlockRef::Base(_) => {
let mut validator_statuses =
per_epoch_processing::base::ValidatorStatuses::new(state, spec)
.map_err(Error::ValidatorStatuses)?;
validator_statuses
.process_attestations(state)
.map_err(Error::ValidatorStatuses)?;
per_epoch_processing::base::process_justification_and_finalization(
state,
&validator_statuses.total_balances,
spec,
)?
}
(
justification_and_finalization_state.current_justified_checkpoint(),
justification_and_finalization_state.finalized_checkpoint(),
)
};
(
justification_and_finalization_state.current_justified_checkpoint(),
justification_and_finalization_state.finalized_checkpoint(),
)
};
// Update best known unrealized justified & finalized checkpoints
if unrealized_justified_checkpoint.epoch
> self.fc_store.unrealized_justified_checkpoint().epoch
@@ -1559,92 +1500,6 @@ where
}
}
/// Process justification and finalization using progressive cache. Also performs a comparative
/// check against the `ParticipationCache` if it is supplied.
///
/// Returns an error if the cache is not initialized or if there is a mismatch on the comparative check.
fn process_justification_and_finalization_from_progressive_cache<E, T>(
state: &BeaconState<E>,
maybe_participation_cache: Option<&ParticipationCache>,
) -> Result<JustificationAndFinalizationState<E>, Error<T::Error>>
where
E: EthSpec,
T: ForkChoiceStore<E>,
{
let justification_and_finalization_state = JustificationAndFinalizationState::new(state);
if state.current_epoch() <= E::genesis_epoch() + 1 {
return Ok(justification_and_finalization_state);
}
// Load cached balances
let progressive_balances_cache: &ProgressiveBalancesCache = state.progressive_balances_cache();
let previous_target_balance =
progressive_balances_cache.previous_epoch_target_attesting_balance()?;
let current_target_balance =
progressive_balances_cache.current_epoch_target_attesting_balance()?;
let total_active_balance = state.get_total_active_balance()?;
if let Some(participation_cache) = maybe_participation_cache {
check_progressive_balances::<E, T>(
state,
participation_cache,
previous_target_balance,
current_target_balance,
total_active_balance,
)?;
}
weigh_justification_and_finalization(
justification_and_finalization_state,
total_active_balance,
previous_target_balance,
current_target_balance,
)
.map_err(Error::from)
}
/// Perform comparative checks against `ParticipationCache`, will return error if there's a mismatch.
fn check_progressive_balances<E, T>(
state: &BeaconState<E>,
participation_cache: &ParticipationCache,
cached_previous_target_balance: u64,
cached_current_target_balance: u64,
cached_total_active_balance: u64,
) -> Result<(), Error<T::Error>>
where
E: EthSpec,
T: ForkChoiceStore<E>,
{
let slot = state.slot();
let epoch = state.current_epoch();
// Check previous epoch target balances
let previous_target_balance = participation_cache.previous_epoch_target_attesting_balance()?;
if previous_target_balance != cached_previous_target_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Previous epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, previous_target_balance, cached_previous_target_balance)
));
}
// Check current epoch target balances
let current_target_balance = participation_cache.current_epoch_target_attesting_balance()?;
if current_target_balance != cached_current_target_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Current epoch target attesting balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, current_target_balance, cached_current_target_balance)
));
}
// Check current epoch total balances
let total_active_balance = participation_cache.current_epoch_total_active_balance();
if total_active_balance != cached_total_active_balance {
return Err(Error::ProgressiveBalancesCacheCheckFailed(
format!("Current epoch total active balance mismatch, slot: {}, epoch: {}, actual: {}, cached: {}", slot, epoch, total_active_balance, cached_total_active_balance)
));
}
Ok(())
}
/// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes.
///
/// This is used when persisting the state of the fork choice to disk.

View File

@@ -16,8 +16,8 @@ use std::time::Duration;
use store::MemoryStore;
use types::{
test_utils::generate_deterministic_keypair, BeaconBlockRef, BeaconState, ChainSpec, Checkpoint,
Epoch, EthSpec, ForkName, Hash256, IndexedAttestation, MainnetEthSpec, ProgressiveBalancesMode,
RelativeEpoch, SignedBeaconBlock, Slot, SubnetId,
Epoch, EthSpec, ForkName, Hash256, IndexedAttestation, MainnetEthSpec, RelativeEpoch,
SignedBeaconBlock, Slot, SubnetId,
};
pub type E = MainnetEthSpec;
@@ -47,37 +47,16 @@ impl fmt::Debug for ForkChoiceTest {
impl ForkChoiceTest {
/// Creates a new tester.
pub fn new() -> Self {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.build();
Self { harness }
Self::new_with_chain_config(ChainConfig::default())
}
/// Creates a new tester with a custom chain config.
pub fn new_with_chain_config(chain_config: ChainConfig) -> Self {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.chain_config(chain_config)
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.build();
Self { harness }
}
/// Creates a new tester with the specified `ProgressiveBalancesMode` and genesis from latest fork.
fn new_with_progressive_balances_mode(mode: ProgressiveBalancesMode) -> ForkChoiceTest {
// genesis with latest fork (at least altair required to test the cache)
// Run fork choice tests against the latest fork.
let spec = ForkName::latest().make_genesis_spec(ChainSpec::default());
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec)
.chain_config(ChainConfig {
progressive_balances_mode: mode,
..ChainConfig::default()
})
.chain_config(chain_config)
.deterministic_keypairs(VALIDATOR_COUNT)
.fresh_ephemeral_store()
.mock_execution_layer()
@@ -338,9 +317,7 @@ impl ForkChoiceTest {
Duration::from_secs(0),
&state,
PayloadVerificationStatus::Verified,
self.harness.chain.config.progressive_balances_mode,
&self.harness.chain.spec,
self.harness.logger(),
)
.unwrap();
self
@@ -383,9 +360,7 @@ impl ForkChoiceTest {
Duration::from_secs(0),
&state,
PayloadVerificationStatus::Verified,
self.harness.chain.config.progressive_balances_mode,
&self.harness.chain.spec,
self.harness.logger(),
)
.expect_err("on_block did not return an error");
comparison_func(err);
@@ -1348,7 +1323,7 @@ async fn weak_subjectivity_check_epoch_boundary_is_skip_slot_failure() {
/// where the slashed validator is a target attester in previous / current epoch.
#[tokio::test]
async fn progressive_balances_cache_attester_slashing() {
ForkChoiceTest::new_with_progressive_balances_mode(ProgressiveBalancesMode::Strict)
ForkChoiceTest::new()
// first two epochs
.apply_blocks_while(|_, state| state.finalized_checkpoint().epoch == 0)
.await
@@ -1379,7 +1354,7 @@ async fn progressive_balances_cache_attester_slashing() {
/// where the slashed validator is a target attester in previous / current epoch.
#[tokio::test]
async fn progressive_balances_cache_proposer_slashing() {
ForkChoiceTest::new_with_progressive_balances_mode(ProgressiveBalancesMode::Strict)
ForkChoiceTest::new()
// first two epochs
.apply_blocks_while(|_, state| state.finalized_checkpoint().epoch == 0)
.await