Remove support for database migrations prior to schema version v22 (#7332)

Remove deprecated database migrations prior to v22 along with v22 migration specific code.
This commit is contained in:
Mac L
2025-05-28 23:47:21 +10:00
committed by GitHub
parent 8989ef8fb1
commit 0ddf9a99d6
10 changed files with 19 additions and 498 deletions

View File

@@ -1,7 +1,4 @@
//! Utilities for managing database schema changes.
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;
mod migration_schema_v23;
use crate::beacon_chain::BeaconChainTypes;
@@ -9,12 +6,10 @@ use std::sync::Arc;
use store::hot_cold_store::{HotColdDB, HotColdDBError};
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION};
use store::Error as StoreError;
use types::Hash256;
/// Migrate the database from one schema version to another, applying all requisite mutations.
pub fn migrate_schema<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
from: SchemaVersion,
to: SchemaVersion,
) -> Result<(), StoreError> {
@@ -24,40 +19,19 @@ pub fn migrate_schema<T: BeaconChainTypes>(
// Upgrade across multiple versions by recursively migrating one step at a time.
(_, _) if from.as_u64() + 1 < to.as_u64() => {
let next = SchemaVersion(from.as_u64() + 1);
migrate_schema::<T>(db.clone(), genesis_state_root, from, next)?;
migrate_schema::<T>(db, genesis_state_root, next, to)
migrate_schema::<T>(db.clone(), from, next)?;
migrate_schema::<T>(db, next, to)
}
// Downgrade across multiple versions by recursively migrating one step at a time.
(_, _) if to.as_u64() + 1 < from.as_u64() => {
let next = SchemaVersion(from.as_u64() - 1);
migrate_schema::<T>(db.clone(), genesis_state_root, from, next)?;
migrate_schema::<T>(db, genesis_state_root, next, to)
migrate_schema::<T>(db.clone(), from, next)?;
migrate_schema::<T>(db, next, to)
}
//
// Migrations from before SchemaVersion(19) are deprecated.
// Migrations from before SchemaVersion(22) are deprecated.
//
(SchemaVersion(19), SchemaVersion(20)) => {
let ops = migration_schema_v20::upgrade_to_v20::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(19)) => {
let ops = migration_schema_v20::downgrade_from_v20::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(20), SchemaVersion(21)) => {
let ops = migration_schema_v21::upgrade_to_v21::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(20)) => {
let ops = migration_schema_v21::downgrade_from_v21::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(21), SchemaVersion(22)) => {
// This migration needs to sync data between hot and cold DBs. The schema version is
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root)
}
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone())?;
db.store_schema_version_atomically(to, ops)

View File

@@ -1,111 +0,0 @@
use crate::beacon_chain::{BeaconChainTypes, OP_POOL_DB_KEY};
use operation_pool::{
PersistedOperationPool, PersistedOperationPoolV15, PersistedOperationPoolV20,
};
use std::sync::Arc;
use store::{Error, HotColdDB, KeyValueStoreOp, StoreItem};
use tracing::{debug, info};
use types::Attestation;
pub fn upgrade_to_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Upgrading from v19 to v20");
// Load a V15 op pool and transform it to V20.
let Some(PersistedOperationPoolV15::<T::EthSpec> {
attestations_v15,
sync_contributions,
attester_slashings_v15,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!("Nothing to do, no operation pool stored");
return Ok(vec![]);
};
let attestations = attestations_v15
.into_iter()
.map(|(attestation, indices)| (Attestation::Base(attestation).into(), indices))
.collect();
let attester_slashings = attester_slashings_v15
.into_iter()
.map(|slashing| slashing.into())
.collect();
let v20 = PersistedOperationPool::V20(PersistedOperationPoolV20 {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
});
Ok(vec![v20.as_kv_store_op(OP_POOL_DB_KEY)])
}
pub fn downgrade_from_v20<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Downgrading from v20 to v19");
// Load a V20 op pool and transform it to V15.
let Some(PersistedOperationPoolV20::<T::EthSpec> {
attestations,
sync_contributions,
attester_slashings,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
}) = db.get_item(&OP_POOL_DB_KEY)?
else {
debug!("Nothing to do, no operation pool stored");
return Ok(vec![]);
};
let attestations_v15 = attestations
.into_iter()
.filter_map(|(attestation, indices)| {
if let Attestation::Base(attestation) = attestation.into() {
Some((attestation, indices))
} else {
info!(
reason = "not a base attestation",
"Dropping attestation during downgrade"
);
None
}
})
.collect();
let attester_slashings_v15 = attester_slashings
.into_iter()
.filter_map(|slashing| match slashing.try_into() {
Ok(slashing) => Some(slashing),
Err(_) => {
info!(
reason = "not a base attester slashing",
"Dropping attester slashing during downgrade"
);
None
}
})
.collect();
let v15 = PersistedOperationPool::V15(PersistedOperationPoolV15 {
attestations_v15,
sync_contributions,
attester_slashings_v15,
proposer_slashings,
voluntary_exits,
bls_to_execution_changes,
capella_bls_change_broadcast_indices,
});
Ok(vec![v15.as_kv_store_op(OP_POOL_DB_KEY)])
}

