mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-27 01:33:33 +00:00
Remove checkpoint alignment requirements and enable historic state pruning (#4610)
## Issue Addressed Closes #3210 Closes #3211 ## Proposed Changes - Checkpoint sync from the latest finalized state regardless of its alignment. - Add the `block_root` to the database's split point. This is _only_ added to the in-memory split in order to avoid a schema migration. See `load_split`. - Add a new method to the DB called `get_advanced_state`, which looks up a state _by block root_, with a `state_root` as fallback. Using this method prevents accidental accesses of the split's unadvanced state, which does not exist in the hot DB and is not guaranteed to exist in the freezer DB at all. Previously Lighthouse would look up this state _from the freezer DB_, even if it was required for block/attestation processing, which was suboptimal. - Replace several state look-ups in block and attestation processing with `get_advanced_state` so that they can't hit the split block's unadvanced state. - Do not store any states in the freezer database by default. All states will be deleted upon being evicted from the hot database unless `--reconstruct-historic-states` is set. The anchor info which was previously used for checkpoint sync is used to implement this, including when syncing from genesis. ## Additional Info Needs further testing. I want to stress-test the pruned database under Hydra. The `get_advanced_state` method is intended to become more relevant over time: `tree-states` includes an identically named method that returns advanced states from its in-memory cache. Co-authored-by: realbigsean <seananderson33@gmail.com>
This commit is contained in:
@@ -14,7 +14,7 @@ use crate::memory_store::MemoryStore;
|
||||
use crate::metadata::{
|
||||
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
||||
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
|
||||
SCHEMA_VERSION_KEY, SPLIT_KEY,
|
||||
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
|
||||
};
|
||||
use crate::metrics;
|
||||
use crate::{
|
||||
@@ -110,10 +110,10 @@ pub enum HotColdDBError {
|
||||
IterationError {
|
||||
unexpected_key: BytesKey,
|
||||
},
|
||||
AttestationStateIsFinalized {
|
||||
FinalizedStateNotInHotDatabase {
|
||||
split_slot: Slot,
|
||||
request_slot: Option<Slot>,
|
||||
state_root: Hash256,
|
||||
request_slot: Slot,
|
||||
block_root: Hash256,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -545,7 +545,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// upon that state (e.g., state roots). Additionally, only states from the hot store are
|
||||
/// returned.
|
||||
///
|
||||
/// See `Self::get_state` for information about `slot`.
|
||||
/// See `Self::get_advanced_hot_state` for information about `max_slot`.
|
||||
///
|
||||
/// ## Warning
|
||||
///
|
||||
@@ -557,23 +557,78 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// - `state.block_roots`
|
||||
pub fn get_inconsistent_state_for_attestation_verification_only(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
slot: Option<Slot>,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
block_root: &Hash256,
|
||||
max_slot: Slot,
|
||||
state_root: Hash256,
|
||||
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
|
||||
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
|
||||
self.get_advanced_hot_state_with_strategy(
|
||||
*block_root,
|
||||
max_slot,
|
||||
state_root,
|
||||
StateProcessingStrategy::Inconsistent,
|
||||
)
|
||||
}
|
||||
|
||||
let split_slot = self.get_split_slot();
|
||||
/// Get a state with `latest_block_root == block_root` advanced through to at most `max_slot`.
|
||||
///
|
||||
/// The `state_root` argument is used to look up the block's un-advanced state in case an
|
||||
/// advanced state is not found.
|
||||
///
|
||||
/// Return the `(result_state_root, state)` satisfying:
|
||||
///
|
||||
/// - `result_state_root == state.canonical_root()`
|
||||
/// - `state.slot() <= max_slot`
|
||||
/// - `state.get_latest_block_root(result_state_root) == block_root`
|
||||
///
|
||||
/// Presently this is only used to avoid loading the un-advanced split state, but in future will
|
||||
/// be expanded to return states from an in-memory cache.
|
||||
pub fn get_advanced_hot_state(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
max_slot: Slot,
|
||||
state_root: Hash256,
|
||||
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
|
||||
self.get_advanced_hot_state_with_strategy(
|
||||
block_root,
|
||||
max_slot,
|
||||
state_root,
|
||||
StateProcessingStrategy::Accurate,
|
||||
)
|
||||
}
|
||||
|
||||
if slot.map_or(false, |slot| slot < split_slot) {
|
||||
Err(HotColdDBError::AttestationStateIsFinalized {
|
||||
split_slot,
|
||||
request_slot: slot,
|
||||
state_root: *state_root,
|
||||
/// Same as `get_advanced_hot_state` but taking a `StateProcessingStrategy`.
|
||||
pub fn get_advanced_hot_state_with_strategy(
|
||||
&self,
|
||||
block_root: Hash256,
|
||||
max_slot: Slot,
|
||||
state_root: Hash256,
|
||||
state_processing_strategy: StateProcessingStrategy,
|
||||
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
|
||||
// Hold a read lock on the split point so it can't move while we're trying to load the
|
||||
// state.
|
||||
let split = self.split.read_recursive();
|
||||
|
||||
// Sanity check max-slot against the split slot.
|
||||
if max_slot < split.slot {
|
||||
return Err(HotColdDBError::FinalizedStateNotInHotDatabase {
|
||||
split_slot: split.slot,
|
||||
request_slot: max_slot,
|
||||
block_root,
|
||||
}
|
||||
.into())
|
||||
} else {
|
||||
self.load_hot_state(state_root, StateProcessingStrategy::Inconsistent)
|
||||
.into());
|
||||
}
|
||||
|
||||
let state_root = if block_root == split.block_root && split.slot <= max_slot {
|
||||
split.state_root
|
||||
} else {
|
||||
state_root
|
||||
};
|
||||
let state = self
|
||||
.load_hot_state(&state_root, state_processing_strategy)?
|
||||
.map(|state| (state_root, state));
|
||||
drop(split);
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
|
||||
@@ -1180,8 +1235,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
*self.split.read_recursive()
|
||||
}
|
||||
|
||||
pub fn set_split(&self, slot: Slot, state_root: Hash256) {
|
||||
*self.split.write() = Split { slot, state_root };
|
||||
pub fn set_split(&self, slot: Slot, state_root: Hash256, block_root: Hash256) {
|
||||
*self.split.write() = Split {
|
||||
slot,
|
||||
state_root,
|
||||
block_root,
|
||||
};
|
||||
}
|
||||
|
||||
/// Fetch the slot of the most recently stored restore point.
|
||||
@@ -1216,25 +1275,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
}
|
||||
|
||||
/// Initialise the anchor info for checkpoint sync starting from `block`.
|
||||
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<KeyValueStoreOp, Error> {
|
||||
pub fn init_anchor_info(
|
||||
&self,
|
||||
block: BeaconBlockRef<'_, E>,
|
||||
retain_historic_states: bool,
|
||||
) -> Result<KeyValueStoreOp, Error> {
|
||||
let anchor_slot = block.slot();
|
||||
let slots_per_restore_point = self.config.slots_per_restore_point;
|
||||
|
||||
// Set the `state_upper_limit` to the slot of the *next* restore point.
|
||||
// See `get_state_upper_limit` for rationale.
|
||||
let next_restore_point_slot = if anchor_slot % slots_per_restore_point == 0 {
|
||||
let state_upper_limit = if !retain_historic_states {
|
||||
STATE_UPPER_LIMIT_NO_RETAIN
|
||||
} else if anchor_slot % slots_per_restore_point == 0 {
|
||||
anchor_slot
|
||||
} else {
|
||||
// Set the `state_upper_limit` to the slot of the *next* restore point.
|
||||
// See `get_state_upper_limit` for rationale.
|
||||
(anchor_slot / slots_per_restore_point + 1) * slots_per_restore_point
|
||||
};
|
||||
let anchor_info = AnchorInfo {
|
||||
anchor_slot,
|
||||
oldest_block_slot: anchor_slot,
|
||||
oldest_block_parent: block.parent_root(),
|
||||
state_upper_limit: next_restore_point_slot,
|
||||
state_lower_limit: self.spec.genesis_slot,
|
||||
let anchor_info = if state_upper_limit == 0 && anchor_slot == 0 {
|
||||
// Genesis archive node: no anchor because we *will* store all states.
|
||||
None
|
||||
} else {
|
||||
Some(AnchorInfo {
|
||||
anchor_slot,
|
||||
oldest_block_slot: anchor_slot,
|
||||
oldest_block_parent: block.parent_root(),
|
||||
state_upper_limit,
|
||||
state_lower_limit: self.spec.genesis_slot,
|
||||
})
|
||||
};
|
||||
self.compare_and_set_anchor_info(None, Some(anchor_info))
|
||||
self.compare_and_set_anchor_info(None, anchor_info)
|
||||
}
|
||||
|
||||
/// Get a clone of the store's anchor info.
|
||||
@@ -1361,11 +1431,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config())
|
||||
}
|
||||
|
||||
/// Load the split point from disk.
|
||||
fn load_split(&self) -> Result<Option<Split>, Error> {
|
||||
/// Load the split point from disk, sans block root.
|
||||
fn load_split_partial(&self) -> Result<Option<Split>, Error> {
|
||||
self.hot_db.get(&SPLIT_KEY)
|
||||
}
|
||||
|
||||
/// Load the split point from disk, including block root.
|
||||
fn load_split(&self) -> Result<Option<Split>, Error> {
|
||||
match self.load_split_partial()? {
|
||||
Some(mut split) => {
|
||||
// Load the hot state summary to get the block root.
|
||||
let summary = self.load_hot_state_summary(&split.state_root)?.ok_or(
|
||||
HotColdDBError::MissingSplitState(split.state_root, split.slot),
|
||||
)?;
|
||||
split.block_root = summary.latest_block_root;
|
||||
Ok(Some(split))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Stage the split for storage to disk.
|
||||
pub fn store_split_in_batch(&self) -> KeyValueStoreOp {
|
||||
self.split.read_recursive().as_kv_store_op(SPLIT_KEY)
|
||||
@@ -1611,42 +1696,40 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
frozen_head_root: Hash256,
|
||||
frozen_head: &BeaconState<E>,
|
||||
finalized_state_root: Hash256,
|
||||
finalized_block_root: Hash256,
|
||||
finalized_state: &BeaconState<E>,
|
||||
) -> Result<(), Error> {
|
||||
debug!(
|
||||
store.log,
|
||||
"Freezer migration started";
|
||||
"slot" => frozen_head.slot()
|
||||
"slot" => finalized_state.slot()
|
||||
);
|
||||
|
||||
// 0. Check that the migration is sensible.
|
||||
// The new frozen head must increase the current split slot, and lie on an epoch
|
||||
// The new finalized state must increase the current split slot, and lie on an epoch
|
||||
// boundary (in order for the hot state summary scheme to work).
|
||||
let current_split_slot = store.split.read_recursive().slot;
|
||||
let anchor_slot = store
|
||||
.anchor_info
|
||||
.read_recursive()
|
||||
.as_ref()
|
||||
.map(|a| a.anchor_slot);
|
||||
let anchor_info = store.anchor_info.read_recursive().clone();
|
||||
let anchor_slot = anchor_info.as_ref().map(|a| a.anchor_slot);
|
||||
|
||||
if frozen_head.slot() < current_split_slot {
|
||||
if finalized_state.slot() < current_split_slot {
|
||||
return Err(HotColdDBError::FreezeSlotError {
|
||||
current_split_slot,
|
||||
proposed_split_slot: frozen_head.slot(),
|
||||
proposed_split_slot: finalized_state.slot(),
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
||||
if frozen_head.slot() % E::slots_per_epoch() != 0 {
|
||||
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into());
|
||||
if finalized_state.slot() % E::slots_per_epoch() != 0 {
|
||||
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
|
||||
}
|
||||
|
||||
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
|
||||
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
|
||||
let state_root_iter = RootsIterator::new(&store, frozen_head);
|
||||
let state_root_iter = RootsIterator::new(&store, finalized_state);
|
||||
for maybe_tuple in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, _, slot)) => {
|
||||
slot >= ¤t_split_slot
|
||||
@@ -1656,6 +1739,29 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
}) {
|
||||
let (block_root, state_root, slot) = maybe_tuple?;
|
||||
|
||||
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
|
||||
// delete the payload for the finalized block itself, but that's OK as we only guarantee
|
||||
// that payloads are present for slots >= the split slot. The payload fetching code is also
|
||||
// forgiving of missing payloads.
|
||||
if store.config.prune_payloads {
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
|
||||
|
||||
// Do not try to store states if a restore point is yet to be stored, or will never be
|
||||
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
|
||||
// which always needs to be copied from the hot DB to the freezer and should not be deleted.
|
||||
if slot != 0
|
||||
&& anchor_info
|
||||
.as_ref()
|
||||
.map_or(false, |anchor| slot < anchor.state_upper_limit)
|
||||
{
|
||||
debug!(store.log, "Pruning finalized state"; "slot" => slot);
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
|
||||
if slot % store.config.slots_per_restore_point == 0 {
|
||||
@@ -1674,17 +1780,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
// There are data dependencies between calls to `store_cold_state()` that prevent us from
|
||||
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
|
||||
store.cold_db.do_atomically(cold_db_ops)?;
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
|
||||
|
||||
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
|
||||
// delete the payload for the finalized block itself, but that's OK as we only guarantee
|
||||
// that payloads are present for slots >= the split slot. The payload fetching code is also
|
||||
// forgiving of missing payloads.
|
||||
if store.config.prune_payloads {
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
}
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
@@ -1724,8 +1819,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
// Before updating the in-memory split value, we flush it to disk first, so that should the
|
||||
// OS process die at this point, we pick up from the right place after a restart.
|
||||
let split = Split {
|
||||
slot: frozen_head.slot(),
|
||||
state_root: frozen_head_root,
|
||||
slot: finalized_state.slot(),
|
||||
state_root: finalized_state_root,
|
||||
block_root: finalized_block_root,
|
||||
};
|
||||
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
|
||||
|
||||
@@ -1741,7 +1837,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
debug!(
|
||||
store.log,
|
||||
"Freezer migration complete";
|
||||
"slot" => frozen_head.slot()
|
||||
"slot" => finalized_state.slot()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
@@ -1750,8 +1846,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
/// Struct for storing the split slot and state root in the database.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Default, Encode, Decode, Deserialize, Serialize)]
|
||||
pub struct Split {
|
||||
pub(crate) slot: Slot,
|
||||
pub(crate) state_root: Hash256,
|
||||
pub slot: Slot,
|
||||
pub state_root: Hash256,
|
||||
/// The block root of the split state.
|
||||
///
|
||||
/// This is used to provide special handling for the split state in the case where there are
|
||||
/// skipped slots. The split state will *always* be the advanced state, so callers
|
||||
/// who only have the finalized block root should use `get_advanced_hot_state` to get this state,
|
||||
/// rather than fetching `block.state_root()` (the unaligned state) which will have been pruned.
|
||||
#[ssz(skip_serializing, skip_deserializing)]
|
||||
pub block_root: Hash256,
|
||||
}
|
||||
|
||||
impl StoreItem for Split {
|
||||
|
||||
@@ -16,6 +16,9 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
|
||||
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
|
||||
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
|
||||
|
||||
/// State upper limit value used to indicate that a node is not storing historic states.
|
||||
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct SchemaVersion(pub u64);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user