Thread more payload status

This commit is contained in:
Michael Sproul
2026-02-24 15:33:43 +11:00
parent a3f31835ab
commit 295aaf982c
6 changed files with 125 additions and 131 deletions

View File

@@ -1,13 +1,7 @@
use crate::{BeaconForkChoiceStore, BeaconSnapshot};
use fork_choice::{ForkChoice, PayloadVerificationStatus};
use crate::BeaconForkChoiceStore;
use fork_choice::ForkChoice;
use itertools::process_results;
use state_processing::state_advance::complete_state_advance;
use state_processing::{
ConsensusContext, VerifyBlockRoot, per_block_processing,
per_block_processing::BlockSignatureStrategy,
};
use std::sync::Arc;
use std::time::Duration;
use store::{HotColdDB, ItemStore, iter::ParentRootBlockIterator};
use tracing::{info, warn};
use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot};
@@ -92,114 +86,11 @@ pub fn revert_to_fork_boundary<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>
/// chains other than the chain leading to `head_block_root`. It should only be used in extreme
/// circumstances when there is no better alternative.
pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
head_block_root: Hash256,
head_state: &BeaconState<E>,
store: Arc<HotColdDB<E, Hot, Cold>>,
current_slot: Option<Slot>,
spec: &ChainSpec,
_head_block_root: Hash256,
_head_state: &BeaconState<E>,
_store: Arc<HotColdDB<E, Hot, Cold>>,
_current_slot: Option<Slot>,
_spec: &ChainSpec,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
let finalized_block_root = finalized_checkpoint.root;
let finalized_block = store
.get_full_block(&finalized_block_root)
.map_err(|e| format!("Error loading finalized block: {:?}", e))?
.ok_or_else(|| {
format!(
"Finalized block missing for revert: {:?}",
finalized_block_root
)
})?;
// Advance finalized state to finalized epoch (to handle skipped slots).
let finalized_state_root = finalized_block.state_root();
// The enshrined finalized state should be in the state cache.
let mut finalized_state = store
.get_state(&finalized_state_root, Some(finalized_block.slot()), true)
.map_err(|e| format!("Error loading finalized state: {:?}", e))?
.ok_or_else(|| {
format!(
"Finalized block state missing from database: {:?}",
finalized_state_root
)
})?;
let finalized_slot = finalized_checkpoint.epoch.start_slot(E::slots_per_epoch());
complete_state_advance(
&mut finalized_state,
Some(finalized_state_root),
finalized_slot,
spec,
)
.map_err(|e| {
format!(
"Error advancing finalized state to finalized epoch: {:?}",
e
)
})?;
let finalized_snapshot = BeaconSnapshot {
beacon_block_root: finalized_block_root,
beacon_block: Arc::new(finalized_block),
beacon_state: finalized_state,
};
let fc_store =
BeaconForkChoiceStore::get_forkchoice_store(store.clone(), finalized_snapshot.clone())
.map_err(|e| format!("Unable to reset fork choice store for revert: {e:?}"))?;
let mut fork_choice = ForkChoice::from_anchor(
fc_store,
finalized_block_root,
&finalized_snapshot.beacon_block,
&finalized_snapshot.beacon_state,
current_slot,
spec,
)
.map_err(|e| format!("Unable to reset fork choice for revert: {:?}", e))?;
// Replay blocks from finalized checkpoint back to head.
// We do not replay attestations presently, relying on the absence of other blocks
// to guarantee `head_block_root` as the head.
// TODO(gloas): this code doesn't work anyway, could just delete all of it
let (blocks, _envelopes) = store
.load_blocks_to_replay(finalized_slot + 1, head_state.slot(), head_block_root)
.map_err(|e| format!("Error loading blocks to replay for fork choice: {:?}", e))?;
let mut state = finalized_snapshot.beacon_state;
for block in blocks {
complete_state_advance(&mut state, None, block.slot(), spec)
.map_err(|e| format!("State advance failed: {:?}", e))?;
let mut ctxt = ConsensusContext::new(block.slot())
.set_proposer_index(block.message().proposer_index());
per_block_processing(
&mut state,
&block,
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
&mut ctxt,
spec,
)
.map_err(|e| format!("Error replaying block: {:?}", e))?;
// Setting this to unverified is the safest solution, since we don't have a way to
// retro-actively determine if they were valid or not.
//
// This scenario is so rare that it seems OK to double-verify some blocks.
let payload_verification_status = PayloadVerificationStatus::Optimistic;
fork_choice
.on_block(
block.slot(),
block.message(),
block.canonical_root(),
// Reward proposer boost. We are reinforcing the canonical chain.
Duration::from_secs(0),
&state,
payload_verification_status,
spec,
)
.map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?;
}
Ok(fork_choice)
Err("broken".into())
}

