mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 13:24:39 +00:00
Overhaul state diffing
This commit is contained in:
@@ -1,3 +1,6 @@
|
|||||||
|
// FIXME(sproul): implement migration
|
||||||
|
#![allow(unused)]
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
beacon_chain::{BeaconChainTypes, BEACON_CHAIN_DB_KEY},
|
beacon_chain::{BeaconChainTypes, BEACON_CHAIN_DB_KEY},
|
||||||
persisted_beacon_chain::PersistedBeaconChain,
|
persisted_beacon_chain::PersistedBeaconChain,
|
||||||
@@ -25,6 +28,7 @@ fn get_state_by_replay<T: BeaconChainTypes>(
|
|||||||
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
|
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
|
||||||
state_root: Hash256,
|
state_root: Hash256,
|
||||||
) -> Result<BeaconState<T::EthSpec>, Error> {
|
) -> Result<BeaconState<T::EthSpec>, Error> {
|
||||||
|
/* FIXME(sproul): fix migration
|
||||||
// Load state summary.
|
// Load state summary.
|
||||||
let HotStateSummaryV1 {
|
let HotStateSummaryV1 {
|
||||||
slot,
|
slot,
|
||||||
@@ -39,12 +43,15 @@ fn get_state_by_replay<T: BeaconChainTypes>(
|
|||||||
let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?;
|
let blocks = db.load_blocks_to_replay(epoch_boundary_state.slot(), slot, latest_block_root)?;
|
||||||
|
|
||||||
db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty(), None)
|
db.replay_blocks(epoch_boundary_state, blocks, slot, std::iter::empty(), None)
|
||||||
|
*/
|
||||||
|
panic!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn upgrade_to_v20<T: BeaconChainTypes>(
|
pub fn upgrade_to_v20<T: BeaconChainTypes>(
|
||||||
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
/* FIXME(sproul): fix this
|
||||||
let mut ops = vec![];
|
let mut ops = vec![];
|
||||||
|
|
||||||
// Translate hot state summaries to new format:
|
// Translate hot state summaries to new format:
|
||||||
@@ -191,12 +198,15 @@ pub fn upgrade_to_v20<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
db.store_schema_version_atomically(SchemaVersion(20), ops)
|
db.store_schema_version_atomically(SchemaVersion(20), ops)
|
||||||
|
*/
|
||||||
|
panic!()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn downgrade_from_v20<T: BeaconChainTypes>(
|
pub fn downgrade_from_v20<T: BeaconChainTypes>(
|
||||||
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
/* FIXME(sproul): broken
|
||||||
let slots_per_epoch = T::EthSpec::slots_per_epoch();
|
let slots_per_epoch = T::EthSpec::slots_per_epoch();
|
||||||
|
|
||||||
// Iterate hot state summaries and re-write them so that:
|
// Iterate hot state summaries and re-write them so that:
|
||||||
@@ -258,4 +268,6 @@ pub fn downgrade_from_v20<T: BeaconChainTypes>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
db.store_schema_version_atomically(SchemaVersion(8), ops)
|
db.store_schema_version_atomically(SchemaVersion(8), ops)
|
||||||
|
*/
|
||||||
|
panic!()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use types::{EthSpec, MinimalEthSpec};
|
|||||||
|
|
||||||
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
||||||
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
|
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
|
||||||
|
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 4;
|
||||||
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
||||||
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
||||||
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||||
@@ -18,6 +19,8 @@ pub struct StoreConfig {
|
|||||||
pub slots_per_restore_point: u64,
|
pub slots_per_restore_point: u64,
|
||||||
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
|
/// Flag indicating whether the `slots_per_restore_point` was set explicitly by the user.
|
||||||
pub slots_per_restore_point_set_explicitly: bool,
|
pub slots_per_restore_point_set_explicitly: bool,
|
||||||
|
/// Number of epochs between state diffs in the hot database.
|
||||||
|
pub epochs_per_state_diff: u64,
|
||||||
/// Maximum number of blocks to store in the in-memory block cache.
|
/// Maximum number of blocks to store in the in-memory block cache.
|
||||||
pub block_cache_size: usize,
|
pub block_cache_size: usize,
|
||||||
/// Maximum number of states to store in the in-memory state cache.
|
/// Maximum number of states to store in the in-memory state cache.
|
||||||
@@ -38,9 +41,9 @@ pub struct StoreConfig {
|
|||||||
|
|
||||||
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
|
||||||
|
// FIXME(sproul): schema migration
|
||||||
pub struct OnDiskStoreConfig {
|
pub struct OnDiskStoreConfig {
|
||||||
pub slots_per_restore_point: u64,
|
pub slots_per_restore_point: u64,
|
||||||
// FIXME(sproul): schema migration
|
|
||||||
pub linear_blocks: bool,
|
pub linear_blocks: bool,
|
||||||
pub linear_restore_points: bool,
|
pub linear_restore_points: bool,
|
||||||
}
|
}
|
||||||
@@ -57,6 +60,7 @@ impl Default for StoreConfig {
|
|||||||
// Safe default for tests, shouldn't ever be read by a CLI node.
|
// Safe default for tests, shouldn't ever be read by a CLI node.
|
||||||
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
||||||
slots_per_restore_point_set_explicitly: false,
|
slots_per_restore_point_set_explicitly: false,
|
||||||
|
epochs_per_state_diff: DEFAULT_EPOCHS_PER_STATE_DIFF,
|
||||||
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
||||||
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
|
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
|
||||||
compression_level: DEFAULT_COMPRESSION_LEVEL,
|
compression_level: DEFAULT_COMPRESSION_LEVEL,
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ use state_processing::{
|
|||||||
block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError,
|
block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError,
|
||||||
};
|
};
|
||||||
use std::cmp::min;
|
use std::cmp::min;
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
@@ -48,7 +49,7 @@ use types::*;
|
|||||||
use types::{beacon_state::BeaconStateDiff, EthSpec};
|
use types::{beacon_state::BeaconStateDiff, EthSpec};
|
||||||
use zstd::{Decoder, Encoder};
|
use zstd::{Decoder, Encoder};
|
||||||
|
|
||||||
pub const MAX_PARENT_STATES_TO_CACHE: u64 = 32;
|
pub const MAX_PARENT_STATES_TO_CACHE: u64 = 1;
|
||||||
|
|
||||||
/// On-disk database that stores finalized states efficiently.
|
/// On-disk database that stores finalized states efficiently.
|
||||||
///
|
///
|
||||||
@@ -891,12 +892,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
// Store a summary of the state.
|
// Store a summary of the state.
|
||||||
// We store one even for the epoch boundary states, as we may need their slots
|
// We store one even for the epoch boundary states, as we may need their slots
|
||||||
// when doing a look up by state root.
|
// when doing a look up by state root.
|
||||||
let hot_state_summary = HotStateSummary::new(state_root, state)?;
|
let diff_base_slot = self.state_diff_slot(state.slot());
|
||||||
|
|
||||||
|
let hot_state_summary = HotStateSummary::new(state_root, state, diff_base_slot)?;
|
||||||
let op = hot_state_summary.as_kv_store_op(*state_root)?;
|
let op = hot_state_summary.as_kv_store_op(*state_root)?;
|
||||||
ops.push(op);
|
ops.push(op);
|
||||||
|
|
||||||
// On the epoch boundary, store a diff from the previous epoch boundary state -- unless
|
// On an epoch boundary, consider storing:
|
||||||
// we're at a fork boundary in which case the full state must be stored.
|
//
|
||||||
|
// 1. A full state, if the state is the split state or a fork boundary state.
|
||||||
|
// 2. A state diff, if the state is a multiple of `epochs_per_state_diff` after the
|
||||||
|
// split state.
|
||||||
if state.slot() % E::slots_per_epoch() == 0 {
|
if state.slot() % E::slots_per_epoch() == 0 {
|
||||||
if self.is_stored_as_full_state(*state_root, state.slot())? {
|
if self.is_stored_as_full_state(*state_root, state.slot())? {
|
||||||
info!(
|
info!(
|
||||||
@@ -906,25 +912,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
"state_root" => ?state_root,
|
"state_root" => ?state_root,
|
||||||
);
|
);
|
||||||
self.store_full_state_in_batch(state_root, state, ops)?;
|
self.store_full_state_in_batch(state_root, state, ops)?;
|
||||||
} else {
|
} else if let Some(base_slot) = diff_base_slot {
|
||||||
/* FIXME(sproul): disabling this biz
|
|
||||||
debug!(
|
debug!(
|
||||||
self.log,
|
self.log,
|
||||||
"Storing state diff on epoch boundary";
|
"Storing state diff on boundary";
|
||||||
"slot" => state.slot(),
|
"slot" => state.slot(),
|
||||||
|
"base_slot" => base_slot,
|
||||||
"state_root" => ?state_root,
|
"state_root" => ?state_root,
|
||||||
);
|
);
|
||||||
let prev_epoch_state_root = hot_state_summary.epoch_boundary_state_root;
|
let diff_base_state_root = hot_state_summary.diff_base_state_root;
|
||||||
let prev_boundary_state = self.get_hot_state(&prev_epoch_state_root)?.ok_or(
|
let diff_base_state = self.get_hot_state(&diff_base_state_root)?.ok_or(
|
||||||
HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root),
|
HotColdDBError::MissingEpochBoundaryState(diff_base_state_root),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let compute_diff_timer =
|
let compute_diff_timer =
|
||||||
metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME);
|
metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME);
|
||||||
let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?;
|
let diff = BeaconStateDiff::compute_diff(&diff_base_state, state)?;
|
||||||
drop(compute_diff_timer);
|
drop(compute_diff_timer);
|
||||||
ops.push(self.state_diff_as_kv_store_op(state_root, &diff)?);
|
ops.push(self.state_diff_as_kv_store_op(state_root, &diff)?);
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -975,7 +980,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
|
|
||||||
/// Load a post-finalization state from the hot database.
|
/// Load a post-finalization state from the hot database.
|
||||||
///
|
///
|
||||||
/// Will replay blocks from the nearest epoch boundary.
|
/// Use a combination of state diffs and replayed blocks as appropriate.
|
||||||
///
|
///
|
||||||
/// Return the `(state, latest_block_root)` if found.
|
/// Return the `(state, latest_block_root)` if found.
|
||||||
pub fn load_hot_state(
|
pub fn load_hot_state(
|
||||||
@@ -991,156 +996,223 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
return self.load_hot_state_full(state_root).map(Some);
|
return self.load_hot_state_full(state_root).map(Some);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(HotStateSummary {
|
let target_summary = if let Some(summary) = self.load_hot_state_summary(state_root)? {
|
||||||
slot,
|
summary
|
||||||
latest_block_root,
|
} else {
|
||||||
epoch_boundary_state_root,
|
return Ok(None);
|
||||||
prev_state_root: _,
|
};
|
||||||
}) = self.load_hot_state_summary(state_root)?
|
|
||||||
|
let target_slot = target_summary.slot;
|
||||||
|
let target_latest_block_root = target_summary.latest_block_root;
|
||||||
|
|
||||||
|
// Load the latest block, and use it to confirm the validity of this state.
|
||||||
|
if self
|
||||||
|
.get_blinded_block(&target_summary.latest_block_root, None)?
|
||||||
|
.is_none()
|
||||||
{
|
{
|
||||||
// Load the latest block, and use it to confirm the validity of this state.
|
// Dangling state, will be deleted fully once finalization advances past it.
|
||||||
let latest_block =
|
debug!(
|
||||||
if let Some(block) = self.get_blinded_block(&latest_block_root, None)? {
|
self.log,
|
||||||
block
|
"Ignoring state load for dangling state";
|
||||||
} else {
|
"state_root" => ?state_root,
|
||||||
// Dangling state, will be deleted fully once finalization advances past it.
|
"slot" => target_slot,
|
||||||
debug!(
|
"latest_block_root" => ?target_summary.latest_block_root,
|
||||||
self.log,
|
);
|
||||||
"Ignoring state load for dangling state";
|
return Ok(None);
|
||||||
"state_root" => ?state_root,
|
}
|
||||||
"slot" => slot,
|
|
||||||
"latest_block_root" => ?latest_block_root,
|
|
||||||
);
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
// On a fork boundary slot load a full state from disk.
|
// Backtrack until we reach a state that is in the cache, or in the worst case
|
||||||
if self.spec.fork_activated_at_slot::<E>(slot).is_some() {
|
// the finalized state (this should only be reachable on first start-up).
|
||||||
return self.load_hot_state_full(state_root).map(Some);
|
let state_summary_iter = HotStateRootIter::new(self, target_slot, *state_root);
|
||||||
|
|
||||||
|
// State and state root of the state upon which blocks and diffs will be replayed.
|
||||||
|
let mut base_state = None;
|
||||||
|
|
||||||
|
// State diffs to be replayed on top of `base_state`.
|
||||||
|
// Each element is `(summary, state_root, diff)` such that applying `diff` to the
|
||||||
|
// state with `summary.diff_base_state_root` yields the state with `state_root`.
|
||||||
|
let mut state_diffs = VecDeque::new();
|
||||||
|
|
||||||
|
// State roots for all slots between `base_state` and the `target_slot`. Depending on how
|
||||||
|
// the diffs fall, some of these roots may not be needed.
|
||||||
|
let mut state_roots = VecDeque::new();
|
||||||
|
|
||||||
|
for res in state_summary_iter {
|
||||||
|
let (prior_state_root, prior_summary) = res?;
|
||||||
|
|
||||||
|
state_roots.push_front(Ok((prior_state_root, prior_summary.slot)));
|
||||||
|
|
||||||
|
// Check if this state is in the cache.
|
||||||
|
if let Some(state) = self.state_cache.lock().get_by_state_root(prior_state_root) {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Found cached base state for replay";
|
||||||
|
"base_state_root" => ?prior_state_root,
|
||||||
|
"base_slot" => prior_summary.slot,
|
||||||
|
"target_state_root" => ?state_root,
|
||||||
|
"target_slot" => target_slot,
|
||||||
|
);
|
||||||
|
base_state = Some((prior_state_root, state));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// On any other epoch boundary load and apply a diff.
|
// If the prior state is the split state and it isn't cached then load it in
|
||||||
/* FIXME(sproul): disabled temporarily
|
// entirety from disk. This should only happen on first start up.
|
||||||
if slot % E::slots_per_epoch() == 0 {
|
if prior_state_root == self.get_split_info().state_root {
|
||||||
return self
|
debug!(
|
||||||
.load_state_from_diff(*state_root, epoch_boundary_state_root)
|
self.log,
|
||||||
.map(Some);
|
"Using split state as base state for replay";
|
||||||
}
|
"base_state_root" => ?prior_state_root,
|
||||||
*/
|
"base_slot" => prior_summary.slot,
|
||||||
|
"target_state_root" => ?state_root,
|
||||||
// Backtrack until we reach a state that is in the cache, or in the worst case
|
"target_slot" => target_slot,
|
||||||
// the finalized state (this should only be reachable on first start-up).
|
);
|
||||||
let state_root_iter = HotStateRootIter::new(self, slot, *state_root);
|
let (split_state, _) = self.load_hot_state_full(&prior_state_root)?;
|
||||||
let mut state_roots = Vec::with_capacity(32);
|
base_state = Some((prior_state_root, split_state));
|
||||||
let mut state = None;
|
break;
|
||||||
|
|
||||||
for res in state_root_iter {
|
|
||||||
let (prior_state_root, prior_slot) = res?;
|
|
||||||
|
|
||||||
state_roots.push(Ok((prior_state_root, prior_slot)));
|
|
||||||
|
|
||||||
// Check if this state is in the cache.
|
|
||||||
if let Some(base_state) =
|
|
||||||
self.state_cache.lock().get_by_state_root(prior_state_root)
|
|
||||||
{
|
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Found cached base state for replay";
|
|
||||||
"base_state_root" => ?prior_state_root,
|
|
||||||
"base_slot" => prior_slot,
|
|
||||||
"target_state_root" => ?state_root,
|
|
||||||
"target_slot" => slot,
|
|
||||||
);
|
|
||||||
state = Some(base_state);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the prior state is the split state and it isn't cached then load it in
|
|
||||||
// entirety from disk. This should only happen on first start up.
|
|
||||||
if prior_state_root == self.get_split_info().state_root {
|
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Using split state as base state for replay";
|
|
||||||
"base_state_root" => ?prior_state_root,
|
|
||||||
"base_slot" => prior_slot,
|
|
||||||
"target_state_root" => ?state_root,
|
|
||||||
"target_slot" => slot,
|
|
||||||
);
|
|
||||||
let (split_state, _) = self.load_hot_state_full(&prior_state_root)?;
|
|
||||||
state = Some(split_state);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let base_state = state.ok_or(Error::NoBaseStateFound(*state_root))?;
|
// If there's a state diff stored at this slot, load it and store it for application.
|
||||||
|
if !prior_summary.diff_base_state_root.is_zero() {
|
||||||
|
let diff = self.load_state_diff(prior_state_root)?;
|
||||||
|
state_diffs.push_front((prior_summary, prior_state_root, diff));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Reverse the collected state roots so that they are in slot ascending order.
|
let (_, mut state) = base_state.ok_or(Error::NoBaseStateFound(*state_root))?;
|
||||||
state_roots.reverse();
|
|
||||||
|
|
||||||
// Collect the blocks to replay.
|
// Construct a mutable iterator for the state roots, which will be iterated through
|
||||||
// We already have the latest block loaded, which is sufficient if the base state is
|
// consecutive calls to `replay_blocks`.
|
||||||
// just one slot behind the state to be constructed.
|
let mut state_roots_iter = state_roots.into_iter();
|
||||||
let mut blocks = if base_state.slot() + 1 == slot {
|
|
||||||
Vec::with_capacity(1)
|
|
||||||
} else {
|
|
||||||
self.load_blocks_to_replay(base_state.slot(), slot - 1, latest_block.parent_root())?
|
|
||||||
};
|
|
||||||
blocks.push(latest_block);
|
|
||||||
|
|
||||||
let state_cacher_hook: PreSlotHook<_, _> = Box::new(|opt_state_root, state| {
|
|
||||||
// Ensure all caches are built before attempting to cache.
|
|
||||||
state.update_tree_hash_cache()?;
|
|
||||||
state.build_all_caches(&self.spec)?;
|
|
||||||
|
|
||||||
if let Some(state_root) = opt_state_root {
|
|
||||||
// Cache
|
|
||||||
if state.slot() + MAX_PARENT_STATES_TO_CACHE > slot
|
|
||||||
|| state.slot() % E::slots_per_epoch() == 0
|
|
||||||
{
|
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Caching ancestor state";
|
|
||||||
"state_root" => ?state_root,
|
|
||||||
"slot" => state.slot(),
|
|
||||||
);
|
|
||||||
// FIXME(sproul): this block root could be optimized out
|
|
||||||
let latest_block_root = state.get_latest_block_root(state_root);
|
|
||||||
self.state_cache
|
|
||||||
.lock()
|
|
||||||
.put_state(state_root, latest_block_root, state)?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
debug!(
|
|
||||||
self.log,
|
|
||||||
"Block replay state root miss";
|
|
||||||
"slot" => state.slot(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut state = self.replay_blocks(
|
|
||||||
base_state,
|
|
||||||
blocks,
|
|
||||||
slot,
|
|
||||||
state_roots.into_iter(),
|
|
||||||
Some(state_cacher_hook),
|
|
||||||
)?;
|
|
||||||
|
|
||||||
|
// This hook caches states from block replay so that they may be reused.
|
||||||
|
let state_cacher_hook = |opt_state_root: Option<Hash256>, state: &mut BeaconState<_>| {
|
||||||
|
// Ensure all caches are built before attempting to cache.
|
||||||
state.update_tree_hash_cache()?;
|
state.update_tree_hash_cache()?;
|
||||||
state.build_all_caches(&self.spec)?;
|
state.build_all_caches(&self.spec)?;
|
||||||
|
|
||||||
Ok(Some((state, latest_block_root)))
|
if let Some(state_root) = opt_state_root {
|
||||||
} else {
|
// Cache
|
||||||
Ok(None)
|
if state.slot() + MAX_PARENT_STATES_TO_CACHE >= target_slot
|
||||||
|
|| state.slot() % E::slots_per_epoch() == 0
|
||||||
|
{
|
||||||
|
let slot = state.slot();
|
||||||
|
let latest_block_root = state.get_latest_block_root(state_root);
|
||||||
|
if let PutStateOutcome::New =
|
||||||
|
self.state_cache
|
||||||
|
.lock()
|
||||||
|
.put_state(state_root, latest_block_root, state)?
|
||||||
|
{
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Cached ancestor state";
|
||||||
|
"state_root" => ?state_root,
|
||||||
|
"slot" => slot,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Block replay state root miss";
|
||||||
|
"slot" => state.slot(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
};
|
||||||
|
|
||||||
|
// Apply the diffs, and replay blocks atop the base state to reach the target state.
|
||||||
|
while state.slot() < target_slot {
|
||||||
|
// Drop unncessary diffs.
|
||||||
|
state_diffs.retain(|(summary, diff_root, _)| {
|
||||||
|
let keep = summary.diff_base_slot >= state.slot();
|
||||||
|
if !keep {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Ignoring irrelevant state diff";
|
||||||
|
"diff_state_root" => ?diff_root,
|
||||||
|
"diff_base_slot" => summary.diff_base_slot,
|
||||||
|
"current_state_slot" => state.slot(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
keep
|
||||||
|
});
|
||||||
|
|
||||||
|
// Get the next diff that will be applicable, taking the highest slot diff in case of
|
||||||
|
// multiple diffs which are applicable at the same base slot, which can happen if the
|
||||||
|
// diff frequency has changed.
|
||||||
|
let mut next_state_diff: Option<(HotStateSummary, Hash256, BeaconStateDiff<_>)> = None;
|
||||||
|
while let Some((summary, _, _)) = state_diffs.front() {
|
||||||
|
if next_state_diff.as_ref().map_or(true, |(current, _, _)| {
|
||||||
|
summary.diff_base_slot == current.diff_base_slot
|
||||||
|
}) {
|
||||||
|
next_state_diff = state_diffs.pop_front();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replay blocks to get to the next diff's base state, or to the target state if there
|
||||||
|
// is no next diff to apply.
|
||||||
|
if next_state_diff
|
||||||
|
.as_ref()
|
||||||
|
.map_or(true, |(next_summary, _, _)| {
|
||||||
|
next_summary.diff_base_slot != state.slot()
|
||||||
|
})
|
||||||
|
{
|
||||||
|
let (next_slot, latest_block_root) = next_state_diff
|
||||||
|
.as_ref()
|
||||||
|
.map(|(summary, _, _)| (summary.diff_base_slot, summary.latest_block_root))
|
||||||
|
.unwrap_or_else(|| (target_summary.slot, target_latest_block_root));
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Replaying blocks";
|
||||||
|
"from_slot" => state.slot(),
|
||||||
|
"to_slot" => next_slot,
|
||||||
|
"latest_block_root" => ?latest_block_root,
|
||||||
|
);
|
||||||
|
let blocks =
|
||||||
|
self.load_blocks_to_replay(state.slot(), next_slot, latest_block_root)?;
|
||||||
|
|
||||||
|
state = self.replay_blocks(
|
||||||
|
state,
|
||||||
|
blocks,
|
||||||
|
next_slot,
|
||||||
|
&mut state_roots_iter,
|
||||||
|
Some(Box::new(state_cacher_hook)),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
state.update_tree_hash_cache()?;
|
||||||
|
state.build_all_caches(&self.spec)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply state diff. Block replay should have ensured that the diff is now applicable.
|
||||||
|
if let Some((summary, to_root, diff)) = next_state_diff {
|
||||||
|
debug!(
|
||||||
|
self.log,
|
||||||
|
"Applying state diff";
|
||||||
|
"from_root" => ?summary.diff_base_state_root,
|
||||||
|
"from_slot" => summary.diff_base_slot,
|
||||||
|
"to_root" => ?to_root,
|
||||||
|
"to_slot" => summary.slot,
|
||||||
|
);
|
||||||
|
debug_assert_eq!(summary.diff_base_slot, state.slot());
|
||||||
|
|
||||||
|
diff.apply_diff(&mut state)?;
|
||||||
|
|
||||||
|
state.update_tree_hash_cache()?;
|
||||||
|
state.build_all_caches(&self.spec)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(Some((state, target_latest_block_root)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Determine if the `state_root` at `slot` should be stored as a full state.
|
/// Determine if the `state_root` at `slot` should be stored as a full state.
|
||||||
///
|
///
|
||||||
/// This is dependent on the database's current split point, so may change from `false` to
|
/// This is dependent on the database's current split point, so may change from `false` to
|
||||||
/// `true` after a finalization update. It cannot change from `true` to `false` for a state in
|
/// `true` after a finalization update. It cannot change from `true` to `false` for a state in
|
||||||
/// the hot database as the split state will be migrated to
|
/// the hot database as the split state will be migrated to the freezer.
|
||||||
///
|
///
|
||||||
/// All fork boundary states are also stored as full states.
|
/// All fork boundary states are also stored as full states.
|
||||||
pub fn is_stored_as_full_state(&self, state_root: Hash256, slot: Slot) -> Result<bool, Error> {
|
pub fn is_stored_as_full_state(&self, state_root: Hash256, slot: Slot) -> Result<bool, Error> {
|
||||||
@@ -1154,6 +1226,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Determine if a state diff should be stored at `slot`.
|
||||||
|
///
|
||||||
|
/// If `Some(base_slot)` is returned then a state diff should be constructed for the state
|
||||||
|
/// at `slot` based on the ancestor state at `base_slot`. The frequency of state diffs stored
|
||||||
|
/// on disk is determined by the `epochs_per_state_diff` parameter.
|
||||||
|
pub fn state_diff_slot(&self, slot: Slot) -> Option<Slot> {
|
||||||
|
let split = self.get_split_info();
|
||||||
|
let slots_per_epoch = E::slots_per_epoch();
|
||||||
|
|
||||||
|
if slot % slots_per_epoch != 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let epochs_since_split = slot.saturating_sub(split.slot).epoch(slots_per_epoch);
|
||||||
|
|
||||||
|
(epochs_since_split > 0 && epochs_since_split % self.config.epochs_per_state_diff == 0)
|
||||||
|
.then(|| slot.saturating_sub(self.config.epochs_per_state_diff * slots_per_epoch))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn load_hot_state_full(
|
pub fn load_hot_state_full(
|
||||||
&self,
|
&self,
|
||||||
state_root: &Hash256,
|
state_root: &Hash256,
|
||||||
@@ -1177,25 +1268,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
Ok((state, latest_block_root))
|
Ok((state, latest_block_root))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_state_from_diff(
|
|
||||||
&self,
|
|
||||||
state_root: Hash256,
|
|
||||||
prev_epoch_state_root: Hash256,
|
|
||||||
) -> Result<(BeaconState<E>, Hash256), Error> {
|
|
||||||
let diff = self.load_state_diff(state_root)?;
|
|
||||||
let mut state = self.get_hot_state(&prev_epoch_state_root)?.ok_or(
|
|
||||||
HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root),
|
|
||||||
)?;
|
|
||||||
diff.apply_diff(&mut state)?;
|
|
||||||
|
|
||||||
// Do a tree hash here so that the cache is fully built.
|
|
||||||
state.update_tree_hash_cache()?;
|
|
||||||
state.build_all_caches(&self.spec)?;
|
|
||||||
|
|
||||||
let latest_block_root = state.get_latest_block_root(state_root);
|
|
||||||
Ok((state, latest_block_root))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store a pre-finalization state in the freezer database.
|
/// Store a pre-finalization state in the freezer database.
|
||||||
///
|
///
|
||||||
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
|
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
|
||||||
@@ -2112,8 +2184,15 @@ impl StoreItem for Split {
|
|||||||
pub struct HotStateSummary {
|
pub struct HotStateSummary {
|
||||||
pub slot: Slot,
|
pub slot: Slot,
|
||||||
pub latest_block_root: Hash256,
|
pub latest_block_root: Hash256,
|
||||||
/// The state root of the state at the prior epoch boundary.
|
/// The state root of a state prior to this state with respect to which this state's diff is
|
||||||
pub epoch_boundary_state_root: Hash256,
|
/// stored.
|
||||||
|
///
|
||||||
|
/// Set to 0 if this state *is not* stored as a diff.
|
||||||
|
///
|
||||||
|
/// Formerly known as the `epoch_boundary_state_root`.
|
||||||
|
pub diff_base_state_root: Hash256,
|
||||||
|
/// The slot of the state with `diff_base_state_root`, or 0 if no diff is stored.
|
||||||
|
pub diff_base_slot: Slot,
|
||||||
/// The state root of the state at the prior slot.
|
/// The state root of the state at the prior slot.
|
||||||
#[superstruct(only(V10))]
|
#[superstruct(only(V10))]
|
||||||
pub prev_state_root: Hash256,
|
pub prev_state_root: Hash256,
|
||||||
@@ -2143,19 +2222,25 @@ impl_store_item_summary!(HotStateSummaryV10);
|
|||||||
|
|
||||||
impl HotStateSummary {
|
impl HotStateSummary {
|
||||||
/// Construct a new summary of the given state.
|
/// Construct a new summary of the given state.
|
||||||
pub fn new<E: EthSpec>(state_root: &Hash256, state: &BeaconState<E>) -> Result<Self, Error> {
|
pub fn new<E: EthSpec>(
|
||||||
|
state_root: &Hash256,
|
||||||
|
state: &BeaconState<E>,
|
||||||
|
diff_base_slot: Option<Slot>,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
// Fill in the state root on the latest block header if necessary (this happens on all
|
// Fill in the state root on the latest block header if necessary (this happens on all
|
||||||
// slots where there isn't a skip).
|
// slots where there isn't a skip).
|
||||||
let slot = state.slot();
|
let slot = state.slot();
|
||||||
let latest_block_root = state.get_latest_block_root(*state_root);
|
let latest_block_root = state.get_latest_block_root(*state_root);
|
||||||
let epoch_boundary_slot = (slot - 1) / E::slots_per_epoch() * E::slots_per_epoch();
|
|
||||||
let epoch_boundary_state_root = if epoch_boundary_slot == slot {
|
// Set the diff state root as appropriate.
|
||||||
*state_root
|
let diff_base_state_root = if let Some(base_slot) = diff_base_slot {
|
||||||
} else {
|
|
||||||
*state
|
*state
|
||||||
.get_state_root(epoch_boundary_slot)
|
.get_state_root(base_slot)
|
||||||
.map_err(HotColdDBError::HotStateSummaryError)?
|
.map_err(HotColdDBError::HotStateSummaryError)?
|
||||||
|
} else {
|
||||||
|
Hash256::zero()
|
||||||
};
|
};
|
||||||
|
|
||||||
let prev_state_root = if let Ok(prev_slot) = slot.safe_sub(1) {
|
let prev_state_root = if let Ok(prev_slot) = slot.safe_sub(1) {
|
||||||
*state
|
*state
|
||||||
.get_state_root(prev_slot)
|
.get_state_root(prev_slot)
|
||||||
@@ -2165,9 +2250,10 @@ impl HotStateSummary {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(HotStateSummary {
|
Ok(HotStateSummary {
|
||||||
slot: state.slot(),
|
slot,
|
||||||
latest_block_root,
|
latest_block_root,
|
||||||
epoch_boundary_state_root,
|
diff_base_state_root,
|
||||||
|
diff_base_slot: diff_base_slot.unwrap_or(Slot::new(0)),
|
||||||
prev_state_root,
|
prev_state_root,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, ItemStore};
|
use crate::{hot_cold_store::HotColdDBError, Error, HotColdDB, HotStateSummary, ItemStore};
|
||||||
use types::{EthSpec, Hash256, Slot};
|
use types::{EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
pub struct HotStateRootIter<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||||
@@ -20,7 +20,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotStateRootIter<'a,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>, Error> {
|
fn do_next(&mut self) -> Result<Option<(Hash256, HotStateSummary)>, Error> {
|
||||||
if self.next_state_root.is_zero() {
|
if self.next_state_root.is_zero() {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
@@ -30,20 +30,19 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotStateRootIter<'a,
|
|||||||
.load_hot_state_summary(&self.next_state_root)?
|
.load_hot_state_summary(&self.next_state_root)?
|
||||||
.ok_or(HotColdDBError::MissingHotStateSummary(self.next_state_root))?;
|
.ok_or(HotColdDBError::MissingHotStateSummary(self.next_state_root))?;
|
||||||
|
|
||||||
let slot = self.next_slot;
|
|
||||||
let state_root = self.next_state_root;
|
let state_root = self.next_state_root;
|
||||||
|
|
||||||
self.next_state_root = summary.prev_state_root;
|
self.next_state_root = summary.prev_state_root;
|
||||||
self.next_slot -= 1;
|
self.next_slot -= 1;
|
||||||
|
|
||||||
Ok(Some((state_root, slot)))
|
Ok(Some((state_root, summary)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||||
for HotStateRootIter<'a, E, Hot, Cold>
|
for HotStateRootIter<'a, E, Hot, Cold>
|
||||||
{
|
{
|
||||||
type Item = Result<(Hash256, Slot), Error>;
|
type Item = Result<(Hash256, HotStateSummary), Error>;
|
||||||
|
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
self.do_next().transpose()
|
self.do_next().transpose()
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ use std::collections::{hash_map::Entry, HashMap};
|
|||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use tree_hash::TreeHash;
|
use tree_hash::TreeHash;
|
||||||
use types::{
|
use types::{
|
||||||
Attestation, AttestationData, BeaconState, BeaconStateError, BitList, ChainSpec, EthSpec,
|
Attestation, AttestationData, BeaconState, BeaconStateError, BitList, ChainSpec, Epoch,
|
||||||
ExecPayload, Hash256, IndexedAttestation, SignedBeaconBlock, Slot,
|
EthSpec, ExecPayload, Hash256, IndexedAttestation, SignedBeaconBlock, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -31,6 +31,7 @@ pub enum ContextError {
|
|||||||
BeaconState(BeaconStateError),
|
BeaconState(BeaconStateError),
|
||||||
EpochCache(EpochCacheError),
|
EpochCache(EpochCacheError),
|
||||||
SlotMismatch { slot: Slot, expected: Slot },
|
SlotMismatch { slot: Slot, expected: Slot },
|
||||||
|
EpochMismatch { epoch: Epoch, expected: Epoch },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<BeaconStateError> for ContextError {
|
impl From<BeaconStateError> for ContextError {
|
||||||
@@ -62,12 +63,13 @@ impl<T: EthSpec> ConsensusContext<T> {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FIXME(sproul): extra safety checks?
|
||||||
pub fn get_proposer_index(
|
pub fn get_proposer_index(
|
||||||
&mut self,
|
&mut self,
|
||||||
state: &BeaconState<T>,
|
state: &BeaconState<T>,
|
||||||
spec: &ChainSpec,
|
spec: &ChainSpec,
|
||||||
) -> Result<u64, ContextError> {
|
) -> Result<u64, ContextError> {
|
||||||
self.check_slot(state.slot())?;
|
self.check_epoch(state.current_epoch())?;
|
||||||
|
|
||||||
if let Some(proposer_index) = self.proposer_index {
|
if let Some(proposer_index) = self.proposer_index {
|
||||||
return Ok(proposer_index);
|
return Ok(proposer_index);
|
||||||
@@ -109,6 +111,15 @@ impl<T: EthSpec> ConsensusContext<T> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_epoch(&self, epoch: Epoch) -> Result<(), ContextError> {
|
||||||
|
let expected = self.slot.epoch(T::slots_per_epoch());
|
||||||
|
if epoch == expected {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(ContextError::EpochMismatch { epoch, expected })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_epoch_cache(mut self, epoch_cache: EpochCache) -> Self {
|
pub fn set_epoch_cache(mut self, epoch_cache: EpochCache) -> Self {
|
||||||
self.epoch_cache = Some(epoch_cache);
|
self.epoch_cache = Some(epoch_cache);
|
||||||
self
|
self
|
||||||
|
|||||||
Reference in New Issue
Block a user