mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-16 12:28:24 +00:00
Store all state roots on disk
This commit is contained in:
@@ -3,6 +3,7 @@ use crate::chunked_vector::{
|
||||
};
|
||||
use crate::config::{OnDiskStoreConfig, StoreConfig};
|
||||
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
|
||||
use crate::hot_state_iter::ForwardsHotStateRootIter;
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||
use crate::leveldb_store::BytesKey;
|
||||
@@ -21,13 +22,12 @@ use crate::{
|
||||
use leveldb::iterator::LevelDBIterator;
|
||||
use lru::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use safe_arith::SafeArith;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use slog::{debug, error, info, trace, warn, Logger};
|
||||
use slog::{debug, error, info, trace, Logger};
|
||||
use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use state_processing::{
|
||||
BlockProcessingError, BlockReplayer, SlotProcessingError, StateRootStrategy,
|
||||
};
|
||||
use state_processing::{BlockProcessingError, BlockReplayer, SlotProcessingError};
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::marker::PhantomData;
|
||||
@@ -362,10 +362,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
// chain. This way we avoid returning a state that doesn't match `state_root`.
|
||||
self.load_cold_state(state_root)
|
||||
} else {
|
||||
self.load_hot_state(state_root, StateRootStrategy::Accurate)
|
||||
self.load_hot_state(state_root)
|
||||
}
|
||||
} else {
|
||||
match self.load_hot_state(state_root, StateRootStrategy::Accurate)? {
|
||||
match self.load_hot_state(state_root)? {
|
||||
Some(state) => Ok(Some(state)),
|
||||
None => self.load_cold_state(state_root),
|
||||
}
|
||||
@@ -386,6 +386,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
///
|
||||
/// - `state.state_roots`
|
||||
/// - `state.block_roots`
|
||||
// FIXME(sproul): delete this whole function
|
||||
pub fn get_inconsistent_state_for_attestation_verification_only(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
@@ -403,7 +404,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
.into())
|
||||
} else {
|
||||
self.load_hot_state(state_root, StateRootStrategy::Inconsistent)
|
||||
self.load_hot_state(state_root)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -495,11 +496,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
//
|
||||
// `StateRootStrategy` should be irrelevant here since we never replay blocks for an epoch
|
||||
// boundary state in the hot DB.
|
||||
let state = self
|
||||
.load_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)?
|
||||
.ok_or(HotColdDBError::MissingEpochBoundaryState(
|
||||
epoch_boundary_state_root,
|
||||
))?;
|
||||
let state = self.load_hot_state(&epoch_boundary_state_root)?.ok_or(
|
||||
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
|
||||
)?;
|
||||
Ok(Some(state))
|
||||
} else {
|
||||
// Try the cold DB
|
||||
@@ -638,11 +637,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// Load a post-finalization state from the hot database.
|
||||
///
|
||||
/// Will replay blocks from the nearest epoch boundary.
|
||||
pub fn load_hot_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
state_root_strategy: StateRootStrategy,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
pub fn load_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
|
||||
let _timer = metrics::start_timer(&metrics::BEACON_HOT_STATE_READ_TIMES);
|
||||
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
|
||||
|
||||
// If the state is marked as temporary, do not return it. It will become visible
|
||||
@@ -655,6 +651,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
slot,
|
||||
latest_block_root,
|
||||
epoch_boundary_state_root,
|
||||
prev_state_root,
|
||||
}) = self.load_hot_state_summary(state_root)?
|
||||
{
|
||||
let boundary_state =
|
||||
@@ -669,13 +666,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
} else {
|
||||
let blocks =
|
||||
self.load_blocks_to_replay(boundary_state.slot(), slot, latest_block_root)?;
|
||||
self.replay_blocks(
|
||||
boundary_state,
|
||||
blocks,
|
||||
|
||||
let state_root_iter = ForwardsHotStateRootIter::new(
|
||||
self,
|
||||
boundary_state.slot(),
|
||||
slot,
|
||||
no_state_root_iter(),
|
||||
state_root_strategy,
|
||||
)?
|
||||
*state_root,
|
||||
prev_state_root,
|
||||
)?;
|
||||
self.replay_blocks(boundary_state, blocks, slot, state_root_iter)?
|
||||
};
|
||||
|
||||
Ok(Some(state))
|
||||
@@ -813,13 +812,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&self.spec,
|
||||
)?;
|
||||
|
||||
self.replay_blocks(
|
||||
low_restore_point,
|
||||
blocks,
|
||||
slot,
|
||||
Some(state_root_iter),
|
||||
StateRootStrategy::Accurate,
|
||||
)
|
||||
self.replay_blocks(low_restore_point, blocks, slot, state_root_iter)
|
||||
}
|
||||
|
||||
/// Get the restore point with the given index, or if it is out of bounds, the split state.
|
||||
@@ -905,31 +898,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
state: BeaconState<E>,
|
||||
blocks: Vec<SignedBeaconBlock<E>>,
|
||||
target_slot: Slot,
|
||||
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
|
||||
state_root_strategy: StateRootStrategy,
|
||||
state_root_iter: impl Iterator<Item = Result<(Hash256, Slot), Error>>,
|
||||
) -> Result<BeaconState<E>, Error> {
|
||||
let mut block_replayer = BlockReplayer::new(state, &self.spec)
|
||||
.state_root_strategy(state_root_strategy)
|
||||
BlockReplayer::new(state, &self.spec)
|
||||
.no_signature_verification()
|
||||
.minimal_block_root_verification();
|
||||
|
||||
let have_state_root_iterator = state_root_iter.is_some();
|
||||
if let Some(state_root_iter) = state_root_iter {
|
||||
block_replayer = block_replayer.state_root_iter(state_root_iter);
|
||||
}
|
||||
|
||||
block_replayer
|
||||
.minimal_block_root_verification()
|
||||
.state_root_iter(state_root_iter)
|
||||
.apply_blocks(blocks, Some(target_slot))
|
||||
.map(|block_replayer| {
|
||||
if have_state_root_iterator && block_replayer.state_root_miss() {
|
||||
warn!(
|
||||
self.log,
|
||||
"State root iterator miss";
|
||||
"slot" => target_slot,
|
||||
);
|
||||
.and_then(|block_replayer| {
|
||||
if block_replayer.state_root_miss() {
|
||||
Err(Error::MissingStateRoot(target_slot))
|
||||
} else {
|
||||
Ok(block_replayer.into_state())
|
||||
}
|
||||
|
||||
block_replayer.into_state()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1411,19 +1392,17 @@ impl StoreItem for Split {
|
||||
}
|
||||
}
|
||||
|
||||
/// Type hint.
|
||||
fn no_state_root_iter() -> Option<std::iter::Empty<Result<(Hash256, Slot), Error>>> {
|
||||
None
|
||||
}
|
||||
|
||||
/// Struct for summarising a state in the hot database.
|
||||
///
|
||||
/// Allows full reconstruction by replaying blocks.
|
||||
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
|
||||
pub struct HotStateSummary {
|
||||
slot: Slot,
|
||||
latest_block_root: Hash256,
|
||||
epoch_boundary_state_root: Hash256,
|
||||
pub(crate) slot: Slot,
|
||||
pub(crate) latest_block_root: Hash256,
|
||||
pub(crate) epoch_boundary_state_root: Hash256,
|
||||
/// The state root of the state at the prior slot.
|
||||
// FIXME(sproul): migrate
|
||||
pub(crate) prev_state_root: Hash256,
|
||||
}
|
||||
|
||||
impl StoreItem for HotStateSummary {
|
||||
@@ -1445,20 +1424,29 @@ impl HotStateSummary {
|
||||
pub fn new<E: EthSpec>(state_root: &Hash256, state: &BeaconState<E>) -> Result<Self, Error> {
|
||||
// Fill in the state root on the latest block header if necessary (this happens on all
|
||||
// slots where there isn't a skip).
|
||||
let slot = state.slot();
|
||||
let latest_block_root = state.get_latest_block_root(*state_root);
|
||||
let epoch_boundary_slot = state.slot() / E::slots_per_epoch() * E::slots_per_epoch();
|
||||
let epoch_boundary_state_root = if epoch_boundary_slot == state.slot() {
|
||||
let epoch_boundary_slot = slot / E::slots_per_epoch() * E::slots_per_epoch();
|
||||
let epoch_boundary_state_root = if epoch_boundary_slot == slot {
|
||||
*state_root
|
||||
} else {
|
||||
*state
|
||||
.get_state_root(epoch_boundary_slot)
|
||||
.map_err(HotColdDBError::HotStateSummaryError)?
|
||||
};
|
||||
let prev_state_root = if let Ok(prev_slot) = slot.safe_sub(1) {
|
||||
*state
|
||||
.get_state_root(prev_slot)
|
||||
.map_err(HotColdDBError::HotStateSummaryError)?
|
||||
} else {
|
||||
Hash256::zero()
|
||||
};
|
||||
|
||||
Ok(HotStateSummary {
|
||||
slot: state.slot(),
|
||||
latest_block_root,
|
||||
epoch_boundary_state_root,
|
||||
prev_state_root,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user