Gloas spec v1.7.0-alpha.5 and beacon_chain tests (#8998)

Fix database pruning post-Gloas


  - Fix DB pruning logic (and state summaries DAG)
- Get the `beacon_chain` tests running with `FORK_NAME=gloas` 🎉


Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>
This commit is contained in:
Michael Sproul
2026-04-21 16:29:15 +10:00
committed by GitHub
parent c028bac28d
commit cf3d5e285e
82 changed files with 1513 additions and 1391 deletions

View File

@@ -1064,7 +1064,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn put_payload_envelope(
&self,
block_root: &Hash256,
payload_envelope: SignedExecutionPayloadEnvelope<E>,
payload_envelope: &SignedExecutionPayloadEnvelope<E>,
) -> Result<(), Error> {
self.hot_db.put_bytes(
SignedExecutionPayloadEnvelope::<E>::db_column(),
@@ -1133,13 +1133,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) =
self.get_advanced_hot_state_from_cache(block_root, payload_status, max_slot)
{
if let Some(cached) = self.get_advanced_hot_state_from_cache(block_root, max_slot) {
return Ok(Some(cached));
}
@@ -1161,11 +1158,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.into());
}
// Split state should always be `Pending`.
let state_root = if block_root == split.block_root
&& let StatePayloadStatus::Pending = payload_status
&& split.slot <= max_slot
{
let state_root = if block_root == split.block_root && split.slot <= max_slot {
split.state_root
} else {
state_root
@@ -1212,12 +1205,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state_from_cache(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
self.state_cache
.lock()
.get_by_block_root(block_root, payload_status, max_slot)
.get_by_block_root(block_root, max_slot)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
@@ -1857,100 +1849,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Compute the `StatePayloadStatus` for a stored state based on its summary.
///
/// In future this might become a field of the summary, but this would require a whole DB
/// migration. For now we use an extra read from the DB to determine it.
fn get_hot_state_summary_payload_status(
&self,
summary: &HotStateSummary,
) -> Result<StatePayloadStatus, Error> {
// Treat pre-Gloas states as `Pending`.
if !self
.spec
.fork_name_at_slot::<E>(summary.slot)
.gloas_enabled()
{
return Ok(StatePayloadStatus::Pending);
}
// Treat genesis state as `Pending` (`BeaconBlock` state).
let previous_state_root = summary.previous_state_root;
if previous_state_root.is_zero() {
return Ok(StatePayloadStatus::Pending);
}
// Load the hot state summary for the previous state.
//
// If it has the same slot as this summary then we know this summary is for a `Full` state
// (payload state), because they are always diffed against their same-slot `Pending` state.
//
// If the previous summary has a different slot AND the latest block is from `summary.slot`,
// then this state *must* be `Pending` (it is the summary for latest block itself).
//
// Otherwise, we are at a skipped slot and must traverse the graph of state summaries
// backwards until we reach a summary for the latest block. This recursion could be quite
// far in the case of a long skip. We could optimise this in future using the
// `diff_base_state` (like in `get_ancestor_state_root`), or by doing a proper DB
// migration.
let previous_state_summary = self
.load_hot_state_summary(&previous_state_root)?
.ok_or(Error::MissingHotStateSummary(previous_state_root))?;
if previous_state_summary.slot == summary.slot {
Ok(StatePayloadStatus::Full)
} else if summary.slot == summary.latest_block_slot {
Ok(StatePayloadStatus::Pending)
} else {
self.get_hot_state_summary_payload_status(&previous_state_summary)
}
}
/// Recompute the payload status for a state at `slot` that is stored in the cold DB.
///
/// This function returns an error for any `slot` that is outside the range of slots stored in
/// the freezer DB.
///
/// For all slots prior to Gloas, it returns `Pending`.
///
/// For post-Gloas slots the algorithm is:
///
/// 1. Load the most recently applied block at `slot` (may not be from `slot` in case of a skip)
/// 2. Load the canonical `state_root` at the slot of the block. If this `state_root` matches
/// the one in the block then we know the state at *that* slot is canonically empty (no
/// payload). Conversely, if it is different, we know that the block's slot is full (assuming
/// no database corruption).
/// 3. The payload status of `slot` is the same as the payload status of `block.slot()`, because
/// we only care about whether a beacon block or payload was applied most recently, and
/// `block` is by definition the most-recently-applied block.
///
/// All of this mucking around could be avoided if we do a schema migration to record the
/// payload status in the database. For now, this is simpler.
fn get_cold_state_payload_status(&self, slot: Slot) -> Result<StatePayloadStatus, Error> {
// Pre-Gloas states are always `Pending`.
if !self.spec.fork_name_at_slot::<E>(slot).gloas_enabled() {
return Ok(StatePayloadStatus::Pending);
}
let block_root = self
.get_cold_block_root(slot)?
.ok_or(HotColdDBError::MissingFrozenBlock(slot))?;
let block = self
.get_blinded_block(&block_root)?
.ok_or(Error::MissingBlock(block_root))?;
let state_root = self
.get_cold_state_root(block.slot())?
.ok_or(HotColdDBError::MissingRestorePointState(block.slot()))?;
if block.state_root() != state_root {
Ok(StatePayloadStatus::Full)
} else {
Ok(StatePayloadStatus::Pending)
}
}
fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result<HDiffBuffer, Error> {
if let Some(buffer) = self
.state_cache
@@ -2046,20 +1944,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
if let Some(
summary @ HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
},
) = self.load_hot_state_summary(state_root)?
if let Some(HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
}) = self.load_hot_state_summary(state_root)?
{
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
debug!(
%slot,
?state_root,
?payload_status,
"Loading hot state"
);
let mut state = match self.hot_storage_strategy(slot)? {
@@ -2113,7 +2007,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
base_state,
slot,
latest_block_root,
payload_status,
update_cache,
)?
}
@@ -2131,26 +2024,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
base_state: BeaconState<E>,
slot: Slot,
latest_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
update_cache: bool,
) -> Result<BeaconState<E>, Error> {
if base_state.slot() == slot && base_state.payload_status() == desired_payload_status {
if base_state.slot() == slot {
return Ok(base_state);
}
let (blocks, envelopes) = self.load_blocks_to_replay(
base_state.slot(),
slot,
latest_block_root,
desired_payload_status,
)?;
let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
// state that this state is based on. It may be useful as the basis of more states
// in the same epoch.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
// TODO(gloas): prevent caching of the payload_status=Full state?
if !update_cache || state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
@@ -2177,16 +2063,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
debug!(
%slot,
blocks = ?blocks.iter().map(|block| block.slot()).collect::<Vec<_>>(),
envelopes = ?envelopes.iter().map(|e| e.message.slot).collect::<Vec<_>>(),
payload_status = ?desired_payload_status,
"Replaying blocks and envelopes"
"Replaying blocks"
);
self.replay_blocks(
base_state,
blocks,
envelopes,
desired_payload_status,
slot,
no_state_root_iter(),
Some(Box::new(state_cache_hook)),
@@ -2490,7 +2372,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(base_state);
}
let (blocks, envelopes) = self.load_cold_blocks(base_state.slot() + 1, slot)?;
let base_slot = base_state.slot();
let blocks = self.load_cold_blocks(base_slot + 1, slot)?;
// Include state root for base state as it is required by block processing to not
// have to hash the state.
@@ -2499,16 +2382,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.forwards_state_roots_iterator_until(base_state.slot(), slot, || {
Err(Error::StateShouldNotBeRequired(slot))
})?;
let payload_status = self.get_cold_state_payload_status(slot)?;
let state = self.replay_blocks(
base_state,
blocks,
envelopes,
payload_status,
slot,
Some(state_root_iter),
None,
)?;
let state = self.replay_blocks(base_state, blocks, slot, Some(state_root_iter), None)?;
debug!(
target_slot = %slot,
replay_time_ms = metrics::stop_timer_with_duration(replay_timer).as_millis(),
@@ -2601,76 +2475,39 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Load cold blocks and payload envelopes between `start_slot` and `end_slot` inclusive.
#[allow(clippy::type_complexity)]
/// Load cold blocks between `start_slot` and `end_slot` inclusive.
pub fn load_cold_blocks(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
) -> Result<Vec<SignedBlindedBeaconBlock<E>>, Error> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_COLD_BLOCKS_TIME);
let block_root_iter =
self.forwards_block_roots_iterator_until(start_slot, end_slot, || {
Err(Error::StateShouldNotBeRequired(end_slot))
})?;
let blocks = process_results(block_root_iter, |iter| {
process_results(block_root_iter, |iter| {
iter.map(|(block_root, _slot)| block_root)
.dedup()
.map(|block_root| {
self.get_blinded_block(&block_root)?
.ok_or(Error::MissingBlock(block_root))
})
.collect::<Result<Vec<_>, Error>>()
})??;
// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
let end_block_root = self
.get_cold_block_root(end_slot)?
.ok_or(HotColdDBError::MissingFrozenBlock(end_slot))?;
let desired_payload_status = self.get_cold_state_payload_status(end_slot)?;
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
.collect()
})?
}
/// Load the blocks & envelopes between `start_slot` and `end_slot` by backtracking from
/// Load the blocks between `start_slot` and `end_slot` by backtracking from
/// `end_block_root`.
///
/// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot
/// equal to `start_slot`, to reach a state with slot equal to `end_slot`.
///
/// Payloads are also returned in slot-ascending order, but only payloads forming part of
/// the chain are loaded (payloads for EMPTY slots are omitted). Prior to Gloas, an empty
/// vec of payloads will be returned.
#[allow(clippy::type_complexity)]
pub fn load_blocks_to_replay(
&self,
start_slot: Slot,
end_slot: Slot,
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
) -> Result<Vec<SignedBlindedBeaconBlock<E>>, Error> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_HOT_BLOCKS_TIME);
let mut blocks = ParentRootBlockIterator::new(self, end_block_root)
.map(|result| result.map(|(_, block)| block))
@@ -2699,70 +2536,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
.collect::<Result<Vec<_>, _>>()?;
blocks.reverse();
// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
}
pub fn load_payload_envelopes_for_blocks(
&self,
blocks: &[SignedBlindedBeaconBlock<E>],
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<Vec<SignedExecutionPayloadEnvelope<E>>, Error> {
let mut envelopes = vec![];
for (block, next_block) in blocks.iter().tuple_windows() {
if block.fork_name_unchecked().gloas_enabled() {
// Check next block to see if this block's payload is canonical on this chain.
let block_hash = block.payload_bid_block_hash()?;
if !next_block.is_parent_block_full(block_hash) {
// No payload at this slot (empty), nothing to load.
continue;
}
// Using `parent_root` avoids computation.
let block_root = next_block.parent_root();
let envelope = self
.get_payload_envelope(&block_root)?
.ok_or(HotColdDBError::MissingExecutionPayloadEnvelope(block_root))?;
envelopes.push(envelope);
}
}
// Load the payload for the last block if desired.
if let StatePayloadStatus::Full = desired_payload_status {
let envelope = self.get_payload_envelope(&end_block_root)?.ok_or(
HotColdDBError::MissingExecutionPayloadEnvelope(end_block_root),
)?;
envelopes.push(envelope);
}
Ok(envelopes)
Ok(blocks)
}
/// Replay `blocks` on top of `state` until `target_slot` is reached.
///
/// Will skip slots as necessary. The returned state is not guaranteed
/// to have any caches built, beyond those immediately required by block processing.
#[allow(clippy::too_many_arguments)]
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBlindedBeaconBlock<E>>,
envelopes: Vec<SignedExecutionPayloadEnvelope<E>>,
desired_payload_status: StatePayloadStatus,
target_slot: Slot,
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
@@ -2771,8 +2555,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut block_replayer = BlockReplayer::new(state, &self.spec)
.no_signature_verification()
.minimal_block_root_verification()
.desired_state_payload_status(desired_payload_status);
.minimal_block_root_verification();
let have_state_root_iterator = state_root_iter.is_some();
if let Some(state_root_iter) = state_root_iter {
@@ -2784,7 +2567,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
block_replayer
.apply_blocks(blocks, envelopes, Some(target_slot))
.apply_blocks(blocks, Some(target_slot))
.map(|block_replayer| {
if have_state_root_iterator && block_replayer.state_root_miss() {
warn!(
@@ -3800,6 +3583,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
) -> Result<SplitChange, Error> {
debug!(
slot = %finalized_state.slot(),
state_root = ?finalized_state_root,
"Freezer migration started"
);
@@ -4219,12 +4003,8 @@ impl HotStateSummary {
// slots where there isn't a skip).
let latest_block_root = state.get_latest_block_root(state_root);
// Payload status of the state determines a lot about how it is stored.
let payload_status = state.payload_status();
let get_state_root = |slot| {
if slot == state.slot() {
// TODO(gloas): I think we can remove this case
Ok::<_, Error>(state_root)
} else {
Ok::<_, Error>(get_ancestor_state_root(store, state, slot).map_err(|e| {
@@ -4247,12 +4027,6 @@ impl HotStateSummary {
let previous_state_root = if state.slot() == 0 {
// Set to 0x0 for genesis state to prevent any sort of circular reference.
Hash256::zero()
} else if let StatePayloadStatus::Full = payload_status
&& state.slot() == state.latest_block_header().slot
{
// A Full state at a non-skipped slot builds off the Pending state of the same slot,
// i.e. the state with the same `state_root` as its `BeaconBlock`
state.latest_block_header().state_root
} else {
get_state_root(state.slot().safe_sub(1_u64)?)?
};

View File

@@ -67,7 +67,6 @@ where
state.build_caches(&self.spec)?;
// TODO(gloas): handle payload envelope replay
process_results(block_root_iter, |iter| -> Result<(), Error> {
let mut io_batch = vec![];

View File

@@ -7,7 +7,7 @@ use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use tracing::instrument;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot, execution::StatePayloadStatus};
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot};
/// Fraction of the LRU cache to leave intact during culling.
const CULL_EXEMPT_NUMERATOR: usize = 1;
@@ -23,10 +23,10 @@ pub struct FinalizedState<E: EthSpec> {
state: BeaconState<E>,
}
/// Map from (block_root, payload_status) -> slot -> state_root.
/// Map from block_root -> slot -> state_root.
#[derive(Debug, Default)]
pub struct BlockMap {
blocks: HashMap<(Hash256, StatePayloadStatus), SlotMap>,
blocks: HashMap<Hash256, SlotMap>,
}
/// Map from slot -> state_root.
@@ -143,11 +143,8 @@ impl<E: EthSpec> StateCache<E> {
return Err(Error::FinalizedStateDecreasingSlot);
}
let payload_status = state.payload_status();
// Add to block map.
self.block_map
.insert(block_root, payload_status, state.slot(), state_root);
self.block_map.insert(block_root, state.slot(), state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(state.slot());
@@ -270,9 +267,7 @@ impl<E: EthSpec> StateCache<E> {
// Record the connection from block root and slot to this state.
let slot = state.slot();
let payload_status = state.payload_status();
self.block_map
.insert(block_root, payload_status, slot, state_root);
self.block_map.insert(block_root, slot, state_root);
Ok(PutStateOutcome::New(deleted_states))
}
@@ -321,10 +316,9 @@ impl<E: EthSpec> StateCache<E> {
pub fn get_by_block_root(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
let slot_map = self.block_map.blocks.get(&(block_root, payload_status))?;
let slot_map = self.block_map.blocks.get(&block_root)?;
// Find the state at `slot`, or failing that the most recent ancestor.
let state_root = slot_map
@@ -345,12 +339,7 @@ impl<E: EthSpec> StateCache<E> {
}
pub fn delete_block_states(&mut self, block_root: &Hash256) {
let (pending_state_roots, full_state_roots) =
self.block_map.delete_block_states(block_root);
for slot_map in [pending_state_roots, full_state_roots]
.into_iter()
.flatten()
{
if let Some(slot_map) = self.block_map.delete_block_states(block_root) {
for state_root in slot_map.slots.values() {
self.states.pop(state_root);
}
@@ -423,14 +412,8 @@ impl<E: EthSpec> StateCache<E> {
}
impl BlockMap {
fn insert(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
state_root: Hash256,
) {
let slot_map = self.blocks.entry((block_root, payload_status)).or_default();
fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) {
let slot_map = self.blocks.entry(block_root).or_default();
slot_map.slots.insert(slot, state_root);
}
@@ -461,12 +444,8 @@ impl BlockMap {
});
}
fn delete_block_states(&mut self, block_root: &Hash256) -> (Option<SlotMap>, Option<SlotMap>) {
let pending_state_roots = self
.blocks
.remove(&(*block_root, StatePayloadStatus::Pending));
let full_state_roots = self.blocks.remove(&(*block_root, StatePayloadStatus::Full));
(pending_state_roots, full_state_roots)
fn delete_block_states(&mut self, block_root: &Hash256) -> Option<SlotMap> {
self.blocks.remove(block_root)
}
}