In-memory tree states (#5533)

* Consensus changes

* EF tests

* lcli

* common and watch

* account manager

* cargo

* fork choice

* promise cache

* beacon chain

* interop genesis

* http api

* lighthouse

* op pool

* beacon chain misc

* parallel state cache

* store

* fix issues in store

* IT COMPILES

* Remove some unnecessary module qualification

* Revert Arced pubkey optimization (#5536)

* Merge remote-tracking branch 'origin/unstable' into tree-states-memory

* Fix caching, rebasing and some tests

* Remove unused deps

* Merge remote-tracking branch 'origin/unstable' into tree-states-memory

* Small cleanups

* Revert shuffling cache/promise cache changes

* Fix state advance bugs

* Fix shuffling tests

* Remove some resolved FIXMEs

* Remove StateProcessingStrategy

* Optimise withdrawals calculation

* Don't reorg if state cache is missed

* Remove inconsistent state func

* Fix beta compiler

* Rebase early, rebase often

* Fix state caching behaviour

* Update to milhouse release

* Fix on-disk consensus context format

* Merge remote-tracking branch 'origin/unstable' into tree-states-memory

* Squashed commit of the following:

commit 3a16649023
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Thu Apr 18 14:26:09 2024 +1000

    Fix on-disk consensus context format

* Keep indexed attestations, thanks Sean

* Merge branch 'on-disk-consensus-context' into tree-states-memory

* Merge branch 'unstable' into tree-states-memory

* Address half of Sean's review

* More simplifications from Sean's review

* Cache state after get_advanced_hot_state
This commit is contained in:
Michael Sproul
2024-04-24 11:22:36 +10:00
committed by GitHub
parent 4cad1fcbbe
commit 61962898e2
108 changed files with 2038 additions and 2762 deletions

View File

@@ -17,6 +17,7 @@ use crate::metadata::{
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
@@ -30,7 +31,8 @@ use slog::{debug, error, info, trace, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
BlockProcessingError, BlockReplayer, SlotProcessingError, StateProcessingStrategy,
block_replayer::PreSlotHook, AllCaches, BlockProcessingError, BlockReplayer,
SlotProcessingError,
};
use std::cmp::min;
use std::marker::PhantomData;
@@ -66,12 +68,16 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pub hot_db: Hot,
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
block_cache: Mutex<BlockCache<E>>,
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
state_cache: Mutex<StateCache<E>>,
/// LRU cache of replayed states.
state_cache: Mutex<LruCache<Slot, BeaconState<E>>>,
historic_state_cache: Mutex<LruCache<Slot, BeaconState<E>>>,
/// Chain spec.
pub(crate) spec: ChainSpec,
/// Logger.
pub(crate) log: Logger,
pub log: Logger,
/// Mere vessel for E.
_phantom: PhantomData<E>,
}
@@ -178,7 +184,8 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
historic_state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
config,
spec,
log,
@@ -192,8 +199,6 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
/// Open a new or existing database, with the given paths to the hot and cold DBs.
///
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
///
/// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide
/// context and access `BeaconChain`-level code without creating a circular dependency.
pub fn open(
@@ -215,7 +220,8 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
blobs_db: LevelDB::open(blobs_db_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
historic_state_cache: Mutex::new(LruCache::new(config.historic_state_cache_size)),
config,
spec,
log,
@@ -352,6 +358,21 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
pub fn update_finalized_state(
&self,
state_root: Hash256,
block_root: Hash256,
state: BeaconState<E>,
) -> Result<(), Error> {
self.state_cache
.lock()
.update_finalized_state(state_root, block_root, state)
}
pub fn state_cache_len(&self) -> usize {
self.state_cache.lock().len()
}
/// Store a block and update the LRU cache.
pub fn put_block(
&self,
@@ -615,11 +636,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Store a state in the store.
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
self.put_state_possibly_temporary(state_root, state, false)
}
/// Store a state in the store.
///
/// The `temporary` flag indicates whether this state should be considered canonical.
pub fn put_state_possibly_temporary(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
temporary: bool,
) -> Result<(), Error> {
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
if state.slot() < self.get_split_slot() {
self.store_cold_state(state_root, state, &mut ops)?;
self.cold_db.do_atomically(ops)
} else {
if temporary {
ops.push(TemporaryFlag.as_kv_store_op(*state_root));
}
self.store_hot_state(state_root, state, &mut ops)?;
self.hot_db.do_atomically(ops)
}
@@ -648,45 +684,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// chain. This way we avoid returning a state that doesn't match `state_root`.
self.load_cold_state(state_root)
} else {
self.load_hot_state(state_root, StateProcessingStrategy::Accurate)
self.get_hot_state(state_root)
}
} else {
match self.load_hot_state(state_root, StateProcessingStrategy::Accurate)? {
match self.get_hot_state(state_root)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
}
}
/// Fetch a state from the store, but don't compute all of the values when replaying blocks
/// upon that state (e.g., state roots). Additionally, only states from the hot store are
/// returned.
///
/// See `Self::get_advanced_hot_state` for information about `max_slot`.
///
/// ## Warning
///
/// The returned state **is not a valid beacon state**, it can only be used for obtaining
/// shuffling to process attestations. At least the following components of the state will be
/// broken/invalid:
///
/// - `state.state_roots`
/// - `state.block_roots`
pub fn get_inconsistent_state_for_attestation_verification_only(
&self,
block_root: &Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
self.get_advanced_hot_state_with_strategy(
*block_root,
max_slot,
state_root,
StateProcessingStrategy::Inconsistent,
)
}
/// Get a state with `latest_block_root == block_root` advanced through to at most `max_slot`.
///
/// The `state_root` argument is used to look up the block's un-advanced state in case an
@@ -697,35 +704,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// - `result_state_root == state.canonical_root()`
/// - `state.slot() <= max_slot`
/// - `state.get_latest_block_root(result_state_root) == block_root`
///
/// Presently this is only used to avoid loading the un-advanced split state, but in future will
/// be expanded to return states from an in-memory cache.
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
self.get_advanced_hot_state_with_strategy(
block_root,
max_slot,
state_root,
StateProcessingStrategy::Accurate,
)
}
if let Some(cached) = self.get_advanced_hot_state_from_cache(block_root, max_slot) {
return Ok(Some(cached));
}
/// Same as `get_advanced_hot_state` but taking a `StateProcessingStrategy`.
pub fn get_advanced_hot_state_with_strategy(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
state_processing_strategy: StateProcessingStrategy,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
// Hold a read lock on the split point so it can't move while we're trying to load the
// state.
let split = self.split.read_recursive();
if state_root != split.state_root {
warn!(
self.log,
"State cache missed";
"state_root" => ?state_root,
"block_root" => ?block_root,
);
}
// Sanity check max-slot against the split slot.
if max_slot < split.slot {
return Err(HotColdDBError::FinalizedStateNotInHotDatabase {
@@ -741,11 +742,40 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} else {
state_root
};
let state = self
.load_hot_state(&state_root, state_processing_strategy)?
.map(|state| (state_root, state));
let mut opt_state = self
.load_hot_state(&state_root)?
.map(|(state, _block_root)| (state_root, state));
if let Some((state_root, state)) = opt_state.as_mut() {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?;
debug!(
self.log,
"Cached state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
}
drop(split);
Ok(state)
Ok(opt_state)
}
/// Same as `get_advanced_hot_state` but will return `None` if no compatible state is cached.
///
/// If this function returns `Some(state)` then that `state` will always have
/// `latest_block_header` matching `block_root` but may not be advanced all the way through to
/// `max_slot`.
pub fn get_advanced_hot_state_from_cache(
&self,
block_root: Hash256,
max_slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
self.state_cache
.lock()
.get_by_block_root(block_root, max_slot)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
@@ -755,17 +785,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
/// (which will be deleted by this function but shouldn't be).
pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
// Delete the state summary.
self.hot_db
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
// Delete the full state if it lies on an epoch boundary.
if slot % E::slots_per_epoch() == 0 {
self.hot_db
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
}
Ok(())
self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::DeleteState(
*state_root,
Some(slot),
)])
}
pub fn forwards_block_roots_iterator(
@@ -833,17 +856,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
//
// `StateProcessingStrategy` should be irrelevant here since we never replay blocks for an epoch
// boundary state in the hot DB.
let state = self
.load_hot_state(
&epoch_boundary_state_root,
StateProcessingStrategy::Accurate,
)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(
epoch_boundary_state_root,
))?;
let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
Ok(Some(state))
} else {
// Try the cold DB
@@ -1029,12 +1044,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteBlock(block_root) => {
guard.delete_block(&block_root);
self.state_cache.lock().delete_block_states(&block_root);
}
StoreOp::DeleteState(state_root, _) => {
self.state_cache.lock().delete_state(&state_root)
}
StoreOp::DeleteBlobs(_) => (),
StoreOp::DeleteState(_, _) => (),
StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::KeyValueOp(_) => (),
@@ -1070,6 +1088,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Put the state in the cache.
let block_root = state.get_latest_block_root(*state_root);
// Avoid storing states in the database if they already exist in the state cache.
// The exception to this is the finalized state, which must exist in the cache before it
// is stored on disk.
if let PutStateOutcome::Duplicate =
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?
{
debug!(
self.log,
"Skipping storage of cached state";
"slot" => state.slot(),
"state_root" => ?state_root
);
return Ok(());
}
// On the epoch boundary, store the full state.
if state.slot() % E::slots_per_epoch() == 0 {
trace!(
@@ -1091,14 +1129,51 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// Get a post-finalization state from the database or store.
pub fn get_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
return Ok(Some(state));
}
if *state_root != self.get_split_info().state_root {
// Do not warn on start up when loading the split state.
warn!(
self.log,
"State cache missed";
"state_root" => ?state_root,
);
}
let state_from_disk = self.load_hot_state(state_root)?;
if let Some((mut state, block_root)) = state_from_disk {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?;
debug!(
self.log,
"Cached state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
Ok(Some(state))
} else {
Ok(None)
}
}
/// Load a post-finalization state from the hot database.
///
/// Will replay blocks from the nearest epoch boundary.
///
/// Return the `(state, latest_block_root)` where `latest_block_root` is the root of the last
/// block applied to `state`.
pub fn load_hot_state(
&self,
state_root: &Hash256,
state_processing_strategy: StateProcessingStrategy,
) -> Result<Option<BeaconState<E>>, Error> {
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// If the state is marked as temporary, do not return it. It will become visible
@@ -1113,16 +1188,47 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
epoch_boundary_state_root,
}) = self.load_hot_state_summary(state_root)?
{
let boundary_state =
let mut boundary_state =
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
// Immediately rebase the state from disk on the finalized state so that we can reuse
// parts of the tree for state root calculation in `replay_blocks`.
self.state_cache
.lock()
.rebase_on_finalized(&mut boundary_state, &self.spec)?;
// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
let state = if slot % E::slots_per_epoch() == 0 {
let mut state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
// Cache ALL intermediate states that are reached during block replay. We may want
// to restrict this in future to only cache epoch boundary states. At worst we will
// cache up to 32 states for each state loaded, which should not flush out the cache
// entirely.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
let latest_block_root = state.get_latest_block_root(state_root);
let state_slot = state.slot();
if let PutStateOutcome::New =
self.state_cache
.lock()
.put_state(state_root, latest_block_root, state)?
{
debug!(
self.log,
"Cached ancestor state";
"state_root" => ?state_root,
"slot" => state_slot,
);
}
Ok(())
};
let blocks =
self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?;
self.replay_blocks(
@@ -1130,11 +1236,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blocks,
slot,
no_state_root_iter(),
state_processing_strategy,
Some(Box::new(state_cache_hook)),
)?
};
state.apply_pending_mutations()?;
Ok(Some(state))
Ok(Some((state, latest_block_root)))
} else {
Ok(None)
}
@@ -1233,7 +1340,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
partial_state.load_randao_mixes(&self.cold_db, &self.spec)?;
partial_state.load_historical_summaries(&self.cold_db, &self.spec)?;
partial_state.try_into()
let mut state: BeaconState<E> = partial_state.try_into()?;
state.apply_pending_mutations()?;
Ok(state)
}
/// Load a restore point state by its `restore_point_index`.
@@ -1247,7 +1356,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Load a frozen state that lies between restore points.
fn load_cold_intermediate_state(&self, slot: Slot) -> Result<BeaconState<E>, Error> {
if let Some(state) = self.state_cache.lock().get(&slot) {
if let Some(state) = self.historic_state_cache.lock().get(&slot) {
return Ok(state.clone());
}
@@ -1261,7 +1370,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut low_state: Option<BeaconState<E>> = None;
// Try to get a more recent state from the cache to avoid massive blocks replay.
for (s, state) in self.state_cache.lock().iter() {
for (s, state) in self.historic_state_cache.lock().iter() {
if s.as_u64() / self.config.slots_per_restore_point == low_restore_point_idx
&& *s < slot
&& low_slot < *s
@@ -1299,16 +1408,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self.spec,
)?;
let state = self.replay_blocks(
low_state,
blocks,
slot,
Some(state_root_iter),
StateProcessingStrategy::Accurate,
)?;
let mut state = self.replay_blocks(low_state, blocks, slot, Some(state_root_iter), None)?;
state.apply_pending_mutations()?;
// If state is not error, put it in the cache.
self.state_cache.lock().put(slot, state.clone());
self.historic_state_cache.lock().put(slot, state.clone());
Ok(state)
}
@@ -1390,16 +1494,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// Will skip slots as necessary. The returned state is not guaranteed
/// to have any caches built, beyond those immediately required by block processing.
fn replay_blocks(
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
target_slot: Slot,
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
state_processing_strategy: StateProcessingStrategy,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
) -> Result<BeaconState<E>, Error> {
let mut block_replayer = BlockReplayer::new(state, &self.spec)
.state_processing_strategy(state_processing_strategy)
.no_signature_verification()
.minimal_block_root_verification();
@@ -1408,17 +1511,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block_replayer = block_replayer.state_root_iter(state_root_iter);
}
if let Some(pre_slot_hook) = pre_slot_hook {
block_replayer = block_replayer.pre_slot_hook(pre_slot_hook);
}
block_replayer
.apply_blocks(blocks, Some(target_slot))
.map(|block_replayer| {
if have_state_root_iterator && block_replayer.state_root_miss() {
warn!(
self.log,
"State root iterator miss";
"State root cache miss during block replay";
"slot" => target_slot,
);
}
block_replayer.into_state()
})
}
@@ -2213,7 +2319,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// This function fills in missing block roots between last restore point slot and split
/// slot, if any.
/// slot, if any.
pub fn heal_freezer_block_roots_at_split(&self) -> Result<(), Error> {
let split = self.get_split_info();
let last_restore_point_slot = (split.slot - 1) / self.config.slots_per_restore_point
@@ -2528,15 +2634,22 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
};
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
// the in-memory split point now.
*split_guard = split;
}
// Delete the states from the hot database if we got this far.
// Delete the blocks and states from the hot database if we got this far.
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
// Update the cache's view of the finalized state.
store.update_finalized_state(
finalized_state_root,
finalized_block_root,
finalized_state.clone(),
)?;
debug!(
store.log,
"Freezer migration complete";