diff --git a/Cargo.lock b/Cargo.lock index 3038e29dff..6972343fe0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3968,6 +3968,7 @@ dependencies = [ "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "lighthouse_metrics 0.1.0", + "lru 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 95cc86b99b..281375c7a4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1,5 +1,4 @@ use crate::checkpoint::CheckPoint; -use crate::checkpoint_cache::CheckPointCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; @@ -22,7 +21,6 @@ use state_processing::per_block_processing::{ use state_processing::{ per_block_processing, per_slot_processing, BlockProcessingError, BlockSignatureStrategy, }; -use std::borrow::Cow; use std::cmp::Ordering; use std::fs; use std::io::prelude::*; @@ -31,7 +29,7 @@ use std::time::{Duration, Instant}; use store::iter::{ BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator, }; -use store::{Error as DBError, Migrate, Store}; +use store::{Error as DBError, Migrate, StateBatch, Store}; use tree_hash::TreeHash; use types::*; @@ -149,8 +147,6 @@ pub struct BeaconChain { pub event_handler: T::EventHandler, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: HeadTracker, - /// Provides a small cache of `BeaconState` and `BeaconBlock`. - pub(crate) checkpoint_cache: CheckPointCache, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -168,11 +164,11 @@ impl BeaconChain { let beacon_block_root = canonical_head.beacon_state.finalized_checkpoint.root; let beacon_block = self .store - .get::>(&beacon_block_root)? + .get_block(&beacon_block_root)? .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root; let beacon_state = self - .get_state_caching(&beacon_state_root, Some(beacon_block.slot))? + .get_state(&beacon_state_root, Some(beacon_block.slot))? .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; CheckPoint { @@ -306,10 +302,10 @@ impl BeaconChain { block_root: Hash256, ) -> Result, Error> { let block = self - .get_block_caching(&block_root)? + .get_block(&block_root)? .ok_or_else(|| Error::MissingBeaconBlock(block_root))?; let state = self - .get_state_caching(&block.state_root, Some(block.slot))? + .get_state(&block.state_root, Some(block.slot))? .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; let iter = BlockRootsIterator::owned(self.store.clone(), state); Ok(ReverseBlockRootIterator::new( @@ -392,7 +388,7 @@ impl BeaconChain { &self, block_root: &Hash256, ) -> Result>, Error> { - Ok(self.store.get(block_root)?) + Ok(self.store.get_block(block_root)?) } /// Returns the state at the given root, if any. @@ -408,44 +404,11 @@ impl BeaconChain { Ok(self.store.get_state(state_root, slot)?) } - /// Returns the block at the given root, if any. - /// - /// ## Errors - /// - /// May return a database error. - pub fn get_block_caching( - &self, - block_root: &Hash256, - ) -> Result>, Error> { - if let Some(block) = self.checkpoint_cache.get_block(block_root) { - Ok(Some(block)) - } else { - Ok(self.store.get(block_root)?) - } - } - - /// Returns the state at the given root, if any. - /// - /// ## Errors - /// - /// May return a database error. - pub fn get_state_caching( - &self, - state_root: &Hash256, - slot: Option, - ) -> Result>, Error> { - if let Some(state) = self.checkpoint_cache.get_state(state_root) { - Ok(Some(state)) - } else { - Ok(self.store.get_state(state_root, slot)?) - } - } - /// Returns the state at the given root, if any. /// /// The return state does not contain any caches other than the committee caches. This method - /// is much faster than `Self::get_state_caching` because it does not clone the tree hash cache - /// when the state is found in the checkpoint cache. + /// is much faster than `Self::get_state` because it does not clone the tree hash cache + /// when the state is found in the cache. /// /// ## Errors /// @@ -455,14 +418,11 @@ impl BeaconChain { state_root: &Hash256, slot: Option, ) -> Result>, Error> { - if let Some(state) = self - .checkpoint_cache - .get_state_only_with_committee_cache(state_root) - { - Ok(Some(state)) - } else { - Ok(self.store.get_state(state_root, slot)?) - } + Ok(self.store.get_state_with( + state_root, + slot, + types::beacon_state::CloneConfig::committee_caches_only(), + )?) } /// Returns a `Checkpoint` representing the head block and state. Contains the "best block"; @@ -568,7 +528,7 @@ impl BeaconChain { .ok_or_else(|| Error::NoStateForSlot(slot))?; Ok(self - .get_state_caching(&state_root, Some(slot))? + .get_state(&state_root, Some(slot))? .ok_or_else(|| Error::NoStateForSlot(slot))?) } } @@ -890,7 +850,7 @@ impl BeaconChain { // An honest validator would have set this block to be the head of the chain (i.e., the // result of running fork choice). let result = if let Some(attestation_head_block) = - self.get_block_caching(&attestation.data.beacon_block_root)? + self.get_block(&attestation.data.beacon_block_root)? { // If the attestation points to a block in the same epoch in which it was made, // then it is sufficient to load the state from that epoch's boundary, because @@ -1274,22 +1234,21 @@ impl BeaconChain { // Load the blocks parent block from the database, returning invalid if that block is not // found. - let parent_block: BeaconBlock = - match self.get_block_caching(&block.parent_root)? { - Some(block) => block, - None => { - return Ok(BlockProcessingOutcome::ParentUnknown { - parent: block.parent_root, - reference_location: "database", - }); - } - }; + let parent_block = match self.get_block(&block.parent_root)? { + Some(block) => block, + None => { + return Ok(BlockProcessingOutcome::ParentUnknown { + parent: block.parent_root, + reference_location: "database", + }); + } + }; // Load the parent blocks state from the database, returning an error if it is not found. // It is an error because if we know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root; let parent_state = self - .get_state_caching(&parent_state_root, Some(parent_block.slot))? + .get_state(&parent_state_root, Some(parent_block.slot))? .ok_or_else(|| { Error::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; @@ -1300,25 +1259,26 @@ impl BeaconChain { let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); - // Keep a list of any states that were "skipped" (block-less) in between the parent state - // slot and the block slot. These will need to be stored in the database. - let mut intermediate_states = vec![]; + // Keep a batch of any states that were "skipped" (block-less) in between the parent state + // slot and the block slot. These will be stored in the database. + let mut intermediate_states = StateBatch::new(); // Transition the parent state to the block slot. let mut state: BeaconState = parent_state; let distance = block.slot.as_u64().saturating_sub(state.slot.as_u64()); for i in 0..distance { - if i > 0 { - intermediate_states.push(state.clone()); - } - let state_root = if i == 0 { - Some(parent_block.state_root) + parent_block.state_root } else { - None + // This is a new state we've reached, so stage it for storage in the DB. + // Computing the state root here is time-equivalent to computing it during slot + // processing, but we get early access to it. + let state_root = state.update_tree_hash_cache()?; + intermediate_states.add_state(state_root, &state)?; + state_root }; - per_slot_processing(&mut state, state_root, &self.spec)?; + per_slot_processing(&mut state, Some(state_root), &self.spec)?; } metrics::stop_timer(catchup_timer); @@ -1393,23 +1353,17 @@ impl BeaconChain { metrics::stop_timer(fork_choice_register_timer); + self.head_tracker.register_block(block_root, &block); + metrics::observe( + &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, + block.body.attestations.len() as f64, + ); + let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Store all the states between the parent block state and this blocks slot before storing + // Store all the states between the parent block state and this block's slot before storing // the final state. - for (i, intermediate_state) in intermediate_states.iter().enumerate() { - // To avoid doing an unnecessary tree hash, use the following (slot + 1) state's - // state_roots field to find the root. - let following_state = match intermediate_states.get(i + 1) { - Some(following_state) => following_state, - None => &state, - }; - let intermediate_state_root = - following_state.get_state_root(intermediate_state.slot)?; - - self.store - .put_state(&intermediate_state_root, intermediate_state)?; - } + intermediate_states.commit(&*self.store)?; // Store the block and state. // NOTE: we store the block *after* the state to guard against inconsistency in the event of @@ -1417,29 +1371,12 @@ impl BeaconChain { // solution would be to use a database transaction (once our choice of database and API // settles down). // See: https://github.com/sigp/lighthouse/issues/692 - self.store.put_state(&state_root, &state)?; - self.store.put(&block_root, &block)?; + self.store.put_state(&state_root, state)?; + self.store.put_block(&block_root, block)?; metrics::stop_timer(db_write_timer); - self.head_tracker.register_block(block_root, &block); - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); - metrics::observe( - &metrics::OPERATIONS_PER_BLOCK_ATTESTATION, - block.body.attestations.len() as f64, - ); - - // Store the block in the checkpoint cache. - // - // A block that was just imported is likely to be referenced by the next block that we - // import. - self.checkpoint_cache.insert(Cow::Owned(CheckPoint { - beacon_block_root: block_root, - beacon_block: block, - beacon_state_root: state_root, - beacon_state: state, - })); metrics::stop_timer(full_timer); @@ -1575,13 +1512,13 @@ impl BeaconChain { let result = if beacon_block_root != self.head_info()?.block_root { metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); - let beacon_block: BeaconBlock = self - .get_block_caching(&beacon_block_root)? + let beacon_block = self + .get_block(&beacon_block_root)? .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root; let beacon_state: BeaconState = self - .get_state_caching(&beacon_state_root, Some(beacon_block.slot))? + .get_state(&beacon_state_root, Some(beacon_block.slot))? .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; let previous_slot = self.head_info()?.slot; @@ -1650,11 +1587,6 @@ impl BeaconChain { let timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES); - // Store the head in the checkpoint cache. - // - // The head block is likely to be referenced by the next imported block. - self.checkpoint_cache.insert(Cow::Borrowed(&new_head)); - // Update the checkpoint that stores the head of the chain at the time it received the // block. *self @@ -1703,7 +1635,7 @@ impl BeaconChain { ) -> Result<(), Error> { let finalized_block = self .store - .get::>(&finalized_block_root)? + .get_block(&finalized_block_root)? .ok_or_else(|| Error::MissingBeaconBlock(finalized_block_root))?; let new_finalized_epoch = finalized_block.slot.epoch(T::EthSpec::slots_per_epoch()); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 03cefefb06..331d4f5f8a 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -1,4 +1,3 @@ -use crate::checkpoint_cache::CheckPointCache; use crate::eth1_chain::CachingEth1Backend; use crate::events::NullEventHandler; use crate::head_tracker::HeadTracker; @@ -219,7 +218,7 @@ where self.genesis_block_root = Some(beacon_block_root); store - .put_state(&beacon_state_root, &beacon_state) + .put_state(&beacon_state_root, beacon_state.clone()) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; store .put(&beacon_block_root, &beacon_block) @@ -334,7 +333,6 @@ where .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, head_tracker: self.head_tracker.unwrap_or_default(), - checkpoint_cache: CheckPointCache::default(), log: log.clone(), }; diff --git a/beacon_node/beacon_chain/src/checkpoint_cache.rs b/beacon_node/beacon_chain/src/checkpoint_cache.rs deleted file mode 100644 index 3832182e9b..0000000000 --- a/beacon_node/beacon_chain/src/checkpoint_cache.rs +++ /dev/null @@ -1,125 +0,0 @@ -use crate::checkpoint::CheckPoint; -use crate::metrics; -use parking_lot::RwLock; -use std::borrow::Cow; -use types::{BeaconBlock, BeaconState, EthSpec, Hash256}; - -const CACHE_SIZE: usize = 4; - -struct Inner { - oldest: usize, - limit: usize, - checkpoints: Vec>, -} - -impl Default for Inner { - fn default() -> Self { - Self { - oldest: 0, - limit: CACHE_SIZE, - checkpoints: vec![], - } - } -} - -pub struct CheckPointCache { - inner: RwLock>, -} - -impl Default for CheckPointCache { - fn default() -> Self { - Self { - inner: RwLock::new(Inner::default()), - } - } -} - -impl CheckPointCache { - pub fn insert(&self, checkpoint: Cow>) { - if self - .inner - .read() - .checkpoints - .iter() - // This is `O(n)` but whilst `n == 4` it ain't no thing. - .any(|local| local.beacon_state_root == checkpoint.beacon_state_root) - { - // Adding a known checkpoint to the cache should be a no-op. - return; - } - - let mut inner = self.inner.write(); - - if inner.checkpoints.len() < inner.limit { - inner.checkpoints.push(checkpoint.into_owned()) - } else { - let i = inner.oldest; // to satisfy the borrow checker. - inner.checkpoints[i] = checkpoint.into_owned(); - inner.oldest += 1; - inner.oldest %= inner.limit; - } - } - - pub fn get_state(&self, state_root: &Hash256) -> Option> { - self.inner - .read() - .checkpoints - .iter() - // Also `O(n)`. - .find(|checkpoint| checkpoint.beacon_state_root == *state_root) - .map(|checkpoint| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); - - checkpoint.beacon_state.clone() - }) - .or_else(|| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); - - None - }) - } - - pub fn get_state_only_with_committee_cache( - &self, - state_root: &Hash256, - ) -> Option> { - self.inner - .read() - .checkpoints - .iter() - // Also `O(n)`. - .find(|checkpoint| checkpoint.beacon_state_root == *state_root) - .map(|checkpoint| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); - - let mut state = checkpoint.beacon_state.clone_without_caches(); - state.committee_caches = checkpoint.beacon_state.committee_caches.clone(); - - state - }) - .or_else(|| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); - - None - }) - } - - pub fn get_block(&self, block_root: &Hash256) -> Option> { - self.inner - .read() - .checkpoints - .iter() - // Also `O(n)`. - .find(|checkpoint| checkpoint.beacon_block_root == *block_root) - .map(|checkpoint| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_HITS); - - checkpoint.beacon_block.clone() - }) - .or_else(|| { - metrics::inc_counter(&metrics::CHECKPOINT_CACHE_MISSES); - - None - }) - } -} diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index e521923978..d1227e1f82 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -883,7 +883,7 @@ mod test { &state .get_state_root(prev_state.slot) .expect("should find state root"), - &prev_state, + prev_state, ) .expect("should store state"); @@ -953,7 +953,7 @@ mod test { &state .get_state_root(Slot::new(0)) .expect("should find state root"), - &prev_state, + prev_state, ) .expect("should store state"); diff --git a/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs b/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs index 5441f4aa78..14cb634b1c 100644 --- a/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs +++ b/beacon_node/beacon_chain/src/fork_choice/checkpoint_manager.rs @@ -302,7 +302,7 @@ impl CheckpointManager { metrics::inc_counter(&metrics::BALANCES_CACHE_MISSES); let block = chain - .get_block_caching(&block_root)? + .get_block(&block_root)? .ok_or_else(|| Error::UnknownJustifiedBlock(block_root))?; let state = chain diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 3a02745b7b..d9b4a28d16 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -5,7 +5,6 @@ extern crate lazy_static; mod beacon_chain; pub mod builder; mod checkpoint; -mod checkpoint_cache; mod errors; pub mod eth1_chain; pub mod events; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 31579d86c8..107666396a 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -149,14 +149,6 @@ lazy_static! { pub static ref PERSIST_CHAIN: Result = try_create_histogram("beacon_persist_chain", "Time taken to update the canonical head"); - /* - * Checkpoint cache - */ - pub static ref CHECKPOINT_CACHE_HITS: Result = - try_create_int_counter("beacon_checkpoint_cache_hits_total", "Count of times checkpoint cache fulfils request"); - pub static ref CHECKPOINT_CACHE_MISSES: Result = - try_create_int_counter("beacon_checkpoint_cache_misses_total", "Count of times checkpoint cache fulfils request"); - /* * Eth1 */ diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 8afe570c17..73d399b23c 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -9,7 +9,7 @@ use beacon_chain::{ }; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; -use store::DiskStore; +use store::{DiskStore, StoreConfig}; use tempfile::{tempdir, TempDir}; use types::{EthSpec, Keypair, MinimalEthSpec}; @@ -27,10 +27,10 @@ fn get_store(db_path: &TempDir) -> Arc> { let spec = E::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); - let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; + let config = StoreConfig::default(); let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log) + DiskStore::open(&hot_path, &cold_path, config, spec, log) .expect("disk store should initialize"), ) } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 06d096c9b4..d4bb7a6422 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -10,7 +10,7 @@ use beacon_chain::AttestationProcessingOutcome; use rand::Rng; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; -use store::{DiskStore, Store}; +use store::{DiskStore, Store, StoreConfig}; use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::test_utils::{SeedableRng, XorShiftRng}; @@ -31,10 +31,10 @@ fn get_store(db_path: &TempDir) -> Arc> { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); - let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; + let config = StoreConfig::default(); let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log) + DiskStore::open(&hot_path, &cold_path, config, spec, log) .expect("disk store should initialize"), ) } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 6519906713..d5de42d3f6 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -7,7 +7,7 @@ use beacon_chain::{ slot_clock::{SlotClock, SystemTimeSlotClock}, store::{ migrate::{BackgroundMigrator, Migrate, NullMigrator}, - DiskStore, MemoryStore, SimpleDiskStore, Store, + DiskStore, MemoryStore, SimpleDiskStore, Store, StoreConfig, }, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, }; @@ -478,7 +478,7 @@ where mut self, hot_path: &Path, cold_path: &Path, - slots_per_restore_point: u64, + config: StoreConfig, ) -> Result { let context = self .runtime_context @@ -490,14 +490,8 @@ where .clone() .ok_or_else(|| "disk_store requires a chain spec".to_string())?; - let store = DiskStore::open( - hot_path, - cold_path, - slots_per_restore_point, - spec, - context.log, - ) - .map_err(|e| format!("Unable to open database: {:?}", e))?; + let store = DiskStore::open(hot_path, cold_path, config, spec, context.log) + .map_err(|e| format!("Unable to open database: {:?}", e))?; self.store = Some(Arc::new(store)); Ok(self) } diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index d08af28223..a855d78ab6 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -6,6 +6,9 @@ use std::path::PathBuf; /// The number initial validators when starting the `Minimal`. const TESTNET_SPEC_CONSTANTS: &str = "minimal"; +/// Default directory name for the freezer database under the top-level data dir. +const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; + /// Defines how the client should initialize the `BeaconChain` and other components. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ClientGenesis { @@ -41,6 +44,10 @@ impl Default for ClientGenesis { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { pub data_dir: PathBuf, + /// Name of the directory inside the data directory where the main "hot" DB is located. + pub db_name: String, + /// Path where the freezer database will be located. + pub freezer_db_path: Option, pub testnet_dir: Option, pub log_file: PathBuf, pub spec_constants: String, @@ -64,6 +71,8 @@ impl Default for Config { fn default() -> Self { Self { data_dir: PathBuf::from(".lighthouse"), + db_name: "chain_db".to_string(), + freezer_db_path: None, testnet_dir: None, log_file: PathBuf::from(""), genesis: <_>::default(), @@ -83,7 +92,7 @@ impl Config { /// Get the database path without initialising it. pub fn get_db_path(&self) -> Option { self.get_data_dir() - .map(|data_dir| data_dir.join(&self.store.db_name)) + .map(|data_dir| data_dir.join(&self.db_name)) } /// Get the database path, creating it if necessary. @@ -97,7 +106,7 @@ impl Config { /// Fetch default path to use for the freezer database. fn default_freezer_db_path(&self) -> Option { self.get_data_dir() - .map(|data_dir| data_dir.join(self.store.default_freezer_db_dir())) + .map(|data_dir| data_dir.join(DEFAULT_FREEZER_DB_DIR)) } /// Returns the path to which the client may initialize the on-disk freezer database. @@ -105,8 +114,7 @@ impl Config { /// Will attempt to use the user-supplied path from e.g. the CLI, or will default /// to a directory in the data_dir if no path is provided. pub fn get_freezer_db_path(&self) -> Option { - self.store - .freezer_db_path + self.freezer_db_path .clone() .or_else(|| self.default_freezer_db_path()) } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 741e9045fd..0a32e864fd 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -199,6 +199,20 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { DO NOT DECREASE AFTER INITIALIZATION. [default: 2048 (mainnet) or 64 (minimal)]") .takes_value(true) ) + .arg( + Arg::with_name("block-cache-size") + .long("block-cache-size") + .value_name("SIZE") + .help("Specifies how many blocks the database should cache in memory [default: 5]") + .takes_value(true) + ) + .arg( + Arg::with_name("state-cache-size") + .long("state-cache-size") + .value_name("SIZE") + .help("Specifies how many states the database should cache in memory [default: 5]") + .takes_value(true) + ) /* * The "testnet" sub-command. * diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 814e2f9728..1ce2edb2f5 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -252,7 +252,7 @@ pub fn get_configs( }; if let Some(freezer_dir) = cli_args.value_of("freezer-dir") { - client_config.store.freezer_db_path = Some(PathBuf::from(freezer_dir)); + client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") { @@ -266,6 +266,18 @@ pub fn get_configs( ); } + if let Some(block_cache_size) = cli_args.value_of("block-cache-size") { + client_config.store.block_cache_size = block_cache_size + .parse() + .map_err(|_| "block-cache-size is not a valid integer".to_string())?; + } + + if let Some(state_cache_size) = cli_args.value_of("state-cache-size") { + client_config.store.state_cache_size = state_cache_size + .parse() + .map_err(|_| "block-cache-size is not a valid integer".to_string())?; + } + if eth2_config.spec_constants != client_config.spec_constants { crit!(log, "Specification constants do not match."; "client_config" => client_config.spec_constants.to_string(), diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 2baef22962..36df8ce1ed 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -90,11 +90,7 @@ impl ProductionBeaconNode { Ok(ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) - .disk_store( - &db_path, - &freezer_db_path_res?, - store_config.slots_per_restore_point, - )? + .disk_store(&db_path, &freezer_db_path_res?, store_config)? .background_migrator()?) }) .and_then(move |builder| { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index d36cd28f25..ebb5cf4995 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -29,3 +29,4 @@ serde = "1.0" serde_derive = "1.0.102" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } +lru = "0.4.3" diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 36e2934a4a..8249cda2c9 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -1,36 +1,28 @@ use serde_derive::{Deserialize, Serialize}; -use std::path::PathBuf; use types::{EthSpec, MinimalEthSpec}; -/// Default directory name for the freezer database under the top-level data dir. -const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; - -/// Default value for the freezer DB's restore point frequency. pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; +pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; +pub const DEFAULT_STATE_CACHE_SIZE: usize = 5; -#[derive(Debug, Clone, Serialize, Deserialize)] +/// Database configuration parameters. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StoreConfig { - /// Name of the directory inside the data directory where the main "hot" DB is located. - pub db_name: String, - /// Path where the freezer database will be located. - pub freezer_db_path: Option, /// Number of slots to wait between storing restore points in the freezer database. pub slots_per_restore_point: u64, + /// Maximum number of blocks to store in the in-memory block cache. + pub block_cache_size: usize, + /// Maximum number of states to store in the in-memory state cache. + pub state_cache_size: usize, } impl Default for StoreConfig { fn default() -> Self { Self { - db_name: "chain_db".to_string(), - freezer_db_path: None, // Safe default for tests, shouldn't ever be read by a CLI node. slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64, + block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, + state_cache_size: DEFAULT_STATE_CACHE_SIZE, } } } - -impl StoreConfig { - pub fn default_freezer_db_dir(&self) -> &'static str { - DEFAULT_FREEZER_DB_DIR - } -} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 90d9b74413..1832360f2e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,12 +1,16 @@ use crate::chunked_vector::{ store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; +use crate::config::StoreConfig; use crate::forwards_iter::HybridForwardsBlockRootsIterator; +use crate::impls::beacon_state::store_full_state; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; +use crate::metrics; use crate::{ leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem, }; -use parking_lot::RwLock; +use lru::LruCache; +use parking_lot::{Mutex, RwLock}; use slog::{debug, trace, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; @@ -18,6 +22,7 @@ use std::convert::TryInto; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; +use types::beacon_state::CloneConfig; use types::*; /// 32-byte key for accessing the `split` of the freezer DB. @@ -33,14 +38,17 @@ pub struct HotColdDB { /// States with slots less than `split.slot` are in the cold DB, while states with slots /// greater than or equal are in the hot DB. split: RwLock, - /// Number of slots per restore point state in the freezer database. - slots_per_restore_point: u64, + config: StoreConfig, /// Cold database containing compact historical data. pub(crate) cold_db: LevelDB, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. pub(crate) hot_db: LevelDB, + /// LRU cache of deserialized blocks. Updated whenever a block is loaded. + block_cache: Mutex>>, + /// LRU cache of deserialized states. Updated whenever a state is loaded. + state_cache: Mutex>>, /// Chain spec. spec: ChainSpec, /// Logger. @@ -98,10 +106,42 @@ impl Store for HotColdDB { self.hot_db.key_delete(column, key) } + /// Store a block and update the LRU cache. + fn put_block(&self, block_root: &Hash256, block: BeaconBlock) -> Result<(), Error> { + // Store on disk. + self.put(block_root, &block)?; + + // Update cache. + self.block_cache.lock().put(*block_root, block); + + Ok(()) + } + + /// Fetch a block from the store. + fn get_block(&self, block_root: &Hash256) -> Result>, Error> { + metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT); + + // Check the cache. + if let Some(block) = self.block_cache.lock().get(block_root) { + metrics::inc_counter(&metrics::BEACON_BLOCK_CACHE_HIT_COUNT); + return Ok(Some(block.clone())); + } + + // Fetch from database. + match self.get::>(block_root)? { + Some(block) => { + // Add to cache. + self.block_cache.lock().put(*block_root, block.clone()); + Ok(Some(block)) + } + None => Ok(None), + } + } + /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { - self.store_cold_state(state_root, state) + self.store_cold_state(state_root, &state) } else { self.store_hot_state(state_root, state) } @@ -113,14 +153,28 @@ impl Store for HotColdDB { state_root: &Hash256, slot: Option, ) -> Result>, Error> { + self.get_state_with(state_root, slot, CloneConfig::all()) + } + + /// Get a state from the store. + /// + /// Fetch a state from the store, controlling which cache fields are cloned. + fn get_state_with( + &self, + state_root: &Hash256, + slot: Option, + clone_config: CloneConfig, + ) -> Result>, Error> { + metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); + if let Some(slot) = slot { if slot < self.get_split_slot() { self.load_cold_state_by_slot(slot).map(Some) } else { - self.load_hot_state(state_root) + self.load_hot_state(state_root, clone_config) } } else { - match self.load_hot_state(state_root)? { + match self.load_hot_state(state_root, clone_config)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } @@ -164,7 +218,7 @@ impl Store for HotColdDB { for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { - if slot % store.slots_per_restore_point == 0 { + if slot % store.config.slots_per_restore_point == 0 { let state: BeaconState = store .hot_db .get_state(&state_root, None)? @@ -229,9 +283,12 @@ impl Store for HotColdDB { .. }) = self.load_hot_state_summary(state_root)? { + // NOTE: minor inefficiency here because we load an unnecessary hot state summary let state = self - .hot_db - .get_state(&epoch_boundary_state_root, None)? + .load_hot_state( + &epoch_boundary_state_root, + CloneConfig::committee_caches_only(), + )? .ok_or_else(|| { HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root) })?; @@ -257,17 +314,19 @@ impl HotColdDB { pub fn open( hot_path: &Path, cold_path: &Path, - slots_per_restore_point: u64, + config: StoreConfig, spec: ChainSpec, log: Logger, ) -> Result { - Self::verify_slots_per_restore_point(slots_per_restore_point)?; + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; let db = HotColdDB { split: RwLock::new(Split::default()), - slots_per_restore_point, cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + state_cache: Mutex::new(LruCache::new(config.state_cache_size)), + config, spec, log, _phantom: PhantomData, @@ -288,7 +347,7 @@ impl HotColdDB { pub fn store_hot_state( &self, state_root: &Hash256, - state: &BeaconState, + state: BeaconState, ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { @@ -298,13 +357,16 @@ impl HotColdDB { "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - self.hot_db.put_state(state_root, state)?; + store_full_state(&self.hot_db, state_root, &state)?; } // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - self.store_hot_state_summary(state_root, state)?; + self.put_state_summary(state_root, HotStateSummary::new(state_root, &state)?)?; + + // Store the state in the cache. + self.state_cache.lock().put(*state_root, state); Ok(()) } @@ -312,14 +374,31 @@ impl HotColdDB { /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. - pub fn load_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + pub fn load_hot_state( + &self, + state_root: &Hash256, + clone_config: CloneConfig, + ) -> Result>, Error> { + metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); + + // Check the cache. + if let Some(state) = self.state_cache.lock().get(state_root) { + metrics::inc_counter(&metrics::BEACON_STATE_CACHE_HIT_COUNT); + + let timer = metrics::start_timer(&metrics::BEACON_STATE_CACHE_CLONE_TIME); + let state = state.clone_with(clone_config); + metrics::stop_timer(timer); + + return Ok(Some(state)); + } + if let Some(HotStateSummary { slot, latest_block_root, epoch_boundary_state_root, }) = self.load_hot_state_summary(state_root)? { - let state: BeaconState = self + let boundary_state = self .hot_db .get_state(&epoch_boundary_state_root, None)? .ok_or_else(|| { @@ -328,12 +407,18 @@ impl HotColdDB { // Optimization to avoid even *thinking* about replaying blocks if we're already // on an epoch boundary. - if slot % E::slots_per_epoch() == 0 { - Ok(Some(state)) + let state = if slot % E::slots_per_epoch() == 0 { + boundary_state } else { - let blocks = self.load_blocks_to_replay(state.slot, slot, latest_block_root)?; - self.replay_blocks(state, blocks, slot).map(Some) - } + let blocks = + self.load_blocks_to_replay(boundary_state.slot, slot, latest_block_root)?; + self.replay_blocks(boundary_state, blocks, slot)? + }; + + // Update the LRU cache. + self.state_cache.lock().put(*state_root, state.clone()); + + Ok(Some(state)) } else { Ok(None) } @@ -348,7 +433,7 @@ impl HotColdDB { state_root: &Hash256, state: &BeaconState, ) -> Result<(), Error> { - if state.slot % self.slots_per_restore_point != 0 { + if state.slot % self.config.slots_per_restore_point != 0 { warn!( self.log, "Not storing non-restore point state in freezer"; @@ -377,7 +462,7 @@ impl HotColdDB { store_updated_vector(RandaoMixes, db, state, &self.spec)?; // 3. Store restore point. - let restore_point_index = state.slot.as_u64() / self.slots_per_restore_point; + let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; self.store_restore_point_hash(restore_point_index, *state_root)?; Ok(()) @@ -397,8 +482,8 @@ impl HotColdDB { /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result, Error> { - if slot % self.slots_per_restore_point == 0 { - let restore_point_idx = slot.as_u64() / self.slots_per_restore_point; + if slot % self.config.slots_per_restore_point == 0 { + let restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; self.load_restore_point_by_index(restore_point_idx) } else { self.load_cold_intermediate_state(slot) @@ -431,7 +516,7 @@ impl HotColdDB { /// Load a frozen state that lies between restore points. fn load_cold_intermediate_state(&self, slot: Slot) -> Result, Error> { // 1. Load the restore points either side of the intermediate state. - let low_restore_point_idx = slot.as_u64() / self.slots_per_restore_point; + let low_restore_point_idx = slot.as_u64() / self.config.slots_per_restore_point; let high_restore_point_idx = low_restore_point_idx + 1; // Acquire the read lock, so that the split can't change while this is happening. @@ -440,7 +525,7 @@ impl HotColdDB { let low_restore_point = self.load_restore_point_by_index(low_restore_point_idx)?; // If the slot of the high point lies outside the freezer, use the split state // as the upper restore point. - let high_restore_point = if high_restore_point_idx * self.slots_per_restore_point + let high_restore_point = if high_restore_point_idx * self.config.slots_per_restore_point >= split.slot.as_u64() { self.get_state(&split.state_root, Some(split.slot))? @@ -553,7 +638,8 @@ impl HotColdDB { /// Fetch the slot of the most recently stored restore point. pub fn get_latest_restore_point_slot(&self) -> Slot { - (self.get_split_slot() - 1) / self.slots_per_restore_point * self.slots_per_restore_point + (self.get_split_slot() - 1) / self.config.slots_per_restore_point + * self.config.slots_per_restore_point } /// Load the split point from disk. @@ -615,33 +701,6 @@ impl HotColdDB { HotStateSummary::db_get(&self.hot_db, state_root) } - /// Store a summary of a hot database state. - fn store_hot_state_summary( - &self, - state_root: &Hash256, - state: &BeaconState, - ) -> Result<(), Error> { - // Fill in the state root on the latest block header if necessary (this happens on all - // slots where there isn't a skip). - let latest_block_root = state.get_latest_block_root(*state_root); - let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); - let epoch_boundary_state_root = if epoch_boundary_slot == state.slot { - *state_root - } else { - *state - .get_state_root(epoch_boundary_slot) - .map_err(HotColdDBError::HotStateSummaryError)? - }; - - HotStateSummary { - slot: state.slot, - latest_block_root, - epoch_boundary_state_root, - } - .db_put(&self.hot_db, state_root) - .map_err(Into::into) - } - /// Check that the restore point frequency is valid. /// /// Specifically, check that it is: @@ -718,6 +777,29 @@ impl SimpleStoreItem for HotStateSummary { } } +impl HotStateSummary { + /// Construct a new summary of the given state. + pub fn new(state_root: &Hash256, state: &BeaconState) -> Result { + // Fill in the state root on the latest block header if necessary (this happens on all + // slots where there isn't a skip). + let latest_block_root = state.get_latest_block_root(*state_root); + let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); + let epoch_boundary_state_root = if epoch_boundary_slot == state.slot { + *state_root + } else { + *state + .get_state_root(epoch_boundary_slot) + .map_err(HotColdDBError::HotStateSummaryError)? + }; + + Ok(HotStateSummary { + slot: state.slot, + latest_block_root, + epoch_boundary_state_root, + }) + } +} + /// Struct for summarising a state in the freezer database. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] struct ColdStateSummary { diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 553227747b..da42b59c04 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -2,7 +2,7 @@ use crate::*; use ssz::{Decode, DecodeError, Encode}; use ssz_derive::{Decode, Encode}; use std::convert::TryInto; -use types::beacon_state::{CommitteeCache, CACHED_EPOCHS}; +use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS}; pub fn store_full_state, E: EthSpec>( store: &S, @@ -58,7 +58,7 @@ impl StorageContainer { /// Create a new instance for storing a `BeaconState`. pub fn new(state: &BeaconState) -> Self { Self { - state: state.clone_without_caches(), + state: state.clone_with(CloneConfig::none()), committee_caches: state.committee_caches.to_vec(), } } diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index c17ee70d74..199d24aa99 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -343,7 +343,7 @@ mod test { let state_a_root = hashes.next().unwrap(); state_b.state_roots[0] = state_a_root; - store.put_state(&state_a_root, &state_a).unwrap(); + store.put_state(&state_a_root, state_a).unwrap(); let iter = BlockRootsIterator::new(store, &state_b); @@ -391,8 +391,8 @@ mod test { let state_a_root = Hash256::from_low_u64_be(slots_per_historical_root as u64); let state_b_root = Hash256::from_low_u64_be(slots_per_historical_root as u64 * 2); - store.put_state(&state_a_root, &state_a).unwrap(); - store.put_state(&state_b_root, &state_b).unwrap(); + store.put_state(&state_a_root, state_a).unwrap(); + store.put_state(&state_b_root, state_b.clone()).unwrap(); let iter = StateRootsIterator::new(store, &state_b); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 051d091706..9ee296e620 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -123,8 +123,8 @@ impl Store for LevelDB { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - store_full_state(self, state_root, state) + fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { + store_full_state(self, state_root, &state) } /// Fetch a state from the store. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e3b3e7926d..eaeb1992e5 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -22,6 +22,7 @@ mod leveldb_store; mod memory_store; mod metrics; mod partial_beacon_state; +mod state_batch; pub mod iter; pub mod migrate; @@ -29,7 +30,7 @@ pub mod migrate; use std::sync::Arc; pub use self::config::StoreConfig; -pub use self::hot_cold_store::HotColdDB as DiskStore; +pub use self::hot_cold_store::{HotColdDB as DiskStore, HotStateSummary}; pub use self::leveldb_store::LevelDB as SimpleDiskStore; pub use self::memory_store::MemoryStore; pub use self::migrate::Migrate; @@ -37,6 +38,8 @@ pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; +pub use state_batch::StateBatch; +pub use types::beacon_state::CloneConfig; pub use types::*; /// An object capable of storing and retrieving objects implementing `StoreItem`. @@ -79,8 +82,29 @@ pub trait Store: Sync + Send + Sized + 'static { I::db_delete(self, key) } + /// Store a block in the store. + fn put_block(&self, block_root: &Hash256, block: BeaconBlock) -> Result<(), Error> { + self.put(block_root, &block) + } + + /// Fetch a block from the store. + fn get_block(&self, block_root: &Hash256) -> Result>, Error> { + self.get(block_root) + } + /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; + fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error>; + + /// Store a state summary in the store. + // NOTE: this is a hack for the HotColdDb, we could consider splitting this + // trait and removing the generic `S: Store` types everywhere? + fn put_state_summary( + &self, + state_root: &Hash256, + summary: HotStateSummary, + ) -> Result<(), Error> { + summary.db_put(self, state_root).map_err(Into::into) + } /// Fetch a state from the store. fn get_state( @@ -89,6 +113,17 @@ pub trait Store: Sync + Send + Sized + 'static { slot: Option, ) -> Result>, Error>; + /// Fetch a state from the store, controlling which cache fields are cloned. + fn get_state_with( + &self, + state_root: &Hash256, + slot: Option, + _clone_config: CloneConfig, + ) -> Result>, Error> { + // Default impl ignores config. Overriden in `HotColdDb`. + self.get_state(state_root, slot) + } + /// Given the root of an existing block in the store (`start_block_root`), return a parent /// block with the specified `slot`. /// @@ -315,13 +350,12 @@ mod tests { let hot_dir = tempdir().unwrap(); let cold_dir = tempdir().unwrap(); - let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let spec = MinimalEthSpec::default_spec(); let log = NullLoggerBuilder.build().unwrap(); let store = DiskStore::open( &hot_dir.path(), &cold_dir.path(), - slots_per_restore_point, + StoreConfig::default(), spec, log, ) diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 59caf9eaad..81826b31f0 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -76,8 +76,8 @@ impl Store for MemoryStore { } /// Store a state in the store. - fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - store_full_state(self, state_root, state) + fn put_state(&self, state_root: &Hash256, state: BeaconState) -> Result<(), Error> { + store_full_state(self, state_root, &state) } /// Fetch a state from the store. diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index e401193594..2f821f0333 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -46,6 +46,22 @@ lazy_static! { /* * Beacon State */ + pub static ref BEACON_STATE_GET_COUNT: Result = try_create_int_counter( + "store_beacon_state_get_total", + "Total number of beacon states requested from the store (cache or DB)" + ); + pub static ref BEACON_STATE_HOT_GET_COUNT: Result = try_create_int_counter( + "store_beacon_state_hot_get_total", + "Total number of hot beacon states requested from the store (cache or DB)" + ); + pub static ref BEACON_STATE_CACHE_HIT_COUNT: Result = try_create_int_counter( + "store_beacon_state_cache_hit_total", + "Number of hits to the store's state cache" + ); + pub static ref BEACON_STATE_CACHE_CLONE_TIME: Result = try_create_histogram( + "store_beacon_state_cache_clone_time", + "Time to load a beacon block from the block cache" + ); pub static ref BEACON_STATE_READ_TIMES: Result = try_create_histogram( "store_beacon_state_read_seconds", "Total time required to read a BeaconState from the database" @@ -81,6 +97,14 @@ lazy_static! { /* * Beacon Block */ + pub static ref BEACON_BLOCK_GET_COUNT: Result = try_create_int_counter( + "store_beacon_block_get_total", + "Total number of beacon blocks requested from the store (cache or DB)" + ); + pub static ref BEACON_BLOCK_CACHE_HIT_COUNT: Result = try_create_int_counter( + "store_beacon_block_cache_hit_total", + "Number of hits to the store's block cache" + ); pub static ref BEACON_BLOCK_READ_TIMES: Result = try_create_histogram( "store_beacon_block_read_overhead_seconds", "Overhead on reading a beacon block from the DB (e.g., decoding)" diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs new file mode 100644 index 0000000000..a33e072254 --- /dev/null +++ b/beacon_node/store/src/state_batch.rs @@ -0,0 +1,47 @@ +use crate::{Error, HotStateSummary, Store}; +use types::{BeaconState, EthSpec, Hash256}; + +/// A collection of states to be stored in the database. +/// +/// Consumes minimal space in memory by not storing states between epoch boundaries. +#[derive(Debug, Clone, Default)] +pub struct StateBatch { + items: Vec>, +} + +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +enum BatchItem { + Full(Hash256, BeaconState), + Summary(Hash256, HotStateSummary), +} + +impl StateBatch { + /// Create a new empty batch. + pub fn new() -> Self { + Self::default() + } + + /// Stage a `BeaconState` to be stored. + pub fn add_state(&mut self, state_root: Hash256, state: &BeaconState) -> Result<(), Error> { + let item = if state.slot % E::slots_per_epoch() == 0 { + BatchItem::Full(state_root, state.clone()) + } else { + BatchItem::Summary(state_root, HotStateSummary::new(&state_root, state)?) + }; + self.items.push(item); + Ok(()) + } + + /// Write the batch to the database. + /// + /// May fail to write the full batch if any of the items error (i.e. not atomic!) + pub fn commit>(self, store: &S) -> Result<(), Error> { + self.items.into_iter().try_for_each(|item| match item { + BatchItem::Full(state_root, state) => store.put_state(&state_root, state), + BatchItem::Summary(state_root, summary) => { + store.put_state_summary(&state_root, summary) + } + }) + } +} diff --git a/eth2/types/examples/tree_hash_state.rs b/eth2/types/examples/tree_hash_state.rs index 8c2764e094..03811c3083 100644 --- a/eth2/types/examples/tree_hash_state.rs +++ b/eth2/types/examples/tree_hash_state.rs @@ -1,7 +1,6 @@ //! These examples only really exist so we can use them for flamegraph. If they get annoying to //! maintain, feel free to delete. -use ssz::{Decode, Encode}; use types::{ test_utils::generate_deterministic_keypair, BeaconState, Eth1Data, EthSpec, Hash256, MinimalEthSpec, Validator, diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index a9f66f171b..b612f4d4a0 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -17,11 +17,13 @@ use tree_hash::TreeHash; use tree_hash_derive::TreeHash; pub use self::committee_cache::CommitteeCache; +pub use clone_config::CloneConfig; pub use eth_spec::*; pub use tree_hash_cache::BeaconTreeHashCache; #[macro_use] mod committee_cache; +mod clone_config; mod exit_cache; mod pubkey_cache; mod tests; @@ -948,7 +950,8 @@ impl BeaconState { }) } - pub fn clone_without_caches(&self) -> Self { + /// Clone the state whilst preserving only the selected caches. + pub fn clone_with(&self, config: CloneConfig) -> Self { BeaconState { genesis_time: self.genesis_time, slot: self.slot, @@ -970,21 +973,35 @@ impl BeaconState { previous_justified_checkpoint: self.previous_justified_checkpoint.clone(), current_justified_checkpoint: self.current_justified_checkpoint.clone(), finalized_checkpoint: self.finalized_checkpoint.clone(), - committee_caches: [ - CommitteeCache::default(), - CommitteeCache::default(), - CommitteeCache::default(), - ], - pubkey_cache: PubkeyCache::default(), - exit_cache: ExitCache::default(), - tree_hash_cache: None, + committee_caches: if config.committee_caches { + self.committee_caches.clone() + } else { + [ + CommitteeCache::default(), + CommitteeCache::default(), + CommitteeCache::default(), + ] + }, + pubkey_cache: if config.pubkey_cache { + self.pubkey_cache.clone() + } else { + PubkeyCache::default() + }, + exit_cache: if config.exit_cache { + self.exit_cache.clone() + } else { + ExitCache::default() + }, + tree_hash_cache: if config.tree_hash_cache { + self.tree_hash_cache.clone() + } else { + None + }, } } pub fn clone_with_only_committee_caches(&self) -> Self { - let mut state = self.clone_without_caches(); - state.committee_caches = self.committee_caches.clone(); - state + self.clone_with(CloneConfig::committee_caches_only()) } } diff --git a/eth2/types/src/beacon_state/clone_config.rs b/eth2/types/src/beacon_state/clone_config.rs new file mode 100644 index 0000000000..e5f050aee6 --- /dev/null +++ b/eth2/types/src/beacon_state/clone_config.rs @@ -0,0 +1,43 @@ +/// Configuration struct for controlling which caches of a `BeaconState` should be cloned. +#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] +pub struct CloneConfig { + pub committee_caches: bool, + pub pubkey_cache: bool, + pub exit_cache: bool, + pub tree_hash_cache: bool, +} + +impl CloneConfig { + pub fn all() -> Self { + Self { + committee_caches: true, + pubkey_cache: true, + exit_cache: true, + tree_hash_cache: true, + } + } + + pub fn none() -> Self { + Self::default() + } + + pub fn committee_caches_only() -> Self { + Self { + committee_caches: true, + ..Self::none() + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn sanity() { + assert!(CloneConfig::all().pubkey_cache); + assert!(!CloneConfig::none().tree_hash_cache); + assert!(CloneConfig::committee_caches_only().committee_caches); + assert!(!CloneConfig::committee_caches_only().exit_cache); + } +} diff --git a/eth2/types/src/beacon_state/tests.rs b/eth2/types/src/beacon_state/tests.rs index a41a46900e..e5d85c25bc 100644 --- a/eth2/types/src/beacon_state/tests.rs +++ b/eth2/types/src/beacon_state/tests.rs @@ -125,6 +125,75 @@ fn cache_initialization() { test_cache_initialization(&mut state, RelativeEpoch::Next, &spec); } +fn test_clone_config(base_state: &BeaconState, clone_config: CloneConfig) { + let state = base_state.clone_with(clone_config.clone()); + if clone_config.committee_caches { + state + .committee_cache(RelativeEpoch::Previous) + .expect("committee cache exists"); + state + .committee_cache(RelativeEpoch::Current) + .expect("committee cache exists"); + state + .committee_cache(RelativeEpoch::Next) + .expect("committee cache exists"); + } else { + state + .committee_cache(RelativeEpoch::Previous) + .expect_err("shouldn't exist"); + state + .committee_cache(RelativeEpoch::Current) + .expect_err("shouldn't exist"); + state + .committee_cache(RelativeEpoch::Next) + .expect_err("shouldn't exist"); + } + if clone_config.pubkey_cache { + assert_ne!(state.pubkey_cache.len(), 0); + } else { + assert_eq!(state.pubkey_cache.len(), 0); + } + if clone_config.exit_cache { + state + .exit_cache + .check_initialized() + .expect("exit cache exists"); + } else { + state + .exit_cache + .check_initialized() + .expect_err("exit cache doesn't exist"); + } + if clone_config.tree_hash_cache { + assert!(state.tree_hash_cache.is_some()); + } else { + assert!(state.tree_hash_cache.is_none(), "{:?}", clone_config); + } +} + +#[test] +fn clone_config() { + let spec = MinimalEthSpec::default_spec(); + + let builder: TestingBeaconStateBuilder = + TestingBeaconStateBuilder::from_default_keypairs_file_if_exists(16, &spec); + let (mut state, _keypairs) = builder.build(); + + state.build_all_caches(&spec).unwrap(); + + let num_caches = 4; + let all_configs = (0..2u8.pow(num_caches)).map(|i| CloneConfig { + committee_caches: (i & 1) != 0, + pubkey_cache: ((i >> 1) & 1) != 0, + exit_cache: ((i >> 2) & 1) != 0, + tree_hash_cache: ((i >> 3) & 1) != 0, + }); + + for config in all_configs { + test_clone_config(&state, config); + } +} + #[test] fn tree_hash_cache() { use crate::test_utils::{SeedableRng, TestRandom, XorShiftRng};