Implement DB upgrade migration

This commit is contained in:
Michael Sproul
2022-03-10 15:31:32 +11:00
parent 0ee31a0a69
commit b4c60807dd
6 changed files with 258 additions and 19 deletions

View File

@@ -83,8 +83,8 @@ impl PartialEq<HeadTracker> for HeadTracker {
/// This is used when persisting the state of the `BeaconChain` to disk.
#[derive(Encode, Decode, Clone)]
pub struct SszHeadTracker {
roots: Vec<Hash256>,
slots: Vec<Slot>,
pub roots: Vec<Hash256>,
pub slots: Vec<Slot>,
}
impl SszHeadTracker {

View File

@@ -1,4 +1,5 @@
//! Utilities for managing database schema changes.
mod migration_schema_v10;
mod migration_schema_v6;
mod migration_schema_v7;
mod migration_schema_v8;
@@ -181,6 +182,15 @@ pub fn migrate_schema<T: BeaconChainTypes>(
Ok(())
}
// Reserved for merge-related changes.
(SchemaVersion(8), SchemaVersion(9)) => Ok(()),
// Upgrade for tree-states database changes.
(SchemaVersion(9), SchemaVersion(10)) => migration_schema_v10::upgrade_to_v10::<T>(db, log),
// Downgrade for tree-states database changes.
(SchemaVersion(10), SchemaVersion(8)) => {
// FIXME(sproul): implement downgrade
panic!("downgrade not implemented yet")
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,193 @@
use crate::{
beacon_chain::{BeaconChainTypes, BEACON_CHAIN_DB_KEY},
persisted_beacon_chain::PersistedBeaconChain,
};
use slog::{debug, Logger};
use std::collections::HashMap;
use std::sync::Arc;
use store::{
get_key_for_col,
hot_cold_store::{HotColdDBError, HotStateSummaryV1, HotStateSummaryV10},
metadata::SchemaVersion,
DBColumn, Error, HotColdDB, KeyValueStoreOp, StoreItem,
};
use types::{milhouse::Diff, BeaconState, BeaconStateDiff, EthSpec, Hash256, Slot};
fn get_summary_v1<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<HotStateSummaryV1, Error> {
db.get_item(&state_root)?
.ok_or(HotColdDBError::MissingHotStateSummary(state_root).into())
}
fn get_state_by_replay<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
state_root: Hash256,
) -> Result<BeaconState<T::EthSpec>, Error> {
// Load state summary.
let HotStateSummaryV1 {
slot,
latest_block_root,
epoch_boundary_state_root,
} = get_summary_v1::<T>(&db, state_root)?;
// Load full state from the epoch boundary.
let (epoch_boundary_state, _) = db.load_hot_state_full(&epoch_boundary_state_root)?;
// Replay blocks to reach the target state.
let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?;
db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty())
}
pub fn upgrade_to_v10<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<(), Error> {
let mut ops = vec![];
// Translate hot state summaries to new format:
// - Rewrite epoch boundary root to previous epoch boundary root.
// - Add previous state root.
//
// Replace most epoch boundary states by diffs.
let split = db.get_split_info();
let finalized_slot = split.slot;
let finalized_state_root = split.state_root;
let slots_per_epoch = T::EthSpec::slots_per_epoch();
let ssz_head_tracker = db
.get_item::<PersistedBeaconChain>(&BEACON_CHAIN_DB_KEY)?
.ok_or(Error::MissingPersistedBeaconChain)?
.ssz_head_tracker;
let mut new_summaries = HashMap::new();
for (head_block_root, head_state_slot) in ssz_head_tracker
.roots
.into_iter()
.zip(ssz_head_tracker.slots)
{
let block = db
.get_block(&head_block_root)?
.ok_or(Error::BlockNotFound(head_block_root))?;
let head_state_root = block.state_root();
debug!(
log,
"Re-writing state summaries for head";
"block_root" => ?head_block_root,
"state_root" => ?head_state_root,
"slot" => head_state_slot
);
let mut current_state = get_state_by_replay::<T>(&db, head_state_root)?;
let mut current_state_root = head_state_root;
new_summaries.insert(
head_state_root,
HotStateSummaryV10::new(&head_state_root, &current_state)?,
);
for slot in (finalized_slot.as_u64()..current_state.slot().as_u64())
.rev()
.map(Slot::new)
{
let epoch_boundary_slot = (slot - 1) / slots_per_epoch * slots_per_epoch;
let state_root = *current_state.get_state_root(slot)?;
let latest_block_root = *current_state.get_block_root(slot)?;
let prev_state_root = *current_state.get_state_root(slot - 1)?;
let epoch_boundary_state_root = *current_state.get_state_root(epoch_boundary_slot)?;
let summary = HotStateSummaryV10 {
slot,
latest_block_root,
epoch_boundary_state_root,
prev_state_root,
};
// Stage the updated state summary for storage.
// If we've reached a known segment of chain then we can stop and continue to the next
// head.
if new_summaries.insert(state_root, summary).is_some() {
debug!(
log,
"Finished migrating chain tip";
"head_block_root" => ?head_block_root,
"reason" => format!("reached common state {:?}", state_root),
);
break;
} else {
debug!(
log,
"Rewriting hot state summary";
"state_root" => ?state_root,
"slot" => slot,
"epoch_boundary_state_root" => ?epoch_boundary_state_root,
"prev_state_root" => ?prev_state_root,
);
}
// If the state reached is an epoch boundary state, then load it so that we can continue
// backtracking from it and storing diffs.
if slot % slots_per_epoch == 0 {
debug!(
log,
"Loading epoch boundary state";
"state_root" => ?state_root,
"slot" => slot,
);
let backtrack_state = get_state_by_replay::<T>(&db, state_root)?;
// If the current state is an epoch boundary state too then we might need to convert
// it to a diff relative to the backtrack state.
if current_state.slot() % slots_per_epoch == 0
&& !db.is_stored_as_full_state(current_state_root, current_state.slot())?
{
debug!(
log,
"Converting full state to diff";
"prev_state_root" => ?state_root,
"state_root" => ?current_state_root,
"slot" => slot,
);
let diff = BeaconStateDiff::compute_diff(&backtrack_state, &current_state)?;
// Store diff.
ops.push(db.state_diff_as_kv_store_op(&current_state_root, &diff)?);
// Delete full state.
let state_key = get_key_for_col(
DBColumn::BeaconState.into(),
current_state_root.as_bytes(),
);
ops.push(KeyValueStoreOp::DeleteKey(state_key));
}
current_state = backtrack_state;
current_state_root = state_root;
}
if slot == finalized_slot {
// FIXME(sproul): remove assert
assert_eq!(finalized_state_root, state_root);
debug!(
log,
"Finished migrating chain tip";
"head_block_root" => ?head_block_root,
"reason" => format!("reached finalized state {:?}", finalized_state_root),
);
break;
}
}
}
ops.reserve(new_summaries.len());
for (state_root, summary) in new_summaries {
ops.push(summary.as_kv_store_op(state_root)?);
}
db.store_schema_version_atomically(SchemaVersion(10), ops)
}