From aaebf72835e73b1173d9ebc852a9f0da322d11ca Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 27 May 2022 16:05:55 +1000 Subject: [PATCH] Remove recursion from DB state lookup --- beacon_node/beacon_chain/src/builder.rs | 14 +- .../src/schema_change/migration_schema_v20.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 2 +- .../http_api/src/block_packing_efficiency.rs | 2 +- beacon_node/store/src/errors.rs | 4 +- beacon_node/store/src/hot_cold_store.rs | 154 ++++++++++++++---- beacon_node/store/src/hot_state_iter.rs | 51 ++++++ beacon_node/store/src/lib.rs | 1 + beacon_node/store/src/state_cache.rs | 44 ++--- .../state_processing/src/block_replayer.rs | 21 ++- 10 files changed, 222 insertions(+), 73 deletions(-) create mode 100644 beacon_node/store/src/hot_state_iter.rs diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 6e66796a16..58f1063fdb 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -29,8 +29,8 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::{ShutdownReason, TaskExecutor}; use types::{ - BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, - PublicKeyBytes, Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, + Signature, SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -313,12 +313,7 @@ where let beacon_block_root = beacon_block.canonical_root(); store - .update_finalized_state( - beacon_state_root, - beacon_block_root, - Epoch::new(0), - beacon_state.clone(), - ) + .update_finalized_state(beacon_state_root, beacon_block_root, beacon_state.clone()) .map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?; store @@ -447,12 +442,11 @@ where .update_finalized_state( weak_subj_state_root, weak_subj_block_root, - weak_subj_slot.epoch(TEthSpec::slots_per_epoch()), weak_subj_state.clone(), ) .map_err(|e| format!("Failed to set checkpoint state as finalized state: {:?}", e))?; store - .store_full_state(&weak_subj_state_root, &weak_subj_state) + .put_state(&weak_subj_state_root, &weak_subj_state) .map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?; store .put_block(&weak_subj_block_root, weak_subj_block.clone()) diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs index 252f79baea..e494bb8764 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs @@ -38,7 +38,7 @@ fn get_state_by_replay( // Replay blocks to reach the target state. let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?; - db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty()) + db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty(), None) } pub fn upgrade_to_v20( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 326b77e085..ab179cfd6e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -445,7 +445,7 @@ fn block_replayer_hooks() { let mut post_block_slots = vec![]; let mut replay_state = BlockReplayer::::new(state, &chain.spec) - .pre_slot_hook(Box::new(|state| { + .pre_slot_hook(Box::new(|_, state| { pre_slots.push(state.slot()); Ok(()) })) diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs index 1b924f3828..e2295fc202 100644 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ b/beacon_node/http_api/src/block_packing_efficiency.rs @@ -277,7 +277,7 @@ pub fn get_block_packing_efficiency( )); let pre_slot_hook = - |state: &mut BeaconState| -> Result<(), PackingEfficiencyError> { + |_, state: &mut BeaconState| -> Result<(), PackingEfficiencyError> { // Add attestations to `available_attestations`. handler.lock().add_attestations(state.slot())?; Ok(()) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 2e4a866821..fd0b2ebb3c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -42,6 +42,7 @@ pub enum Error { }, MissingStateRoot(Slot), MissingState(Hash256), + NoBaseStateFound(Hash256), BlockReplayError(BlockReplayError), MilhouseError(milhouse::Error), Compression(std::io::Error), @@ -49,7 +50,8 @@ pub enum Error { SlotIsBeforeSplit { slot: Slot, }, - FinalizedStateDecreasingEpoch, + FinalizedStateDecreasingSlot, + FinalizedStateUnaligned, StateForCacheHasPendingUpdates { state_root: Hash256, slot: Slot, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index c94070142f..4b85daffc0 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -6,10 +6,10 @@ use crate::config::{ PREV_DEFAULT_SLOTS_PER_RESTORE_POINT, }; use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator}; +use crate::hot_state_iter::HotStateRootIter; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; -use crate::leveldb_store::BytesKey; -use crate::leveldb_store::LevelDB; +use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, @@ -17,7 +17,7 @@ use crate::metadata::{ SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; -use crate::state_cache::StateCache; +use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, @@ -31,7 +31,9 @@ use serde_derive::{Deserialize, Serialize}; use slog::{debug, error, info, trace, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use state_processing::{BlockProcessingError, BlockReplayer, SlotProcessingError}; +use state_processing::{ + block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError, +}; use std::cmp::min; use std::convert::TryInto; use std::marker::PhantomData; @@ -41,6 +43,8 @@ use std::time::Duration; use types::*; use types::{beacon_state::BeaconStateDiff, EthSpec}; +pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32; + /// On-disk database that stores finalized states efficiently. /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores @@ -273,12 +277,11 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: Hash256, block_root: Hash256, - epoch: Epoch, state: BeaconState, ) -> Result<(), Error> { self.state_cache .lock() - .update_finalized_state(state_root, block_root, epoch, state) + .update_finalized_state(state_root, block_root, state) } pub fn state_cache_len(&self) -> usize { @@ -737,12 +740,14 @@ impl, Cold: ItemStore> HotColdDB // FIXME(sproul): could optimise out the block root let block_root = state.get_latest_block_root(*state_root); - if self - .state_cache - .lock() - .put_state(*state_root, block_root, state)? + // Avoid storing states in the database if they already exist in the state cache. + // The exception to this is the finalized state, which must exist in the cache before it + // is stored on disk. + if let PutStateOutcome::Duplicate = + self.state_cache + .lock() + .put_state(*state_root, block_root, state)? { - // Already exists in database. return Ok(()); } @@ -756,11 +761,10 @@ impl, Cold: ItemStore> HotColdDB // On the epoch boundary, store a diff from the previous epoch boundary state -- unless // we're at a fork boundary in which case the full state must be stored. if state.slot() % E::slots_per_epoch() == 0 { - if let Some(fork) = self.spec.fork_activated_at_slot::(state.slot()) { + if self.is_stored_as_full_state(*state_root, state.slot())? { info!( self.log, - "Storing fork transition state"; - "fork" => %fork, + "Storing full state on epoch boundary"; "slot" => state.slot(), "state_root" => ?state_root, ); @@ -852,7 +856,7 @@ impl, Cold: ItemStore> HotColdDB slot, latest_block_root, epoch_boundary_state_root, - prev_state_root, + prev_state_root: _, }) = self.load_hot_state_summary(state_root)? { // Load the latest block, and use it to confirm the validity of this state. @@ -882,21 +886,105 @@ impl, Cold: ItemStore> HotColdDB .map(Some); } - // Otherwise try to load the prior state and replay the `latest_block` on top of it as - // necessary (if it's not a skip slot). - let prev_state = self - .get_hot_state(&prev_state_root)? - .ok_or(HotColdDBError::MissingPrevState(prev_state_root))?; - let blocks = if latest_block.slot() == slot { - vec![latest_block] + // Backtrack until we reach a state that is in the cache, or in the worst case + // the finalized state (this should only be reachable on first start-up). + let mut state_root_iter = HotStateRootIter::new(self, slot, *state_root); + let mut state_roots = Vec::with_capacity(32); + let mut state = None; + + while let Some(res) = state_root_iter.next() { + let (prior_state_root, prior_slot) = res?; + + state_roots.push(Ok((prior_state_root, prior_slot))); + + // Check if this state is in the cache. + if let Some(base_state) = + self.state_cache.lock().get_by_state_root(prior_state_root) + { + debug!( + self.log, + "Found cached base state for replay"; + "base_state_root" => ?prior_state_root, + "base_slot" => prior_slot, + "target_state_root" => ?state_root, + "target_slot" => slot, + ); + state = Some(base_state); + break; + } + + // If the prior state is the split state and it isn't cached then load it in + // entirety from disk. This should only happen on first start up. + if prior_state_root == self.get_split_info().state_root { + debug!( + self.log, + "Using split state as base state for replay"; + "base_state_root" => ?prior_state_root, + "base_slot" => prior_slot, + "target_state_root" => ?state_root, + "target_slot" => slot, + ); + let (split_state, _) = self.load_hot_state_full(&prior_state_root)?; + state = Some(split_state); + break; + } + } + + let base_state = state.ok_or(Error::NoBaseStateFound(*state_root))?; + + // Reverse the collected state roots so that they are in slot ascending order. + state_roots.reverse(); + + // Collect the blocks to replay. + // We already have the latest block loaded, which is sufficient if the base state is + // just one slot behind the state to be constructed. + let mut blocks = if base_state.slot() + 1 == slot { + Vec::with_capacity(1) } else { - vec![] + self.load_blocks_to_replay(base_state.slot(), slot - 1, latest_block.parent_root())? }; + blocks.push(latest_block); - let state_roots = [(prev_state_root, slot - 1), (*state_root, slot)]; - let state_root_iter = state_roots.into_iter().map(Ok); + let state_cacher_hook: PreSlotHook<_, _> = Box::new(|opt_state_root, state| { + // Ensure all caches are built before attempting to cache. + state.update_tree_hash_cache()?; + state.build_all_caches(&self.spec)?; + + if let Some(state_root) = opt_state_root { + // Cache + if state.slot() + MAX_PARENT_STATES_TO_CACHE > slot + || state.slot() % E::slots_per_epoch() == 0 + { + debug!( + self.log, + "Caching ancestor state"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + // FIXME(sproul): this block root could be optimized out + let latest_block_root = state.get_latest_block_root(state_root); + self.state_cache + .lock() + .put_state(state_root, latest_block_root, state)?; + } + } else { + debug!( + self.log, + "Block replay state root miss"; + "slot" => state.slot(), + ); + } + Ok(()) + }); + + let mut state = self.replay_blocks( + base_state, + blocks, + slot, + state_roots.into_iter(), + Some(state_cacher_hook), + )?; - let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?; state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; @@ -1087,7 +1175,7 @@ impl, Cold: ItemStore> HotColdDB &self.spec, )?; - self.replay_blocks(low_restore_point, blocks, slot, state_root_iter) + self.replay_blocks(low_restore_point, blocks, slot, state_root_iter, None) } /// Get the restore point with the given index, or if it is out of bounds, the split state. @@ -1173,11 +1261,18 @@ impl, Cold: ItemStore> HotColdDB blocks: Vec>>, target_slot: Slot, state_root_iter: impl Iterator>, + pre_slot_hook: Option>, ) -> Result, Error> { - BlockReplayer::new(state, &self.spec) + let mut block_replayer = BlockReplayer::new(state, &self.spec) .no_signature_verification() .minimal_block_root_verification() - .state_root_iter(state_root_iter) + .state_root_iter(state_root_iter); + + if let Some(pre_slot_hook) = pre_slot_hook { + block_replayer = block_replayer.pre_slot_hook(pre_slot_hook); + } + + block_replayer .apply_blocks(blocks, Some(target_slot)) .map(|block_replayer| { // FIXME(sproul): tweak state miss condition @@ -1677,7 +1772,6 @@ pub fn migrate_database, Cold: ItemStore>( store.update_finalized_state( finalized_state_root, finalized_block_root, - finalized_state.slot().epoch(E::slots_per_epoch()), finalized_state.clone(), )?; diff --git a/beacon_node/store/src/hot_state_iter.rs b/beacon_node/store/src/hot_state_iter.rs new file mode 100644 index 0000000000..51dc917a5c --- /dev/null +++ b/beacon_node/store/src/hot_state_iter.rs @@ -0,0 +1,51 @@ +use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, ItemStore}; +use types::{EthSpec, Hash256, Slot}; + +pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> { + store: &'a HotColdDB, + next_slot: Slot, + next_state_root: Hash256, +} + +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> HotStateRootIter<'a, E, Hot, Cold> { + pub fn new( + store: &'a HotColdDB, + next_slot: Slot, + next_state_root: Hash256, + ) -> Self { + Self { + store, + next_slot, + next_state_root, + } + } + + fn do_next(&mut self) -> Result, Error> { + if self.next_state_root.is_zero() { + return Ok(None); + } + + let summary = self + .store + .load_hot_state_summary(&self.next_state_root)? + .ok_or_else(|| HotColdDBError::MissingHotStateSummary(self.next_state_root))?; + + let slot = self.next_slot; + let state_root = self.next_state_root; + + self.next_state_root = summary.prev_state_root; + self.next_slot -= 1; + + Ok(Some((state_root, slot))) + } +} + +impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> Iterator + for HotStateRootIter<'a, E, Hot, Cold> +{ + type Item = Result<(Hash256, Slot), Error>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 75ba1fcf0d..e3f8b31e1d 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -18,6 +18,7 @@ pub mod errors; mod forwards_iter; mod garbage_collection; pub mod hot_cold_store; +mod hot_state_iter; mod impls; mod leveldb_store; mod memory_store; diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 545fcbd04d..7ebbc4dd52 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -1,12 +1,11 @@ use crate::Error; use lru::LruCache; use std::collections::{BTreeMap, HashMap, HashSet}; -use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; +use types::{BeaconState, EthSpec, Hash256, Slot}; #[derive(Debug)] pub struct FinalizedState { state_root: Hash256, - epoch: Epoch, state: BeaconState, } @@ -29,6 +28,13 @@ pub struct StateCache { block_map: BlockMap, } +#[derive(Debug)] +pub enum PutStateOutcome { + Finalized, + Duplicate, + New, +} + impl StateCache { pub fn new(capacity: usize) -> Self { StateCache { @@ -46,25 +52,27 @@ impl StateCache { &mut self, state_root: Hash256, block_root: Hash256, - epoch: Epoch, state: BeaconState, ) -> Result<(), Error> { + if state.slot() % E::slots_per_epoch() != 0 { + return Err(Error::FinalizedStateUnaligned); + } + if self .finalized_state .as_ref() - .map_or(false, |finalized_state| epoch < finalized_state.epoch) + .map_or(false, |finalized_state| { + state.slot() < finalized_state.state.slot() + }) { - return Err(Error::FinalizedStateDecreasingEpoch); + return Err(Error::FinalizedStateDecreasingSlot); } - let finalized_slot = epoch.start_slot(E::slots_per_epoch()); - // Add to block map. - self.block_map - .insert(block_root, finalized_slot, state_root); + self.block_map.insert(block_root, state.slot(), state_root); // Prune block map. - let state_roots_to_prune = self.block_map.prune(finalized_slot); + let state_roots_to_prune = self.block_map.prune(state.slot()); // Delete states. for state_root in state_roots_to_prune { @@ -72,21 +80,17 @@ impl StateCache { } // Update finalized state. - self.finalized_state = Some(FinalizedState { - state_root, - epoch, - state, - }); + self.finalized_state = Some(FinalizedState { state_root, state }); Ok(()) } - /// Return a bool indicating whether the state already existed in the cache. + /// Return a status indicating whether the state already existed in the cache. pub fn put_state( &mut self, state_root: Hash256, block_root: Hash256, state: &BeaconState, - ) -> Result { + ) -> Result { if self .finalized_state .as_ref() @@ -94,11 +98,11 @@ impl StateCache { finalized_state.state_root == state_root }) { - return Ok(true); + return Ok(PutStateOutcome::Finalized); } if self.states.peek(&state_root).is_some() { - return Ok(true); + return Ok(PutStateOutcome::Duplicate); } // Refuse states with pending mutations: we want cached states to be as small as possible @@ -117,7 +121,7 @@ impl StateCache { let slot = state.slot(); self.block_map.insert(block_root, slot, state_root); - Ok(false) + Ok(PutStateOutcome::New) } pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option> { diff --git a/consensus/state_processing/src/block_replayer.rs b/consensus/state_processing/src/block_replayer.rs index b31e2d6c33..7fd2d98d8c 100644 --- a/consensus/state_processing/src/block_replayer.rs +++ b/consensus/state_processing/src/block_replayer.rs @@ -6,17 +6,18 @@ use crate::{ use std::marker::PhantomData; use types::{BeaconState, BlindedPayload, ChainSpec, EthSpec, Hash256, SignedBeaconBlock, Slot}; -type PreBlockHook<'a, E, Error> = Box< +pub type PreBlockHook<'a, E, Error> = Box< dyn FnMut(&mut BeaconState, &SignedBeaconBlock>) -> Result<(), Error> + 'a, >; -type PostBlockHook<'a, E, Error> = PreBlockHook<'a, E, Error>; -type PreSlotHook<'a, E, Error> = Box) -> Result<(), Error> + 'a>; -type PostSlotHook<'a, E, Error> = Box< +pub type PostBlockHook<'a, E, Error> = PreBlockHook<'a, E, Error>; +pub type PreSlotHook<'a, E, Error> = + Box, &mut BeaconState) -> Result<(), Error> + 'a>; +pub type PostSlotHook<'a, E, Error> = Box< dyn FnMut(&mut BeaconState, Option>, bool) -> Result<(), Error> + 'a, >; -type StateRootIterDefault = std::iter::Empty>; +pub type StateRootIterDefault = std::iter::Empty>; /// Efficiently apply blocks to a state while configuring various parameters. /// @@ -201,11 +202,12 @@ where } while self.state.slot() < block.slot() { + let state_root = self.get_state_root(self.state.slot(), &blocks, i)?; + if let Some(ref mut pre_slot_hook) = self.pre_slot_hook { - pre_slot_hook(&mut self.state)?; + pre_slot_hook(state_root, &mut self.state)?; } - let state_root = self.get_state_root(self.state.slot(), &blocks, i)?; let summary = per_slot_processing(&mut self.state, state_root, self.spec) .map_err(BlockReplayError::from)?; @@ -243,11 +245,12 @@ where if let Some(target_slot) = target_slot { while self.state.slot() < target_slot { + let state_root = self.get_state_root(self.state.slot(), &blocks, blocks.len())?; + if let Some(ref mut pre_slot_hook) = self.pre_slot_hook { - pre_slot_hook(&mut self.state)?; + pre_slot_hook(state_root, &mut self.state)?; } - let state_root = self.get_state_root(self.state.slot(), &blocks, blocks.len())?; let summary = per_slot_processing(&mut self.state, state_root, self.spec) .map_err(BlockReplayError::from)?;