Merge remote-tracking branch 'origin/unstable' into tree-states

This commit is contained in:
Michael Sproul
2023-12-01 12:02:21 +11:00
72 changed files with 2467 additions and 1336 deletions

View File

@@ -5,7 +5,9 @@ use participation_cache::ParticipationCache;
use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
use state_processing::per_epoch_processing::altair::process_inactivity_updates_slow;
use state_processing::per_epoch_processing::altair::{
process_inactivity_updates_slow, process_justification_and_finalization,
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
@@ -27,6 +29,7 @@ use state_processing::per_epoch_processing::base::rewards_and_penalties::{
};
use state_processing::per_epoch_processing::base::validator_statuses::InclusionInfo;
use state_processing::per_epoch_processing::base::{
process_justification_and_finalization as process_justification_and_finalization_base,
TotalBalances, ValidatorStatus, ValidatorStatuses,
};
@@ -67,6 +70,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut validator_statuses = ValidatorStatuses::new(&state, spec)?;
validator_statuses.process_attestations(&state)?;
process_justification_and_finalization_base(
&state,
&validator_statuses.total_balances,
spec,
)?
.apply_changes_to_state(&mut state);
let ideal_rewards =
self.compute_ideal_rewards_base(&state, &validator_statuses.total_balances)?;
@@ -126,6 +136,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;
let previous_epoch = state.previous_epoch();

View File

@@ -18,7 +18,7 @@ use crate::block_verification::{
use crate::block_verification_types::{
AsBlock, AvailableExecutedBlock, BlockImportData, ExecutedBlock, RpcBlock,
};
pub use crate::canonical_head::{CanonicalHead, CanonicalHeadRwLock};
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
@@ -5252,15 +5252,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This function will result in a call to `forkchoiceUpdated` on the EL if we're in the
/// tail-end of the slot (as defined by `self.config.prepare_payload_lookahead`).
///
/// Return `Ok(Some(head_block_root))` if this node prepared to propose at the next slot on
/// top of `head_block_root`.
pub async fn prepare_beacon_proposer(
self: &Arc<Self>,
current_slot: Slot,
) -> Result<(), Error> {
) -> Result<Option<Hash256>, Error> {
let prepare_slot = current_slot + 1;
// There's no need to run the proposer preparation routine before the bellatrix fork.
if self.slot_is_prior_to_bellatrix(prepare_slot) {
return Ok(());
return Ok(None);
}
let execution_layer = self
@@ -5273,7 +5276,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !self.config.always_prepare_payload
&& !execution_layer.has_any_proposer_preparation_data().await
{
return Ok(());
return Ok(None);
}
// Load the cached head and its forkchoice update parameters.
@@ -5320,7 +5323,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let Some((forkchoice_update_params, Some(pre_payload_attributes))) = maybe_prep_data else {
// Appropriate log messages have already been logged above and in
// `get_pre_payload_attributes`.
return Ok(());
return Ok(None);
};
// If the execution layer doesn't have any proposer data for this validator then we assume
@@ -5331,7 +5334,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.has_proposer_preparation_data(proposer)
.await
{
return Ok(());
return Ok(None);
}
// Fetch payload attributes from the execution layer's cache, or compute them from scratch
@@ -5426,7 +5429,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot,
"validator" => proposer,
);
return Ok(());
return Ok(None);
};
// If we are close enough to the proposal slot, send an fcU, which will have payload
@@ -5449,7 +5452,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await?;
}
Ok(())
Ok(Some(head_root))
}
pub async fn update_execution_engine_forkchoice(
@@ -6367,6 +6370,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn data_availability_boundary(&self) -> Option<Epoch> {
self.data_availability_checker.data_availability_boundary()
}
/// Gets the `LightClientBootstrap` object for a requested block root.
///
/// Returns `None` when the state or block is not found in the database.
#[allow(clippy::type_complexity)]
pub fn get_light_client_bootstrap(
&self,
block_root: &Hash256,
) -> Result<Option<(LightClientBootstrap<T::EthSpec>, ForkName)>, Error> {
let Some((state_root, slot)) = self
.get_blinded_block(block_root)?
.map(|block| (block.state_root(), block.slot()))
else {
return Ok(None);
};
let Some(mut state) = self.get_state(&state_root, Some(slot))? else {
return Ok(None);
};
let fork_name = state
.fork_name(&self.spec)
.map_err(Error::InconsistentFork)?;
match fork_name {
ForkName::Altair | ForkName::Merge => {
LightClientBootstrap::from_beacon_state(&mut state)
.map(|bootstrap| Some((bootstrap, fork_name)))
.map_err(Error::LightClientError)
}
ForkName::Base | ForkName::Capella | ForkName::Deneb => Err(Error::UnsupportedFork),
}
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

View File

@@ -219,6 +219,8 @@ pub enum BeaconChainError {
ProposerHeadForkChoiceError(fork_choice::Error<proto_array::Error>),
UnableToPublish,
AvailabilityCheckError(AvailabilityCheckError),
LightClientError(LightClientError),
UnsupportedFork,
}
easy_from_to!(SlotProcessingError, BeaconChainError);

View File

@@ -17,6 +17,8 @@ pub struct ServerSentEventHandler<T: EthSpec> {
contribution_tx: Sender<EventKind<T>>,
payload_attributes_tx: Sender<EventKind<T>>,
late_head: Sender<EventKind<T>>,
light_client_finality_update_tx: Sender<EventKind<T>>,
light_client_optimistic_update_tx: Sender<EventKind<T>>,
block_reward_tx: Sender<EventKind<T>>,
log: Logger,
}
@@ -40,6 +42,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (contribution_tx, _) = broadcast::channel(capacity);
let (payload_attributes_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);
Self {
@@ -53,6 +57,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
contribution_tx,
payload_attributes_tx,
late_head,
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
log,
}
@@ -108,6 +114,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.late_head
.send(kind)
.map(|count| log_count("late head", count)),
EventKind::LightClientFinalityUpdate(_) => self
.light_client_finality_update_tx
.send(kind)
.map(|count| log_count("light client finality update", count)),
EventKind::LightClientOptimisticUpdate(_) => self
.light_client_optimistic_update_tx
.send(kind)
.map(|count| log_count("light client optimistic update", count)),
EventKind::BlockReward(_) => self
.block_reward_tx
.send(kind)
@@ -158,6 +172,14 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.late_head.subscribe()
}
pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<T>> {
self.light_client_finality_update_tx.subscribe()
}
pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<T>> {
self.light_client_optimistic_update_tx.subscribe()
}
pub fn subscribe_block_reward(&self) -> Receiver<EventKind<T>> {
self.block_reward_tx.subscribe()
}

View File

@@ -67,7 +67,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_finality_slot = light_client_finality_update.finalized_header.slot;
let gossiped_finality_slot = light_client_finality_update.finalized_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_finality_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
@@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientFinalityUpdate<T> {
.get_blinded_block(&finalized_block_root)?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_finality_update_slot = match latest_seen_finality_update.as_ref() {
Some(update) => update.finalized_header.slot,
Some(update) => update.finalized_header.beacon.slot,
None => Slot::new(0),
};

View File

@@ -71,7 +71,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
chain: &BeaconChain<T>,
seen_timestamp: Duration,
) -> Result<Self, Error> {
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.slot;
let gossiped_optimistic_slot = light_client_optimistic_update.attested_header.beacon.slot;
let one_third_slot_duration = Duration::new(chain.spec.seconds_per_slot / 3, 0);
let signature_slot = light_client_optimistic_update.signature_slot;
let start_time = chain.slot_clock.start_of(signature_slot);
@@ -88,7 +88,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
.get_state(&attested_block.state_root(), Some(attested_block.slot()))?
.ok_or(Error::FailedConstructingUpdate)?;
let latest_seen_optimistic_update_slot = match latest_seen_optimistic_update.as_ref() {
Some(update) => update.attested_header.slot,
Some(update) => update.attested_header.beacon.slot,
None => Slot::new(0),
};
@@ -114,6 +114,7 @@ impl<T: BeaconChainTypes> VerifiedLightClientOptimisticUpdate<T> {
// otherwise queue
let canonical_root = light_client_optimistic_update
.attested_header
.beacon
.canonical_root();
if canonical_root != head_block.message().parent_root() {

View File

@@ -1,21 +1,15 @@
//! Utilities for managing database schema changes.
mod migration_schema_v12;
mod migration_schema_v13;
mod migration_schema_v14;
mod migration_schema_v15;
mod migration_schema_v16;
mod migration_schema_v17;
mod migration_schema_v18;
mod migration_schema_v20;
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
use crate::eth1_chain::SszEth1;
use crate::beacon_chain::BeaconChainTypes;
use crate::types::ChainSpec;
use slog::{warn, Logger};
use slog::Logger;
use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::{Error as StoreError, StoreItem};
use store::Error as StoreError;
/// Migrate the database from one schema version to another, applying all requisite mutations.
#[allow(clippy::only_used_in_recursion)] // spec is not used but likely to be used in future
@@ -66,92 +60,8 @@ pub fn migrate_schema<T: BeaconChainTypes>(
}
//
// Migrations from before SchemaVersion(11) are deprecated.
// Migrations from before SchemaVersion(16) are deprecated.
//
// Upgrade from v11 to v12 to store richer metadata in the attestation op pool.
(SchemaVersion(11), SchemaVersion(12)) => {
let ops = migration_schema_v12::upgrade_to_v12::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Downgrade from v12 to v11 to drop richer metadata from the attestation op pool.
(SchemaVersion(12), SchemaVersion(11)) => {
let ops = migration_schema_v12::downgrade_from_v12::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(12), SchemaVersion(13)) => {
let mut ops = vec![];
if let Some(persisted_eth1_v1) = db.get_item::<SszEth1>(&ETH1_CACHE_DB_KEY)? {
let upgraded_eth1_cache =
match migration_schema_v13::update_eth1_cache(persisted_eth1_v1) {
Ok(upgraded_eth1) => upgraded_eth1,
Err(e) => {
warn!(log, "Failed to deserialize SszEth1CacheV1"; "error" => ?e);
warn!(log, "Reinitializing eth1 cache");
migration_schema_v13::reinitialized_eth1_cache_v13(
deposit_contract_deploy_block,
)
}
};
ops.push(upgraded_eth1_cache.as_kv_store_op(ETH1_CACHE_DB_KEY)?);
}
db.store_schema_version_atomically(to, ops)?;
Ok(())
}
(SchemaVersion(13), SchemaVersion(12)) => {
let mut ops = vec![];
if let Some(persisted_eth1_v13) = db.get_item::<SszEth1>(&ETH1_CACHE_DB_KEY)? {
let downgraded_eth1_cache = match migration_schema_v13::downgrade_eth1_cache(
persisted_eth1_v13,
) {
Ok(Some(downgraded_eth1)) => downgraded_eth1,
Ok(None) => {
warn!(log, "Unable to downgrade eth1 cache from newer version: reinitializing eth1 cache");
migration_schema_v13::reinitialized_eth1_cache_v1(
deposit_contract_deploy_block,
)
}
Err(e) => {
warn!(log, "Unable to downgrade eth1 cache from newer version: failed to deserialize SszEth1CacheV13"; "error" => ?e);
warn!(log, "Reinitializing eth1 cache");
migration_schema_v13::reinitialized_eth1_cache_v1(
deposit_contract_deploy_block,
)
}
};
ops.push(downgraded_eth1_cache.as_kv_store_op(ETH1_CACHE_DB_KEY)?);
}
db.store_schema_version_atomically(to, ops)?;
Ok(())
}
(SchemaVersion(13), SchemaVersion(14)) => {
let ops = migration_schema_v14::upgrade_to_v14::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(14), SchemaVersion(13)) => {
let ops = migration_schema_v14::downgrade_from_v14::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(14), SchemaVersion(15)) => {
let ops = migration_schema_v15::upgrade_to_v15::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(15), SchemaVersion(14)) => {
let ops = migration_schema_v15::downgrade_from_v15::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(15), SchemaVersion(16)) => {
let ops = migration_schema_v16::upgrade_to_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(16), SchemaVersion(15)) => {
let ops = migration_schema_v16::downgrade_from_v16::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(16), SchemaVersion(17)) => {
let ops = migration_schema_v17::upgrade_to_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)

View File

@@ -1,222 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY};
use crate::persisted_fork_choice::PersistedForkChoiceV11;
use operation_pool::{PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV5};
use slog::{debug, info, Logger};
use state_processing::{
common::get_indexed_attestation, per_block_processing::is_valid_indexed_attestation,
VerifyOperation, VerifySignatures,
};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_to_v12<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let spec = db.get_chain_spec();
// Load a V5 op pool and transform it to V12.
let Some(PersistedOperationPoolV5 {
attestations_v5,
sync_contributions,
attester_slashings_v5,
proposer_slashings_v5,
voluntary_exits_v5,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
// Load the persisted fork choice so we can grab the state of the justified block and use
// it to verify the stored attestations, slashings and exits.
let fork_choice = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let justified_block_root = fork_choice
.fork_choice_store
.unrealized_justified_checkpoint
.root;
let justified_block = db
.get_blinded_block(&justified_block_root, None)?
.ok_or_else(|| {
Error::SchemaMigrationError(format!(
"unrealized justified block missing for migration: {justified_block_root:?}",
))
})?;
let justified_state_root = justified_block.state_root();
let mut state = db
.get_state(&justified_state_root, Some(justified_block.slot()))?
.ok_or_else(|| {
Error::SchemaMigrationError(format!(
"justified state missing for migration: {justified_state_root:?}"
))
})?;
state.build_all_committee_caches(spec).map_err(|e| {
Error::SchemaMigrationError(format!("unable to build committee caches: {e:?}"))
})?;
// Re-verify attestations while adding attesting indices.
let attestations = attestations_v5
.into_iter()
.flat_map(|(_, attestations)| attestations)
.filter_map(|attestation| {
let res = state
.get_beacon_committee(attestation.data.slot, attestation.data.index)
.map_err(Into::into)
.and_then(|committee| get_indexed_attestation(committee.committee, &attestation))
.and_then(|indexed_attestation| {
is_valid_indexed_attestation(
&state,
&indexed_attestation,
VerifySignatures::True,
spec,
)?;
Ok(indexed_attestation)
});
match res {
Ok(indexed) => Some((attestation, indexed.attesting_indices.into())),
Err(e) => {
debug!(
log,
"Dropping attestation on migration";
"err" => ?e,
"head_block" => ?attestation.data.beacon_block_root,
);
None
}
}
})
.collect::<Vec<_>>();
let attester_slashings = attester_slashings_v5
.iter()
.filter_map(|(slashing, _)| {
slashing
.clone()
.validate(&state, spec)
.map_err(|e| {
debug!(
log,
"Dropping attester slashing on migration";
"err" => ?e,
"slashing" => ?slashing,
);
})
.ok()
})
.collect::<Vec<_>>();
let proposer_slashings = proposer_slashings_v5
.iter()
.filter_map(|slashing| {
slashing
.clone()
.validate(&state, spec)
.map_err(|e| {
debug!(
log,
"Dropping proposer slashing on migration";
"err" => ?e,
"slashing" => ?slashing,
);
})
.ok()
})
.collect::<Vec<_>>();
let voluntary_exits = voluntary_exits_v5
.iter()
.filter_map(|exit| {
exit.clone()
.validate(&state, spec)
.map_err(|e| {
debug!(
log,
"Dropping voluntary exit on migration";
"err" => ?e,
"exit" => ?exit,
);
})
.ok()
})
.collect::<Vec<_>>();
debug!(
log,
"Migrated op pool";
"attestations" => attestations.len(),
"attester_slashings" => attester_slashings.len(),
"proposer_slashings" => proposer_slashings.len(),
"voluntary_exits" => voluntary_exits.len()
);
let v12 = PersistedOperationPool::V12(PersistedOperationPoolV12 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
});
Ok(vec![v12.as_kv_store_op(OP_POOL_DB_KEY)?])
}
pub fn downgrade_from_v12<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Load a V12 op pool and transform it to V5.
let Some(PersistedOperationPoolV12::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
info!(
log,
"Dropping attestations from pool";
"count" => attestations.len(),
);
let attester_slashings_v5 = attester_slashings
.into_iter()
.filter_map(|slashing| {
let fork_version = slashing.first_fork_verified_against()?;
Some((slashing.into_inner(), fork_version))
})
.collect::<Vec<_>>();
let proposer_slashings_v5 = proposer_slashings
.into_iter()
.map(|slashing| slashing.into_inner())
.collect::<Vec<_>>();
let voluntary_exits_v5 = voluntary_exits
.into_iter()
.map(|exit| exit.into_inner())
.collect::<Vec<_>>();
info!(
log,
"Migrated slashings and exits";
"attester_slashings" => attester_slashings_v5.len(),
"proposer_slashings" => proposer_slashings_v5.len(),
"voluntary_exits" => voluntary_exits_v5.len(),
);
let v5 = PersistedOperationPoolV5 {
attestations_v5: vec![],
sync_contributions,
attester_slashings_v5,
proposer_slashings_v5,
voluntary_exits_v5,
};
Ok(vec![v5.as_kv_store_op(OP_POOL_DB_KEY)?])
}

View File

@@ -1,150 +0,0 @@
use crate::eth1_chain::SszEth1;
use eth1::{BlockCache, SszDepositCacheV1, SszDepositCacheV13, SszEth1CacheV1, SszEth1CacheV13};
use ssz::{Decode, Encode};
use state_processing::common::DepositDataTree;
use store::Error;
use types::DEPOSIT_TREE_DEPTH;
pub fn update_eth1_cache(persisted_eth1_v1: SszEth1) -> Result<SszEth1, Error> {
if persisted_eth1_v1.use_dummy_backend {
// backend_bytes is empty when using dummy backend
return Ok(persisted_eth1_v1);
}
let SszEth1 {
use_dummy_backend,
backend_bytes,
} = persisted_eth1_v1;
let ssz_eth1_cache_v1 = SszEth1CacheV1::from_ssz_bytes(&backend_bytes)?;
let SszEth1CacheV1 {
block_cache,
deposit_cache: deposit_cache_v1,
last_processed_block,
} = ssz_eth1_cache_v1;
let SszDepositCacheV1 {
logs,
leaves,
deposit_contract_deploy_block,
deposit_roots,
} = deposit_cache_v1;
let deposit_cache_v13 = SszDepositCacheV13 {
logs,
leaves,
deposit_contract_deploy_block,
finalized_deposit_count: 0,
finalized_block_height: deposit_contract_deploy_block.saturating_sub(1),
deposit_tree_snapshot: None,
deposit_roots,
};
let ssz_eth1_cache_v13 = SszEth1CacheV13 {
block_cache,
deposit_cache: deposit_cache_v13,
last_processed_block,
};
let persisted_eth1_v13 = SszEth1 {
use_dummy_backend,
backend_bytes: ssz_eth1_cache_v13.as_ssz_bytes(),
};
Ok(persisted_eth1_v13)
}
pub fn downgrade_eth1_cache(persisted_eth1_v13: SszEth1) -> Result<Option<SszEth1>, Error> {
if persisted_eth1_v13.use_dummy_backend {
// backend_bytes is empty when using dummy backend
return Ok(Some(persisted_eth1_v13));
}
let SszEth1 {
use_dummy_backend,
backend_bytes,
} = persisted_eth1_v13;
let ssz_eth1_cache_v13 = SszEth1CacheV13::from_ssz_bytes(&backend_bytes)?;
let SszEth1CacheV13 {
block_cache,
deposit_cache: deposit_cache_v13,
last_processed_block,
} = ssz_eth1_cache_v13;
let SszDepositCacheV13 {
logs,
leaves,
deposit_contract_deploy_block,
finalized_deposit_count,
finalized_block_height: _,
deposit_tree_snapshot,
deposit_roots,
} = deposit_cache_v13;
if finalized_deposit_count == 0 && deposit_tree_snapshot.is_none() {
// This tree was never finalized and can be directly downgraded to v1 without re-initializing
let deposit_cache_v1 = SszDepositCacheV1 {
logs,
leaves,
deposit_contract_deploy_block,
deposit_roots,
};
let ssz_eth1_cache_v1 = SszEth1CacheV1 {
block_cache,
deposit_cache: deposit_cache_v1,
last_processed_block,
};
return Ok(Some(SszEth1 {
use_dummy_backend,
backend_bytes: ssz_eth1_cache_v1.as_ssz_bytes(),
}));
}
// deposit cache was finalized; can't downgrade
Ok(None)
}
pub fn reinitialized_eth1_cache_v13(deposit_contract_deploy_block: u64) -> SszEth1 {
let empty_tree = DepositDataTree::create(&[], 0, DEPOSIT_TREE_DEPTH);
let deposit_cache_v13 = SszDepositCacheV13 {
logs: vec![],
leaves: vec![],
deposit_contract_deploy_block,
finalized_deposit_count: 0,
finalized_block_height: deposit_contract_deploy_block.saturating_sub(1),
deposit_tree_snapshot: empty_tree.get_snapshot(),
deposit_roots: vec![empty_tree.root()],
};
let ssz_eth1_cache_v13 = SszEth1CacheV13 {
block_cache: BlockCache::default(),
deposit_cache: deposit_cache_v13,
last_processed_block: None,
};
SszEth1 {
use_dummy_backend: false,
backend_bytes: ssz_eth1_cache_v13.as_ssz_bytes(),
}
}
pub fn reinitialized_eth1_cache_v1(deposit_contract_deploy_block: u64) -> SszEth1 {
let empty_tree = DepositDataTree::create(&[], 0, DEPOSIT_TREE_DEPTH);
let deposit_cache_v1 = SszDepositCacheV1 {
logs: vec![],
leaves: vec![],
deposit_contract_deploy_block,
deposit_roots: vec![empty_tree.root()],
};
let ssz_eth1_cache_v1 = SszEth1CacheV1 {
block_cache: BlockCache::default(),
deposit_cache: deposit_cache_v1,
last_processed_block: None,
};
SszEth1 {
use_dummy_backend: false,
backend_bytes: ssz_eth1_cache_v1.as_ssz_bytes(),
}
}

View File

@@ -1,118 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use operation_pool::{
PersistedOperationPool, PersistedOperationPoolV12, PersistedOperationPoolV14,
};
use slog::{debug, error, info, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
use types::{EthSpec, Hash256, Slot};
/// The slot clock isn't usually available before the database is initialized, so we construct a
/// temporary slot clock by reading the genesis state. It should always exist if the database is
/// initialized at a prior schema version, however we still handle the lack of genesis state
/// gracefully.
fn get_slot_clock<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis block");
return Ok(None);
};
let Some(genesis_state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root());
return Ok(None);
};
Ok(Some(T::SlotClock::new(
spec.genesis_slot,
Duration::from_secs(genesis_state.genesis_time()),
Duration::from_secs(spec.seconds_per_slot),
)))
}
pub fn upgrade_to_v14<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Load a V12 op pool and transform it to V14.
let Some(PersistedOperationPoolV12::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
// initialize with empty vector
let bls_to_execution_changes = vec![];
let v14 = PersistedOperationPool::V14(PersistedOperationPoolV14 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
});
Ok(vec![v14.as_kv_store_op(OP_POOL_DB_KEY)?])
}
pub fn downgrade_from_v14<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// We cannot downgrade from V14 once the Capella fork has been reached because there will
// be HistoricalSummaries stored in the database instead of HistoricalRoots and prior versions
// of Lighthouse can't handle that.
if let Some(capella_fork_epoch) = db.get_chain_spec().capella_fork_epoch {
let current_epoch = get_slot_clock::<T>(&db, &log)?
.and_then(|clock| clock.now())
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(Error::SlotClockUnavailableForMigration)?;
if current_epoch >= capella_fork_epoch {
error!(
log,
"Capella already active: v14+ is mandatory";
"current_epoch" => current_epoch,
"capella_fork_epoch" => capella_fork_epoch,
);
return Err(Error::UnableToDowngrade);
}
}
// Load a V14 op pool and transform it to V12.
let Some(PersistedOperationPoolV14::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
info!(
log,
"Dropping bls_to_execution_changes from pool";
"count" => bls_to_execution_changes.len(),
);
let v12 = PersistedOperationPoolV12 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
};
Ok(vec![v12.as_kv_store_op(OP_POOL_DB_KEY)?])
}

