mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-18 22:49:34 +00:00
Merge remote-tracking branch 'origin/unstable' into tree-states
This commit is contained in:
@@ -3,7 +3,8 @@ use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttest
|
||||
use eth2::lighthouse::StandardAttestationRewards;
|
||||
use participation_cache::ParticipationCache;
|
||||
use safe_arith::SafeArith;
|
||||
use slog::{debug, Logger};
|
||||
use serde_utils::quoted_u64::Quoted;
|
||||
use slog::debug;
|
||||
use state_processing::{
|
||||
common::altair::BaseRewardPerIncrement,
|
||||
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
|
||||
@@ -15,32 +16,111 @@ use store::consts::altair::{
|
||||
};
|
||||
use types::consts::altair::WEIGHT_DENOMINATOR;
|
||||
|
||||
use types::{Epoch, EthSpec};
|
||||
use types::{BeaconState, Epoch, EthSpec};
|
||||
|
||||
use eth2::types::ValidatorId;
|
||||
use state_processing::common::base::get_base_reward_from_effective_balance;
|
||||
use state_processing::per_epoch_processing::base::rewards_and_penalties::{
|
||||
get_attestation_component_delta, get_attestation_deltas_all, get_attestation_deltas_subset,
|
||||
get_inactivity_penalty_delta, get_inclusion_delay_delta,
|
||||
};
|
||||
use state_processing::per_epoch_processing::base::validator_statuses::InclusionInfo;
|
||||
use state_processing::per_epoch_processing::base::{
|
||||
TotalBalances, ValidatorStatus, ValidatorStatuses,
|
||||
};
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
pub fn compute_attestation_rewards(
|
||||
&self,
|
||||
epoch: Epoch,
|
||||
validators: Vec<ValidatorId>,
|
||||
log: Logger,
|
||||
) -> Result<StandardAttestationRewards, BeaconChainError> {
|
||||
debug!(log, "computing attestation rewards"; "epoch" => epoch, "validator_count" => validators.len());
|
||||
debug!(self.log, "computing attestation rewards"; "epoch" => epoch, "validator_count" => validators.len());
|
||||
|
||||
// Get state
|
||||
let spec = &self.spec;
|
||||
|
||||
let state_slot = (epoch + 1).end_slot(T::EthSpec::slots_per_epoch());
|
||||
|
||||
let state_root = self
|
||||
.state_root_at_slot(state_slot)?
|
||||
.ok_or(BeaconChainError::NoStateForSlot(state_slot))?;
|
||||
|
||||
let mut state = self
|
||||
let state = self
|
||||
.get_state(&state_root, Some(state_slot))?
|
||||
.ok_or(BeaconChainError::MissingBeaconState(state_root))?;
|
||||
|
||||
match state {
|
||||
BeaconState::Base(_) => self.compute_attestation_rewards_base(state, validators),
|
||||
BeaconState::Altair(_) | BeaconState::Merge(_) | BeaconState::Capella(_) => {
|
||||
self.compute_attestation_rewards_altair(state, validators)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_attestation_rewards_base(
|
||||
&self,
|
||||
mut state: BeaconState<T::EthSpec>,
|
||||
validators: Vec<ValidatorId>,
|
||||
) -> Result<StandardAttestationRewards, BeaconChainError> {
|
||||
let spec = &self.spec;
|
||||
let mut validator_statuses = ValidatorStatuses::new(&state, spec)?;
|
||||
validator_statuses.process_attestations(&state)?;
|
||||
|
||||
let ideal_rewards =
|
||||
self.compute_ideal_rewards_base(&state, &validator_statuses.total_balances)?;
|
||||
|
||||
let indices_to_attestation_delta = if validators.is_empty() {
|
||||
get_attestation_deltas_all(&state, &validator_statuses, spec)?
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.collect()
|
||||
} else {
|
||||
let validator_indices = Self::validators_ids_to_indices(&mut state, validators)?;
|
||||
get_attestation_deltas_subset(&state, &validator_statuses, &validator_indices, spec)?
|
||||
};
|
||||
|
||||
let mut total_rewards = vec![];
|
||||
|
||||
for (index, delta) in indices_to_attestation_delta.into_iter() {
|
||||
let head_delta = delta.head_delta;
|
||||
let head = (head_delta.rewards as i64).safe_sub(head_delta.penalties as i64)?;
|
||||
|
||||
let target_delta = delta.target_delta;
|
||||
let target = (target_delta.rewards as i64).safe_sub(target_delta.penalties as i64)?;
|
||||
|
||||
let source_delta = delta.source_delta;
|
||||
let source = (source_delta.rewards as i64).safe_sub(source_delta.penalties as i64)?;
|
||||
|
||||
// No penalties associated with inclusion delay
|
||||
let inclusion_delay = delta.inclusion_delay_delta.rewards;
|
||||
let inactivity = delta.inactivity_penalty_delta.penalties.wrapping_neg() as i64;
|
||||
|
||||
let rewards = TotalAttestationRewards {
|
||||
validator_index: index as u64,
|
||||
head,
|
||||
target,
|
||||
source,
|
||||
inclusion_delay: Some(Quoted {
|
||||
value: inclusion_delay,
|
||||
}),
|
||||
inactivity,
|
||||
};
|
||||
|
||||
total_rewards.push(rewards);
|
||||
}
|
||||
|
||||
Ok(StandardAttestationRewards {
|
||||
ideal_rewards,
|
||||
total_rewards,
|
||||
})
|
||||
}
|
||||
|
||||
fn compute_attestation_rewards_altair(
|
||||
&self,
|
||||
mut state: BeaconState<T::EthSpec>,
|
||||
validators: Vec<ValidatorId>,
|
||||
) -> Result<StandardAttestationRewards, BeaconChainError> {
|
||||
let spec = &self.spec;
|
||||
|
||||
// Calculate ideal_rewards
|
||||
let participation_cache = ParticipationCache::new(&state, spec)
|
||||
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
|
||||
@@ -68,7 +148,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let base_reward_per_increment =
|
||||
BaseRewardPerIncrement::new(total_active_balance, spec)?;
|
||||
|
||||
for effective_balance_eth in 0..=32 {
|
||||
for effective_balance_eth in 1..=self.max_effective_balance_increment_steps()? {
|
||||
let effective_balance =
|
||||
effective_balance_eth.safe_mul(spec.effective_balance_increment)?;
|
||||
let base_reward =
|
||||
@@ -83,7 +163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let ideal_reward = reward_numerator
|
||||
.safe_div(active_increments)?
|
||||
.safe_div(WEIGHT_DENOMINATOR)?;
|
||||
if !state.is_in_inactivity_leak(previous_epoch, spec) {
|
||||
if !state.is_in_inactivity_leak(previous_epoch, spec)? {
|
||||
ideal_rewards_hashmap
|
||||
.insert((flag_index, effective_balance), (ideal_reward, penalty));
|
||||
} else {
|
||||
@@ -98,15 +178,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
let validators = if validators.is_empty() {
|
||||
participation_cache.eligible_validator_indices().to_vec()
|
||||
} else {
|
||||
validators
|
||||
.into_iter()
|
||||
.map(|validator| match validator {
|
||||
ValidatorId::Index(i) => Ok(i as usize),
|
||||
ValidatorId::PublicKey(pubkey) => state
|
||||
.get_validator_index(&pubkey)?
|
||||
.ok_or(BeaconChainError::ValidatorPubkeyUnknown(pubkey)),
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
Self::validators_ids_to_indices(&mut state, validators)?
|
||||
};
|
||||
|
||||
for &validator_index in &validators {
|
||||
@@ -114,7 +186,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.get_validator(validator_index)
|
||||
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
|
||||
let eligible = validator.is_eligible;
|
||||
let mut head_reward = 0u64;
|
||||
let mut head_reward = 0i64;
|
||||
let mut target_reward = 0i64;
|
||||
let mut source_reward = 0i64;
|
||||
|
||||
@@ -130,7 +202,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
|
||||
if voted_correctly {
|
||||
if flag_index == TIMELY_HEAD_FLAG_INDEX {
|
||||
head_reward += ideal_reward;
|
||||
head_reward += *ideal_reward as i64;
|
||||
} else if flag_index == TIMELY_TARGET_FLAG_INDEX {
|
||||
target_reward += *ideal_reward as i64;
|
||||
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
|
||||
@@ -150,6 +222,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
head: head_reward,
|
||||
target: target_reward,
|
||||
source: source_reward,
|
||||
inclusion_delay: None,
|
||||
// TODO: altair calculation logic needs to be updated to include inactivity penalty
|
||||
inactivity: 0,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -171,6 +246,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
head: 0,
|
||||
target: 0,
|
||||
source: 0,
|
||||
inclusion_delay: None,
|
||||
// TODO: altair calculation logic needs to be updated to include inactivity penalty
|
||||
inactivity: 0,
|
||||
});
|
||||
match *flag_index {
|
||||
TIMELY_SOURCE_FLAG_INDEX => entry.source += ideal_reward,
|
||||
@@ -190,4 +268,126 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
total_rewards,
|
||||
})
|
||||
}
|
||||
|
||||
fn max_effective_balance_increment_steps(&self) -> Result<u64, BeaconChainError> {
|
||||
let spec = &self.spec;
|
||||
let max_steps = spec
|
||||
.max_effective_balance
|
||||
.safe_div(spec.effective_balance_increment)?;
|
||||
Ok(max_steps)
|
||||
}
|
||||
|
||||
fn validators_ids_to_indices(
|
||||
state: &mut BeaconState<T::EthSpec>,
|
||||
validators: Vec<ValidatorId>,
|
||||
) -> Result<Vec<usize>, BeaconChainError> {
|
||||
let indices = validators
|
||||
.into_iter()
|
||||
.map(|validator| match validator {
|
||||
ValidatorId::Index(i) => Ok(i as usize),
|
||||
ValidatorId::PublicKey(pubkey) => state
|
||||
.get_validator_index(&pubkey)?
|
||||
.ok_or(BeaconChainError::ValidatorPubkeyUnknown(pubkey)),
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
Ok(indices)
|
||||
}
|
||||
|
||||
fn compute_ideal_rewards_base(
|
||||
&self,
|
||||
state: &BeaconState<T::EthSpec>,
|
||||
total_balances: &TotalBalances,
|
||||
) -> Result<Vec<IdealAttestationRewards>, BeaconChainError> {
|
||||
let spec = &self.spec;
|
||||
let previous_epoch = state.previous_epoch();
|
||||
let finality_delay = previous_epoch
|
||||
.safe_sub(state.finalized_checkpoint().epoch)?
|
||||
.as_u64();
|
||||
|
||||
let ideal_validator_status = ValidatorStatus {
|
||||
is_previous_epoch_attester: true,
|
||||
is_slashed: false,
|
||||
inclusion_info: Some(InclusionInfo {
|
||||
delay: 1,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut ideal_attestation_rewards_list = Vec::new();
|
||||
|
||||
for effective_balance_step in 1..=self.max_effective_balance_increment_steps()? {
|
||||
let effective_balance =
|
||||
effective_balance_step.safe_mul(spec.effective_balance_increment)?;
|
||||
let base_reward = get_base_reward_from_effective_balance::<T::EthSpec>(
|
||||
effective_balance,
|
||||
total_balances.current_epoch(),
|
||||
spec,
|
||||
)?;
|
||||
|
||||
// compute ideal head rewards
|
||||
let head = get_attestation_component_delta(
|
||||
true,
|
||||
total_balances.previous_epoch_attesters(),
|
||||
total_balances,
|
||||
base_reward,
|
||||
finality_delay,
|
||||
spec,
|
||||
)?
|
||||
.rewards;
|
||||
|
||||
// compute ideal target rewards
|
||||
let target = get_attestation_component_delta(
|
||||
true,
|
||||
total_balances.previous_epoch_target_attesters(),
|
||||
total_balances,
|
||||
base_reward,
|
||||
finality_delay,
|
||||
spec,
|
||||
)?
|
||||
.rewards;
|
||||
|
||||
// compute ideal source rewards
|
||||
let source = get_attestation_component_delta(
|
||||
true,
|
||||
total_balances.previous_epoch_head_attesters(),
|
||||
total_balances,
|
||||
base_reward,
|
||||
finality_delay,
|
||||
spec,
|
||||
)?
|
||||
.rewards;
|
||||
|
||||
// compute ideal inclusion delay rewards
|
||||
let inclusion_delay =
|
||||
get_inclusion_delay_delta(&ideal_validator_status, base_reward, spec)?
|
||||
.0
|
||||
.rewards;
|
||||
|
||||
// compute inactivity penalty
|
||||
let inactivity = get_inactivity_penalty_delta(
|
||||
&ideal_validator_status,
|
||||
base_reward,
|
||||
finality_delay,
|
||||
spec,
|
||||
)?
|
||||
.penalties
|
||||
.wrapping_neg() as i64;
|
||||
|
||||
let ideal_attestation_rewards = IdealAttestationRewards {
|
||||
effective_balance,
|
||||
head,
|
||||
target,
|
||||
source,
|
||||
inclusion_delay: Some(Quoted {
|
||||
value: inclusion_delay,
|
||||
}),
|
||||
inactivity,
|
||||
};
|
||||
|
||||
ideal_attestation_rewards_list.push(ideal_attestation_rewards);
|
||||
}
|
||||
|
||||
Ok(ideal_attestation_rewards_list)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,6 +193,17 @@ pub struct PrePayloadAttributes {
|
||||
pub parent_block_number: u64,
|
||||
}
|
||||
|
||||
/// Information about a state/block at a specific slot.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct FinalizationAndCanonicity {
|
||||
/// True if the slot of the state or block is finalized.
|
||||
///
|
||||
/// This alone DOES NOT imply that the state/block is finalized, use `self.is_finalized()`.
|
||||
pub slot_is_finalized: bool,
|
||||
/// True if the state or block is canonical at its slot.
|
||||
pub canonical: bool,
|
||||
}
|
||||
|
||||
/// Define whether a forkchoiceUpdate needs to be checked for an override (`Yes`) or has already
|
||||
/// been checked (`AlreadyApplied`). It is safe to specify `Yes` even if re-orgs are disabled.
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -420,6 +431,12 @@ pub struct BeaconChain<T: BeaconChainTypes> {
|
||||
|
||||
type BeaconBlockAndState<T, Payload> = (BeaconBlock<T, Payload>, BeaconState<T>);
|
||||
|
||||
impl FinalizationAndCanonicity {
|
||||
pub fn is_finalized(self) -> bool {
|
||||
self.slot_is_finalized && self.canonical
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
/// Checks if a block is finalized.
|
||||
/// The finalization check is done with the block slot. The block root is used to verify that
|
||||
@@ -449,16 +466,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
state_root: &Hash256,
|
||||
state_slot: Slot,
|
||||
) -> Result<bool, Error> {
|
||||
self.state_finalization_and_canonicity(state_root, state_slot)
|
||||
.map(FinalizationAndCanonicity::is_finalized)
|
||||
}
|
||||
|
||||
/// Fetch the finalization and canonicity status of the state with `state_root`.
|
||||
pub fn state_finalization_and_canonicity(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
state_slot: Slot,
|
||||
) -> Result<FinalizationAndCanonicity, Error> {
|
||||
let finalized_slot = self
|
||||
.canonical_head
|
||||
.cached_head()
|
||||
.finalized_checkpoint()
|
||||
.epoch
|
||||
.start_slot(T::EthSpec::slots_per_epoch());
|
||||
let is_canonical = self
|
||||
let slot_is_finalized = state_slot <= finalized_slot;
|
||||
let canonical = self
|
||||
.state_root_at_slot(state_slot)?
|
||||
.map_or(false, |canonical_root| state_root == &canonical_root);
|
||||
Ok(state_slot <= finalized_slot && is_canonical)
|
||||
Ok(FinalizationAndCanonicity {
|
||||
slot_is_finalized,
|
||||
canonical,
|
||||
})
|
||||
}
|
||||
|
||||
/// Persists the head tracker and fork choice.
|
||||
|
||||
@@ -83,6 +83,8 @@ pub struct ChainConfig {
|
||||
pub enable_backfill_rate_limiting: bool,
|
||||
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
|
||||
pub progressive_balances_mode: ProgressiveBalancesMode,
|
||||
/// Number of epochs between each migration of data from the hot database to the freezer.
|
||||
pub epochs_per_migration: u64,
|
||||
}
|
||||
|
||||
impl Default for ChainConfig {
|
||||
@@ -114,6 +116,7 @@ impl Default for ChainConfig {
|
||||
always_prepare_payload: false,
|
||||
enable_backfill_rate_limiting: true,
|
||||
progressive_balances_mode: ProgressiveBalancesMode::Checked,
|
||||
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ use state_processing::{
|
||||
},
|
||||
signature_sets::Error as SignatureSetError,
|
||||
state_advance::Error as StateAdvanceError,
|
||||
BlockProcessingError, BlockReplayError, SlotProcessingError,
|
||||
BlockProcessingError, BlockReplayError, EpochProcessingError, SlotProcessingError,
|
||||
};
|
||||
use std::time::Duration;
|
||||
use task_executor::ShutdownReason;
|
||||
@@ -61,6 +61,7 @@ pub enum BeaconChainError {
|
||||
MissingBeaconBlock(Hash256),
|
||||
MissingBeaconState(Hash256),
|
||||
SlotProcessingError(SlotProcessingError),
|
||||
EpochProcessingError(EpochProcessingError),
|
||||
StateAdvanceError(StateAdvanceError),
|
||||
UnableToAdvanceState(String),
|
||||
NoStateForAttestation {
|
||||
@@ -215,6 +216,7 @@ pub enum BeaconChainError {
|
||||
}
|
||||
|
||||
easy_from_to!(SlotProcessingError, BeaconChainError);
|
||||
easy_from_to!(EpochProcessingError, BeaconChainError);
|
||||
easy_from_to!(AttestationValidationError, BeaconChainError);
|
||||
easy_from_to!(SyncCommitteeMessageValidationError, BeaconChainError);
|
||||
easy_from_to!(ExitValidationError, BeaconChainError);
|
||||
|
||||
@@ -71,6 +71,7 @@ pub use execution_layer::EngineState;
|
||||
pub use execution_payload::NotifyExecutionLayer;
|
||||
pub use fork_choice::{ExecutionStatus, ForkchoiceUpdateParameters};
|
||||
pub use metrics::scrape_for_metrics;
|
||||
pub use migrate::MigratorConfig;
|
||||
pub use parking_lot;
|
||||
pub use slot_clock;
|
||||
pub use state_processing::per_block_processing::errors::{
|
||||
|
||||
@@ -3,7 +3,6 @@ use crate::errors::BeaconChainError;
|
||||
use crate::head_tracker::{HeadTracker, SszHeadTracker};
|
||||
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use slog::{debug, error, info, trace, warn, Logger};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::mem;
|
||||
@@ -28,13 +27,13 @@ const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
|
||||
const BLOCKS_PER_RECONSTRUCTION: usize = 8192 * 4;
|
||||
|
||||
/// Default number of epochs to wait between finalization migrations.
|
||||
pub const DEFAULT_EPOCHS_PER_RUN: u64 = 1;
|
||||
pub const DEFAULT_EPOCHS_PER_MIGRATION: u64 = 1;
|
||||
|
||||
/// The background migrator runs a thread to perform pruning and migrate state from the hot
|
||||
/// to the cold database.
|
||||
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
db: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
/// Record of when the last migration ran, for enforcing `epochs_per_run`.
|
||||
/// Record of when the last migration ran, for enforcing `epochs_per_migration`.
|
||||
prev_migration: Arc<Mutex<PrevMigration>>,
|
||||
tx_thread: Option<
|
||||
Mutex<(
|
||||
@@ -47,41 +46,45 @@ pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
log: Logger,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MigratorConfig {
|
||||
pub blocking: bool,
|
||||
/// Run migrations at most once per `epochs_per_run`.
|
||||
/// Run migrations at most once per `epochs_per_migration`.
|
||||
///
|
||||
/// If set to 0, then run every finalization.
|
||||
pub epochs_per_run: u64,
|
||||
/// If set to 0 or 1, then run every finalization.
|
||||
pub epochs_per_migration: u64,
|
||||
}
|
||||
|
||||
impl Default for MigratorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
blocking: false,
|
||||
epochs_per_run: DEFAULT_EPOCHS_PER_RUN,
|
||||
epochs_per_migration: DEFAULT_EPOCHS_PER_MIGRATION,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PrevMigration {
|
||||
epoch: Option<Epoch>,
|
||||
epochs_per_run: u64,
|
||||
}
|
||||
|
||||
impl MigratorConfig {
|
||||
pub fn blocking(mut self) -> Self {
|
||||
self.blocking = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn epochs_per_run(mut self, epochs_per_run: u64) -> Self {
|
||||
self.epochs_per_run = epochs_per_run;
|
||||
pub fn epochs_per_migration(mut self, epochs_per_migration: u64) -> Self {
|
||||
self.epochs_per_migration = epochs_per_migration;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Record of when the last migration ran.
|
||||
#[derive(Debug)]
|
||||
pub struct PrevMigration {
|
||||
/// The epoch at which the last finalization migration ran.
|
||||
epoch: Epoch,
|
||||
/// The number of epochs to wait between runs.
|
||||
epochs_per_migration: u64,
|
||||
}
|
||||
|
||||
/// Pruning can be successful, or in rare cases deferred to a later point.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum PruningOutcome {
|
||||
@@ -128,6 +131,7 @@ pub struct FinalizationNotification {
|
||||
finalized_state_root: BeaconStateHash,
|
||||
finalized_checkpoint: Checkpoint,
|
||||
head_tracker: Arc<HeadTracker>,
|
||||
prev_migration: Arc<Mutex<PrevMigration>>,
|
||||
genesis_block_root: Hash256,
|
||||
}
|
||||
|
||||
@@ -151,9 +155,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
genesis_block_root: Hash256,
|
||||
log: Logger,
|
||||
) -> Self {
|
||||
// Estimate last migration run from DB split slot.
|
||||
let prev_migration = Arc::new(Mutex::new(PrevMigration {
|
||||
epoch: None,
|
||||
epochs_per_run: config.epochs_per_run,
|
||||
epoch: db.get_split_slot().epoch(E::slots_per_epoch()),
|
||||
epochs_per_migration: config.epochs_per_migration,
|
||||
}));
|
||||
let tx_thread = if config.blocking {
|
||||
None
|
||||
@@ -188,6 +193,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
finalized_state_root,
|
||||
finalized_checkpoint,
|
||||
head_tracker,
|
||||
prev_migration: self.prev_migration.clone(),
|
||||
genesis_block_root: self.genesis_block_root,
|
||||
};
|
||||
|
||||
@@ -265,6 +271,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
notif: FinalizationNotification,
|
||||
log: &Logger,
|
||||
) {
|
||||
// Do not run too frequently.
|
||||
let epoch = notif.finalized_checkpoint.epoch;
|
||||
let mut prev_migration = notif.prev_migration.lock();
|
||||
if epoch < prev_migration.epoch + prev_migration.epochs_per_migration {
|
||||
debug!(
|
||||
log,
|
||||
"Database consolidation deferred";
|
||||
"last_finalized_epoch" => prev_migration.epoch,
|
||||
"new_finalized_epoch" => epoch,
|
||||
"epochs_per_migration" => prev_migration.epochs_per_migration,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the previous migration epoch immediately to avoid holding the lock. If the
|
||||
// migration doesn't succeed then the next migration will be retried at the next scheduled
|
||||
// run.
|
||||
prev_migration.epoch = epoch;
|
||||
drop(prev_migration);
|
||||
|
||||
debug!(log, "Database consolidation started");
|
||||
|
||||
let finalized_state_root = notif.finalized_state_root;
|
||||
@@ -438,23 +464,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
||||
|
||||
// Do not run too frequently.
|
||||
let epoch = notif.finalized_checkpoint.epoch;
|
||||
if let Some(prev_epoch) = prev_migration.epoch {
|
||||
if epoch < prev_epoch + prev_migration.epochs_per_run {
|
||||
debug!(
|
||||
log,
|
||||
"Finalization migration deferred";
|
||||
"last_finalized_epoch" => prev_epoch,
|
||||
"new_finalized_epoch" => epoch,
|
||||
"epochs_per_run" => prev_migration.epochs_per_run,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if epoch < prev_migration.epoch + prev_migration.epochs_per_migration {
|
||||
debug!(
|
||||
log,
|
||||
"Finalization migration deferred";
|
||||
"last_finalized_epoch" => prev_migration.epoch,
|
||||
"new_finalized_epoch" => epoch,
|
||||
"epochs_per_migration" => prev_migration.epochs_per_migration,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// We intend to run at this epoch, update the in-memory record of the last epoch
|
||||
// at which we ran. This value isn't tracked on disk so we will always migrate
|
||||
// on the first finalization after startup.
|
||||
prev_migration.epoch = Some(epoch);
|
||||
prev_migration.epoch = epoch;
|
||||
|
||||
Self::run_migration(db.clone(), notif.to_owned(), &log);
|
||||
|
||||
|
||||
@@ -513,18 +513,23 @@ where
|
||||
let validator_keypairs = self
|
||||
.validator_keypairs
|
||||
.expect("cannot build without validator keypairs");
|
||||
let chain_config = self.chain_config.unwrap_or_default();
|
||||
|
||||
let mut builder = BeaconChainBuilder::new(self.eth_spec_instance)
|
||||
.logger(log.clone())
|
||||
.custom_spec(spec)
|
||||
.store(self.store.expect("cannot build without store"))
|
||||
.store_migrator_config(MigratorConfig::default().blocking())
|
||||
.store_migrator_config(
|
||||
MigratorConfig::default()
|
||||
.blocking()
|
||||
.epochs_per_migration(chain_config.epochs_per_migration),
|
||||
)
|
||||
.task_executor(self.runtime.task_executor.clone())
|
||||
.execution_layer(self.execution_layer)
|
||||
.dummy_eth1_backend()
|
||||
.expect("should build dummy backend")
|
||||
.shutdown_sender(shutdown_tx)
|
||||
.chain_config(self.chain_config.unwrap_or_default())
|
||||
.chain_config(chain_config)
|
||||
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
|
||||
log.clone(),
|
||||
5,
|
||||
|
||||
Reference in New Issue
Block a user