Delete old database schemas (#6051)

* Delete old database schemas

* Fix docs (thanks CK)

* Fix beacon-chain tests
This commit is contained in:
Michael Sproul
2024-07-09 10:24:49 +10:00
committed by GitHub
parent a59a61fef9
commit 9942c18c11
14 changed files with 32 additions and 528 deletions

View File

@@ -20,26 +20,12 @@ use types::{
Hash256, Slot,
};
/// Ensure this justified checkpoint has an epoch of 0 so that it is never
/// greater than the justified checkpoint and enshrined as the actual justified
/// checkpoint.
const JUNK_BEST_JUSTIFIED_CHECKPOINT: Checkpoint = Checkpoint {
epoch: Epoch::new(0),
root: Hash256::repeat_byte(0),
};
#[derive(Debug)]
pub enum Error {
UnableToReadSlot,
UnableToReadTime,
InvalidGenesisSnapshot(Slot),
AncestorUnknown { ancestor_slot: Slot },
UninitializedBestJustifiedBalances,
FailedToReadBlock(StoreError),
MissingBlock(Hash256),
FailedToReadState(StoreError),
MissingState(Hash256),
InvalidPersistedBytes(ssz::DecodeError),
BeaconStateError(BeaconStateError),
Arith(ArithError),
}
@@ -66,7 +52,6 @@ const MAX_BALANCE_CACHE_SIZE: usize = 4;
)]
pub(crate) struct CacheItem {
pub(crate) block_root: Hash256,
#[superstruct(only(V8))]
pub(crate) epoch: Epoch,
pub(crate) balances: Vec<u64>,
}
@@ -79,7 +64,6 @@ pub(crate) type CacheItem = CacheItemV8;
no_enum
)]
pub struct BalancesCache {
#[superstruct(only(V8))]
pub(crate) items: Vec<CacheItemV8>,
}
@@ -365,59 +349,15 @@ where
pub type PersistedForkChoiceStore = PersistedForkChoiceStoreV17;
/// A container which allows persisting the `BeaconForkChoiceStore` to the on-disk database.
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
pub struct PersistedForkChoiceStore {
#[superstruct(only(V11, V17))]
pub balances_cache: BalancesCacheV8,
pub time: Slot,
pub finalized_checkpoint: Checkpoint,
pub justified_checkpoint: Checkpoint,
pub justified_balances: Vec<u64>,
#[superstruct(only(V11))]
pub best_justified_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub unrealized_justified_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub unrealized_finalized_checkpoint: Checkpoint,
#[superstruct(only(V11, V17))]
pub proposer_boost_root: Hash256,
#[superstruct(only(V11, V17))]
pub equivocating_indices: BTreeSet<u64>,
}
impl From<PersistedForkChoiceStoreV11> for PersistedForkChoiceStore {
fn from(from: PersistedForkChoiceStoreV11) -> PersistedForkChoiceStore {
PersistedForkChoiceStore {
balances_cache: from.balances_cache,
time: from.time,
finalized_checkpoint: from.finalized_checkpoint,
justified_checkpoint: from.justified_checkpoint,
justified_balances: from.justified_balances,
unrealized_justified_checkpoint: from.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: from.unrealized_finalized_checkpoint,
proposer_boost_root: from.proposer_boost_root,
equivocating_indices: from.equivocating_indices,
}
}
}
impl From<PersistedForkChoiceStore> for PersistedForkChoiceStoreV11 {
fn from(from: PersistedForkChoiceStore) -> PersistedForkChoiceStoreV11 {
PersistedForkChoiceStoreV11 {
balances_cache: from.balances_cache,
time: from.time,
finalized_checkpoint: from.finalized_checkpoint,
justified_checkpoint: from.justified_checkpoint,
justified_balances: from.justified_balances,
best_justified_checkpoint: JUNK_BEST_JUSTIFIED_CHECKPOINT,
unrealized_justified_checkpoint: from.unrealized_justified_checkpoint,
unrealized_finalized_checkpoint: from.unrealized_finalized_checkpoint,
proposer_boost_root: from.proposer_boost_root,
equivocating_indices: from.equivocating_indices,
}
}
}

