Remove support for database migrations prior to schema version v22 (#7332)

Remove deprecated database migrations prior to v22 along with v22 migration specific code.
This commit is contained in:
Mac L
2025-05-28 23:47:21 +10:00
committed by GitHub
parent 8989ef8fb1
commit 0ddf9a99d6
10 changed files with 19 additions and 498 deletions

View File

@@ -1,7 +1,4 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;
mod migration_schema_v23;
use crate::beacon_chain::BeaconChainTypes;
@@ -9,12 +6,10 @@ use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;
/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
) -> Result<(), StoreError> {
@@ -24,40 +19,19 @@ pub fn migrate_schema<T: BeaconChainTypes>(
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(db.clone(), genesis_state_root, from, next)?;
migrate_schema::<T>(db, genesis_state_root, next, to)
migrate_schema::<T>(db.clone(), from, next)?;
migrate_schema::<T>(db, next, to)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(db.clone(), genesis_state_root, from, next)?;
migrate_schema::<T>(db, genesis_state_root, next, to)
migrate_schema::<T>(db.clone(), from, next)?;
migrate_schema::<T>(db, next, to)
}
//
// Migrations from before SchemaVersion(19) are deprecated.
// Migrations from before SchemaVersion(22) are deprecated.
//
(SchemaVersion(19), SchemaVersion(20)) => {
let ops = migration_schema_v20::upgrade_to_v20::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(19)) => {
let ops = migration_schema_v20::downgrade_from_v20::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(21)) => {
let ops = migration_schema_v21::upgrade_to_v21::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(20)) => {
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
// This migration needs to sync data between hot and cold DBs. The schema version is
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root)
}
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)

View File

@@ -1,111 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use operation_pool::{
PersistedOperationPool, PersistedOperationPoolV15, PersistedOperationPoolV20,
};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
use tracing::{debug, info};
use types::Attestation;
pub fn upgrade_to_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Upgrading from v19 to v20");
// Load a V15 op pool and transform it to V20.
let Some(PersistedOperationPoolV15::<T::EthSpec> {
attestations_v15,
sync_contributions,
attester_slashings_v15,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!("Nothing to do, no operation pool stored");
return Ok(vec![]);
};
let attestations = attestations_v15
.into_iter()
.map(|(attestation, indices)| (Attestation::Base(attestation).into(), indices))
.collect();
let attester_slashings = attester_slashings_v15
.into_iter()
.map(|slashing| slashing.into())
.collect();
let v20 = PersistedOperationPool::V20(PersistedOperationPoolV20 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
});
Ok(vec![v20.as_kv_store_op(OP_POOL_DB_KEY)])
}
pub fn downgrade_from_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Downgrading from v20 to v19");
// Load a V20 op pool and transform it to V15.
let Some(PersistedOperationPoolV20::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!("Nothing to do, no operation pool stored");
return Ok(vec![]);
};
let attestations_v15 = attestations
.into_iter()
.filter_map(|(attestation, indices)| {
if let Attestation::Base(attestation) = attestation.into() {
Some((attestation, indices))
} else {
info!(
reason = "not a base attestation",
"Dropping attestation during downgrade"
);
None
}
})
.collect();
let attester_slashings_v15 = attester_slashings
.into_iter()
.filter_map(|slashing| match slashing.try_into() {
Ok(slashing) => Some(slashing),
Err(_) => {
info!(
reason = "not a base attester slashing",
"Dropping attester slashing during downgrade"
);
None
}
})
.collect();
let v15 = PersistedOperationPool::V15(PersistedOperationPoolV15 {
attestations_v15,
sync_contributions,
attester_slashings_v15,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
});
Ok(vec![v15.as_kv_store_op(OP_POOL_DB_KEY)])
}

View File

@@ -1,74 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem};
use tracing::info;
use types::{Hash256, PublicKey};
const LOG_EVERY: usize = 200_000;
pub fn upgrade_to_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Upgrading from v20 to v21");
let mut ops = vec![];
// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let pubkey = PublicKey::from_ssz_bytes(&value)?;
let decompressed = DatabasePubkey::from_pubkey(&pubkey);
ops.push(decompressed.as_kv_store_op(key));
if i > 0 && i % LOG_EVERY == 0 {
info!(
keys_decompressed = i,
"Public key decompression in progress"
);
}
}
info!("Public key decompression complete");
Ok(ops)
}
pub fn downgrade_from_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Downgrading from v21 to v20");
let mut ops = vec![];
// Iterate through all pubkeys and recompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let decompressed = DatabasePubkey::from_ssz_bytes(&value)?;
let (_, pubkey_bytes) = decompressed.as_pubkey().map_err(|e| Error::DBError {
message: format!("{e:?}"),
})?;
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::PubkeyCache,
key.as_slice().to_vec(),
pubkey_bytes.as_ssz_bytes(),
));
if i > 0 && i % LOG_EVERY == 0 {
info!(keys_compressed = i, "Public key compression in progress");
}
}
info!("Public key compression complete");
Ok(ops)
}

