Update database and block replayer to handle payload envelopes (#8886)

Closes:

- https://github.com/sigp/lighthouse/issues/8869


  - Update `BlockReplayer` to support replay of execution payload envelopes.
- Update `HotColdDB` to load payload envelopes and feed them to the `BlockReplayer` for both hot + cold states. However the cold DB code is not fully working yet (see: https://github.com/sigp/lighthouse/issues/8958).
- Add `StatePayloadStatus` to allow callers to specify whether they want a state with a payload applied, or not.
- Fix the state cache to key by `StatePayloadStatus`.
- Lots of fixes to block production and block processing regarding state management.
- Initial test harness support for producing+processing Gloas blocks+envelopes
- A few new tests to cover Gloas DB operations


Co-Authored-By: Eitan Seri- Levi <eserilev@gmail.com>

Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: Michael Sproul <michael@sigmaprime.io>

Co-Authored-By: Michael Sproul <michaelsproul@users.noreply.github.com>

Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
Michael Sproul
2026-03-12 10:06:25 +11:00
committed by GitHub
parent 6350a27031
commit bff72a920d
30 changed files with 1243 additions and 84 deletions

View File

@@ -654,6 +654,12 @@ impl HierarchyModuli {
/// layer 2 diff will point to the start snapshot instead of the layer 1 diff at
/// 2998272.
pub fn storage_strategy(&self, slot: Slot, start_slot: Slot) -> Result<StorageStrategy, Error> {
// Initially had the idea of using different storage strategies for full and pending states,
// but it was very complex. However without this concept we end up storing two diffs/two
// snapshots at full slots. The complexity of managing skipped slots was the main impetus
// for reverting the payload-status sensitive design: a Full skipped slot has no same-slot
// Pending state to replay from, so has to be handled differently from Full non-skipped
// slots.
match slot.cmp(&start_slot) {
Ordering::Less => return Err(Error::LessThanStart(slot, start_slot)),
Ordering::Equal => return Ok(StorageStrategy::Snapshot),

View File

@@ -186,6 +186,7 @@ pub enum HotColdDBError {
MissingHotHDiff(Hash256),
MissingHDiff(Slot),
MissingExecutionPayload(Hash256),
MissingExecutionPayloadEnvelope(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
MissingFrozenBlockSlot(Hash256),
@@ -1132,10 +1133,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) = self.get_advanced_hot_state_from_cache(block_root, max_slot) {
if let Some(cached) =
self.get_advanced_hot_state_from_cache(block_root, payload_status, max_slot)
{
return Ok(Some(cached));
}
@@ -1157,7 +1161,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.into());
}
let state_root = if block_root == split.block_root && split.slot <= max_slot {
// Split state should always be `Pending`.
let state_root = if block_root == split.block_root
&& let StatePayloadStatus::Pending = payload_status
&& split.slot <= max_slot
{
split.state_root
} else {
state_root
@@ -1204,11 +1212,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state_from_cache(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
self.state_cache
.lock()
.get_by_block_root(block_root, max_slot)
.get_by_block_root(block_root, payload_status, max_slot)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.
@@ -1379,6 +1388,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// NOTE: `hot_storage_strategy` can error if there are states in the database
// prior to the `anchor_slot`. This can happen if checkpoint sync has been
// botched and left some states in the database prior to completing.
// Use `Pending` status here because snapshots and diffs are only stored for
// `Pending` states.
if let Some(slot) = slot
&& let Ok(strategy) = self.hot_storage_strategy(slot)
{
@@ -1846,6 +1857,55 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Compute the `StatePayloadStatus` for a stored state based on its summary.
///
/// In future this might become a field of the summary, but this would require a whole DB
/// migration. For now we use an extra read from the DB to determine it.
fn get_hot_state_summary_payload_status(
&self,
summary: &HotStateSummary,
) -> Result<StatePayloadStatus, Error> {
// Treat pre-Gloas states as `Pending`.
if !self
.spec
.fork_name_at_slot::<E>(summary.slot)
.gloas_enabled()
{
return Ok(StatePayloadStatus::Pending);
}
// Treat genesis state as `Pending` (`BeaconBlock` state).
let previous_state_root = summary.previous_state_root;
if previous_state_root.is_zero() {
return Ok(StatePayloadStatus::Pending);
}
// Load the hot state summary for the previous state.
//
// If it has the same slot as this summary then we know this summary is for a `Full` state
// (payload state), because they are always diffed against their same-slot `Pending` state.
//
// If the previous summary has a different slot AND the latest block is from `summary.slot`,
// then this state *must* be `Pending` (it is the summary for latest block itself).
//
// Otherwise, we are at a skipped slot and must traverse the graph of state summaries
// backwards until we reach a summary for the latest block. This recursion could be quite
// far in the case of a long skip. We could optimise this in future using the
// `diff_base_state` (like in `get_ancestor_state_root`), or by doing a proper DB
// migration.
let previous_state_summary = self
.load_hot_state_summary(&previous_state_root)?
.ok_or(Error::MissingHotStateSummary(previous_state_root))?;
if previous_state_summary.slot == summary.slot {
Ok(StatePayloadStatus::Full)
} else if summary.slot == summary.latest_block_slot {
Ok(StatePayloadStatus::Pending)
} else {
self.get_hot_state_summary_payload_status(&previous_state_summary)
}
}
fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result<HDiffBuffer, Error> {
if let Some(buffer) = self
.state_cache
@@ -1941,13 +2001,22 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
if let Some(HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
}) = self.load_hot_state_summary(state_root)?
if let Some(
summary @ HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
},
) = self.load_hot_state_summary(state_root)?
{
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
debug!(
%slot,
?state_root,
?payload_status,
"Loading hot state"
);
let mut state = match self.hot_storage_strategy(slot)? {
strat @ StorageStrategy::Snapshot | strat @ StorageStrategy::DiffFrom(_) => {
let buffer_timer = metrics::start_timer_vec(
@@ -1999,6 +2068,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
base_state,
slot,
latest_block_root,
payload_status,
update_cache,
)?
}
@@ -2016,19 +2086,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
base_state: BeaconState<E>,
slot: Slot,
latest_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
update_cache: bool,
) -> Result<BeaconState<E>, Error> {
if base_state.slot() == slot {
if base_state.slot() == slot && base_state.payload_status() == desired_payload_status {
return Ok(base_state);
}
let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let (blocks, envelopes) = self.load_blocks_to_replay(
base_state.slot(),
slot,
latest_block_root,
desired_payload_status,
)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
// state that this state is based on. It may be useful as the basis of more states
// in the same epoch.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
// TODO(gloas): prevent caching of the payload_status=Full state?
if !update_cache || state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
@@ -2052,9 +2129,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
};
debug!(
%slot,
blocks = ?blocks.iter().map(|block| block.slot()).collect::<Vec<_>>(),
envelopes = ?envelopes.iter().map(|e| e.message.slot).collect::<Vec<_>>(),
payload_status = ?desired_payload_status,
"Replaying blocks and envelopes"
);
self.replay_blocks(
base_state,
blocks,
envelopes,
desired_payload_status,
slot,
no_state_root_iter(),
Some(Box::new(state_cache_hook)),
@@ -2358,7 +2445,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(base_state);
}
let blocks = self.load_cold_blocks(base_state.slot() + 1, slot)?;
let (blocks, envelopes) = self.load_cold_blocks(base_state.slot() + 1, slot)?;
// Include state root for base state as it is required by block processing to not
// have to hash the state.
@@ -2367,7 +2454,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.forwards_state_roots_iterator_until(base_state.slot(), slot, || {
Err(Error::StateShouldNotBeRequired(slot))
})?;
let state = self.replay_blocks(base_state, blocks, slot, Some(state_root_iter), None)?;
// TODO(gloas): calculate correct payload status for cold states
let payload_status = StatePayloadStatus::Pending;
let state = self.replay_blocks(
base_state,
blocks,
envelopes,
payload_status,
slot,
Some(state_root_iter),
None,
)?;
debug!(
target_slot = %slot,
replay_time_ms = metrics::stop_timer_with_duration(replay_timer).as_millis(),
@@ -2460,40 +2557,77 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Load cold blocks between `start_slot` and `end_slot` inclusive.
/// Load cold blocks and payload envelopes between `start_slot` and `end_slot` inclusive.
#[allow(clippy::type_complexity)]
pub fn load_cold_blocks(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<SignedBlindedBeaconBlock<E>>, Error> {
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_COLD_BLOCKS_TIME);
let block_root_iter =
self.forwards_block_roots_iterator_until(start_slot, end_slot, || {
Err(Error::StateShouldNotBeRequired(end_slot))
})?;
process_results(block_root_iter, |iter| {
let blocks = process_results(block_root_iter, |iter| {
iter.map(|(block_root, _slot)| block_root)
.dedup()
.map(|block_root| {
self.get_blinded_block(&block_root)?
.ok_or(Error::MissingBlock(block_root))
})
.collect()
})?
.collect::<Result<Vec<_>, Error>>()
})??;
// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
// TODO(gloas): wire this up
let end_block_root = Hash256::ZERO;
let desired_payload_status = StatePayloadStatus::Pending;
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
}
/// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`.
/// Load the blocks & envelopes between `start_slot` and `end_slot` by backtracking from
/// `end_block_root`.
///
/// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot
/// equal to `start_slot`, to reach a state with slot equal to `end_slot`.
///
/// Payloads are also returned in slot-ascending order, but only payloads forming part of
/// the chain are loaded (payloads for EMPTY slots are omitted). Prior to Gloas, an empty
/// vec of payloads will be returned.
#[allow(clippy::type_complexity)]
pub fn load_blocks_to_replay(
&self,
start_slot: Slot,
end_slot: Slot,
end_block_hash: Hash256,
) -> Result<Vec<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_HOT_BLOCKS_TIME);
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
let mut blocks = ParentRootBlockIterator::new(self, end_block_root)
.map(|result| result.map(|(_, block)| block))
// Include the block at the end slot (if any), it needs to be
// replayed in order to construct the canonical state at `end_slot`.
@@ -2520,17 +2654,70 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
.collect::<Result<Vec<_>, _>>()?;
blocks.reverse();
Ok(blocks)
// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
}
pub fn load_payload_envelopes_for_blocks(
&self,
blocks: &[SignedBlindedBeaconBlock<E>],
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<Vec<SignedExecutionPayloadEnvelope<E>>, Error> {
let mut envelopes = vec![];
for (block, next_block) in blocks.iter().tuple_windows() {
if block.fork_name_unchecked().gloas_enabled() {
// Check next block to see if this block's payload is canonical on this chain.
let block_hash = block.payload_bid_block_hash()?;
if !next_block.is_parent_block_full(block_hash) {
// No payload at this slot (empty), nothing to load.
continue;
}
// Using `parent_root` avoids computation.
let block_root = next_block.parent_root();
let envelope = self
.get_payload_envelope(&block_root)?
.ok_or(HotColdDBError::MissingExecutionPayloadEnvelope(block_root))?;
envelopes.push(envelope);
}
}
// Load the payload for the last block if desired.
if let StatePayloadStatus::Full = desired_payload_status {
let envelope = self.get_payload_envelope(&end_block_root)?.ok_or(
HotColdDBError::MissingExecutionPayloadEnvelope(end_block_root),
)?;
envelopes.push(envelope);
}
Ok(envelopes)
}
/// Replay `blocks` on top of `state` until `target_slot` is reached.
///
/// Will skip slots as necessary. The returned state is not guaranteed
/// to have any caches built, beyond those immediately required by block processing.
#[allow(clippy::too_many_arguments)]
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
blocks: Vec<SignedBlindedBeaconBlock<E>>,
envelopes: Vec<SignedExecutionPayloadEnvelope<E>>,
desired_payload_status: StatePayloadStatus,
target_slot: Slot,
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
@@ -2539,7 +2726,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut block_replayer = BlockReplayer::new(state, &self.spec)
.no_signature_verification()
.minimal_block_root_verification();
.minimal_block_root_verification()
.desired_state_payload_status(desired_payload_status);
let have_state_root_iterator = state_root_iter.is_some();
if let Some(state_root_iter) = state_root_iter {
@@ -2551,7 +2739,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
block_replayer
.apply_blocks(blocks, Some(target_slot))
.apply_blocks(blocks, envelopes, Some(target_slot))
.map(|block_replayer| {
if have_state_root_iterator && block_replayer.state_root_miss() {
warn!(
@@ -4006,11 +4194,15 @@ impl HotStateSummary {
// slots where there isn't a skip).
let latest_block_root = state.get_latest_block_root(state_root);
// Payload status of the state determines a lot about how it is stored.
let payload_status = state.payload_status();
let get_state_root = |slot| {
if slot == state.slot() {
// TODO(gloas): I think we can remove this case
Ok::<_, Error>(state_root)
} else {
Ok(get_ancestor_state_root(store, state, slot).map_err(|e| {
Ok::<_, Error>(get_ancestor_state_root(store, state, slot).map_err(|e| {
Error::StateSummaryIteratorError {
error: e,
from_state_root: state_root,
@@ -4030,6 +4222,12 @@ impl HotStateSummary {
let previous_state_root = if state.slot() == 0 {
// Set to 0x0 for genesis state to prevent any sort of circular reference.
Hash256::zero()
} else if let StatePayloadStatus::Full = payload_status
&& state.slot() == state.latest_block_header().slot
{
// A Full state at a non-skipped slot builds off the Pending state of the same slot,
// i.e. the state with the same `state_root` as its `BeaconBlock`
state.latest_block_header().state_root
} else {
get_state_root(state.slot().safe_sub(1_u64)?)?
};

View File

@@ -67,6 +67,7 @@ where
state.build_caches(&self.spec)?;
// TODO(gloas): handle payload envelope replay
process_results(block_root_iter, |iter| -> Result<(), Error> {
let mut io_batch = vec![];

View File

@@ -7,7 +7,7 @@ use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use tracing::instrument;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot};
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot, execution::StatePayloadStatus};
/// Fraction of the LRU cache to leave intact during culling.
const CULL_EXEMPT_NUMERATOR: usize = 1;
@@ -23,10 +23,10 @@ pub struct FinalizedState<E: EthSpec> {
state: BeaconState<E>,
}
/// Map from block_root -> slot -> state_root.
/// Map from (block_root, payload_status) -> slot -> state_root.
#[derive(Debug, Default)]
pub struct BlockMap {
blocks: HashMap<Hash256, SlotMap>,
blocks: HashMap<(Hash256, StatePayloadStatus), SlotMap>,
}
/// Map from slot -> state_root.
@@ -143,8 +143,11 @@ impl<E: EthSpec> StateCache<E> {
return Err(Error::FinalizedStateDecreasingSlot);
}
let payload_status = state.payload_status();
// Add to block map.
self.block_map.insert(block_root, state.slot(), state_root);
self.block_map
.insert(block_root, payload_status, state.slot(), state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(state.slot());
@@ -267,7 +270,9 @@ impl<E: EthSpec> StateCache<E> {
// Record the connection from block root and slot to this state.
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
let payload_status = state.payload_status();
self.block_map
.insert(block_root, payload_status, slot, state_root);
Ok(PutStateOutcome::New(deleted_states))
}
@@ -316,9 +321,10 @@ impl<E: EthSpec> StateCache<E> {
pub fn get_by_block_root(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
let slot_map = self.block_map.blocks.get(&block_root)?;
let slot_map = self.block_map.blocks.get(&(block_root, payload_status))?;
// Find the state at `slot`, or failing that the most recent ancestor.
let state_root = slot_map
@@ -339,7 +345,12 @@ impl<E: EthSpec> StateCache<E> {
}
pub fn delete_block_states(&mut self, block_root: &Hash256) {
if let Some(slot_map) = self.block_map.delete_block_states(block_root) {
let (pending_state_roots, full_state_roots) =
self.block_map.delete_block_states(block_root);
for slot_map in [pending_state_roots, full_state_roots]
.into_iter()
.flatten()
{
for state_root in slot_map.slots.values() {
self.states.pop(state_root);
}
@@ -412,8 +423,14 @@ impl<E: EthSpec> StateCache<E> {
}
impl BlockMap {
fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) {
let slot_map = self.blocks.entry(block_root).or_default();
fn insert(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
state_root: Hash256,
) {
let slot_map = self.blocks.entry((block_root, payload_status)).or_default();
slot_map.slots.insert(slot, state_root);
}
@@ -444,8 +461,12 @@ impl BlockMap {
});
}
fn delete_block_states(&mut self, block_root: &Hash256) -> Option<SlotMap> {
self.blocks.remove(block_root)
fn delete_block_states(&mut self, block_root: &Hash256) -> (Option<SlotMap>, Option<SlotMap>) {
let pending_state_roots = self
.blocks
.remove(&(*block_root, StatePayloadStatus::Pending));
let full_state_roots = self.blocks.remove(&(*block_root, StatePayloadStatus::Full));
(pending_state_roots, full_state_roots)
}
}