View File

@@ -1,74 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use ssz::{Decode, Encode};
use std::sync::Arc;
use store::{DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem};
use tracing::info;
use types::{Hash256, PublicKey};
const LOG_EVERY: usize = 200_000;
pub fn upgrade_to_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Upgrading from v20 to v21");
let mut ops = vec![];
// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let pubkey = PublicKey::from_ssz_bytes(&value)?;
let decompressed = DatabasePubkey::from_pubkey(&pubkey);
ops.push(decompressed.as_kv_store_op(key));
if i > 0 && i % LOG_EVERY == 0 {
info!(
keys_decompressed = i,
"Public key decompression in progress"
);
}
}
info!("Public key decompression complete");
Ok(ops)
}
pub fn downgrade_from_v21<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!("Downgrading from v21 to v20");
let mut ops = vec![];
// Iterate through all pubkeys and recompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::PubkeyCache)
.enumerate()
{
let (key, value) = res?;
let decompressed = DatabasePubkey::from_ssz_bytes(&value)?;
let (_, pubkey_bytes) = decompressed.as_pubkey().map_err(|e| Error::DBError {
message: format!("{e:?}"),
})?;
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::PubkeyCache,
key.as_slice().to_vec(),
pubkey_bytes.as_ssz_bytes(),
));
if i > 0 && i % LOG_EVERY == 0 {
info!(keys_compressed = i, "Public key compression in progress");
}
}
info!("Public key compression complete");
Ok(ops)
}

View File

@@ -1,196 +0,0 @@
use crate::beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRootsChunked,
metadata::{
SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN,
},
partial_beacon_state::PartialBeaconState,
AnchorInfo, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
use tracing::info;
use types::{BeaconState, Hash256, Slot};
const LOG_EVERY: usize = 200_000;
fn load_old_schema_frozen_state<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<Option<BeaconState<T::EthSpec>>, Error> {
let Some(partial_state_bytes) = db
.cold_db
.get_bytes(DBColumn::BeaconState, state_root.as_slice())?
else {
return Ok(None);
};
let mut partial_state: PartialBeaconState<T::EthSpec> =
PartialBeaconState::from_ssz_bytes(&partial_state_bytes, db.get_chain_spec())?;
// Fill in the fields of the partial state.
partial_state.load_block_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_state_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_roots(&db.cold_db, db.get_chain_spec())?;
partial_state.load_randao_mixes(&db.cold_db, db.get_chain_spec())?;
partial_state.load_historical_summaries(&db.cold_db, db.get_chain_spec())?;
partial_state.try_into().map(Some)
}
pub fn upgrade_to_v22<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
genesis_state_root: Option<Hash256>,
) -> Result<(), Error> {
info!("Upgrading DB schema from v21 to v22");
let old_anchor = db.get_anchor_info();
// If the anchor was uninitialized in the old schema (`None`), this represents a full archive
// node.
let effective_anchor = if old_anchor == ANCHOR_UNINITIALIZED {
ANCHOR_FOR_ARCHIVE_NODE
} else {
old_anchor.clone()
};
let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;
let mut cold_ops = vec![];
// Load the genesis state in the previous chunked format, BEFORE we go deleting or rewriting
// anything.
let mut genesis_state = load_old_schema_frozen_state::<T>(&db, genesis_state_root)?
.ok_or(Error::MissingGenesisState)?;
let genesis_state_root = genesis_state.update_tree_hash_cache()?;
let genesis_block_root = genesis_state.get_latest_block_root(genesis_state_root);
// Store the genesis state in the new format, prior to updating the schema version on disk.
// In case of a crash no data is lost because we will re-load it in the old format and re-do
// this write.
if split_slot > 0 {
info!(
state_root = ?genesis_state_root,
"Re-storing genesis state"
);
db.store_cold_state(&genesis_state_root, &genesis_state, &mut cold_ops)?;
}
// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = effective_anchor.oldest_block_slot;
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
oldest_block_slot,
split_slot,
&mut cold_ops,
)?;
// Commit this first batch of non-destructive cold database ops.
db.cold_db.do_atomically(cold_ops)?;
// Now we update the anchor and the schema version atomically in the hot database.
//
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..effective_anchor.clone()
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;
// Finally, clean up the old-format data from the freezer database.
delete_old_schema_freezer_data::<T>(&db)?;
Ok(())
}
pub fn delete_old_schema_freezer_data<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
) -> Result<(), Error> {
let mut cold_ops = vec![];
let columns = [
DBColumn::BeaconState,
// Cold state summaries indexed by state root were stored in this column.
DBColumn::BeaconStateSummary,
// Mapping from restore point number to state root was stored in this column.
DBColumn::BeaconRestorePoint,
// Chunked vector values were stored in these columns.
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
];
for column in columns {
for res in db.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(column, key));
}
}
let delete_ops = cold_ops.len();
info!(delete_ops, "Deleting historic states");
db.cold_db.do_atomically(cold_ops)?;
// In order to reclaim space, we need to compact the freezer DB as well.
db.compact_freezer()?;
Ok(())
}
pub fn write_new_schema_block_roots<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
genesis_block_root: Hash256,
oldest_block_slot: Slot,
split_slot: Slot,
cold_ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
info!(
%oldest_block_slot,
?genesis_block_root,
"Starting beacon block root migration"
);
// Store the genesis block root if it would otherwise not be stored.
if oldest_block_slot != 0 {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlockRoots,
0u64.to_be_bytes().to_vec(),
genesis_block_root.as_slice().to_vec(),
));
}
// Block roots are available from the `oldest_block_slot` to the `split_slot`.
let start_vindex = oldest_block_slot.as_usize();
let block_root_iter = ChunkedVectorIter::<BlockRootsChunked, _, _, _>::new(
db,
start_vindex,
split_slot,
db.get_chain_spec(),
);
// OK to hold these in memory (10M slots * 43 bytes per KV ~= 430 MB).
for (i, (slot, block_root)) in block_root_iter.enumerate() {
cold_ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));
if i > 0 && i % LOG_EVERY == 0 {
info!(
roots_migrated = i,
"Beacon block root migration in progress"
);
}
}
Ok(())
}