View File

@@ -1,74 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use operation_pool::{
PersistedOperationPool, PersistedOperationPoolV14, PersistedOperationPoolV15,
};
use slog::{debug, info, Logger};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_to_v15<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Load a V14 op pool and transform it to V15.
let Some(PersistedOperationPoolV14::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
let v15 = PersistedOperationPool::V15(PersistedOperationPoolV15 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
// Initialize with empty set
capella_bls_change_broadcast_indices: <_>::default(),
});
Ok(vec![v15.as_kv_store_op(OP_POOL_DB_KEY)?])
}
pub fn downgrade_from_v15<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Load a V15 op pool and transform it to V14.
let Some(PersistedOperationPoolV15::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!(log, "Nothing to do, no operation pool stored");
return Ok(vec![]);
};
info!(
log,
"Forgetting address changes for Capella broadcast";
"count" => capella_bls_change_broadcast_indices.len(),
);
let v14 = PersistedOperationPoolV14 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
};
Ok(vec![v14.as_kv_store_op(OP_POOL_DB_KEY)?])
}

View File

@@ -1,46 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::PersistedForkChoiceV11;
use slog::{debug, Logger};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_to_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
pub fn downgrade_from_v16<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
drop_balances_cache::<T>(db, log)
}
/// Drop the balances cache from the fork choice store.
///
/// There aren't any type-level changes in this schema migration, however the
/// way that we compute the `JustifiedBalances` has changed due to:
/// https://github.com/sigp/lighthouse/pull/3962
pub fn drop_balances_cache<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut persisted_fork_choice = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
debug!(
log,
"Dropping fork choice balances cache";
"item_count" => persisted_fork_choice.fork_choice_store.balances_cache.items.len()
);
// Drop all items in the balances cache.
persisted_fork_choice.fork_choice_store.balances_cache = <_>::default();
let kv_op = persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY)?;
Ok(vec![kv_op])
}

