diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs index 84c800f3b7..7847d75f40 100644 --- a/beacon_node/beacon_chain/src/head_tracker.rs +++ b/beacon_node/beacon_chain/src/head_tracker.rs @@ -83,8 +83,8 @@ impl PartialEq for HeadTracker { /// This is used when persisting the state of the `BeaconChain` to disk. #[derive(Encode, Decode, Clone)] pub struct SszHeadTracker { - roots: Vec, - slots: Vec, + pub roots: Vec, + pub slots: Vec, } impl SszHeadTracker { diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 1d7dbe2b0b..016c3725d6 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -1,4 +1,5 @@ //! Utilities for managing database schema changes. +mod migration_schema_v10; mod migration_schema_v6; mod migration_schema_v7; mod migration_schema_v8; @@ -181,6 +182,15 @@ pub fn migrate_schema( Ok(()) } + // Reserved for merge-related changes. + (SchemaVersion(8), SchemaVersion(9)) => Ok(()), + // Upgrade for tree-states database changes. + (SchemaVersion(9), SchemaVersion(10)) => migration_schema_v10::upgrade_to_v10::(db, log), + // Downgrade for tree-states database changes. + (SchemaVersion(10), SchemaVersion(8)) => { + // FIXME(sproul): implement downgrade + panic!("downgrade not implemented yet") + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs new file mode 100644 index 0000000000..44fc5c6969 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v10.rs @@ -0,0 +1,193 @@ +use crate::{ + beacon_chain::{BeaconChainTypes, BEACON_CHAIN_DB_KEY}, + persisted_beacon_chain::PersistedBeaconChain, +}; +use slog::{debug, Logger}; +use std::collections::HashMap; +use std::sync::Arc; +use store::{ + get_key_for_col, + hot_cold_store::{HotColdDBError, HotStateSummaryV1, HotStateSummaryV10}, + metadata::SchemaVersion, + DBColumn, Error, HotColdDB, KeyValueStoreOp, StoreItem, +}; +use types::{milhouse::Diff, BeaconState, BeaconStateDiff, EthSpec, Hash256, Slot}; + +fn get_summary_v1( + db: &HotColdDB, + state_root: Hash256, +) -> Result { + db.get_item(&state_root)? + .ok_or(HotColdDBError::MissingHotStateSummary(state_root).into()) +} + +fn get_state_by_replay( + db: &HotColdDB, + state_root: Hash256, +) -> Result, Error> { + // Load state summary. + let HotStateSummaryV1 { + slot, + latest_block_root, + epoch_boundary_state_root, + } = get_summary_v1::(&db, state_root)?; + + // Load full state from the epoch boundary. + let (epoch_boundary_state, _) = db.load_hot_state_full(&epoch_boundary_state_root)?; + + // Replay blocks to reach the target state. + let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?; + + db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty()) +} + +pub fn upgrade_to_v10( + db: Arc>, + log: Logger, +) -> Result<(), Error> { + let mut ops = vec![]; + + // Translate hot state summaries to new format: + // - Rewrite epoch boundary root to previous epoch boundary root. + // - Add previous state root. + // + // Replace most epoch boundary states by diffs. + let split = db.get_split_info(); + let finalized_slot = split.slot; + let finalized_state_root = split.state_root; + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + + let ssz_head_tracker = db + .get_item::(&BEACON_CHAIN_DB_KEY)? + .ok_or(Error::MissingPersistedBeaconChain)? + .ssz_head_tracker; + + let mut new_summaries = HashMap::new(); + + for (head_block_root, head_state_slot) in ssz_head_tracker + .roots + .into_iter() + .zip(ssz_head_tracker.slots) + { + let block = db + .get_block(&head_block_root)? + .ok_or(Error::BlockNotFound(head_block_root))?; + let head_state_root = block.state_root(); + + debug!( + log, + "Re-writing state summaries for head"; + "block_root" => ?head_block_root, + "state_root" => ?head_state_root, + "slot" => head_state_slot + ); + let mut current_state = get_state_by_replay::(&db, head_state_root)?; + let mut current_state_root = head_state_root; + + new_summaries.insert( + head_state_root, + HotStateSummaryV10::new(&head_state_root, ¤t_state)?, + ); + + for slot in (finalized_slot.as_u64()..current_state.slot().as_u64()) + .rev() + .map(Slot::new) + { + let epoch_boundary_slot = (slot - 1) / slots_per_epoch * slots_per_epoch; + + let state_root = *current_state.get_state_root(slot)?; + let latest_block_root = *current_state.get_block_root(slot)?; + let prev_state_root = *current_state.get_state_root(slot - 1)?; + let epoch_boundary_state_root = *current_state.get_state_root(epoch_boundary_slot)?; + + let summary = HotStateSummaryV10 { + slot, + latest_block_root, + epoch_boundary_state_root, + prev_state_root, + }; + + // Stage the updated state summary for storage. + // If we've reached a known segment of chain then we can stop and continue to the next + // head. + if new_summaries.insert(state_root, summary).is_some() { + debug!( + log, + "Finished migrating chain tip"; + "head_block_root" => ?head_block_root, + "reason" => format!("reached common state {:?}", state_root), + ); + break; + } else { + debug!( + log, + "Rewriting hot state summary"; + "state_root" => ?state_root, + "slot" => slot, + "epoch_boundary_state_root" => ?epoch_boundary_state_root, + "prev_state_root" => ?prev_state_root, + ); + } + + // If the state reached is an epoch boundary state, then load it so that we can continue + // backtracking from it and storing diffs. + if slot % slots_per_epoch == 0 { + debug!( + log, + "Loading epoch boundary state"; + "state_root" => ?state_root, + "slot" => slot, + ); + let backtrack_state = get_state_by_replay::(&db, state_root)?; + + // If the current state is an epoch boundary state too then we might need to convert + // it to a diff relative to the backtrack state. + if current_state.slot() % slots_per_epoch == 0 + && !db.is_stored_as_full_state(current_state_root, current_state.slot())? + { + debug!( + log, + "Converting full state to diff"; + "prev_state_root" => ?state_root, + "state_root" => ?current_state_root, + "slot" => slot, + ); + + let diff = BeaconStateDiff::compute_diff(&backtrack_state, ¤t_state)?; + + // Store diff. + ops.push(db.state_diff_as_kv_store_op(¤t_state_root, &diff)?); + + // Delete full state. + let state_key = get_key_for_col( + DBColumn::BeaconState.into(), + current_state_root.as_bytes(), + ); + ops.push(KeyValueStoreOp::DeleteKey(state_key)); + } + + current_state = backtrack_state; + current_state_root = state_root; + } + + if slot == finalized_slot { + // FIXME(sproul): remove assert + assert_eq!(finalized_state_root, state_root); + debug!( + log, + "Finished migrating chain tip"; + "head_block_root" => ?head_block_root, + "reason" => format!("reached finalized state {:?}", finalized_state_root), + ); + break; + } + } + } + + ops.reserve(new_summaries.len()); + for (state_root, summary) in new_summaries { + ops.push(summary.as_kv_store_op(state_root)?); + } + + db.store_schema_version_atomically(SchemaVersion(10), ops) +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 66c877b5c3..56ed5ad9db 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -48,6 +48,10 @@ pub enum Error { #[cfg(feature = "milhouse")] MilhouseError(milhouse::Error), Compression(std::io::Error), + MissingPersistedBeaconChain, + SlotIsBeforeSplit { + slot: Slot, + }, } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6a34575187..eb3aa43785 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -798,6 +798,24 @@ impl, Cold: ItemStore> HotColdDB } } + /// Determine if the `state_root` at `slot` should be stored as a full state. + /// + /// This is dependent on the database's current split point, so may change from `false` to + /// `true` after a finalization update. It cannot change from `true` to `false` for a state in + /// the hot database as the split state will be migrated to + /// + /// All fork boundary states are also stored as full states. + pub fn is_stored_as_full_state(&self, state_root: Hash256, slot: Slot) -> Result { + let split = self.get_split_info(); + + if slot >= split.slot { + Ok(state_root == split.state_root + || self.spec.fork_activated_at_slot::(slot).is_some()) + } else { + Err(Error::SlotIsBeforeSplit { slot }) + } + } + pub fn load_hot_state_full( &self, state_root: &Hash256, @@ -1040,7 +1058,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Will skip slots as necessary. The returned state is not guaranteed /// to have any caches built, beyond those immediately required by block processing. - fn replay_blocks( + pub fn replay_blocks( &self, state: BeaconState, blocks: Vec>, @@ -1053,7 +1071,8 @@ impl, Cold: ItemStore> HotColdDB .state_root_iter(state_root_iter) .apply_blocks(blocks, Some(target_slot)) .and_then(|block_replayer| { - if block_replayer.state_root_miss() { + // FIXME(sproul): tweak state miss condition + if block_replayer.state_root_miss() && false { Err(Error::MissingStateRoot(target_slot)) } else { Ok(block_replayer.into_state()) @@ -1554,8 +1573,8 @@ pub fn migrate_database, Cold: ItemStore>( /// Struct for storing the split slot and state root in the database. #[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)] pub struct Split { - pub(crate) slot: Slot, - pub(crate) state_root: Hash256, + pub slot: Slot, + pub state_root: Hash256, } impl StoreItem for Split { @@ -1575,29 +1594,42 @@ impl StoreItem for Split { /// Struct for summarising a state in the hot database. /// /// Allows full reconstruction by replaying blocks. -#[derive(Debug, Clone, Copy, Default, Encode, Decode)] +#[superstruct( + variants(V1, V10), + variant_attributes(derive(Debug, Clone, Copy, Default, Encode, Decode)), + no_enum +)] pub struct HotStateSummary { pub slot: Slot, pub latest_block_root: Hash256, + /// The state root of the state at the prior epoch boundary. pub epoch_boundary_state_root: Hash256, /// The state root of the state at the prior slot. - // FIXME(sproul): migrate + #[superstruct(only(V10))] pub prev_state_root: Hash256, } -impl StoreItem for HotStateSummary { - fn db_column() -> DBColumn { - DBColumn::BeaconStateSummary - } +pub type HotStateSummary = HotStateSummaryV10; - fn as_store_bytes(&self) -> Result, Error> { - Ok(self.as_ssz_bytes()) - } +macro_rules! impl_store_item_summary { + ($t:ty) => { + impl StoreItem for $t { + fn db_column() -> DBColumn { + DBColumn::BeaconStateSummary + } - fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self::from_ssz_bytes(bytes)?) - } + fn as_store_bytes(&self) -> Result, Error> { + Ok(self.as_ssz_bytes()) + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } + } + }; } +impl_store_item_summary!(HotStateSummaryV1); +impl_store_item_summary!(HotStateSummaryV10); impl HotStateSummary { /// Construct a new summary of the given state. diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index c5f18e4bea..b7f2912c12 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9000); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(10); // All the keys that get stored under the `BeaconMeta` column. //