View File

@@ -3023,7 +3023,6 @@ async fn schema_downgrade_to_min_version() {
.await;
let min_version = SchemaVersion(22);
let genesis_state_root = Some(harness.chain.genesis_state_root);
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();
@@ -3036,22 +3035,12 @@ async fn schema_downgrade_to_min_version() {
let store = get_store(&db_path);
// Downgrade.
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
genesis_state_root,
CURRENT_SCHEMA_VERSION,
min_version,
)
.expect("schema downgrade to minimum version should work");
migrate_schema::<DiskHarnessType<E>>(store.clone(), CURRENT_SCHEMA_VERSION, min_version)
.expect("schema downgrade to minimum version should work");
// Upgrade back.
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
genesis_state_root,
min_version,
CURRENT_SCHEMA_VERSION,
)
.expect("schema upgrade from minimum version should work");
migrate_schema::<DiskHarnessType<E>>(store.clone(), min_version, CURRENT_SCHEMA_VERSION)
.expect("schema upgrade from minimum version should work");
// Recreate the harness.
let harness = BeaconChainHarness::builder(MinimalEthSpec)
@@ -3069,13 +3058,8 @@ async fn schema_downgrade_to_min_version() {
// Check that downgrading beyond the minimum version fails (bound is *tight*).
let min_version_sub_1 = SchemaVersion(min_version.as_u64().checked_sub(1).unwrap());
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
genesis_state_root,
CURRENT_SCHEMA_VERSION,
min_version_sub_1,
)
.expect_err("should not downgrade below minimum version");
migrate_schema::<DiskHarnessType<E>>(store.clone(), CURRENT_SCHEMA_VERSION, min_version_sub_1)
.expect_err("should not downgrade below minimum version");
}
/// Check that blob pruning prunes blobs older than the data availability boundary.

View File

