Merge remote-tracking branch 'origin/unstable' into tree-states

This commit is contained in:
Michael Sproul
2023-09-13 11:25:18 +10:00
250 changed files with 13730 additions and 5455 deletions

View File

@@ -62,6 +62,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)?;
Ok(SimpleForwardsIterator { values })
}
fn freezer_upper_limit(&self, column: DBColumn) -> Option<Slot> {
let split_slot = self.get_split_slot();
if column == DBColumn::BeaconBlockRoots {
// Block roots are available up to the split slot.
Some(split_slot)
} else if column == DBColumn::BeaconStateRoots {
let anchor_info = self.get_anchor_info();
// There are no historic states stored if the state upper limit lies in the hot
// database. It hasn't been reached yet, and may never be.
if anchor_info.map_or(false, |a| a.state_upper_limit >= split_slot) {
None
} else {
// Otherwise if the state upper limit lies in the freezer or all states are
// reconstructed then state roots are available up to the split slot.
Some(split_slot)
}
} else {
None
}
}
}
/// Forwards root iterator that makes use of a flat field table in the freezer DB.
@@ -168,8 +189,8 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
///
/// The `get_state` closure should return a beacon state and final block/state root to backtrack
/// from in the case where the iterated range does not lie entirely within the frozen portion of
/// the database. If an `end_slot` is provided and it is before the database's latest restore
/// point slot then the `get_state` closure will not be called at all.
/// the database. If an `end_slot` is provided and it is before the database's freezer upper
/// limit for the field then the `get_state` closure will not be called at all.
///
/// It is OK for `get_state` to hold a lock while this function is evaluated, as the returned
/// iterator is as lazy as possible and won't do any work apart from calling `get_state`.
@@ -185,21 +206,27 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
) -> Result<Self> {
use HybridForwardsIterator::*;
let split_slot = store.get_split_slot();
// First slot at which this field is *not* available in the freezer. i.e. all slots less
// than this slot have their data available in the freezer.
let freezer_upper_limit = store.freezer_upper_limit(column).unwrap_or(Slot::new(0));
let result = if start_slot < split_slot {
let result = if start_slot < freezer_upper_limit {
let iter = Box::new(FrozenForwardsIterator::new(
store, column, start_slot, split_slot,
store,
column,
start_slot,
freezer_upper_limit,
));
// No continuation data is needed if the forwards iterator plans to halt before
// `end_slot`. If it tries to continue further a `NoContinuationData` error will be
// returned.
let continuation_data = if end_slot.map_or(false, |end_slot| end_slot < split_slot) {
None
} else {
Some(Box::new(get_state()))
};
let continuation_data =
if end_slot.map_or(false, |end_slot| end_slot < freezer_upper_limit) {
None
} else {
Some(Box::new(get_state()))
};
PreFinalization {
iter,
store,

View File

@@ -12,7 +12,7 @@ use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY,
SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::state_cache::{PutStateOutcome, StateCache};
@@ -31,7 +31,7 @@ use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
block_replayer::PreSlotHook, AllCaches, BlockProcessingError, BlockReplayer,
SlotProcessingError,
SlotProcessingError, StateProcessingStrategy,
};
use std::cmp::min;
use std::collections::VecDeque;
@@ -129,10 +129,10 @@ pub enum HotColdDBError {
IterationError {
unexpected_key: BytesKey,
},
AttestationStateIsFinalized {
FinalizedStateNotInHotDatabase {
split_slot: Slot,
request_slot: Option<Slot>,
state_root: Hash256,
request_slot: Slot,
block_root: Hash256,
},
}
@@ -323,8 +323,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
// Store on disk.
let mut ops = Vec::with_capacity(2);
let block = self.block_as_kv_store_ops(block_root, block, &mut ops)?;
self.hot_db.do_atomically(ops)?;
// Update cache.
self.block_cache.lock().put(*block_root, block);
Ok(())
@@ -467,8 +469,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<SignedBlindedBeaconBlock<E>>, Error> {
let split = self.get_split_info();
if let Some(slot) = slot {
if slot < self.get_split_slot() || slot == 0 {
if (slot < split.slot || slot == 0) && *block_root != split.block_root {
// To the freezer DB.
self.get_cold_blinded_block_by_slot(slot)
} else {
@@ -713,20 +716,98 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Get a state with `latest_block_root == block_root` advanced through to at most `slot`.
///
/// The `state_root` argument is used to look up the block's un-advanced state in case of a
/// cache miss.
pub fn get_advanced_state(
/// See `Self::get_advanced_hot_state` for information about `max_slot`.
///
/// ## Warning
///
/// The returned state **is not a valid beacon state**, it can only be used for obtaining
/// shuffling to process attestations. At least the following components of the state will be
/// broken/invalid:
///
/// - `state.state_roots`
/// - `state.block_roots`
pub fn get_inconsistent_state_for_attestation_verification_only(
&self,
block_root: Hash256,
slot: Slot,
block_root: &Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) = self.state_cache.lock().get_by_block_root(block_root, slot) {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
self.get_advanced_hot_state_with_strategy(
*block_root,
max_slot,
state_root,
StateProcessingStrategy::Inconsistent,
)
}
/// Get a state with `latest_block_root == block_root` advanced through to at most `max_slot`.
///
/// The `state_root` argument is used to look up the block's un-advanced state in case an
/// advanced state is not found.
///
/// Return the `(result_state_root, state)` satisfying:
///
/// - `result_state_root == state.canonical_root()`
/// - `state.slot() <= max_slot`
/// - `state.get_latest_block_root(result_state_root) == block_root`
///
/// Presently this is only used to avoid loading the un-advanced split state, but in future will
/// be expanded to return states from an in-memory cache.
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) = self
.state_cache
.lock()
.get_by_block_root(block_root, max_slot)
{
return Ok(Some(cached));
}
Ok(self
.get_hot_state(&state_root)?
.map(|state| (state_root, state)))
self.get_advanced_hot_state_with_strategy(
block_root,
max_slot,
state_root,
StateProcessingStrategy::Accurate,
)
}
/// Same as `get_advanced_hot_state` but taking a `StateProcessingStrategy`.
// FIXME(sproul): delete the state processing strategy stuff again
pub fn get_advanced_hot_state_with_strategy(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
_state_processing_strategy: StateProcessingStrategy,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
// Hold a read lock on the split point so it can't move while we're trying to load the
// state.
let split = self.split.read_recursive();
// Sanity check max-slot against the split slot.
if max_slot < split.slot {
return Err(HotColdDBError::FinalizedStateNotInHotDatabase {
split_slot: split.slot,
request_slot: max_slot,
block_root,
}
.into());
}
let state_root = if block_root == split.block_root && split.slot <= max_slot {
split.state_root
} else {
state_root
};
let opt_state = self
.load_hot_state(&state_root)?
.map(|(state, _block_root)| (state_root, state));
drop(split);
Ok(opt_state)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
@@ -1711,8 +1792,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
*self.split.read_recursive()
}
pub fn set_split(&self, slot: Slot, state_root: Hash256) {
*self.split.write() = Split { slot, state_root };
pub fn set_split(&self, slot: Slot, state_root: Hash256, block_root: Hash256) {
*self.split.write() = Split {
slot,
state_root,
block_root,
};
}
/// Load the database schema version from disk.
@@ -1741,20 +1826,33 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Initialise the anchor info for checkpoint sync starting from `block`.
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<KeyValueStoreOp, Error> {
pub fn init_anchor_info(
&self,
block: BeaconBlockRef<'_, E>,
retain_historic_states: bool,
) -> Result<KeyValueStoreOp, Error> {
let anchor_slot = block.slot();
// Set the `state_upper_limit` to the slot of the *next* checkpoint.
// See `get_state_upper_limit` for rationale.
let next_snapshot_slot = self.hierarchy.next_snapshot_slot(anchor_slot)?;
let anchor_info = AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
oldest_block_parent: block.parent_root(),
state_upper_limit: next_snapshot_slot,
state_lower_limit: self.spec.genesis_slot,
let state_upper_limit = if !retain_historic_states {
STATE_UPPER_LIMIT_NO_RETAIN
} else {
next_snapshot_slot
};
self.compare_and_set_anchor_info(None, Some(anchor_info))
let anchor_info = if state_upper_limit == 0 && anchor_slot == 0 {
// Genesis archive node: no anchor because we *will* store all states.
None
} else {
Some(AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
oldest_block_parent: block.parent_root(),
state_upper_limit,
state_lower_limit: self.spec.genesis_slot,
})
};
self.compare_and_set_anchor_info(None, anchor_info)
}
/// Get a clone of the store's anchor info.
@@ -1884,11 +1982,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config())
}
/// Load the split point from disk.
fn load_split(&self) -> Result<Option<Split>, Error> {
/// Load the split point from disk, sans block root.
fn load_split_partial(&self) -> Result<Option<Split>, Error> {
self.hot_db.get(&SPLIT_KEY)
}
/// Load the split point from disk, including block root.
fn load_split(&self) -> Result<Option<Split>, Error> {
match self.load_split_partial()? {
Some(mut split) => {
// Load the hot state summary to get the block root.
let summary = self.load_hot_state_summary(&split.state_root)?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;
split.block_root = summary.latest_block_root;
Ok(Some(split))
}
None => Ok(None),
}
}
/// Stage the split for storage to disk.
pub fn store_split_in_batch(&self) -> Result<KeyValueStoreOp, Error> {
self.split.read_recursive().as_kv_store_op(SPLIT_KEY)
@@ -2012,6 +2125,25 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)
}
/// Update the linear array of frozen block roots with the block root for several skipped slots.
///
/// Write the block root at all slots from `start_slot` (inclusive) to `end_slot` (exclusive).
pub fn store_frozen_block_root_at_skip_slots(
&self,
start_slot: Slot,
end_slot: Slot,
block_root: Hash256,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut ops = vec![];
for slot in start_slot.as_u64()..end_slot.as_u64() {
ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
block_root.as_bytes().to_vec(),
));
}
Ok(ops)
}
/// Try to prune all execution payloads, returning early if there is no need to prune.
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info();
@@ -2181,17 +2313,18 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
// Copy the blinded block from the hot database to the freezer.
// Move the blinded block from the hot database to the freezer.
// FIXME(sproul): make this load lazy
let blinded_block = store
.get_blinded_block(&block_root, None)?
.ok_or(Error::BlockNotFound(block_root))?;
if blinded_block.slot() == slot {
if blinded_block.slot() == slot || slot == current_split_slot {
store.blinded_block_as_cold_kv_store_ops(
&block_root,
&blinded_block,
&mut cold_db_block_ops,
)?;
hot_db_ops.push(StoreOp::DeleteBlock(block_root));
}
// Store the slot to block root mapping.
@@ -2206,12 +2339,15 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
// Do not try to store states if the first snapshot is yet to be stored.
if anchor_info
.as_ref()
.map_or(false, |anchor| slot < anchor.state_upper_limit)
// Do not try to store states if a restore point is yet to be stored, or will never be
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
// which always needs to be copied from the hot DB to the freezer and should not be deleted.
if slot != 0
&& anchor_info
.as_ref()
.map_or(false, |anchor| slot < anchor.state_upper_limit)
{
debug!(store.log, "Skipping cold state storage"; "slot" => slot);
debug!(store.log, "Pruning finalized state"; "slot" => slot);
continue;
}
@@ -2277,6 +2413,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let split = Split {
slot: finalized_state.slot(),
state_root: finalized_state_root,
block_root: finalized_block_root,
};
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
@@ -2310,6 +2447,14 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
pub struct Split {
pub slot: Slot,
pub state_root: Hash256,
/// The block root of the split state.
///
/// This is used to provide special handling for the split state in the case where there are
/// skipped slots. The split state will *always* be the advanced state, so callers
/// who only have the finalized block root should use `get_advanced_hot_state` to get this state,
/// rather than fetching `block.state_root()` (the unaligned state) which will have been pruned.
#[ssz(skip_serializing, skip_deserializing)]
pub block_root: Hash256,
}
impl StoreItem for Split {

View File

@@ -167,7 +167,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
)
};
for (start_key, end_key) in vec![
for (start_key, end_key) in [
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateDiff),
endpoints(DBColumn::BeaconStateSummary),

View File

@@ -16,6 +16,9 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);