View File

@@ -1,196 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRootsChunked,
metadata::{
SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN,
},
partial_beacon_state::PartialBeaconState,
AnchorInfo, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
use tracing::info;
use types::{BeaconState, Hash256, Slot};
const LOG_EVERY: usize = 200_000;
fn load_old_schema_frozen_state<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
let Some(partial_state_bytes) = db
.cold_db
.get_bytes(DBColumn::BeaconState, state_root.as_slice())?
else {
return Ok(None);
};
let mut partial_state: PartialBeaconState<T::EthSpec> =
PartialBeaconState::from_ssz_bytes(&partial_state_bytes, db.get_chain_spec())?;
// Fill in the fields of the partial state.
partial_state.load_block_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_state_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_randao_mixes(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_summaries(&db.cold_db, db.get_chain_spec())?;
partial_state.try_into().map(Some)
}
pub fn upgrade_to_v22<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
) -> Result<(), Error> {
info!("Upgrading DB schema from v21 to v22");
let old_anchor = db.get_anchor_info();
// If the anchor was uninitialized in the old schema (`None`), this represents a full archive
// node.
let effective_anchor = if old_anchor == ANCHOR_UNINITIALIZED {
ANCHOR_FOR_ARCHIVE_NODE
} else {
old_anchor.clone()
};
let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;
let mut cold_ops = vec![];
// Load the genesis state in the previous chunked format, BEFORE we go deleting or rewriting
// anything.
let mut genesis_state = load_old_schema_frozen_state::<T>(&db, genesis_state_root)?
.ok_or(Error::MissingGenesisState)?;
let genesis_state_root = genesis_state.update_tree_hash_cache()?;
let genesis_block_root = genesis_state.get_latest_block_root(genesis_state_root);
// Store the genesis state in the new format, prior to updating the schema version on disk.
// In case of a crash no data is lost because we will re-load it in the old format and re-do
// this write.
if split_slot > 0 {
info!(
state_root = ?genesis_state_root,
"Re-storing genesis state"
);
db.store_cold_state(&genesis_state_root, &genesis_state, &mut cold_ops)?;
}
// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = effective_anchor.oldest_block_slot;
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
oldest_block_slot,
split_slot,
&mut cold_ops,
)?;
// Commit this first batch of non-destructive cold database ops.
db.cold_db.do_atomically(cold_ops)?;
// Now we update the anchor and the schema version atomically in the hot database.
//
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..effective_anchor.clone()
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;
// Finally, clean up the old-format data from the freezer database.
delete_old_schema_freezer_data::<T>(&db)?;
Ok(())
}
pub fn delete_old_schema_freezer_data<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
let mut cold_ops = vec![];
let columns = [
DBColumn::BeaconState,
// Cold state summaries indexed by state root were stored in this column.
DBColumn::BeaconStateSummary,
// Mapping from restore point number to state root was stored in this column.
DBColumn::BeaconRestorePoint,
// Chunked vector values were stored in these columns.
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
];
for column in columns {
for res in db.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(column, key));
}
}
let delete_ops = cold_ops.len();
info!(delete_ops, "Deleting historic states");
db.cold_db.do_atomically(cold_ops)?;
// In order to reclaim space, we need to compact the freezer DB as well.
db.compact_freezer()?;
Ok(())
}
pub fn write_new_schema_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
genesis_block_root: Hash256,
oldest_block_slot: Slot,
split_slot: Slot,
cold_ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
info!(
%oldest_block_slot,
?genesis_block_root,
"Starting beacon block root migration"
);
// Store the genesis block root if it would otherwise not be stored.
if oldest_block_slot != 0 {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlockRoots,
0u64.to_be_bytes().to_vec(),
genesis_block_root.as_slice().to_vec(),
));
}
// Block roots are available from the `oldest_block_slot` to the `split_slot`.
let start_vindex = oldest_block_slot.as_usize();
let block_root_iter = ChunkedVectorIter::<BlockRootsChunked, _, _, _>::new(
db,
start_vindex,
split_slot,
db.get_chain_spec(),
);
// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));
if i > 0 && i % LOG_EVERY == 0 {
info!(
roots_migrated = i,
"Beacon block root migration in progress"
);
}
}
Ok(())
}