View File

@@ -1,4 +1,4 @@
use crate::beacon_fork_choice_store::{PersistedForkChoiceStoreV11, PersistedForkChoiceStoreV17};
use crate::beacon_fork_choice_store::PersistedForkChoiceStoreV17;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use store::{DBColumn, Error, StoreItem};
@@ -7,37 +7,12 @@ use superstruct::superstruct;
// If adding a new version you should update this type alias and fix the breakages.
pub type PersistedForkChoice = PersistedForkChoiceV17;
#[superstruct(
variants(V11, V17),
variant_attributes(derive(Encode, Decode)),
no_enum
)]
#[superstruct(variants(V17), variant_attributes(derive(Encode, Decode)), no_enum)]
pub struct PersistedForkChoice {
pub fork_choice: fork_choice::PersistedForkChoice,
#[superstruct(only(V11))]
pub fork_choice_store: PersistedForkChoiceStoreV11,
#[superstruct(only(V17))]
pub fork_choice_store: PersistedForkChoiceStoreV17,
}
impl From<PersistedForkChoiceV11> for PersistedForkChoice {
fn from(from: PersistedForkChoiceV11) -> PersistedForkChoice {
PersistedForkChoice {
fork_choice: from.fork_choice,
fork_choice_store: from.fork_choice_store.into(),
}
}
}
impl From<PersistedForkChoice> for PersistedForkChoiceV11 {
fn from(from: PersistedForkChoice) -> PersistedForkChoiceV11 {
PersistedForkChoiceV11 {
fork_choice: from.fork_choice,
fork_choice_store: from.fork_choice_store.into(),
}
}
}
macro_rules! impl_store_item {
($type:ty) => {
impl StoreItem for $type {
@@ -56,5 +31,4 @@ macro_rules! impl_store_item {
};
}
impl_store_item!(PersistedForkChoiceV11);
impl_store_item!(PersistedForkChoiceV17);

View File

@@ -1,7 +1,4 @@
//! Utilities for managing database schema changes.
mod migration_schema_v17;
mod migration_schema_v18;
mod migration_schema_v19;
mod migration_schema_v20;
mod migration_schema_v21;
@@ -54,32 +51,8 @@ pub fn migrate_schema<T: BeaconChainTypes>(
}
//
// Migrations from before SchemaVersion(16) are deprecated.
// Migrations from before SchemaVersion(19) are deprecated.
//
(SchemaVersion(16), SchemaVersion(17)) => {
let ops = migration_schema_v17::upgrade_to_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(16)) => {
let ops = migration_schema_v17::downgrade_from_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(18)) => {
let ops = migration_schema_v18::upgrade_to_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(18), SchemaVersion(17)) => {
let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(18), SchemaVersion(19)) => {
let ops = migration_schema_v19::upgrade_to_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(18)) => {
let ops = migration_schema_v19::downgrade_from_v19::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(19), SchemaVersion(20)) => {
let ops = migration_schema_v20::upgrade_to_v20::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)

View File

@@ -1,88 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, FORK_CHOICE_DB_KEY};
use crate::persisted_fork_choice::{PersistedForkChoiceV11, PersistedForkChoiceV17};
use proto_array::core::{SszContainerV16, SszContainerV17};
use slog::{debug, Logger};
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
pub fn upgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV11,
) -> Result<PersistedForkChoiceV17, Error> {
let ssz_container_v16 = SszContainerV16::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v17: SszContainerV17 = ssz_container_v16.try_into().map_err(|e| {
Error::SchemaMigrationError(format!(
"Missing checkpoint during schema migration: {:?}",
e
))
})?;
fork_choice.fork_choice.proto_array_bytes = ssz_container_v17.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn downgrade_fork_choice(
mut fork_choice: PersistedForkChoiceV17,
) -> Result<PersistedForkChoiceV11, Error> {
let ssz_container_v17 = SszContainerV17::from_ssz_bytes(
&fork_choice.fork_choice.proto_array_bytes,
)
.map_err(|e| {
Error::SchemaMigrationError(format!(
"Failed to decode ProtoArrayForkChoice during schema migration: {:?}",
e
))
})?;
let ssz_container_v16: SszContainerV16 = ssz_container_v17.into();
fork_choice.fork_choice.proto_array_bytes = ssz_container_v16.as_ssz_bytes();
Ok(fork_choice.into())
}
pub fn upgrade_to_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v11 = db
.get_item::<PersistedForkChoiceV11>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v17 = upgrade_fork_choice(v11)?;
debug!(
log,
"Removing unused best_justified_checkpoint from fork choice store."
);
Ok(vec![v17.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}
pub fn downgrade_from_v17<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// Get persisted_fork_choice.
let v17 = db
.get_item::<PersistedForkChoiceV17>(&FORK_CHOICE_DB_KEY)?
.ok_or_else(|| Error::SchemaMigrationError("fork choice missing from database".into()))?;
let v11 = downgrade_fork_choice(v17)?;
debug!(
log,
"Adding junk best_justified_checkpoint to fork choice store."
);
Ok(vec![v11.as_kv_store_op(FORK_CHOICE_DB_KEY)])
}

View File

@@ -1,119 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use store::{
get_key_for_col, metadata::BLOB_INFO_KEY, DBColumn, Error, HotColdDB, KeyValueStoreOp,
};
use types::{Epoch, EthSpec, Hash256, Slot};
/// The slot clock isn't usually available before the database is initialized, so we construct a
/// temporary slot clock by reading the genesis state. It should always exist if the database is
/// initialized at a prior schema version, however we still handle the lack of genesis state
/// gracefully.
fn get_slot_clock<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let Some(genesis_block) = db.get_blinded_block(&Hash256::zero())? else {
error!(log, "Missing genesis block");
return Ok(None);
};
let Some(genesis_state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? else {
error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root());
return Ok(None);
};
Ok(Some(T::SlotClock::new(
spec.genesis_slot,
Duration::from_secs(genesis_state.genesis_time()),
Duration::from_secs(spec.seconds_per_slot),
)))
}
fn get_current_epoch<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: &Logger,
) -> Result<Epoch, Error> {
get_slot_clock::<T>(db, log)?
.and_then(|clock| clock.now())
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(Error::SlotClockUnavailableForMigration)
}
pub fn upgrade_to_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
db.heal_freezer_block_roots_at_split()?;
db.heal_freezer_block_roots_at_genesis()?;
info!(log, "Healed freezer block roots");
// No-op, even if Deneb has already occurred. The database is probably borked in this case, but
// *maybe* the fork recovery will revert the minority fork and succeed.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
warn!(
log,
"Attempting upgrade to v18 schema";
"info" => "this may not work as Deneb has already been activated"
);
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb once it is scheduled"
);
}
Ok(vec![])
}
pub fn downgrade_from_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// We cannot downgrade from V18 once the Deneb fork has been activated, because there will
// be blobs and blob metadata in the database that aren't understood by the V17 schema.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
error!(
log,
"Deneb already active: v18+ is mandatory";
"current_epoch" => current_epoch,
"deneb_fork_epoch" => deneb_fork_epoch,
);
return Err(Error::UnableToDowngrade);
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you will need to upgrade before Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you need to upgrade before Deneb",
);
}
let ops = vec![KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
BLOB_INFO_KEY.as_bytes(),
))];
Ok(ops)
}

