Connect up DB replay_blocks/load_blocks

This commit is contained in:
Michael Sproul
2026-02-23 15:43:19 +11:00
parent a959c5f640
commit afc6fb137c
6 changed files with 99 additions and 27 deletions

View File

@@ -186,6 +186,7 @@ pub enum HotColdDBError {
MissingHotHDiff(Hash256),
MissingHDiff(Slot),
MissingExecutionPayload(Hash256),
MissingExecutionPayloadEnvelope(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
MissingFrozenBlockSlot(Hash256),
@@ -2020,7 +2021,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(base_state);
}
let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let (blocks, envelopes) =
self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
@@ -2053,6 +2055,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.replay_blocks(
base_state,
blocks,
envelopes,
slot,
no_state_root_iter(),
Some(Box::new(state_cache_hook)),
@@ -2357,6 +2360,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
let blocks = self.load_cold_blocks(base_state.slot() + 1, slot)?;
// TODO(gloas): load payload envelopes
let envelopes = vec![];
// Include state root for base state as it is required by block processing to not
// have to hash the state.
@@ -2365,7 +2370,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.forwards_state_roots_iterator_until(base_state.slot(), slot, || {
Err(Error::StateShouldNotBeRequired(slot))
})?;
let state = self.replay_blocks(base_state, blocks, slot, Some(state_root_iter), None)?;
let state = self.replay_blocks(
base_state,
blocks,
envelopes,
slot,
Some(state_root_iter),
None,
)?;
debug!(
target_slot = %slot,
replay_time_ms = metrics::stop_timer_with_duration(replay_timer).as_millis(),
@@ -2480,18 +2492,31 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})?
}
/// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`.
/// Load the blocks & envelopes between `start_slot` and `end_slot` by backtracking from
/// `end_block_root`.
///
/// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot
/// equal to `start_slot`, to reach a state with slot equal to `end_slot`.
///
/// Payloads are also returned in slot-ascending order, but only payloads forming part of
/// the chain are loaded (payloads for EMPTY slots are omitted). Prior to Gloas, an empty
/// vec of payloads will be returned.
// TODO(gloas): handle last payload
#[allow(clippy::type_complexity)]
pub fn load_blocks_to_replay(
&self,
start_slot: Slot,
end_slot: Slot,
end_block_hash: Hash256,
) -> Result<Vec<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
end_block_root: Hash256,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_HOT_BLOCKS_TIME);
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
let mut blocks = ParentRootBlockIterator::new(self, end_block_root)
.map(|result| result.map(|(_, block)| block))
// Include the block at the end slot (if any), it needs to be
// replayed in order to construct the canonical state at `end_slot`.
@@ -2518,7 +2543,35 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
.collect::<Result<Vec<_>, _>>()?;
blocks.reverse();
Ok(blocks)
// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
// Load envelopes.
let mut envelopes = vec![];
for (block, next_block) in blocks.iter().tuple_windows() {
if block.fork_name_unchecked().gloas_enabled() {
// Check next block to see if this block's payload is canonical on this chain.
let block_hash = block.payload_bid_block_hash()?;
if !next_block.is_parent_block_full(block_hash) {
// No payload at this slot (empty), nothing to load.
continue;
}
// Using `parent_root` avoids computation.
let block_root = next_block.parent_root();
let envelope = self
.get_payload_envelope(&block_root)?
.ok_or(HotColdDBError::MissingExecutionPayloadEnvelope(block_root))?;
envelopes.push(envelope);
}
}
Ok((blocks, envelopes))
}
/// Replay `blocks` on top of `state` until `target_slot` is reached.
@@ -2528,7 +2581,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
blocks: Vec<SignedBlindedBeaconBlock<E>>,
envelopes: Vec<SignedExecutionPayloadEnvelope<E>>,
target_slot: Slot,
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
@@ -2548,9 +2602,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block_replayer = block_replayer.pre_slot_hook(pre_slot_hook);
}
// TODO(gloas): plumb through payloads here
block_replayer
.apply_blocks(blocks, vec![], Some(target_slot))
.apply_blocks(blocks, envelopes, Some(target_slot))
.map(|block_replayer| {
if have_state_root_iterator && block_replayer.state_root_miss() {
warn!(