View File

@@ -239,14 +239,18 @@ async fn state_advance_timer<T: BeaconChainTypes>(
// Prepare proposers so that the node can send payload attributes in the case where
// it decides to abandon a proposer boost re-org.
if let Err(e) = beacon_chain.prepare_beacon_proposer(current_slot).await {
warn!(
log,
"Unable to prepare proposer with lookahead";
"error" => ?e,
"slot" => next_slot,
);
}
beacon_chain
.prepare_beacon_proposer(current_slot)
.await
.unwrap_or_else(|e| {
warn!(
log,
"Unable to prepare proposer with lookahead";
"error" => ?e,
"slot" => next_slot,
);
None
});
// Use a blocking task to avoid blocking the core executor whilst waiting for locks
// in `ForkChoiceSignalTx`.

View File

@@ -622,19 +622,19 @@ impl<T: EthSpec> ValidatorMonitor<T> {
"parent block root" => ?prev_block_root,
);
}
} else {
warn!(
self.log,
"Missing validator index";
"info" => "potentially inconsistency in the validator manager",
"index" => i,
)
}
} else {
warn!(
self.log,
"Missing validator index";
"info" => "potentially inconsistency in the validator manager",
"index" => i,
)
}
} else {
debug!(
self.log,
"Could not get proposers for from cache";
"Could not get proposers from cache";
"epoch" => ?slot_epoch
);
}