Remove recursion from DB state lookup

This commit is contained in:
Michael Sproul
2022-05-27 16:05:55 +10:00
parent f30f17bf36
commit aaebf72835
10 changed files with 222 additions and 73 deletions

View File

@@ -42,6 +42,7 @@ pub enum Error {
},
MissingStateRoot(Slot),
MissingState(Hash256),
NoBaseStateFound(Hash256),
BlockReplayError(BlockReplayError),
MilhouseError(milhouse::Error),
Compression(std::io::Error),
@@ -49,7 +50,8 @@ pub enum Error {
SlotIsBeforeSplit {
slot: Slot,
},
FinalizedStateDecreasingEpoch,
FinalizedStateDecreasingSlot,
FinalizedStateUnaligned,
StateForCacheHasPendingUpdates {
state_root: Hash256,
slot: Slot,

View File

@@ -6,10 +6,10 @@ use crate::config::{
PREV_DEFAULT_SLOTS_PER_RESTORE_POINT,
};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::hot_state_iter::HotStateRootIter;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::leveldb_store::{BytesKey, LevelDB};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
@@ -17,7 +17,7 @@ use crate::metadata::{
SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::state_cache::StateCache;
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
@@ -31,7 +31,9 @@ use serde_derive::{Deserialize, Serialize};
use slog::{debug, error, info, trace, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{BlockProcessingError, BlockReplayer, SlotProcessingError};
use state_processing::{
block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError,
};
use std::cmp::min;
use std::convert::TryInto;
use std::marker::PhantomData;
@@ -41,6 +43,8 @@ use std::time::Duration;
use types::*;
use types::{beacon_state::BeaconStateDiff, EthSpec};
pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32;
/// On-disk database that stores finalized states efficiently.
///
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
@@ -273,12 +277,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: Hash256,
block_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
) -> Result<(), Error> {
self.state_cache
.lock()
.update_finalized_state(state_root, block_root, epoch, state)
.update_finalized_state(state_root, block_root, state)
}
pub fn state_cache_len(&self) -> usize {
@@ -737,12 +740,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// FIXME(sproul): could optimise out the block root
let block_root = state.get_latest_block_root(*state_root);
if self
.state_cache
.lock()
.put_state(*state_root, block_root, state)?
// Avoid storing states in the database if they already exist in the state cache.
// The exception to this is the finalized state, which must exist in the cache before it
// is stored on disk.
if let PutStateOutcome::Duplicate =
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?
{
// Already exists in database.
return Ok(());
}
@@ -756,11 +761,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// On the epoch boundary, store a diff from the previous epoch boundary state -- unless
// we're at a fork boundary in which case the full state must be stored.
if state.slot() % E::slots_per_epoch() == 0 {
if let Some(fork) = self.spec.fork_activated_at_slot::<E>(state.slot()) {
if self.is_stored_as_full_state(*state_root, state.slot())? {
info!(
self.log,
"Storing fork transition state";
"fork" => %fork,
"Storing full state on epoch boundary";
"slot" => state.slot(),
"state_root" => ?state_root,
);
@@ -852,7 +856,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
slot,
latest_block_root,
epoch_boundary_state_root,
prev_state_root,
prev_state_root: _,
}) = self.load_hot_state_summary(state_root)?
{
// Load the latest block, and use it to confirm the validity of this state.
@@ -882,21 +886,105 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(Some);
}
// Otherwise try to load the prior state and replay the `latest_block` on top of it as
// necessary (if it's not a skip slot).
let prev_state = self
.get_hot_state(&prev_state_root)?
.ok_or(HotColdDBError::MissingPrevState(prev_state_root))?;
let blocks = if latest_block.slot() == slot {
vec![latest_block]
// Backtrack until we reach a state that is in the cache, or in the worst case
// the finalized state (this should only be reachable on first start-up).
let mut state_root_iter = HotStateRootIter::new(self, slot, *state_root);
let mut state_roots = Vec::with_capacity(32);
let mut state = None;
while let Some(res) = state_root_iter.next() {
let (prior_state_root, prior_slot) = res?;
state_roots.push(Ok((prior_state_root, prior_slot)));
// Check if this state is in the cache.
if let Some(base_state) =
self.state_cache.lock().get_by_state_root(prior_state_root)
{
debug!(
self.log,
"Found cached base state for replay";
"base_state_root" => ?prior_state_root,
"base_slot" => prior_slot,
"target_state_root" => ?state_root,
"target_slot" => slot,
);
state = Some(base_state);
break;
}
// If the prior state is the split state and it isn't cached then load it in
// entirety from disk. This should only happen on first start up.
if prior_state_root == self.get_split_info().state_root {
debug!(
self.log,
"Using split state as base state for replay";
"base_state_root" => ?prior_state_root,
"base_slot" => prior_slot,
"target_state_root" => ?state_root,
"target_slot" => slot,
);
let (split_state, _) = self.load_hot_state_full(&prior_state_root)?;
state = Some(split_state);
break;
}
}
let base_state = state.ok_or(Error::NoBaseStateFound(*state_root))?;
// Reverse the collected state roots so that they are in slot ascending order.
state_roots.reverse();
// Collect the blocks to replay.
// We already have the latest block loaded, which is sufficient if the base state is
// just one slot behind the state to be constructed.
let mut blocks = if base_state.slot() + 1 == slot {
Vec::with_capacity(1)
} else {
vec![]
self.load_blocks_to_replay(base_state.slot(), slot - 1, latest_block.parent_root())?
};
blocks.push(latest_block);
let state_roots = [(prev_state_root, slot - 1), (*state_root, slot)];
let state_root_iter = state_roots.into_iter().map(Ok);
let state_cacher_hook: PreSlotHook<_, _> = Box::new(|opt_state_root, state| {
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
if let Some(state_root) = opt_state_root {
// Cache
if state.slot() + MAX_PARENT_STATES_TO_CACHE > slot
|| state.slot() % E::slots_per_epoch() == 0
{
debug!(
self.log,
"Caching ancestor state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
// FIXME(sproul): this block root could be optimized out
let latest_block_root = state.get_latest_block_root(state_root);
self.state_cache
.lock()
.put_state(state_root, latest_block_root, state)?;
}
} else {
debug!(
self.log,
"Block replay state root miss";
"slot" => state.slot(),
);
}
Ok(())
});
let mut state = self.replay_blocks(
base_state,
blocks,
slot,
state_roots.into_iter(),
Some(state_cacher_hook),
)?;
let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?;
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
@@ -1087,7 +1175,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self.spec,
)?;
self.replay_blocks(low_restore_point, blocks, slot, state_root_iter)
self.replay_blocks(low_restore_point, blocks, slot, state_root_iter, None)
}
/// Get the restore point with the given index, or if it is out of bounds, the split state.
@@ -1173,11 +1261,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
target_slot: Slot,
state_root_iter: impl Iterator<Item = Result<(Hash256, Slot), Error>>,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
) -> Result<BeaconState<E>, Error> {
BlockReplayer::new(state, &self.spec)
let mut block_replayer = BlockReplayer::new(state, &self.spec)
.no_signature_verification()
.minimal_block_root_verification()
.state_root_iter(state_root_iter)
.state_root_iter(state_root_iter);
if let Some(pre_slot_hook) = pre_slot_hook {
block_replayer = block_replayer.pre_slot_hook(pre_slot_hook);
}
block_replayer
.apply_blocks(blocks, Some(target_slot))
.map(|block_replayer| {
// FIXME(sproul): tweak state miss condition
@@ -1677,7 +1772,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store.update_finalized_state(
finalized_state_root,
finalized_block_root,
finalized_state.slot().epoch(E::slots_per_epoch()),
finalized_state.clone(),
)?;

View File

@@ -0,0 +1,51 @@
use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, ItemStore};
use types::{EthSpec, Hash256, Slot};
pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotStateRootIter<'a, E, Hot, Cold> {
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
) -> Self {
Self {
store,
next_slot,
next_state_root,
}
}
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>, Error> {
if self.next_state_root.is_zero() {
return Ok(None);
}
let summary = self
.store
.load_hot_state_summary(&self.next_state_root)?
.ok_or_else(|| HotColdDBError::MissingHotStateSummary(self.next_state_root))?;
let slot = self.next_slot;
let state_root = self.next_state_root;
self.next_state_root = summary.prev_state_root;
self.next_slot -= 1;
Ok(Some((state_root, slot)))
}
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HotStateRootIter<'a, E, Hot, Cold>
{
type Item = Result<(Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}

View File

@@ -18,6 +18,7 @@ pub mod errors;
mod forwards_iter;
mod garbage_collection;
pub mod hot_cold_store;
mod hot_state_iter;
mod impls;
mod leveldb_store;
mod memory_store;

View File

@@ -1,12 +1,11 @@
use crate::Error;
use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
use types::{BeaconState, EthSpec, Hash256, Slot};
#[derive(Debug)]
pub struct FinalizedState<E: EthSpec> {
state_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
}
@@ -29,6 +28,13 @@ pub struct StateCache<E: EthSpec> {
block_map: BlockMap,
}
#[derive(Debug)]
pub enum PutStateOutcome {
Finalized,
Duplicate,
New,
}
impl<E: EthSpec> StateCache<E> {
pub fn new(capacity: usize) -> Self {
StateCache {
@@ -46,25 +52,27 @@ impl<E: EthSpec> StateCache<E> {
&mut self,
state_root: Hash256,
block_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
) -> Result<(), Error> {
if state.slot() % E::slots_per_epoch() != 0 {
return Err(Error::FinalizedStateUnaligned);
}
if self
.finalized_state
.as_ref()
.map_or(false, |finalized_state| epoch < finalized_state.epoch)
.map_or(false, |finalized_state| {
state.slot() < finalized_state.state.slot()
})
{
return Err(Error::FinalizedStateDecreasingEpoch);
return Err(Error::FinalizedStateDecreasingSlot);
}
let finalized_slot = epoch.start_slot(E::slots_per_epoch());
// Add to block map.
self.block_map
.insert(block_root, finalized_slot, state_root);
self.block_map.insert(block_root, state.slot(), state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(finalized_slot);
let state_roots_to_prune = self.block_map.prune(state.slot());
// Delete states.
for state_root in state_roots_to_prune {
@@ -72,21 +80,17 @@ impl<E: EthSpec> StateCache<E> {
}
// Update finalized state.
self.finalized_state = Some(FinalizedState {
state_root,
epoch,
state,
});
self.finalized_state = Some(FinalizedState { state_root, state });
Ok(())
}
/// Return a bool indicating whether the state already existed in the cache.
/// Return a status indicating whether the state already existed in the cache.
pub fn put_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<bool, Error> {
) -> Result<PutStateOutcome, Error> {
if self
.finalized_state
.as_ref()
@@ -94,11 +98,11 @@ impl<E: EthSpec> StateCache<E> {
finalized_state.state_root == state_root
})
{
return Ok(true);
return Ok(PutStateOutcome::Finalized);
}
if self.states.peek(&state_root).is_some() {
return Ok(true);
return Ok(PutStateOutcome::Duplicate);
}
// Refuse states with pending mutations: we want cached states to be as small as possible
@@ -117,7 +121,7 @@ impl<E: EthSpec> StateCache<E> {
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
Ok(false)
Ok(PutStateOutcome::New)
}
pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option<BeaconState<E>> {