From d0319320ce4a17c1cccc4730d7247f3532394019 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 6 Dec 2019 14:29:06 +1100 Subject: [PATCH] Improve freezer DB efficiency with periodic restore points (#649) * Draft of checkpoint freezer DB * Fix bugs * Adjust root iterators for checkpoint database * Fix freezer state lookups with no slot hint * Fix split comment * Use "restore point" to refer to frozen states * Resolve some FIXMEs * Configurable slots per restore point * Document new freezer DB functions * Fix up StoreConfig * Fix new test for merge * Document SPRP default CLI flag, clarify tests --- .../beacon_chain/tests/persistence_tests.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 44 +- beacon_node/client/src/builder.rs | 17 +- beacon_node/client/src/config.rs | 18 +- beacon_node/src/cli.rs | 15 + beacon_node/src/config.rs | 8 +- beacon_node/src/lib.rs | 7 +- beacon_node/store/Cargo.toml | 3 + beacon_node/store/src/config.rs | 32 ++ beacon_node/store/src/hot_cold_store.rs | 425 +++++++++++++++--- beacon_node/store/src/iter.rs | 79 +++- beacon_node/store/src/lib.rs | 18 +- eth2/types/src/beacon_state.rs | 8 + 13 files changed, 587 insertions(+), 91 deletions(-) create mode 100644 beacon_node/store/src/config.rs diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 5e9977a23e..c3caa50b9b 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -27,9 +27,11 @@ fn get_store(db_path: &TempDir) -> Arc { let spec = E::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); + let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"), + DiskStore::open::(&hot_path, &cold_path, slots_per_restore_point, spec, log) + .expect("disk store should initialize"), ) } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 1dd05da399..0d6deb4a88 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -9,7 +9,7 @@ use beacon_chain::test_utils::{ use rand::Rng; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; -use store::DiskStore; +use store::{DiskStore, Store}; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::test_utils::{SeedableRng, XorShiftRng}; @@ -30,9 +30,11 @@ fn get_store(db_path: &TempDir) -> Arc { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); + let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"), + DiskStore::open::(&hot_path, &cold_path, slots_per_restore_point, spec, log) + .expect("disk store should initialize"), ) } @@ -62,6 +64,7 @@ fn full_participation_no_skips() { check_finalization(&harness, num_blocks_produced); check_split_slot(&harness, store); check_chain_dump(&harness, num_blocks_produced + 1); + check_iterators(&harness); } #[test] @@ -99,6 +102,7 @@ fn randomised_skips() { check_split_slot(&harness, store); check_chain_dump(&harness, num_blocks_produced + 1); + check_iterators(&harness); } #[test] @@ -140,6 +144,7 @@ fn long_skip() { check_finalization(&harness, initial_blocks + skip_slots + final_blocks); check_split_slot(&harness, store); check_chain_dump(&harness, initial_blocks + final_blocks + 1); + check_iterators(&harness); } /// Go forward to the point where the genesis randao value is no longer part of the vector. @@ -201,6 +206,7 @@ fn randao_genesis_storage() { check_finalization(&harness, num_slots); check_split_slot(&harness, store); check_chain_dump(&harness, num_slots + 1); + check_iterators(&harness); } // Check that closing and reopening a freezer DB restores the split slot to its correct value. @@ -281,10 +287,44 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { assert_eq!(chain_dump.len() as u64, expected_len); for checkpoint in chain_dump { + // Check that the tree hash of the stored state is as expected assert_eq!( checkpoint.beacon_state_root, Hash256::from_slice(&checkpoint.beacon_state.tree_hash_root()), "tree hash of stored state is incorrect" ); + + // Check that looking up the state root with no slot hint succeeds. + // This tests the state root -> slot mapping. + assert_eq!( + harness + .chain + .store + .get_state::(&checkpoint.beacon_state_root, None) + .expect("no error") + .expect("state exists") + .slot, + checkpoint.beacon_state.slot + ); } } + +/// Check that state and block root iterators can reach genesis +fn check_iterators(harness: &TestHarness) { + assert_eq!( + harness + .chain + .rev_iter_state_roots() + .last() + .map(|(_, slot)| slot), + Some(Slot::new(0)) + ); + assert_eq!( + harness + .chain + .rev_iter_block_roots() + .last() + .map(|(_, slot)| slot), + Some(Slot::new(0)) + ); +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e4da78da65..abc1710a86 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -577,7 +577,12 @@ where TEventHandler: EventHandler + 'static, { /// Specifies that the `Client` should use a `DiskStore` database. - pub fn disk_store(mut self, hot_path: &Path, cold_path: &Path) -> Result { + pub fn disk_store( + mut self, + hot_path: &Path, + cold_path: &Path, + slots_per_restore_point: u64, + ) -> Result { let context = self .runtime_context .as_ref() @@ -588,8 +593,14 @@ where .clone() .ok_or_else(|| "disk_store requires a chain spec".to_string())?; - let store = DiskStore::open(hot_path, cold_path, spec, context.log) - .map_err(|e| format!("Unable to open database: {:?}", e).to_string())?; + let store = DiskStore::open::( + hot_path, + cold_path, + slots_per_restore_point, + spec, + context.log, + ) + .map_err(|e| format!("Unable to open database: {:?}", e).to_string())?; self.store = Some(Arc::new(store)); Ok(self) } diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 330aadbc37..d08af28223 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -6,9 +6,6 @@ use std::path::PathBuf; /// The number initial validators when starting the `Minimal`. const TESTNET_SPEC_CONSTANTS: &str = "minimal"; -/// Default directory name for the freezer database under the top-level data dir. -const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; - /// Defines how the client should initialize the `BeaconChain` and other components. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ClientGenesis { @@ -45,9 +42,6 @@ impl Default for ClientGenesis { pub struct Config { pub data_dir: PathBuf, pub testnet_dir: Option, - pub db_type: String, - pub db_name: String, - pub freezer_db_path: Option, pub log_file: PathBuf, pub spec_constants: String, /// If true, the node will use co-ordinated junk for eth1 values. @@ -59,6 +53,7 @@ pub struct Config { /// The `genesis` field is not serialized or deserialized by `serde` to ensure it is defined /// via the CLI at runtime, instead of from a configuration file saved to disk. pub genesis: ClientGenesis, + pub store: store::StoreConfig, pub network: network::NetworkConfig, pub rest_api: rest_api::Config, pub websocket_server: websocket_server::Config, @@ -71,10 +66,8 @@ impl Default for Config { data_dir: PathBuf::from(".lighthouse"), testnet_dir: None, log_file: PathBuf::from(""), - db_type: "disk".to_string(), - db_name: "chain_db".to_string(), - freezer_db_path: None, genesis: <_>::default(), + store: <_>::default(), network: NetworkConfig::default(), rest_api: <_>::default(), websocket_server: <_>::default(), @@ -90,7 +83,7 @@ impl Config { /// Get the database path without initialising it. pub fn get_db_path(&self) -> Option { self.get_data_dir() - .map(|data_dir| data_dir.join(&self.db_name)) + .map(|data_dir| data_dir.join(&self.store.db_name)) } /// Get the database path, creating it if necessary. @@ -104,7 +97,7 @@ impl Config { /// Fetch default path to use for the freezer database. fn default_freezer_db_path(&self) -> Option { self.get_data_dir() - .map(|data_dir| data_dir.join(DEFAULT_FREEZER_DB_DIR)) + .map(|data_dir| data_dir.join(self.store.default_freezer_db_dir())) } /// Returns the path to which the client may initialize the on-disk freezer database. @@ -112,7 +105,8 @@ impl Config { /// Will attempt to use the user-supplied path from e.g. the CLI, or will default /// to a directory in the data_dir if no path is provided. pub fn get_freezer_db_path(&self) -> Option { - self.freezer_db_path + self.store + .freezer_db_path .clone() .or_else(|| self.default_freezer_db_path()) } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 649520c191..7c474ca1fc 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1,4 +1,5 @@ use clap::{App, Arg, SubCommand}; +use store::StoreConfig; pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new("beacon_node") @@ -188,6 +189,20 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("http://localhost:8545") ) + .arg( + Arg::with_name("slots-per-restore-point") + .long("slots-per-restore-point") + .value_name("SLOT_COUNT") + .help("Specifies how often a freezer DB restore point should be stored. \ + DO NOT CHANGE AFTER INITIALIZATION.") + .takes_value(true) + .default_value( + Box::leak( + format!("{}", StoreConfig::default().slots_per_restore_point) + .into_boxed_str() + ) + ) + ) /* * The "testnet" sub-command. * diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 75171c759e..45cd69ff3a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -233,7 +233,13 @@ pub fn get_configs( }; if let Some(freezer_dir) = cli_args.value_of("freezer-dir") { - client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); + client_config.store.freezer_db_path = Some(PathBuf::from(freezer_dir)); + } + + if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") { + client_config.store.slots_per_restore_point = slots_per_restore_point + .parse() + .map_err(|_| "slots-per-restore-point is not a valid integer".to_string())?; } if eth2_config.spec_constants != client_config.spec_constants { diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 36f69ab765..e3dd1f8679 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -79,6 +79,7 @@ impl ProductionBeaconNode { let spec = context.eth2_config().spec.clone(); let genesis_eth1_config = client_config.eth1.clone(); let client_genesis = client_config.genesis.clone(); + let store_config = client_config.store.clone(); let log = context.log.clone(); let db_path_res = client_config.create_db_path(); @@ -90,7 +91,11 @@ impl ProductionBeaconNode { Ok(ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) - .disk_store(&db_path, &freezer_db_path_res?)? + .disk_store( + &db_path, + &freezer_db_path_res?, + store_config.slots_per_restore_point, + )? .background_migrator()?) }) .and_then(move |builder| { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 5187dba0d4..adce8c6f70 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -16,6 +16,9 @@ eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" tree_hash = "0.1.0" types = { path = "../../eth2/types" } +state_processing = { path = "../../eth2/state_processing" } slog = "2.2.3" +serde = "1.0" +serde_derive = "1.0.102" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs new file mode 100644 index 0000000000..4693942eb9 --- /dev/null +++ b/beacon_node/store/src/config.rs @@ -0,0 +1,32 @@ +use serde_derive::{Deserialize, Serialize}; +use std::path::PathBuf; +use types::{EthSpec, MinimalEthSpec}; + +/// Default directory name for the freezer database under the top-level data dir. +const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StoreConfig { + /// Name of the directory inside the data directory where the main "hot" DB is located. + pub db_name: String, + /// Path where the freezer database will be located. + pub freezer_db_path: Option, + /// Number of slots to wait between storing restore points in the freezer database. + pub slots_per_restore_point: u64, +} + +impl Default for StoreConfig { + fn default() -> Self { + Self { + db_name: "chain_db".to_string(), + freezer_db_path: None, + slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64, + } + } +} + +impl StoreConfig { + pub fn default_freezer_db_dir(&self) -> &'static str { + DEFAULT_FREEZER_DB_DIR + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 490f6c2d78..3ddf2c365a 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,30 +1,43 @@ use crate::chunked_vector::{ store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; -use crate::iter::StateRootsIterator; +use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::{ leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem, }; use parking_lot::RwLock; -use slog::{info, trace, Logger}; +use slog::{debug, trace, warn, Logger}; use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use state_processing::{ + per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, + SlotProcessingError, +}; use std::convert::TryInto; use std::path::Path; use std::sync::Arc; use types::*; -/// 32-byte key for accessing the `split_slot` of the freezer DB. -pub const SPLIT_SLOT_DB_KEY: &str = "FREEZERDBSPLITSLOTFREEZERDBSPLIT"; +/// 32-byte key for accessing the `split` of the freezer DB. +pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; +/// On-disk database that stores finalized states efficiently. +/// +/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores +/// intermittent "restore point" states pre-finalization. pub struct HotColdDB { - /// The slot before which all data is stored in the cold database. + /// The slot and state root at the point where the database is split between hot and cold. /// - /// Data for slots less than `split_slot` is in the cold DB, while data for slots - /// greater than or equal is in the hot DB. - split_slot: RwLock, + /// States with slots less than `split.slot` are in the cold DB, while states with slots + /// greater than or equal are in the hot DB. + split: RwLock, + /// Number of slots per restore point state in the freezer database. + slots_per_restore_point: u64, /// Cold database containing compact historical data. cold_db: LevelDB, /// Hot database containing duplicated but quick-to-access recent data. + /// + /// The hot database also contains all blocks. hot_db: LevelDB, /// Chain spec. spec: ChainSpec, @@ -38,6 +51,24 @@ pub enum HotColdDbError { current_split_slot: Slot, proposed_split_slot: Slot, }, + MissingStateToFreeze(Hash256), + MissingRestorePointHash(u64), + MissingRestorePoint(Hash256), + MissingStateSlot(Hash256), + MissingSplitState(Hash256, Slot), + RestorePointDecodeError(ssz::DecodeError), + RestorePointReplayFailure { + expected_state_root: Hash256, + observed_state_root: Hash256, + }, + BlockReplayBeaconError(BeaconStateError), + BlockReplaySlotError(SlotProcessingError), + BlockReplayBlockError(BlockProcessingError), + InvalidSlotsPerRestorePoint { + slots_per_restore_point: u64, + slots_per_historical_root: u64, + }, + RestorePointBlockHashError(BeaconStateError), } impl Store for HotColdDB { @@ -79,24 +110,31 @@ impl Store for HotColdDB { ) -> Result>, Error> { if let Some(slot) = slot { if slot < self.get_split_slot() { - self.load_archive_state(state_root) + self.load_archive_state(state_root, slot).map(Some) } else { self.hot_db.get_state(state_root, None) } } else { match self.hot_db.get_state(state_root, None)? { Some(state) => Ok(Some(state)), - None => self.load_archive_state(state_root), + None => { + // Look-up the state in the freezer DB. We don't know the slot, so we must + // look it up separately and then use it to reconstruct the state from a + // restore point. + let slot = self.load_state_slot(state_root)?; + self.load_archive_state(state_root, slot).map(Some) + } } } } + /// Advance the split point of the store, moving new finalized states to the freezer. fn freeze_to_state( store: Arc, - _frozen_head_root: Hash256, + frozen_head_root: Hash256, frozen_head: &BeaconState, ) -> Result<(), Error> { - info!( + debug!( store.log, "Freezer migration started"; "slot" => frozen_head.slot @@ -119,25 +157,28 @@ impl Store for HotColdDB { for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { - trace!(store.log, "Freezing"; - "slot" => slot, - "state_root" => format!("{}", state_root)); + if slot % store.slots_per_restore_point == 0 { + let state: BeaconState = store + .hot_db + .get_state(&state_root, None)? + .ok_or_else(|| HotColdDbError::MissingStateToFreeze(state_root))?; - let state: BeaconState = match store.hot_db.get_state(&state_root, None)? { - Some(s) => s, - // If there's no state it could be a skip slot, which is fine, our job is just - // to move everything that was in the hot DB to the cold. - None => continue, - }; + store.store_archive_state(&state_root, &state)?; + } + + // Store a pointer from this state root to its slot, so we can later reconstruct states + // from their state root alone. + store.store_state_slot(&state_root, slot)?; to_delete.push(state_root); - - store.store_archive_state(&state_root, &state)?; } // 2. Update the split slot - *store.split_slot.write() = frozen_head.slot; - store.store_split_slot()?; + *store.split.write() = Split { + slot: frozen_head.slot, + state_root: frozen_head_root, + }; + store.store_split()?; // 3. Delete from the hot DB for state_root in to_delete { @@ -146,7 +187,7 @@ impl Store for HotColdDB { .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; } - info!( + debug!( store.log, "Freezer migration complete"; "slot" => frozen_head.slot @@ -157,38 +198,60 @@ impl Store for HotColdDB { } impl HotColdDB { - pub fn open( + /// Open a new or existing database, with the given paths to the hot and cold DBs. + /// + /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. + pub fn open( hot_path: &Path, cold_path: &Path, + slots_per_restore_point: u64, spec: ChainSpec, log: Logger, ) -> Result { + Self::verify_slots_per_restore_point::(slots_per_restore_point)?; + let db = HotColdDB { - split_slot: RwLock::new(Slot::new(0)), + split: RwLock::new(Split::default()), + slots_per_restore_point, cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, spec, log, }; + // Load the previous split slot from the database (if any). This ensures we can // stop and restart correctly. - if let Some(split_slot) = db.load_split_slot()? { - *db.split_slot.write() = split_slot; + if let Some(split) = db.load_split()? { + *db.split.write() = split; } Ok(db) } + /// Store a pre-finalization state in the freezer database. + /// + /// Will return an error if the state does not lie on a restore point boundary. pub fn store_archive_state( &self, state_root: &Hash256, state: &BeaconState, ) -> Result<(), Error> { + if state.slot % self.slots_per_restore_point != 0 { + warn!( + self.log, + "Not storing non-restore point state in freezer"; + "slot" => state.slot.as_u64(), + "state_root" => format!("{:?}", state_root) + ); + return Ok(()); + } + trace!( self.log, - "Freezing state"; - "slot" => state.slot.as_u64(), + "Creating restore point"; + "slot" => state.slot, "state_root" => format!("{:?}", state_root) ); + // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); partial_state.db_put(&self.cold_db, state_root)?; @@ -200,17 +263,35 @@ impl HotColdDB { store_updated_vector(HistoricalRoots, db, state, &self.spec)?; store_updated_vector(RandaoMixes, db, state, &self.spec)?; + // 3. Store restore point. + let restore_point_index = state.slot.as_u64() / self.slots_per_restore_point; + self.store_restore_point_hash(restore_point_index, *state_root)?; + Ok(()) } + /// Load a pre-finalization state from the freezer database. + /// + /// Will reconstruct the state if it lies between restore points. pub fn load_archive_state( &self, state_root: &Hash256, - ) -> Result>, Error> { - let mut partial_state = match PartialBeaconState::db_get(&self.cold_db, state_root)? { - Some(s) => s, - None => return Ok(None), - }; + slot: Slot, + ) -> Result, Error> { + if slot % self.slots_per_restore_point == 0 { + self.load_restore_point(state_root) + } else { + self.load_intermediate_state(state_root, slot) + } + } + + /// Load a restore point state by its `state_root`. + fn load_restore_point( + &self, + state_root: &Hash256, + ) -> Result, Error> { + let mut partial_state = PartialBeaconState::db_get(&self.cold_db, state_root)? + .ok_or_else(|| HotColdDbError::MissingRestorePoint(*state_root))?; // Fill in the fields of the partial state. partial_state.load_block_roots(&self.cold_db, &self.spec)?; @@ -218,43 +299,277 @@ impl HotColdDB { partial_state.load_historical_roots(&self.cold_db, &self.spec)?; partial_state.load_randao_mixes(&self.cold_db, &self.spec)?; - let state: BeaconState = partial_state.try_into()?; - - Ok(Some(state)) + Ok(partial_state.try_into()?) } + /// Load a restore point state by its `restore_point_index`. + fn load_restore_point_by_index( + &self, + restore_point_index: u64, + ) -> Result, Error> { + let state_root = self.load_restore_point_hash(restore_point_index)?; + self.load_restore_point(&state_root) + } + + /// Load a state that lies between restore points. + fn load_intermediate_state( + &self, + state_root: &Hash256, + slot: Slot, + ) -> Result, Error> { + // 1. Load the restore points either side of the intermediate state. + let low_restore_point_idx = slot.as_u64() / self.slots_per_restore_point; + let high_restore_point_idx = low_restore_point_idx + 1; + + // Acquire the read lock, so that the split can't change while this is happening. + let split = self.split.read(); + + let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?; + // If the slot of the high point lies outside the freezer, use the split state + // as the upper restore point. + let high_restore_point = if high_restore_point_idx * self.slots_per_restore_point + >= split.slot.as_u64() + { + self.get_state::(&split.state_root, Some(split.slot))? + .ok_or_else(|| HotColdDbError::MissingSplitState(split.state_root, split.slot))? + } else { + self.load_restore_point_by_index(high_restore_point_idx)? + }; + + // 2. Load the blocks from the high restore point back to the low restore point. + let blocks = self.load_blocks_to_replay( + low_restore_point.slot, + slot, + self.get_high_restore_point_block_root(&high_restore_point, slot)?, + )?; + + // 3. Replay the blocks on top of the low restore point. + let mut state = self.replay_blocks(low_restore_point, blocks, slot)?; + + // 4. Check that the state root is correct (should be quick with the cache already built). + // TODO: we could optimise out *all* the tree hashing when replaying blocks, + // in which case we could also drop this check. + let observed_state_root = state.update_tree_hash_cache()?; + + if observed_state_root == *state_root { + Ok(state) + } else { + Err(HotColdDbError::RestorePointReplayFailure { + expected_state_root: *state_root, + observed_state_root, + } + .into()) + } + } + + /// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`. + /// + /// Defaults to the block root for `slot`, which *should* be in range. + fn get_high_restore_point_block_root( + &self, + high_restore_point: &BeaconState, + slot: Slot, + ) -> Result { + high_restore_point + .get_block_root(slot) + .or_else(|_| high_restore_point.get_oldest_block_root()) + .map(|x| *x) + .map_err(HotColdDbError::RestorePointBlockHashError) + } + + /// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`. + /// + /// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot + /// equal to `start_slot`, to reach a state with slot equal to `end_slot`. + fn load_blocks_to_replay( + &self, + start_slot: Slot, + end_slot: Slot, + end_block_hash: Hash256, + ) -> Result>, Error> { + let mut blocks = ParentRootBlockIterator::new(self, end_block_hash) + // Include the block at the end slot (if any), it needs to be + // replayed in order to construct the canonical state at `end_slot`. + .filter(|block| block.slot <= end_slot) + // Exclude the block at the start slot (if any), because it has already + // been applied to the starting state. + .take_while(|block| block.slot > start_slot) + .collect::>(); + blocks.reverse(); + Ok(blocks) + } + + /// Replay `blocks` on top of `state` until `target_slot` is reached. + /// + /// Will skip slots as necessary. + fn replay_blocks( + &self, + mut state: BeaconState, + blocks: Vec>, + target_slot: Slot, + ) -> Result, Error> { + state + .build_all_caches(&self.spec) + .map_err(HotColdDbError::BlockReplayBeaconError)?; + + for block in blocks { + while state.slot < block.slot { + per_slot_processing(&mut state, &self.spec) + .map_err(HotColdDbError::BlockReplaySlotError)?; + } + per_block_processing( + &mut state, + &block, + None, + BlockSignatureStrategy::NoVerification, + &self.spec, + ) + .map_err(HotColdDbError::BlockReplayBlockError)?; + } + + while state.slot < target_slot { + per_slot_processing(&mut state, &self.spec) + .map_err(HotColdDbError::BlockReplaySlotError)?; + } + + Ok(state) + } + + /// Fetch a copy of the current split slot from memory. pub fn get_split_slot(&self) -> Slot { - *self.split_slot.read() + self.split.read().slot } - fn load_split_slot(&self) -> Result, Error> { - let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes()); - let split_slot: Option = self.hot_db.get(&key)?; - Ok(split_slot.map(|s| Slot::new(s.0))) + /// Load the split point from disk. + fn load_split(&self) -> Result, Error> { + let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes()); + let split: Option = self.hot_db.get(&key)?; + Ok(split) } - fn store_split_slot(&self) -> Result<(), Error> { - let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes()); - self.hot_db - .put(&key, &SplitSlot(self.get_split_slot().as_u64()))?; + /// Store the split point on disk. + fn store_split(&self) -> Result<(), Error> { + let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes()); + self.hot_db.put(&key, &*self.split.read())?; Ok(()) } + + /// Load the state root of a restore point. + fn load_restore_point_hash(&self, restore_point_index: u64) -> Result { + let key = Self::restore_point_key(restore_point_index); + RestorePointHash::db_get(&self.cold_db, &key)? + .map(|r| r.state_root) + .ok_or(HotColdDbError::MissingRestorePointHash(restore_point_index).into()) + } + + /// Store the state root of a restore point. + fn store_restore_point_hash( + &self, + restore_point_index: u64, + state_root: Hash256, + ) -> Result<(), Error> { + let key = Self::restore_point_key(restore_point_index); + RestorePointHash { state_root } + .db_put(&self.cold_db, &key) + .map_err(Into::into) + } + + /// Convert a `restore_point_index` into a database key. + fn restore_point_key(restore_point_index: u64) -> Hash256 { + Hash256::from_low_u64_be(restore_point_index) + } + + /// Load a frozen state's slot, given its root. + fn load_state_slot(&self, state_root: &Hash256) -> Result { + StateSlot::db_get(&self.cold_db, state_root)? + .map(|s| s.slot) + .ok_or_else(|| HotColdDbError::MissingStateSlot(*state_root).into()) + } + + /// Store the slot of a frozen state. + fn store_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { + StateSlot { slot } + .db_put(&self.cold_db, state_root) + .map_err(Into::into) + } + + /// Check that the restore point frequency is a divisor of the slots per historical root. + /// + /// This ensures that we have at least one restore point within range of our state + /// root history when iterating backwards (and allows for more frequent restore points if + /// desired). + fn verify_slots_per_restore_point( + slots_per_restore_point: u64, + ) -> Result<(), HotColdDbError> { + let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64(); + if slots_per_restore_point > 0 && slots_per_historical_root % slots_per_restore_point == 0 { + Ok(()) + } else { + Err(HotColdDbError::InvalidSlotsPerRestorePoint { + slots_per_restore_point, + slots_per_historical_root, + }) + } + } } -/// Struct for storing the split slot in the database. -#[derive(Clone, Copy)] -struct SplitSlot(u64); +/// Struct for storing the split slot and state root in the database. +#[derive(Clone, Copy, Default, Encode, Decode)] +struct Split { + slot: Slot, + state_root: Hash256, +} -impl SimpleStoreItem for SplitSlot { +impl SimpleStoreItem for Split { fn db_column() -> DBColumn { DBColumn::BeaconMeta } fn as_store_bytes(&self) -> Vec { - self.0.as_ssz_bytes() + self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(SplitSlot(u64::from_ssz_bytes(bytes)?)) + Ok(Self::from_ssz_bytes(bytes)?) + } +} + +/// Struct for storing the slot of a state root in the database. +#[derive(Clone, Copy, Default, Encode, Decode)] +struct StateSlot { + slot: Slot, +} + +impl SimpleStoreItem for StateSlot { + fn db_column() -> DBColumn { + DBColumn::BeaconStateSlot + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} + +/// Struct for storing the state root of a restore point in the database. +#[derive(Clone, Copy, Default, Encode, Decode)] +struct RestorePointHash { + state_root: Hash256, +} + +impl SimpleStoreItem for RestorePointHash { + fn db_column() -> DBColumn { + DBColumn::BeaconRestorePoint + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) } } diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 6b2dc2d540..b4dfee774a 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -1,7 +1,10 @@ use crate::Store; use std::borrow::Cow; +use std::marker::PhantomData; use std::sync::Arc; -use types::{BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot}; +use types::{ + typenum::Unsigned, BeaconBlock, BeaconState, BeaconStateError, EthSpec, Hash256, Slot, +}; /// Implemented for types that have ancestors (e.g., blocks, states) that may be iterated over. /// @@ -80,12 +83,9 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { match self.beacon_state.get_state_root(self.slot) { Ok(root) => Some((*root, self.slot)), Err(BeaconStateError::SlotOutOfBounds) => { - // Read a `BeaconState` from the store that has access to prior historical root. - let beacon_state: BeaconState = { - let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; - - self.store.get_state(&new_state_root, None).ok()? - }?; + // Read a `BeaconState` from the store that has access to prior historical roots. + let beacon_state = + next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?; self.beacon_state = Cow::Owned(beacon_state); @@ -98,6 +98,39 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { } } +/// Block iterator that uses the `parent_root` of each block to backtrack. +pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { + store: &'a S, + next_block_root: Hash256, + _phantom: PhantomData, +} + +impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { + pub fn new(store: &'a S, start_block_root: Hash256) -> Self { + Self { + store, + next_block_root: start_block_root, + _phantom: PhantomData, + } + } +} + +impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { + type Item = BeaconBlock; + + fn next(&mut self) -> Option { + // Stop once we reach the zero parent, otherwise we'll keep returning the genesis + // block forever. + if self.next_block_root.is_zero() { + None + } else { + let block: BeaconBlock = self.store.get(&self.next_block_root).ok()??; + self.next_block_root = block.parent_root; + Some(block) + } + } +} + #[derive(Clone)] /// Extends `BlockRootsIterator`, returning `BeaconBlock` instances, instead of their roots. pub struct BlockIterator<'a, T: EthSpec, U> { @@ -177,7 +210,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { type Item = (Hash256, Slot); fn next(&mut self) -> Option { - if (self.slot == 0) || (self.slot > self.beacon_state.slot) { + if self.slot == 0 || self.slot > self.beacon_state.slot { return None; } @@ -186,13 +219,9 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { match self.beacon_state.get_block_root(self.slot) { Ok(root) => Some((*root, self.slot)), Err(BeaconStateError::SlotOutOfBounds) => { - // Read a `BeaconState` from the store that has access to prior historical root. - let beacon_state: BeaconState = { - // Load the earliest state from disk. - let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; - - self.store.get_state(&new_state_root, None).ok()? - }?; + // Read a `BeaconState` from the store that has access to prior historical roots. + let beacon_state = + next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?; self.beacon_state = Cow::Owned(beacon_state); @@ -205,6 +234,26 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } } +/// Fetch the next state to use whilst backtracking in `*RootsIterator`. +fn next_historical_root_backtrack_state( + store: &S, + current_state: &BeaconState, +) -> Option> { + // For compatibility with the freezer database's restore points, we load a state at + // a restore point slot (thus avoiding replaying blocks). In the case where we're + // not frozen, this just means we might not jump back by the maximum amount on + // our first jump (i.e. at most 1 extra state load). + let new_state_slot = slot_of_prev_restore_point::(current_state.slot); + let new_state_root = current_state.get_state_root(new_state_slot).ok()?; + store.get_state(new_state_root, Some(new_state_slot)).ok()? +} + +/// Compute the slot of the last guaranteed restore point in the freezer database. +fn slot_of_prev_restore_point(current_slot: Slot) -> Slot { + let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64(); + (current_slot - 1) / slots_per_historical_root * slots_per_historical_root +} + pub type ReverseBlockRootIterator<'a, E, S> = ReverseHashAndSlotIterator>; pub type ReverseStateRootIterator<'a, E, S> = diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index d33a2ab3d4..3698d2409f 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -12,6 +12,7 @@ extern crate lazy_static; mod block_at_slot; pub mod chunked_vector; +pub mod config; mod errors; mod hot_cold_store; mod impls; @@ -25,6 +26,7 @@ pub mod migrate; use std::sync::Arc; +pub use self::config::StoreConfig; pub use self::hot_cold_store::HotColdDB as DiskStore; pub use self::leveldb_store::LevelDB as SimpleDiskStore; pub use self::memory_store::MemoryStore; @@ -117,6 +119,10 @@ pub enum DBColumn { BeaconBlock, BeaconState, BeaconChain, + /// For the table mapping restore point numbers to state roots. + BeaconRestorePoint, + /// For the mapping from state roots to their slots. + BeaconStateSlot, BeaconBlockRoots, BeaconStateRoots, BeaconHistoricalRoots, @@ -131,6 +137,8 @@ impl Into<&'static str> for DBColumn { DBColumn::BeaconBlock => "blk", DBColumn::BeaconState => "ste", DBColumn::BeaconChain => "bch", + DBColumn::BeaconRestorePoint => "brp", + DBColumn::BeaconStateSlot => "bss", DBColumn::BeaconBlockRoots => "bbr", DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconHistoricalRoots => "bhr", @@ -263,9 +271,17 @@ mod tests { let hot_dir = tempdir().unwrap(); let cold_dir = tempdir().unwrap(); + let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let spec = MinimalEthSpec::default_spec(); let log = NullLoggerBuilder.build().unwrap(); - let store = DiskStore::open(&hot_dir.path(), &cold_dir.path(), spec, log).unwrap(); + let store = DiskStore::open::( + &hot_dir.path(), + &cold_dir.path(), + slots_per_restore_point, + spec, + log, + ) + .unwrap(); test_impl(store); } diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 9cd949d877..63d2a1f31f 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -576,6 +576,14 @@ impl BeaconState { Ok(&self.state_roots[i]) } + /// Gets the oldest (earliest slot) block root. + /// + /// Spec v0.9.1 + pub fn get_oldest_block_root(&self) -> Result<&Hash256, Error> { + let i = self.get_latest_block_roots_index(self.slot - self.block_roots.len() as u64)?; + Ok(&self.block_roots[i]) + } + /// Sets the latest state root for slot. /// /// Spec v0.9.1