Load all states relative to finalized state

This commit is contained in:
Michael Sproul
2022-02-15 15:37:24 +11:00
parent b8709fdcab
commit e86cff2f8b
7 changed files with 58 additions and 119 deletions

View File

@@ -49,8 +49,8 @@ enum Error {
block_root: Hash256,
},
BadStateSlot {
_state_slot: Slot,
_block_slot: Slot,
state_slot: Slot,
current_slot: Slot,
},
}
@@ -225,9 +225,8 @@ fn advance_head<T: BeaconChainTypes>(
// Advancing more than one slot without storing the intermediate state would corrupt the
// database. Future works might store temporary, intermediate states inside this function.
return Err(Error::BadStateSlot {
// FIXME(sproul): wrong
_block_slot: state.slot(),
_state_slot: state.slot(),
state_slot: state.slot(),
current_slot: current_slot,
});
}

View File

@@ -2,7 +2,7 @@
name = "store"
version = "0.2.0"
authors = ["Paul Hauner <paul@paulhauner.com>"]
edition = "2018"
edition = "2021"
[dev-dependencies]
tempfile = "3.1.0"

View File

@@ -3,7 +3,6 @@ use crate::chunked_vector::{
};
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::hot_state_iter::ForwardsHotStateRootIter;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::BytesKey;
@@ -88,6 +87,7 @@ pub enum HotColdDBError {
MissingColdStateSummary(Hash256),
MissingHotStateSummary(Hash256),
MissingEpochBoundaryState(Hash256),
MissingPrevState(Hash256),
MissingSplitState(Hash256, Slot),
MissingAnchorInfo,
HotStateSummaryError(BeaconStateError),
@@ -712,6 +712,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let _timer = metrics::start_timer(&metrics::BEACON_HOT_STATE_READ_TIMES);
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// If the state is the finalized state, load it from disk. This should only be necessary
// once during start-up, after which point the finalized state will be cached.
if *state_root == self.get_split_info().state_root {
let mut state = get_full_state(&self.hot_db, state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
state.apply_pending_mutations()?;
let latest_block_root = state.get_latest_block_root(*state_root);
return Ok(Some((state, latest_block_root)));
}
// If the state is marked as temporary, do not return it. It will become visible
// only once its transaction commits and deletes its temporary flag.
if self.load_state_temporary_flag(state_root)?.is_some() {
@@ -721,32 +731,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if let Some(HotStateSummary {
slot,
latest_block_root,
epoch_boundary_state_root,
prev_state_root,
..
}) = self.load_hot_state_summary(state_root)?
{
let boundary_state =
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
// Load prior state, potentially from the cache.
//
// This can backtrack as far as the finalized state in extreme cases, but will prime
// the cache with every intermediate state while doing so, meaning that this work should
// be repeated infrequently.
let prev_state = self
.get_hot_state(&prev_state_root)?
.ok_or(HotColdDBError::MissingPrevState(prev_state_root))?;
// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
let state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
let blocks =
self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?;
let blocks = self.load_blocks_to_replay(slot, slot, latest_block_root)?;
let state_root_iter = ForwardsHotStateRootIter::new(
self,
boundary_state.slot(),
slot,
*state_root,
prev_state_root,
)?;
self.replay_blocks(boundary_state, blocks, slot, state_root_iter)?
};
let state_roots = [(prev_state_root, slot - 1), (*state_root, slot)];
let state_root_iter = state_roots.into_iter().map(Ok);
let mut state = self.replay_blocks(prev_state, blocks, slot, state_root_iter)?;
state.apply_pending_mutations()?;
Ok(Some((state, latest_block_root)))
} else {

View File

@@ -1,90 +0,0 @@
use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, ItemStore};
use itertools::process_results;
use std::iter;
use take_until::TakeUntilExt;
use types::{EthSpec, Hash256, Slot};
pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotStateRootIter<'a, E, Hot, Cold> {
pub fn new(
store: &'a HotColdDB<E, Hot, Cold>,
next_slot: Slot,
next_state_root: Hash256,
) -> Self {
Self {
store,
next_slot,
next_state_root,
}
}
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>, Error> {
if self.next_state_root.is_zero() {
return Ok(None);
}
let summary = self
.store
.load_hot_state_summary(&self.next_state_root)?
.ok_or_else(|| HotColdDBError::MissingHotStateSummary(self.next_state_root))?;
let slot = self.next_slot;
let state_root = self.next_state_root;
self.next_state_root = summary.prev_state_root;
self.next_slot -= 1;
Ok(Some((state_root, slot)))
}
}
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
for HotStateRootIter<'a, E, Hot, Cold>
{
type Item = Result<(Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> {
self.do_next().transpose()
}
}
pub struct ForwardsHotStateRootIter {
// Values from the backwards iterator (in slot descending order)
values: Vec<(Hash256, Slot)>,
}
impl ForwardsHotStateRootIter {
pub fn new<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: &HotColdDB<E, Hot, Cold>,
start_slot: Slot,
end_slot: Slot,
last_state_root: Hash256,
second_last_state_root: Hash256,
) -> Result<Self, Error> {
process_results(
iter::once(Ok((last_state_root, end_slot))).chain(HotStateRootIter::new(
store,
end_slot - 1,
second_last_state_root,
)),
|iter| {
let values = iter.take_until(|(_, slot)| *slot == start_slot).collect();
Self { values }
},
)
}
}
impl Iterator for ForwardsHotStateRootIter {
type Item = Result<(Hash256, Slot), Error>;
fn next(&mut self) -> Option<Self::Item> {
// Pop from the end of the vector to get the state roots in slot-ascending order.
Ok(self.values.pop()).transpose()
}
}

View File

@@ -18,7 +18,6 @@ pub mod errors;
mod forwards_iter;
mod garbage_collection;
pub mod hot_cold_store;
mod hot_state_iter;
mod impls;
mod leveldb_store;
mod memory_store;

View File

@@ -92,6 +92,12 @@ impl<E: EthSpec> StateCache<E> {
return Ok(true);
}
// FIXME(sproul): remove zis
assert!(
!state.has_pending_mutations(),
"what are you doing putting these filthy states in here?"
);
// Insert the full state into the cache.
self.states.put(state_root, state.clone());

View File

@@ -1589,6 +1589,26 @@ impl<T: EthSpec> BeaconState<T> {
*self.pubkey_cache_mut() = PubkeyCache::default()
}
pub fn has_pending_mutations(&self) -> bool {
self.block_roots().has_pending_updates()
|| self.state_roots().has_pending_updates()
|| self.historical_roots().has_pending_updates()
|| self.eth1_data_votes().has_pending_updates()
|| self.validators().has_pending_updates()
|| self.balances().has_pending_updates()
|| self.randao_mixes().has_pending_updates()
|| self.slashings().has_pending_updates()
|| self
.inactivity_scores()
.map_or(false, VList::has_pending_updates)
|| self
.previous_epoch_participation()
.map_or(false, VList::has_pending_updates)
|| self
.current_epoch_participation()
.map_or(false, VList::has_pending_updates)
}
// FIXME(sproul): automate this somehow
pub fn apply_pending_mutations(&mut self) -> Result<(), Error> {
self.block_roots_mut().apply_updates()?;
@@ -1600,6 +1620,7 @@ impl<T: EthSpec> BeaconState<T> {
self.randao_mixes_mut().apply_updates()?;
self.slashings_mut().apply_updates()?;
// FIXME(sproul): phase0 fields
if let Ok(inactivity_scores) = self.inactivity_scores_mut() {
inactivity_scores.apply_updates()?;
}