@@ -1002,11 +1002,6 @@ where
blobs_path: &Path,
config: StoreConfig,
) -> Result<Self, String> {
let context = self
.runtime_context
.as_ref()
.ok_or("disk_store requires a log")?
.service_context("freezer_db".into());
let spec = self
.chain_spec
.clone()
@@ -1015,21 +1010,8 @@ where
self.db_path = Some(hot_path.into());
self.freezer_db_path = Some(cold_path.into());
// Optionally grab the genesis state root.
// This will only be required if a DB upgrade to V22 is needed.
let genesis_state_root = context
.eth2_network_config
.as_ref()
.and_then(|config| config.genesis_state_root::<E>().transpose())
.transpose()?;
let schema_upgrade = |db, from, to| {
migrate_schema::<Witness<TSlotClock, TEth1Backend, _, _, _>>(
db,
genesis_state_root,
from,
to,
)
migrate_schema::<Witness<TSlotClock, TEth1Backend, _, _, _>>(db, from, to)
};
let store = HotColdDB::open(

View File

@@ -2501,27 +2501,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Run a compaction pass on the freezer DB to free up space used by deleted states.
pub fn compact_freezer(&self) -> Result<(), Error> {
let current_schema_columns = vec![
let columns = vec![
DBColumn::BeaconColdStateSummary,
DBColumn::BeaconStateSnapshot,
DBColumn::BeaconStateDiff,
DBColumn::BeaconStateRoots,
];
// We can remove this once schema V21 has been gone for a while.
let previous_schema_columns = vec![
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];
let mut columns = current_schema_columns;
columns.extend(previous_schema_columns);
for column in columns {
info!(?column, "Starting compaction");
self.cold_db.compact_column(column)?;
@@ -2871,32 +2857,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// migrating to the tree-states schema (delete everything in the freezer then start afresh).
let mut cold_ops = vec![];
let current_schema_columns = vec![
let columns = vec![
DBColumn::BeaconColdStateSummary,
DBColumn::BeaconStateSnapshot,
DBColumn::BeaconStateDiff,
DBColumn::BeaconStateRoots,
];
// This function is intended to be able to clean up leftover V21 freezer database stuff in
// the case where the V22 schema upgrade failed *after* commiting the version increment but
// *before* cleaning up the freezer DB.
//
// We can remove this once schema V21 has been gone for a while.
let previous_schema_columns = vec![
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconBlockRootsChunked,
DBColumn::BeaconStateRootsChunked,
DBColumn::BeaconRestorePoint,
DBColumn::BeaconHistoricalRoots,
DBColumn::BeaconRandaoMixes,
DBColumn::BeaconHistoricalSummaries,
];
let mut columns = current_schema_columns;
columns.extend(previous_schema_columns);
for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;

View File

@@ -17,6 +17,7 @@ validator client or the slasher**.
| Lighthouse version | Release date | Schema version | Downgrade available? |
|--------------------|--------------|----------------|----------------------|
| v7.1.0 | TBD 2025 | v23 | yes |
| v7.0.0 | Apr 2025 | v22 | no |
| v6.0.0 | Nov 2024 | v22 | no |
@@ -206,6 +207,7 @@ Here are the steps to prune historic states:
| Lighthouse version | Release date | Schema version | Downgrade available? |
|--------------------|--------------|----------------|-------------------------------------|
| v7.1.0 | TBD 2025 | v23 | yes |
| v7.0.0 | Apr 2025 | v22 | no |
| v6.0.0 | Nov 2024 | v22 | no |
| v5.3.0 | Aug 2024 | v21 | yes before Electra using <= v7.0.0 |

View File

@@ -301,7 +301,6 @@ fn parse_migrate_config(migrate_config: &Migrate) -> Result<MigrateConfig, Strin
pub fn migrate_db<E: EthSpec>(
migrate_config: MigrateConfig,
client_config: ClientConfig,
mut genesis_state: BeaconState<E>,
runtime_context: &RuntimeContext<E>,
) -> Result<(), Error> {
let spec = runtime_context.eth2_config.spec.clone();
@@ -329,13 +328,7 @@ pub fn migrate_db<E: EthSpec>(
"Migrating database schema"
);
let genesis_state_root = genesis_state.canonical_root()?;
migrate_schema::<Witness<SystemTimeSlotClock, CachingEth1Backend<E>, _, _, _>>(
db,
Some(genesis_state_root),
from,
to,
)
migrate_schema::<Witness<SystemTimeSlotClock, CachingEth1Backend<E>, _, _, _>>(db, from, to)
}
pub fn prune_payloads<E: EthSpec>(
@@ -487,8 +480,7 @@ pub fn run<E: EthSpec>(
match &db_manager_config.subcommand {
cli::DatabaseManagerSubcommand::Migrate(migrate_config) => {
let migrate_config = parse_migrate_config(migrate_config)?;
let genesis_state = get_genesis_state()?;
migrate_db(migrate_config, client_config, genesis_state, &context).map_err(format_err)
migrate_db(migrate_config, client_config, &context).map_err(format_err)
}
cli::DatabaseManagerSubcommand::Inspect(inspect_config) => {
let inspect_config = parse_inspect_config(inspect_config)?;

View File

@@ -89,6 +89,7 @@ SSD
SSL
SSZ
Styleguide
TBD
TCP
Teku
TLS