From 7245161fc25365139da7bebd9e5ce64e52b5eb13 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 25 Jan 2022 12:22:55 +1100 Subject: [PATCH] Store all state roots on disk --- Cargo.lock | 8 ++ beacon_node/beacon_chain/src/test_utils.rs | 7 +- beacon_node/store/Cargo.toml | 2 + beacon_node/store/src/errors.rs | 1 + beacon_node/store/src/hot_cold_store.rs | 110 ++++++++---------- beacon_node/store/src/hot_state_iter.rs | 90 ++++++++++++++ beacon_node/store/src/lib.rs | 1 + beacon_node/store/src/metadata.rs | 2 +- beacon_node/store/src/metrics.rs | 14 +-- .../state_processing/src/block_replayer.rs | 28 +---- consensus/state_processing/src/lib.rs | 2 +- 11 files changed, 161 insertions(+), 104 deletions(-) create mode 100644 beacon_node/store/src/hot_state_iter.rs diff --git a/Cargo.lock b/Cargo.lock index e16f4996ff..f259934ad8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5783,11 +5783,13 @@ dependencies = [ "lighthouse_metrics", "lru", "parking_lot", + "safe_arith", "serde", "serde_derive", "slog", "sloggers", "state_processing", + "take-until", "tempfile", "types", ] @@ -5876,6 +5878,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take-until" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4e17d8598067a8c134af59cd33c1c263470e089924a11ab61cf61690919fe3b" + [[package]] name = "take_mut" version = "0.2.2" diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 574895296d..770ce19c75 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -31,7 +31,7 @@ use rayon::prelude::*; use sensitive_url::SensitiveUrl; use slog::Logger; use slot_clock::TestingSlotClock; -use state_processing::{state_advance::complete_state_advance, StateRootStrategy}; +use state_processing::state_advance::complete_state_advance; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::str::FromStr; @@ -525,10 +525,7 @@ where } pub fn get_hot_state(&self, state_hash: BeaconStateHash) -> Option> { - self.chain - .store - .load_hot_state(&state_hash.into(), StateRootStrategy::Accurate) - .unwrap() + self.chain.store.load_hot_state(&state_hash.into()).unwrap() } pub fn get_cold_state(&self, state_hash: BeaconStateHash) -> Option> { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 66a6cf5d28..7c7686ad3f 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -16,6 +16,7 @@ itertools = "0.10.0" eth2_ssz = "0.4.1" eth2_ssz_derive = "0.3.0" types = { path = "../../consensus/types" } +safe_arith = { path = "../../consensus/safe_arith" } state_processing = { path = "../../consensus/state_processing" } slog = "2.5.2" serde = "1.0.116" @@ -25,3 +26,4 @@ lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lru = "0.7.1" sloggers = { version = "2.1.1", features = ["json"] } directory = { path = "../../common/directory" } +take-until = "0.1.0" diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 1147d52c43..f57f186876 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -40,6 +40,7 @@ pub enum Error { expected: Hash256, computed: Hash256, }, + MissingStateRoot(Slot), BlockReplayError(BlockReplayError), } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 62441ce0f2..eee4aa9eb7 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -3,6 +3,7 @@ use crate::chunked_vector::{ }; use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; +use crate::hot_state_iter::ForwardsHotStateRootIter; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::leveldb_store::BytesKey; @@ -21,13 +22,12 @@ use crate::{ use leveldb::iterator::LevelDBIterator; use lru::LruCache; use parking_lot::{Mutex, RwLock}; +use safe_arith::SafeArith; use serde_derive::{Deserialize, Serialize}; -use slog::{debug, error, info, trace, warn, Logger}; +use slog::{debug, error, info, trace, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use state_processing::{ - BlockProcessingError, BlockReplayer, SlotProcessingError, StateRootStrategy, -}; +use state_processing::{BlockProcessingError, BlockReplayer, SlotProcessingError}; use std::cmp::min; use std::convert::TryInto; use std::marker::PhantomData; @@ -362,10 +362,10 @@ impl, Cold: ItemStore> HotColdDB // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { - self.load_hot_state(state_root, StateRootStrategy::Accurate) + self.load_hot_state(state_root) } } else { - match self.load_hot_state(state_root, StateRootStrategy::Accurate)? { + match self.load_hot_state(state_root)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } @@ -386,6 +386,7 @@ impl, Cold: ItemStore> HotColdDB /// /// - `state.state_roots` /// - `state.block_roots` + // FIXME(sproul): delete this whole function pub fn get_inconsistent_state_for_attestation_verification_only( &self, state_root: &Hash256, @@ -403,7 +404,7 @@ impl, Cold: ItemStore> HotColdDB } .into()) } else { - self.load_hot_state(state_root, StateRootStrategy::Inconsistent) + self.load_hot_state(state_root) } } @@ -495,11 +496,9 @@ impl, Cold: ItemStore> HotColdDB // // `StateRootStrategy` should be irrelevant here since we never replay blocks for an epoch // boundary state in the hot DB. - let state = self - .load_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)? - .ok_or(HotColdDBError::MissingEpochBoundaryState( - epoch_boundary_state_root, - ))?; + let state = self.load_hot_state(&epoch_boundary_state_root)?.ok_or( + HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), + )?; Ok(Some(state)) } else { // Try the cold DB @@ -638,11 +637,8 @@ impl, Cold: ItemStore> HotColdDB /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. - pub fn load_hot_state( - &self, - state_root: &Hash256, - state_root_strategy: StateRootStrategy, - ) -> Result>, Error> { + pub fn load_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + let _timer = metrics::start_timer(&metrics::BEACON_HOT_STATE_READ_TIMES); metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); // If the state is marked as temporary, do not return it. It will become visible @@ -655,6 +651,7 @@ impl, Cold: ItemStore> HotColdDB slot, latest_block_root, epoch_boundary_state_root, + prev_state_root, }) = self.load_hot_state_summary(state_root)? { let boundary_state = @@ -669,13 +666,15 @@ impl, Cold: ItemStore> HotColdDB } else { let blocks = self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?; - self.replay_blocks( - boundary_state, - blocks, + + let state_root_iter = ForwardsHotStateRootIter::new( + self, + boundary_state.slot(), slot, - no_state_root_iter(), - state_root_strategy, - )? + *state_root, + prev_state_root, + )?; + self.replay_blocks(boundary_state, blocks, slot, state_root_iter)? }; Ok(Some(state)) @@ -813,13 +812,7 @@ impl, Cold: ItemStore> HotColdDB &self.spec, )?; - self.replay_blocks( - low_restore_point, - blocks, - slot, - Some(state_root_iter), - StateRootStrategy::Accurate, - ) + self.replay_blocks(low_restore_point, blocks, slot, state_root_iter) } /// Get the restore point with the given index, or if it is out of bounds, the split state. @@ -905,31 +898,19 @@ impl, Cold: ItemStore> HotColdDB state: BeaconState, blocks: Vec>, target_slot: Slot, - state_root_iter: Option>>, - state_root_strategy: StateRootStrategy, + state_root_iter: impl Iterator>, ) -> Result, Error> { - let mut block_replayer = BlockReplayer::new(state, &self.spec) - .state_root_strategy(state_root_strategy) + BlockReplayer::new(state, &self.spec) .no_signature_verification() - .minimal_block_root_verification(); - - let have_state_root_iterator = state_root_iter.is_some(); - if let Some(state_root_iter) = state_root_iter { - block_replayer = block_replayer.state_root_iter(state_root_iter); - } - - block_replayer + .minimal_block_root_verification() + .state_root_iter(state_root_iter) .apply_blocks(blocks, Some(target_slot)) - .map(|block_replayer| { - if have_state_root_iterator && block_replayer.state_root_miss() { - warn!( - self.log, - "State root iterator miss"; - "slot" => target_slot, - ); + .and_then(|block_replayer| { + if block_replayer.state_root_miss() { + Err(Error::MissingStateRoot(target_slot)) + } else { + Ok(block_replayer.into_state()) } - - block_replayer.into_state() }) } @@ -1411,19 +1392,17 @@ impl StoreItem for Split { } } -/// Type hint. -fn no_state_root_iter() -> Option>> { - None -} - /// Struct for summarising a state in the hot database. /// /// Allows full reconstruction by replaying blocks. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] pub struct HotStateSummary { - slot: Slot, - latest_block_root: Hash256, - epoch_boundary_state_root: Hash256, + pub(crate) slot: Slot, + pub(crate) latest_block_root: Hash256, + pub(crate) epoch_boundary_state_root: Hash256, + /// The state root of the state at the prior slot. + // FIXME(sproul): migrate + pub(crate) prev_state_root: Hash256, } impl StoreItem for HotStateSummary { @@ -1445,20 +1424,29 @@ impl HotStateSummary { pub fn new(state_root: &Hash256, state: &BeaconState) -> Result { // Fill in the state root on the latest block header if necessary (this happens on all // slots where there isn't a skip). + let slot = state.slot(); let latest_block_root = state.get_latest_block_root(*state_root); - let epoch_boundary_slot = state.slot() / E::slots_per_epoch() * E::slots_per_epoch(); - let epoch_boundary_state_root = if epoch_boundary_slot == state.slot() { + let epoch_boundary_slot = slot / E::slots_per_epoch() * E::slots_per_epoch(); + let epoch_boundary_state_root = if epoch_boundary_slot == slot { *state_root } else { *state .get_state_root(epoch_boundary_slot) .map_err(HotColdDBError::HotStateSummaryError)? }; + let prev_state_root = if let Ok(prev_slot) = slot.safe_sub(1) { + *state + .get_state_root(prev_slot) + .map_err(HotColdDBError::HotStateSummaryError)? + } else { + Hash256::zero() + }; Ok(HotStateSummary { slot: state.slot(), latest_block_root, epoch_boundary_state_root, + prev_state_root, }) } } diff --git a/beacon_node/store/src/hot_state_iter.rs b/beacon_node/store/src/hot_state_iter.rs new file mode 100644 index 0000000000..c6f571df41 --- /dev/null +++ b/beacon_node/store/src/hot_state_iter.rs @@ -0,0 +1,90 @@ +use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, ItemStore}; +use itertools::process_results; +use std::iter; +use take_until::TakeUntilExt; +use types::{EthSpec, Hash256, Slot}; + +pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: &'a HotColdDB, + next_slot: Slot, + next_state_root: Hash256, +} + +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> HotStateRootIter<'a, E, Hot, Cold> { + pub fn new( + store: &'a HotColdDB, + next_slot: Slot, + next_state_root: Hash256, + ) -> Self { + Self { + store, + next_slot, + next_state_root, + } + } + + fn do_next(&mut self) -> Result, Error> { + if self.next_state_root.is_zero() { + return Ok(None); + } + + let summary = self + .store + .load_hot_state_summary(&self.next_state_root)? + .ok_or_else(|| HotColdDBError::MissingHotStateSummary(self.next_state_root))?; + + let slot = self.next_slot; + let state_root = self.next_state_root; + + self.next_state_root = summary.prev_state_root; + self.next_slot -= 1; + + Ok(Some((state_root, slot))) + } +} + +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for HotStateRootIter<'a, E, Hot, Cold> +{ + type Item = Result<(Hash256, Slot), Error>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} + +pub struct ForwardsHotStateRootIter { + // Values from the backwards iterator (in slot descending order) + values: Vec<(Hash256, Slot)>, +} + +impl ForwardsHotStateRootIter { + pub fn new, Cold: ItemStore>( + store: &HotColdDB, + start_slot: Slot, + end_slot: Slot, + last_state_root: Hash256, + second_last_state_root: Hash256, + ) -> Result { + process_results( + iter::once(Ok((last_state_root, end_slot))).chain(HotStateRootIter::new( + store, + end_slot - 1, + second_last_state_root, + )), + |iter| { + let values = iter.take_until(|(_, slot)| *slot == start_slot).collect(); + Self { values } + }, + ) + } +} + +impl Iterator for ForwardsHotStateRootIter { + type Item = Result<(Hash256, Slot), Error>; + + fn next(&mut self) -> Option { + // Pop from the end of the vector to get the state roots in slot-ascending order. + Ok(self.values.pop()).transpose() + } +} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 8d1993f461..d56d85328c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -18,6 +18,7 @@ pub mod errors; mod forwards_iter; mod garbage_collection; pub mod hot_cold_store; +mod hot_state_iter; mod impls; mod leveldb_store; mod memory_store; diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 78c02a02e1..9053247186 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(8); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(9000); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 72c5e61969..6b3445463c 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -54,17 +54,13 @@ lazy_static! { "store_beacon_state_hot_get_total", "Total number of hot beacon states requested from the store (cache or DB)" ); - pub static ref BEACON_STATE_CACHE_HIT_COUNT: Result = try_create_int_counter( - "store_beacon_state_cache_hit_total", - "Number of hits to the store's state cache" - ); - pub static ref BEACON_STATE_CACHE_CLONE_TIME: Result = try_create_histogram( - "store_beacon_state_cache_clone_time", - "Time to load a beacon block from the block cache" - ); pub static ref BEACON_STATE_READ_TIMES: Result = try_create_histogram( "store_beacon_state_read_seconds", - "Total time required to read a BeaconState from the database" + "Total time required to read a full BeaconState from the database" + ); + pub static ref BEACON_HOT_STATE_READ_TIMES: Result = try_create_histogram( + "store_beacon_hot_state_read_seconds", + "Total time required to read a hot BeaconState from the database" ); pub static ref BEACON_STATE_READ_OVERHEAD_TIMES: Result = try_create_histogram( "store_beacon_state_read_overhead_seconds", diff --git a/consensus/state_processing/src/block_replayer.rs b/consensus/state_processing/src/block_replayer.rs index 937348263b..c018a2fab0 100644 --- a/consensus/state_processing/src/block_replayer.rs +++ b/consensus/state_processing/src/block_replayer.rs @@ -26,7 +26,6 @@ pub struct BlockReplayer< > { state: BeaconState, spec: &'a ChainSpec, - state_root_strategy: StateRootStrategy, block_sig_strategy: BlockSignatureStrategy, verify_block_root: Option, pre_block_hook: Option>, @@ -57,16 +56,6 @@ impl From for BlockReplayError { } } -/// Defines how state roots should be computed during block replay. -#[derive(PartialEq)] -pub enum StateRootStrategy { - /// Perform all transitions faithfully to the specification. - Accurate, - /// Don't compute state roots, eventually computing an invalid beacon state that can only be - /// used for obtaining shuffling. - Inconsistent, -} - impl<'a, E, Error, StateRootIter> BlockReplayer<'a, E, Error, StateRootIter> where E: EthSpec, @@ -84,7 +73,6 @@ where Self { state, spec, - state_root_strategy: StateRootStrategy::Accurate, block_sig_strategy: BlockSignatureStrategy::VerifyBulk, verify_block_root: Some(VerifyBlockRoot::True), pre_block_hook: None, @@ -97,15 +85,6 @@ where } } - /// Set the replayer's state root strategy different from the default. - pub fn state_root_strategy(mut self, state_root_strategy: StateRootStrategy) -> Self { - if state_root_strategy == StateRootStrategy::Inconsistent { - self.verify_block_root = None; - } - self.state_root_strategy = state_root_strategy; - self - } - /// Set the replayer's block signature verification strategy. pub fn block_signature_strategy(mut self, block_sig_strategy: BlockSignatureStrategy) -> Self { self.block_sig_strategy = block_sig_strategy; @@ -178,11 +157,6 @@ where blocks: &[SignedBeaconBlock], i: usize, ) -> Result, Error> { - // If we don't care about state roots then return immediately. - if self.state_root_strategy == StateRootStrategy::Inconsistent { - return Ok(Some(Hash256::zero())); - } - // If a state root iterator is configured, use it to find the root. if let Some(ref mut state_root_iter) = self.state_root_iter { let opt_root = state_root_iter @@ -246,7 +220,7 @@ where // If no explicit policy is set, verify only the first 1 or 2 block roots if using // accurate state roots. Inaccurate state roots require block root verification to // be off. - if i <= 1 && self.state_root_strategy == StateRootStrategy::Accurate { + if i <= 1 { VerifyBlockRoot::True } else { VerifyBlockRoot::False diff --git a/consensus/state_processing/src/lib.rs b/consensus/state_processing/src/lib.rs index cb4ffee780..1c90b4355d 100644 --- a/consensus/state_processing/src/lib.rs +++ b/consensus/state_processing/src/lib.rs @@ -26,7 +26,7 @@ pub mod state_advance; pub mod upgrade; pub mod verify_operation; -pub use block_replayer::{BlockReplayError, BlockReplayer, StateRootStrategy}; +pub use block_replayer::{BlockReplayError, BlockReplayer}; pub use genesis::{ eth2_genesis_time, initialize_beacon_state_from_eth1, is_valid_genesis_state, process_activations,