View File

@@ -1,65 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{debug, info, Logger};
use std::sync::Arc;
use store::{get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp};
pub fn upgrade_to_v19<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut hot_delete_ops = vec![];
let mut blob_keys = vec![];
let column = DBColumn::BeaconBlob;
debug!(log, "Migrating from v18 to v19");
// Iterate through the blobs on disk.
for res in db.hot_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
let key_col = get_key_for_col(column.as_str(), &key);
hot_delete_ops.push(KeyValueStoreOp::DeleteKey(key_col));
blob_keys.push(key);
}
let num_blobs = blob_keys.len();
debug!(log, "Collected {} blob lists to migrate", num_blobs);
let batch_size = 500;
let mut batch = Vec::with_capacity(batch_size);
for key in blob_keys {
let next_blob = db.hot_db.get_bytes(column.as_str(), &key)?;
if let Some(next_blob) = next_blob {
let key_col = get_key_for_col(column.as_str(), &key);
batch.push(KeyValueStoreOp::PutKeyValue(key_col, next_blob));
if batch.len() >= batch_size {
db.blobs_db.do_atomically(batch.clone())?;
batch.clear();
}
}
}
// Process the remaining batch if it's not empty
if !batch.is_empty() {
db.blobs_db.do_atomically(batch)?;
}
debug!(log, "Wrote {} blobs to the blobs db", num_blobs);
// Delete all the blobs
info!(log, "Upgrading to v19 schema");
Ok(hot_delete_ops)
}
pub fn downgrade_from_v19<T: BeaconChainTypes>(
_db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// No-op
info!(
log,
"Downgrading to v18 schema";
);
Ok(vec![])
}

