Make state cache payload status aware

This commit is contained in:
Michael Sproul
2026-02-25 13:21:48 +11:00
parent a09839df1f
commit 984f0d70e0
13 changed files with 181 additions and 36 deletions

View File

@@ -2041,7 +2041,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} else {
let (advanced_state_root, mut state) = self
.store
.get_advanced_hot_state(beacon_block_root, request_slot, beacon_state_root)?
.get_advanced_hot_state(
beacon_block_root,
StatePayloadStatus::Pending,
request_slot,
beacon_state_root,
)?
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
if state.current_epoch() < request_epoch {
partial_state_advance(
@@ -4710,7 +4715,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or(Error::MissingBeaconBlock(parent_block_root))?;
let (state_root, state) = self
.store
.get_advanced_hot_state(parent_block_root, proposal_slot, block.state_root())?
.get_advanced_hot_state(
parent_block_root,
StatePayloadStatus::Pending,
proposal_slot,
block.state_root(),
)?
.ok_or(Error::MissingBeaconState(block.state_root()))?;
(Cow::Owned(state), state_root)
};
@@ -6701,7 +6711,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
} else {
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.get_advanced_hot_state(
head_block_root,
StatePayloadStatus::Pending,
target_slot,
head_block.state_root,
)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};

View File

@@ -20,6 +20,7 @@ use tree_hash::TreeHash;
use types::data::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
StatePayloadStatus,
};
/// An error occurred while validating a gossip blob.
@@ -510,7 +511,12 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes, O: ObservationStrat
);
chain
.store
.get_advanced_hot_state(block_parent_root, blob_slot, parent_block.state_root)
.get_advanced_hot_state(
block_parent_root,
StatePayloadStatus::Pending,
blob_slot,
parent_block.state_root,
)
.map_err(|e| GossipBlobError::BeaconChainError(Box::new(e.into())))?
.ok_or_else(|| {
GossipBlobError::BeaconChainError(Box::new(BeaconChainError::DBInconsistent(

View File

@@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use proto_array::ProposerHeadError;
use slot_clock::SlotClock;
use tracing::{debug, error, info, instrument, warn};
use types::{BeaconState, Hash256, Slot};
use types::{BeaconState, Hash256, Slot, StatePayloadStatus};
use crate::{
BeaconChain, BeaconChainTypes, BlockProductionError, StateSkipConfig,
@@ -51,7 +51,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// state cache thanks to the state advance timer.
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, slot, head_state_root)
.get_advanced_hot_state(
head_block_root,
StatePayloadStatus::Pending,
slot,
head_state_root,
)
.map_err(BlockProductionError::FailedToLoadState)?
.ok_or(BlockProductionError::UnableToProduceAtSlot(slot))?;
(state, Some(state_root))
@@ -204,7 +209,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (state_root, state) = self
.store
.get_advanced_hot_state_from_cache(re_org_parent_block, slot)
.get_advanced_hot_state_from_cache(
re_org_parent_block,
StatePayloadStatus::Pending,
slot,
)
.or_else(|| {
warn!(reason = "no state in cache", "Not attempting re-org");
None

View File

@@ -100,7 +100,8 @@ use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument}
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, KzgProofs,
RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, data::DataColumnSidecarError,
RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, StatePayloadStatus,
data::DataColumnSidecarError,
};
pub const POS_PANDA_BANNER: &str = r#"
@@ -1992,9 +1993,16 @@ fn load_parent<T: BeaconChainTypes, B: AsBlock<T::EthSpec>>(
// Retrieve any state that is advanced through to at most `block.slot()`: this is
// particularly important if `block` descends from the finalized/split block, but at a slot
// prior to the finalized slot (which is invalid and inaccessible in our DB schema).
// TODO(gloas): use correct payload_status based on block
let payload_status = StatePayloadStatus::Pending;
let (parent_state_root, state) = chain
.store
.get_advanced_hot_state(root, block.slot(), parent_block.state_root())?
.get_advanced_hot_state(
root,
payload_status,
block.slot(),
parent_block.state_root(),
)?
.ok_or_else(|| {
BeaconChainError::DBInconsistent(
format!("Missing state for parent block {root:?}",),

View File

@@ -45,7 +45,7 @@ use tree_hash::TreeHash;
use types::data::CustodyIndex;
use types::{
BeaconBlock, BeaconState, BlobSidecarList, ChainSpec, ColumnIndex, DataColumnSidecarList,
Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, StatePayloadStatus,
};
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -783,8 +783,16 @@ where
.map_err(|e| descriptive_db_error("head block", &e))?
.ok_or("Head block not found in store")?;
// TODO(gloas): update head loading to load Full block once fork choice works
let payload_status = StatePayloadStatus::Pending;
let (_head_state_root, head_state) = store
.get_advanced_hot_state(head_block_root, current_slot, head_block.state_root())
.get_advanced_hot_state(
head_block_root,
payload_status,
current_slot,
head_block.state_root(),
)
.map_err(|e| descriptive_db_error("head state", &e))?
.ok_or("Head state not found in store")?;

View File

@@ -305,8 +305,16 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
.get_full_block(&beacon_block_root)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let current_slot = fork_choice.fc_store().get_current_slot();
// TODO(gloas): pass a better payload status once fork choice is implemented
let payload_status = StatePayloadStatus::Pending;
let (_, beacon_state) = store
.get_advanced_hot_state(beacon_block_root, current_slot, beacon_block.state_root())?
.get_advanced_hot_state(
beacon_block_root,
payload_status,
current_slot,
beacon_block.state_root(),
)?
.ok_or(Error::MissingBeaconState(beacon_block.state_root()))?;
let snapshot = BeaconSnapshot {
@@ -673,10 +681,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_full_block(&new_view.head_block_root)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;
// TODO(gloas): update once we have fork choice
let payload_status = StatePayloadStatus::Pending;
let (_, beacon_state) = self
.store
.get_advanced_hot_state(
new_view.head_block_root,
payload_status,
current_slot,
beacon_block.state_root(),
)?

View File

@@ -20,7 +20,7 @@ use tracing::{debug, instrument};
use types::data::ColumnIndex;
use types::{
BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId,
EthSpec, Hash256, Slot,
EthSpec, Hash256, Slot, StatePayloadStatus,
};
/// An error occurred while validating a gossip data column.
@@ -708,7 +708,12 @@ fn verify_proposer_and_signature<T: BeaconChainTypes>(
);
chain
.store
.get_advanced_hot_state(block_parent_root, column_slot, parent_block.state_root)
.get_advanced_hot_state(
block_parent_root,
StatePayloadStatus::Pending,
column_slot,
parent_block.state_root,
)
.map_err(|e| GossipDataColumnError::BeaconChainError(Box::new(e.into())))?
.ok_or_else(|| {
GossipDataColumnError::BeaconChainError(Box::new(

View File

@@ -26,7 +26,10 @@ use std::sync::{
use task_executor::TaskExecutor;
use tokio::time::{Instant, sleep, sleep_until};
use tracing::{Instrument, debug, debug_span, error, instrument, warn};
use types::{AttestationShufflingId, BeaconStateError, EthSpec, Hash256, RelativeEpoch, Slot};
use types::{
AttestationShufflingId, BeaconStateError, EthSpec, Hash256, RelativeEpoch, Slot,
StatePayloadStatus,
};
/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform
/// the state advancement.
@@ -277,9 +280,16 @@ fn advance_head<T: BeaconChainTypes>(beacon_chain: &Arc<BeaconChain<T>>) -> Resu
(snapshot.beacon_block_root, snapshot.beacon_state_root())
};
// TODO(gloas): do better once we have fork choice
let payload_status = StatePayloadStatus::Pending;
let (head_state_root, mut state) = beacon_chain
.store
.get_advanced_hot_state(head_block_root, current_slot, head_block_state_root)?
.get_advanced_hot_state(
head_block_root,
payload_status,
current_slot,
head_block_state_root,
)?
.ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?;
let initial_slot = state.slot();

View File

@@ -27,7 +27,7 @@ use bls::{
use eth2::types::{GraffitiPolicy, SignedBlockContentsTuple};
use execution_layer::test_utils::generate_genesis_header;
use execution_layer::{
ExecutionLayer,
ExecutionLayer, NewPayloadRequest, NewPayloadRequestGloas,
auth::JwtKey,
test_utils::{
DEFAULT_JWT_SECRET, DEFAULT_TERMINAL_BLOCK, ExecutionBlockGenerator, MockBuilder,
@@ -53,6 +53,7 @@ use sensitive_url::SensitiveUrl;
use slot_clock::{SlotClock, TestingSlotClock};
use ssz_types::{RuntimeVariableList, VariableList};
use state_processing::per_block_processing::compute_timestamp_at_slot;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use state_processing::state_advance::complete_state_advance;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
@@ -2559,7 +2560,7 @@ where
}
/// Process an execution payload envelope for a Gloas block.
pub fn process_envelope(
pub async fn process_envelope(
&self,
block_root: Hash256,
signed_envelope: SignedExecutionPayloadEnvelope<E>,
@@ -2585,6 +2586,42 @@ where
)
.expect("should process envelope");
// Notify the EL of the new payload so forkchoiceUpdated can reference it.
let block = self
.chain
.store
.get_blinded_block(&block_root)
.expect("should read block from store")
.expect("block should exist in store");
let bid = &block
.message()
.body()
.signed_execution_payload_bid()
.expect("Gloas block should have a payload bid")
.message;
let versioned_hashes = bid
.blob_kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect();
let request = NewPayloadRequest::Gloas(NewPayloadRequestGloas {
execution_payload: &signed_envelope.message.payload,
versioned_hashes,
parent_beacon_block_root: block.message().parent_root(),
execution_requests: &signed_envelope.message.execution_requests,
});
self.chain
.execution_layer
.as_ref()
.expect("harness should have execution layer")
.notify_new_payload(request)
.await
.expect("newPayload should succeed");
// Store the envelope.
self.chain
.store

View File

@@ -3803,7 +3803,12 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
let (split_state_root, mut advanced_split_state) = harness
.chain
.store
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)
.get_advanced_hot_state(
split.block_root,
StatePayloadStatus::Pending,
split.slot,
split.state_root,
)
.unwrap()
.unwrap();
complete_state_advance(
@@ -5472,7 +5477,9 @@ async fn test_gloas_block_and_envelope_storage() {
// Process the envelope.
let envelope = envelope.expect("Gloas block should have envelope");
let mut full_state = pending_state.clone();
let full_state_root = harness.process_envelope(block_root, envelope, &mut full_state);
let full_state_root = harness
.process_envelope(block_root, envelope, &mut full_state)
.await;
full_state_roots.push(full_state_root);
block_roots.push(block_root);
@@ -5574,7 +5581,9 @@ async fn test_gloas_state_payload_status() {
// Process the envelope and verify the full state has correct payload status.
let envelope = envelope.expect("Gloas block should have envelope");
let mut full_state = pending_state;
let full_state_root = harness.process_envelope(block_root, envelope, &mut full_state);
let full_state_root = harness
.process_envelope(block_root, envelope, &mut full_state)
.await;
assert_eq!(
full_state.payload_status_with_skipped_pending(),
@@ -5636,7 +5645,9 @@ async fn test_gloas_block_replay_with_envelopes() {
let envelope = envelope.expect("Gloas block should have envelope");
let mut full_state = pending_state;
let full_state_root = harness.process_envelope(block_root, envelope, &mut full_state);
let full_state_root = harness
.process_envelope(block_root, envelope, &mut full_state)
.await;
full_states.insert(slot, (full_state_root, full_state.clone()));
last_block_root = block_root;
@@ -5775,7 +5786,9 @@ async fn test_gloas_hot_state_hierarchy() {
let envelope = envelope.expect("Gloas block should have envelope");
let mut full_state = pending_state;
harness.process_envelope(block_root, envelope, &mut full_state);
harness
.process_envelope(block_root, envelope, &mut full_state)
.await;
last_block_root = block_root;
state = full_state;

View File

@@ -1144,10 +1144,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) = self.get_advanced_hot_state_from_cache(block_root, max_slot) {
if let Some(cached) =
self.get_advanced_hot_state_from_cache(block_root, payload_status, max_slot)
{
return Ok(Some(cached));
}
@@ -1169,7 +1172,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.into());
}
let state_root = if block_root == split.block_root && split.slot <= max_slot {
// Split state should always be `Pending`.
let state_root = if block_root == split.block_root
&& let StatePayloadStatus::Pending = payload_status
&& split.slot <= max_slot
{
split.state_root
} else {
state_root
@@ -1216,11 +1223,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_advanced_hot_state_from_cache(
&self,
block_root: Hash256,
payload_status: StatePayloadStatus,
max_slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
self.state_cache
.lock()
.get_by_block_root(block_root, max_slot)
.get_by_block_root(block_root, payload_status, max_slot)
}
/// Delete a state, ensuring it is removed from the LRU cache, as well as from on-disk.

View File

@@ -7,7 +7,7 @@ use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use tracing::instrument;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot};
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot, execution::StatePayloadStatus};
/// Fraction of the LRU cache to leave intact during culling.
const CULL_EXEMPT_NUMERATOR: usize = 1;
@@ -23,10 +23,10 @@ pub struct FinalizedState<E: EthSpec> {
state: BeaconState<E>,
}
/// Map from block_root -> slot -> state_root.
/// Map from (block_root, payload_status) -> slot -> state_root.
#[derive(Debug, Default)]
pub struct BlockMap {
blocks: HashMap<Hash256, SlotMap>,
blocks: HashMap<(Hash256, StatePayloadStatus), SlotMap>,
}
/// Map from slot -> state_root.
@@ -130,8 +130,11 @@ impl<E: EthSpec> StateCache<E> {
return Err(Error::FinalizedStateDecreasingSlot);
}
let payload_status = state.payload_status();
// Add to block map.
self.block_map.insert(block_root, state.slot(), state_root);
self.block_map
.insert(block_root, payload_status, state.slot(), state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(state.slot());
@@ -254,7 +257,9 @@ impl<E: EthSpec> StateCache<E> {
// Record the connection from block root and slot to this state.
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
let payload_status = state.payload_status();
self.block_map
.insert(block_root, payload_status, slot, state_root);
Ok(PutStateOutcome::New(deleted_states))
}
@@ -303,9 +308,10 @@ impl<E: EthSpec> StateCache<E> {
pub fn get_by_block_root(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
let slot_map = self.block_map.blocks.get(&block_root)?;
let slot_map = self.block_map.blocks.get(&(block_root, payload_status))?;
// Find the state at `slot`, or failing that the most recent ancestor.
let state_root = slot_map
@@ -399,8 +405,14 @@ impl<E: EthSpec> StateCache<E> {
}
impl BlockMap {
fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) {
let slot_map = self.blocks.entry(block_root).or_default();
fn insert(
&mut self,
block_root: Hash256,
payload_status: StatePayloadStatus,
slot: Slot,
state_root: Hash256,
) {
let slot_map = self.blocks.entry((block_root, payload_status)).or_default();
slot_map.slots.insert(slot, state_root);
}
@@ -432,7 +444,10 @@ impl BlockMap {
}
fn delete_block_states(&mut self, block_root: &Hash256) -> Option<SlotMap> {
self.blocks.remove(block_root)
// TODO(gloas): update return type
self.blocks
.remove(&(*block_root, StatePayloadStatus::Pending));
self.blocks.remove(&(*block_root, StatePayloadStatus::Full))
}
}

View File

@@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
///
/// Note that states at skipped slots could be either `Pending` or `Full`, depending on whether
/// the payload for the most-recently applied block was also applied.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StatePayloadStatus {
/// For states produced by `process_block` executed on a `BeaconBlock`.