View File

@@ -16,6 +16,7 @@ use store::{
use tracing::{debug, info, warn};
use types::{
BeaconState, CACHED_EPOCHS, ChainSpec, Checkpoint, CommitteeCache, EthSpec, Hash256, Slot,
execution::StatePayloadStatus,
};
/// We stopped using the pruning checkpoint in schema v23 but never explicitly deleted it.
@@ -58,6 +59,7 @@ pub fn get_state_v22<T: BeaconChainTypes>(
base_state,
summary.slot,
summary.latest_block_root,
StatePayloadStatus::Pending,
update_cache,
)
.map(Some)

View File

@@ -689,7 +689,12 @@ async fn block_replayer_hooks() {
.await;
let (blocks, envelopes) = store
.load_blocks_to_replay(Slot::new(0), max_slot, end_block_root.into())
.load_blocks_to_replay(
Slot::new(0),
max_slot,
end_block_root.into(),
StatePayloadStatus::Pending,
)
.unwrap();
let mut pre_slots = vec![];

View File

@@ -6,6 +6,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::{debug, warn};
use types::block::BlindedBeaconBlock;
use types::execution::StatePayloadStatus;
use types::new_non_zero_usize;
use warp_utils::reject::{beacon_state_error, custom_bad_request, unhandled_error};
@@ -34,7 +35,12 @@ pub fn get_block_rewards<T: BeaconChainTypes>(
let (blocks, envelopes) = chain
.store
.load_blocks_to_replay(start_slot, end_slot, end_block_root)
.load_blocks_to_replay(
start_slot,
end_slot,
end_block_root,
StatePayloadStatus::Pending,
)
.map_err(|e| unhandled_error(BeaconChainError::from(e)))?;
let state_root = chain

View File

@@ -1845,6 +1845,44 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Compute the `StatePayloadStatus` for a stored state based on its summary.
///
/// In future this might become a field of the summary, but this would require a whole DB
/// migration. For now we use an extra read from the DB to determine it.
fn get_hot_state_summary_payload_status(
&self,
summary: &HotStateSummary,
) -> Result<StatePayloadStatus, Error> {
// Treat pre-Gloas states as `Pending`.
if !self
.spec
.fork_name_at_slot::<E>(summary.slot)
.gloas_enabled()
{
return Ok(StatePayloadStatus::Pending);
}
// Treat genesis state as `Pending` (`BeaconBlock` state).
let previous_state_root = summary.previous_state_root;
if previous_state_root.is_zero() {
return Ok(StatePayloadStatus::Pending);
}
// Load the hot state summary for the previous state. If it has the same slot as this
// summary then we know this summary is for a `Full` block (payload state).
// NOTE: We treat any and all skipped-slot states as `Pending` by this definition, which is
// perhaps a bit strange (they could have a payload most-recently applied).
let previous_state_summary = self
.load_hot_state_summary(&previous_state_root)?
.ok_or(Error::MissingHotStateSummary(previous_state_root))?;
if previous_state_summary.slot == summary.slot {
Ok(StatePayloadStatus::Full)
} else {
Ok(StatePayloadStatus::Pending)
}
}
fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result<HDiffBuffer, Error> {
if let Some(buffer) = self
.state_cache
@@ -1940,12 +1978,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
if let Some(HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
}) = self.load_hot_state_summary(state_root)?
if let Some(
summary @ HotStateSummary {
slot,
latest_block_root,
diff_base_state,
..
},
) = self.load_hot_state_summary(state_root)?
{
let mut state = match self.hot_storage_strategy(slot)? {
strat @ StorageStrategy::Snapshot | strat @ StorageStrategy::DiffFrom(_) => {
@@ -1994,10 +2034,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.lock()
.rebase_on_finalized(&mut base_state, &self.spec)?;
let payload_status = self.get_hot_state_summary_payload_status(&summary)?;
self.load_hot_state_using_replay(
base_state,
slot,
latest_block_root,
payload_status,
update_cache,
)?
}
@@ -2015,20 +2058,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
base_state: BeaconState<E>,
slot: Slot,
latest_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
update_cache: bool,
) -> Result<BeaconState<E>, Error> {
if base_state.slot() == slot {
if base_state.slot() == slot && base_state.payload_status() == desired_payload_status {
return Ok(base_state);
}
let (blocks, envelopes) =
self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let (blocks, envelopes) = self.load_blocks_to_replay(
base_state.slot(),
slot,
latest_block_root,
desired_payload_status,
)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
// state that this state is based on. It may be useful as the basis of more states
// in the same epoch.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
// TODO(gloas): prevent caching of the payload_status=Full state?
if !update_cache || state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
@@ -2502,7 +2551,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
{
return Ok((blocks, vec![]));
}
let envelopes = self.load_payload_envelopes_for_blocks(&blocks)?;
// TODO(gloas): wire this up
let end_block_root = Hash256::ZERO;
let desired_payload_status = StatePayloadStatus::Pending;
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
}
@@ -2523,6 +2579,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
start_slot: Slot,
end_slot: Slot,
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
@@ -2566,7 +2623,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok((blocks, vec![]));
}
let envelopes = self.load_payload_envelopes_for_blocks(&blocks)?;
let envelopes = self.load_payload_envelopes_for_blocks(
&blocks,
end_block_root,
desired_payload_status,
)?;
Ok((blocks, envelopes))
}
@@ -2574,6 +2635,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn load_payload_envelopes_for_blocks(
&self,
blocks: &[SignedBlindedBeaconBlock<E>],
end_block_root: Hash256,
desired_payload_status: StatePayloadStatus,
) -> Result<Vec<SignedExecutionPayloadEnvelope<E>>, Error> {
let mut envelopes = vec![];
@@ -2593,6 +2656,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
envelopes.push(envelope);
}
}
// Load the payload for the last block if desired.
if let StatePayloadStatus::Full = desired_payload_status {
let envelope = self.get_payload_envelope(&end_block_root)?.ok_or(
HotColdDBError::MissingExecutionPayloadEnvelope(end_block_root),
)?;
envelopes.push(envelope);
}
Ok(envelopes)
}

View File

@@ -36,7 +36,7 @@ use crate::{
execution::{
Eth1Data, ExecutionPayloadHeaderBellatrix, ExecutionPayloadHeaderCapella,
ExecutionPayloadHeaderDeneb, ExecutionPayloadHeaderElectra, ExecutionPayloadHeaderFulu,
ExecutionPayloadHeaderRef, ExecutionPayloadHeaderRefMut,
ExecutionPayloadHeaderRef, ExecutionPayloadHeaderRefMut, StatePayloadStatus,
},
fork::{Fork, ForkName, ForkVersionDecode, InconsistentFork, map_fork_name},
light_client::consts::{
@@ -1265,6 +1265,24 @@ impl<E: EthSpec> BeaconState<E> {
}
}
/// Determine the payload status of this state.
///
/// Prior to Gloas this is always `Pending`.
///
/// Post-Gloas, the definition of the `StatePayloadStatus` is:
///
/// - `Full` if this state is the result of envelope processing.
/// - `Pending` if this state is the result of block processing.
pub fn payload_status(&self) -> StatePayloadStatus {
if !self.fork_name_unchecked().gloas_enabled() {
StatePayloadStatus::Pending
} else if self.is_parent_block_full() {
StatePayloadStatus::Full
} else {
StatePayloadStatus::Pending
}
}
/// Return `true` if the validator who produced `slot_signature` is eligible to aggregate.
///
/// Spec v0.12.1