View File

@@ -3050,13 +3050,7 @@ async fn schema_downgrade_to_min_version() {
)
.await;
let min_version = if harness.spec.deneb_fork_epoch.is_some() {
// Can't downgrade beyond V18 once Deneb is reached, for simplicity don't test that
// at all if Deneb is enabled.
SchemaVersion(18)
} else {
SchemaVersion(16)
};
let min_version = SchemaVersion(19);
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();

View File

@@ -54,7 +54,7 @@ pub enum Error {
pub type SszDepositCache = SszDepositCacheV13;
#[superstruct(
variants(V1, V13),
variants(V13),
variant_attributes(derive(Encode, Decode, Clone)),
no_enum
)]
@@ -62,11 +62,8 @@ pub struct SszDepositCache {
pub logs: Vec<DepositLog>,
pub leaves: Vec<Hash256>,
pub deposit_contract_deploy_block: u64,
#[superstruct(only(V13))]
pub finalized_deposit_count: u64,
#[superstruct(only(V13))]
pub finalized_block_height: u64,
#[superstruct(only(V13))]
pub deposit_tree_snapshot: Option<DepositTreeSnapshot>,
pub deposit_roots: Vec<Hash256>,
}

View File

@@ -2,7 +2,7 @@ use crate::service::endpoint_from_config;
use crate::Config;
use crate::{
block_cache::{BlockCache, Eth1Block},
deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV1, SszDepositCacheV13},
deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV13},
};
use execution_layer::HttpJsonRpc;
use parking_lot::RwLock;
@@ -90,15 +90,12 @@ impl Inner {
pub type SszEth1Cache = SszEth1CacheV13;
#[superstruct(
variants(V1, V13),
variants(V13),
variant_attributes(derive(Encode, Decode, Clone)),
no_enum
)]
pub struct SszEth1Cache {
pub block_cache: BlockCache,
#[superstruct(only(V1))]
pub deposit_cache: SszDepositCacheV1,
#[superstruct(only(V13))]
pub deposit_cache: SszDepositCacheV13,
#[ssz(with = "four_byte_option_u64")]
pub last_processed_block: Option<u64>,

View File

@@ -5,9 +5,9 @@ mod metrics;
mod service;
pub use block_cache::{BlockCache, Eth1Block};
pub use deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV1, SszDepositCacheV13};
pub use deposit_cache::{DepositCache, SszDepositCache, SszDepositCacheV13};
pub use execution_layer::http::deposit_log::DepositLog;
pub use inner::{SszEth1Cache, SszEth1CacheV1, SszEth1CacheV13};
pub use inner::{SszEth1Cache, SszEth1CacheV13};
pub use service::{
BlockCacheUpdateOutcome, Config, DepositCacheUpdateOutcome, Error, Eth1Endpoint, Service,
DEFAULT_CHAIN_ID,