Merge branch 'unstable' into merge-unstable-to-deneb-20230822

# Conflicts:
#	beacon_node/beacon_chain/src/builder.rs
#	beacon_node/beacon_chain/tests/store_tests.rs
#	beacon_node/client/src/builder.rs
#	beacon_node/src/config.rs
#	beacon_node/store/src/hot_cold_store.rs
#	lighthouse/tests/beacon_node.rs
This commit is contained in:
Jimmy Chen
2023-08-22 21:20:47 +10:00
41 changed files with 729 additions and 375 deletions

View File

@@ -14,7 +14,7 @@ use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::{
@@ -159,10 +159,10 @@ pub enum HotColdDBError {
IterationError {
unexpected_key: BytesKey,
},
AttestationStateIsFinalized {
FinalizedStateNotInHotDatabase {
split_slot: Slot,
request_slot: Option<Slot>,
state_root: Hash256,
request_slot: Slot,
block_root: Hash256,
},
Rollback,
}
@@ -688,7 +688,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// upon that state (e.g., state roots). Additionally, only states from the hot store are
/// returned.
///
/// See `Self::get_state` for information about `slot`.
/// See `Self::get_advanced_hot_state` for information about `max_slot`.
///
/// ## Warning
///
@@ -700,23 +700,78 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// - `state.block_roots`
pub fn get_inconsistent_state_for_attestation_verification_only(
&self,
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
block_root: &Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
self.get_advanced_hot_state_with_strategy(
*block_root,
max_slot,
state_root,
StateProcessingStrategy::Inconsistent,
)
}
let split_slot = self.get_split_slot();
/// Get a state with `latest_block_root == block_root` advanced through to at most `max_slot`.
///
/// The `state_root` argument is used to look up the block's un-advanced state in case an
/// advanced state is not found.
///
/// Return the `(result_state_root, state)` satisfying:
///
/// - `result_state_root == state.canonical_root()`
/// - `state.slot() <= max_slot`
/// - `state.get_latest_block_root(result_state_root) == block_root`
///
/// Presently this is only used to avoid loading the un-advanced split state, but in future will
/// be expanded to return states from an in-memory cache.
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
self.get_advanced_hot_state_with_strategy(
block_root,
max_slot,
state_root,
StateProcessingStrategy::Accurate,
)
}
if slot.map_or(false, |slot| slot < split_slot) {
Err(HotColdDBError::AttestationStateIsFinalized {
split_slot,
request_slot: slot,
state_root: *state_root,
/// Same as `get_advanced_hot_state` but taking a `StateProcessingStrategy`.
pub fn get_advanced_hot_state_with_strategy(
&self,
block_root: Hash256,
max_slot: Slot,
state_root: Hash256,
state_processing_strategy: StateProcessingStrategy,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
// Hold a read lock on the split point so it can't move while we're trying to load the
// state.
let split = self.split.read_recursive();
// Sanity check max-slot against the split slot.
if max_slot < split.slot {
return Err(HotColdDBError::FinalizedStateNotInHotDatabase {
split_slot: split.slot,
request_slot: max_slot,
block_root,
}
.into())
} else {
self.load_hot_state(state_root, StateProcessingStrategy::Inconsistent)
.into());
}
let state_root = if block_root == split.block_root && split.slot <= max_slot {
split.state_root
} else {
state_root
};
let state = self
.load_hot_state(&state_root, state_processing_strategy)?
.map(|state| (state_root, state));
drop(split);
Ok(state)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
@@ -1434,8 +1489,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
*self.split.read_recursive()
}
pub fn set_split(&self, slot: Slot, state_root: Hash256) {
*self.split.write() = Split { slot, state_root };
pub fn set_split(&self, slot: Slot, state_root: Hash256, block_root: Hash256) {
*self.split.write() = Split {
slot,
state_root,
block_root,
};
}
/// Fetch the slot of the most recently stored restore point.
@@ -1470,25 +1529,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Initialise the anchor info for checkpoint sync starting from `block`.
pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result<KeyValueStoreOp, Error> {
pub fn init_anchor_info(
&self,
block: BeaconBlockRef<'_, E>,
retain_historic_states: bool,
) -> Result<KeyValueStoreOp, Error> {
let anchor_slot = block.slot();
let slots_per_restore_point = self.config.slots_per_restore_point;
// Set the `state_upper_limit` to the slot of the *next* restore point.
// See `get_state_upper_limit` for rationale.
let next_restore_point_slot = if anchor_slot % slots_per_restore_point == 0 {
let state_upper_limit = if !retain_historic_states {
STATE_UPPER_LIMIT_NO_RETAIN
} else if anchor_slot % slots_per_restore_point == 0 {
anchor_slot
} else {
// Set the `state_upper_limit` to the slot of the *next* restore point.
// See `get_state_upper_limit` for rationale.
(anchor_slot / slots_per_restore_point + 1) * slots_per_restore_point
};
let anchor_info = AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
oldest_block_parent: block.parent_root(),
state_upper_limit: next_restore_point_slot,
state_lower_limit: self.spec.genesis_slot,
let anchor_info = if state_upper_limit == 0 && anchor_slot == 0 {
// Genesis archive node: no anchor because we *will* store all states.
None
} else {
Some(AnchorInfo {
anchor_slot,
oldest_block_slot: anchor_slot,
oldest_block_parent: block.parent_root(),
state_upper_limit,
state_lower_limit: self.spec.genesis_slot,
})
};
self.compare_and_set_anchor_info(None, Some(anchor_info))
self.compare_and_set_anchor_info(None, anchor_info)
}
/// Get a clone of the store's anchor info.
@@ -1667,11 +1737,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config())
}
/// Load the split point from disk.
fn load_split(&self) -> Result<Option<Split>, Error> {
/// Load the split point from disk, sans block root.
fn load_split_partial(&self) -> Result<Option<Split>, Error> {
self.hot_db.get(&SPLIT_KEY)
}
/// Load the split point from disk, including block root.
fn load_split(&self) -> Result<Option<Split>, Error> {
match self.load_split_partial()? {
Some(mut split) => {
// Load the hot state summary to get the block root.
let summary = self.load_hot_state_summary(&split.state_root)?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;
split.block_root = summary.latest_block_root;
Ok(Some(split))
}
None => Ok(None),
}
}
/// Stage the split for storage to disk.
pub fn store_split_in_batch(&self) -> KeyValueStoreOp {
self.split.read_recursive().as_kv_store_op(SPLIT_KEY)
@@ -2089,43 +2174,40 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Advance the split point of the store, moving new finalized states to the freezer.
pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,
finalized_state_root: Hash256,
finalized_block_root: Hash256,
finalized_state: &BeaconState<E>,
) -> Result<(), Error> {
debug!(
store.log,
"Freezer migration started";
"slot" => frozen_head.slot()
"slot" => finalized_state.slot()
);
// 0. Check that the migration is sensible.
// The new frozen head must increase the current split slot, and lie on an epoch
// The new finalized state must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.split.read_recursive().slot;
let anchor_slot = store
.anchor_info
.read_recursive()
.as_ref()
.map(|a| a.anchor_slot);
let anchor_info = store.anchor_info.read_recursive().clone();
let anchor_slot = anchor_info.as_ref().map(|a| a.anchor_slot);
if frozen_head.slot() < current_split_slot {
if finalized_state.slot() < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
current_split_slot,
proposed_split_slot: frozen_head.slot(),
proposed_split_slot: finalized_state.slot(),
}
.into());
}
if frozen_head.slot() % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into());
if finalized_state.slot() % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
}
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
// 1. Copy all of the states between the head and the split slot, from the hot DB
// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
let state_root_iter = RootsIterator::new(&store, frozen_head);
let state_root_iter = RootsIterator::new(&store, finalized_state);
for maybe_tuple in state_root_iter.take_while(|result| match result {
Ok((_, _, slot)) => {
slot >= &current_split_slot
@@ -2135,6 +2217,29 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
}) {
let (block_root, state_root, slot) = maybe_tuple?;
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot. The payload fetching code is also
// forgiving of missing payloads.
if store.config.prune_payloads {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
// Do not try to store states if a restore point is yet to be stored, or will never be
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
// which always needs to be copied from the hot DB to the freezer and should not be deleted.
if slot != 0
&& anchor_info
.as_ref()
.map_or(false, |anchor| slot < anchor.state_upper_limit)
{
debug!(store.log, "Pruning finalized state"; "slot" => slot);
continue;
}
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
if slot % store.config.slots_per_restore_point == 0 {
@@ -2153,17 +2258,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// There are data dependencies between calls to `store_cold_state()` that prevent us from
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
store.cold_db.do_atomically(cold_db_ops)?;
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot. The payload fetching code is also
// forgiving of missing payloads.
if store.config.prune_payloads {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
}
// Warning: Critical section. We have to take care not to put any of the two databases in an
@@ -2203,8 +2297,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Before updating the in-memory split value, we flush it to disk first, so that should the
// OS process die at this point, we pick up from the right place after a restart.
let split = Split {
slot: frozen_head.slot(),
state_root: frozen_head_root,
slot: finalized_state.slot(),
state_root: finalized_state_root,
block_root: finalized_block_root,
};
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
@@ -2220,7 +2315,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
debug!(
store.log,
"Freezer migration complete";
"slot" => frozen_head.slot()
"slot" => finalized_state.slot()
);
Ok(())
@@ -2229,8 +2324,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
/// Struct for storing the split slot and state root in the database.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Encode, Decode, Deserialize, Serialize)]
pub struct Split {
pub(crate) slot: Slot,
pub(crate) state_root: Hash256,
pub slot: Slot,
pub state_root: Hash256,
/// The block root of the split state.
///
/// This is used to provide special handling for the split state in the case where there are
/// skipped slots. The split state will *always* be the advanced state, so callers
/// who only have the finalized block root should use `get_advanced_hot_state` to get this state,
/// rather than fetching `block.state_root()` (the unaligned state) which will have been pruned.
#[ssz(skip_serializing, skip_deserializing)]
pub block_root: Hash256,
}
impl StoreItem for Split {

View File

@@ -17,6 +17,9 @@ pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct SchemaVersion(pub u64);