Add db boilerplate for payload envelope

This commit is contained in:
Eitan Seri- Levi
2026-01-28 18:26:56 -08:00
parent f7b5c7ee3f
commit 3df2cf8f7e
7 changed files with 186 additions and 3 deletions

View File

@@ -1295,6 +1295,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(self.store.get_blinded_block(block_root)?) Ok(self.store.get_blinded_block(block_root)?)
} }
pub fn get_payload(
&self,
block_root: &Hash256,
) -> Result<Option<SignedExecutionPayloadEnvelope<T::EthSpec>>, Error> {
Ok(self.store.get_payload_envelope(block_root)?)
}
/// Return the status of a block as it progresses through the various caches of the beacon /// Return the status of a block as it progresses through the various caches of the beacon
/// chain. Used by sync to learn the status of a block and prevent repeated downloads / /// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts. /// processing attempts.

View File

@@ -635,9 +635,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.hierarchy .hierarchy
.closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot()?); .closest_layer_points(new_finalized_slot, store.hot_hdiff_start_slot()?);
// We don't know which blocks are shared among abandoned chains, so we buffer and delete // We don't know which blocks/payloads are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop. // everything in one fell swoop.
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new(); let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
let mut payloads_to_prune: HashSet<Hash256> = HashSet::new();
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new(); let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
let mut kept_summaries_for_hdiff = vec![]; let mut kept_summaries_for_hdiff = vec![];
@@ -728,6 +729,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if should_prune { if should_prune {
blocks_to_prune.insert(block_root); blocks_to_prune.insert(block_root);
payloads_to_prune.insert(block_root);
} }
} }
@@ -748,6 +750,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
state_summaries_dag_roots = ?state_summaries_dag_roots, state_summaries_dag_roots = ?state_summaries_dag_roots,
finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(), finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(),
blocks_to_prune = blocks_to_prune.len(), blocks_to_prune = blocks_to_prune.len(),
payloads_to_prune = payloads_to_prune.len(),
states_to_prune = states_to_prune.len(), states_to_prune = states_to_prune.len(),
"Extra pruning information" "Extra pruning information"
); );
@@ -773,6 +776,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
StoreOp::DeleteBlock(block_root), StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root), StoreOp::DeleteExecutionPayload(block_root),
StoreOp::DeleteBlobs(block_root), StoreOp::DeleteBlobs(block_root),
StoreOp::DeletePayloadEnvelope(block_root),
StoreOp::DeleteSyncCommitteeBranch(block_root), StoreOp::DeleteSyncCommitteeBranch(block_root),
] ]
}) })

View File

@@ -93,6 +93,7 @@ struct BlockCache<E: EthSpec> {
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>, block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
blob_cache: LruCache<Hash256, BlobSidecarList<E>>, blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>, data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
payload_envelope_cache: LruCache<Hash256, SignedExecutionPayloadEnvelope<E>>,
data_column_custody_info_cache: Option<DataColumnCustodyInfo>, data_column_custody_info_cache: Option<DataColumnCustodyInfo>,
} }
@@ -102,6 +103,7 @@ impl<E: EthSpec> BlockCache<E> {
block_cache: LruCache::new(size), block_cache: LruCache::new(size),
blob_cache: LruCache::new(size), blob_cache: LruCache::new(size),
data_column_cache: LruCache::new(size), data_column_cache: LruCache::new(size),
payload_envelope_cache: LruCache::new(size),
data_column_custody_info_cache: None, data_column_custody_info_cache: None,
} }
} }
@@ -116,6 +118,14 @@ impl<E: EthSpec> BlockCache<E> {
.get_or_insert_mut(block_root, Default::default) .get_or_insert_mut(block_root, Default::default)
.insert(*data_column.index(), data_column); .insert(*data_column.index(), data_column);
} }
pub fn put_payload_envelope(
&mut self,
block_root: Hash256,
payload_envelope: SignedExecutionPayloadEnvelope<E>,
) {
self.payload_envelope_cache
.put(block_root, payload_envelope);
}
pub fn put_data_column_custody_info( pub fn put_data_column_custody_info(
&mut self, &mut self,
data_column_custody_info: Option<DataColumnCustodyInfo>, data_column_custody_info: Option<DataColumnCustodyInfo>,
@@ -139,6 +149,12 @@ impl<E: EthSpec> BlockCache<E> {
.get(block_root) .get(block_root)
.and_then(|map| map.get(column_index).cloned()) .and_then(|map| map.get(column_index).cloned())
} }
pub fn get_payload_envelope<'a>(
&'a mut self,
block_root: &Hash256,
) -> Option<&'a SignedExecutionPayloadEnvelope<E>> {
self.payload_envelope_cache.get(block_root)
}
pub fn get_data_column_custody_info(&self) -> Option<DataColumnCustodyInfo> { pub fn get_data_column_custody_info(&self) -> Option<DataColumnCustodyInfo> {
self.data_column_custody_info_cache.clone() self.data_column_custody_info_cache.clone()
} }
@@ -151,10 +167,14 @@ impl<E: EthSpec> BlockCache<E> {
pub fn delete_data_columns(&mut self, block_root: &Hash256) { pub fn delete_data_columns(&mut self, block_root: &Hash256) {
let _ = self.data_column_cache.pop(block_root); let _ = self.data_column_cache.pop(block_root);
} }
pub fn delete_payload_envelope(&mut self, block_root: &Hash256) {
let _ = self.payload_envelope_cache.pop(block_root);
}
pub fn delete(&mut self, block_root: &Hash256) { pub fn delete(&mut self, block_root: &Hash256) {
self.delete_block(block_root); self.delete_block(block_root);
self.delete_blobs(block_root); self.delete_blobs(block_root);
self.delete_data_columns(block_root); self.delete_data_columns(block_root);
self.delete_payload_envelope(block_root);
} }
} }
@@ -508,6 +528,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&metrics::STORE_BEACON_BLOB_CACHE_SIZE, &metrics::STORE_BEACON_BLOB_CACHE_SIZE,
cache.blob_cache.len() as i64, cache.blob_cache.len() as i64,
); );
metrics::set_gauge(
&metrics::STORE_BEACON_PAYLOAD_ENVELOPE_CACHE_SIZE,
cache.payload_envelope_cache.len() as i64,
);
} }
let state_cache = self.state_cache.lock(); let state_cache = self.state_cache.lock();
metrics::set_gauge( metrics::set_gauge(
@@ -745,6 +769,57 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
pub fn get_payload_envelope(
&self,
block_root: &Hash256,
) -> Result<Option<SignedExecutionPayloadEnvelope<E>>, Error> {
// Check the cache.
if let Some(envelope) = self
.block_cache
.as_ref()
.and_then(|cache| cache.lock().get_payload_envelope(block_root).cloned())
{
metrics::inc_counter(&metrics::BEACON_PAYLOAD_ENVELOPE_CACHE_HIT_COUNT);
return Ok(Some(envelope));
}
let key = block_root.as_slice();
match self
.hot_db
.get_bytes(SignedExecutionPayloadEnvelope::<E>::db_column(), key)?
{
Some(bytes) => {
let envelope = SignedExecutionPayloadEnvelope::from_ssz_bytes(&bytes)?;
self.block_cache.as_ref().inspect(|cache| {
cache
.lock()
.put_payload_envelope(*block_root, envelope.clone())
});
Ok(Some(envelope))
}
None => Ok(None),
}
}
/// Check if the payload envelope for a block exists on disk or in cache.
pub fn payload_envelope_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
// Check the cache first.
if self
.block_cache
.as_ref()
.and_then(|cache| cache.lock().get_payload_envelope(block_root).cloned())
.is_some()
{
return Ok(true);
}
self.hot_db.key_exists(
SignedExecutionPayloadEnvelope::<E>::db_column(),
block_root.as_slice(),
)
}
/// Load the execution payload for a block from disk. /// Load the execution payload for a block from disk.
/// This method deserializes with the proper fork. /// This method deserializes with the proper fork.
pub fn get_execution_payload( pub fn get_execution_payload(
@@ -1027,6 +1102,39 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
// TODO(gloas) we should store the execution payload separately like we do for blocks.
/// Prepare a signed execution payload envelope for storage in the database.
pub fn payload_envelope_as_kv_store_ops(
&self,
key: &Hash256,
payload: &SignedExecutionPayloadEnvelope<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
ops.push(KeyValueStoreOp::PutKeyValue(
SignedExecutionPayloadEnvelope::<E>::db_column(),
key.as_slice().into(),
payload.as_ssz_bytes(),
));
}
pub fn put_payload_envelope(
&self,
block_root: &Hash256,
payload_envelope: SignedExecutionPayloadEnvelope<E>,
) -> Result<(), Error> {
self.hot_db.put_bytes(
SignedExecutionPayloadEnvelope::<E>::db_column(),
block_root.as_slice(),
&payload_envelope.as_ssz_bytes(),
)?;
self.block_cache.as_ref().inspect(|cache| {
cache
.lock()
.put_payload_envelope(*block_root, payload_envelope)
});
Ok(())
}
/// Store a state in the store. /// Store a state in the store.
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> { pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
let mut ops: Vec<KeyValueStoreOp> = Vec::new(); let mut ops: Vec<KeyValueStoreOp> = Vec::new();
@@ -1283,6 +1391,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
); );
} }
StoreOp::PutPayloadEnvelope(block_root, payload_envelope) => {
self.payload_envelope_as_kv_store_ops(
&block_root,
&payload_envelope,
&mut key_value_batch,
);
}
StoreOp::PutStateSummary(state_root, summary) => { StoreOp::PutStateSummary(state_root, summary) => {
key_value_batch.push(summary.as_kv_store_op(state_root)); key_value_batch.push(summary.as_kv_store_op(state_root));
} }
@@ -1309,6 +1425,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} }
} }
StoreOp::DeletePayloadEnvelope(block_root) => {
key_value_batch.push(KeyValueStoreOp::DeleteKey(
SignedExecutionPayloadEnvelope::<E>::db_column(),
block_root.as_slice().to_vec(),
))
}
StoreOp::DeleteState(state_root, slot) => { StoreOp::DeleteState(state_root, slot) => {
// Delete the hot state summary. // Delete the hot state summary.
key_value_batch.push(KeyValueStoreOp::DeleteKey( key_value_batch.push(KeyValueStoreOp::DeleteKey(
@@ -1528,6 +1651,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutDataColumns(_, _) => (), StoreOp::PutDataColumns(_, _) => (),
StoreOp::PutPayloadEnvelope(block_root, payload_envelope) => {
guard.put_payload_envelope(block_root, (*payload_envelope).clone());
}
StoreOp::PutState(_, _) => (), StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (), StoreOp::PutStateSummary(_, _) => (),
@@ -1536,6 +1663,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
guard.delete_block(&block_root); guard.delete_block(&block_root);
} }
StoreOp::DeletePayloadEnvelope(block_root) => {
guard.delete_payload_envelope(&block_root);
}
StoreOp::DeleteState(_, _) => (), StoreOp::DeleteState(_, _) => (),
StoreOp::DeleteBlobs(_) => (), StoreOp::DeleteBlobs(_) => (),

View File

@@ -1 +1,2 @@
pub mod execution_payload; pub mod execution_payload;
mod signed_execution_payload_envelope;

View File

@@ -0,0 +1,18 @@
use ssz::{Decode, Encode};
use types::{EthSpec, SignedExecutionPayloadEnvelope};
use crate::{DBColumn, Error, StoreItem};
impl<E: EthSpec> StoreItem for SignedExecutionPayloadEnvelope<E> {
fn db_column() -> DBColumn {
DBColumn::PayloadEnvelope
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@@ -234,12 +234,14 @@ pub enum StoreOp<'a, E: EthSpec> {
PutState(Hash256, &'a BeaconState<E>), PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, BlobSidecarList<E>), PutBlobs(Hash256, BlobSidecarList<E>),
PutDataColumns(Hash256, DataColumnSidecarList<E>), PutDataColumns(Hash256, DataColumnSidecarList<E>),
PutPayloadEnvelope(Hash256, Arc<SignedExecutionPayloadEnvelope<E>>),
PutStateSummary(Hash256, HotStateSummary), PutStateSummary(Hash256, HotStateSummary),
DeleteBlock(Hash256), DeleteBlock(Hash256),
DeleteBlobs(Hash256), DeleteBlobs(Hash256),
DeleteDataColumns(Hash256, Vec<ColumnIndex>, ForkName), DeleteDataColumns(Hash256, Vec<ColumnIndex>, ForkName),
DeleteState(Hash256, Option<Slot>), DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256), DeleteExecutionPayload(Hash256),
DeletePayloadEnvelope(Hash256),
DeleteSyncCommitteeBranch(Hash256), DeleteSyncCommitteeBranch(Hash256),
KeyValueOp(KeyValueStoreOp), KeyValueOp(KeyValueStoreOp),
} }
@@ -307,9 +309,14 @@ pub enum DBColumn {
/// non-temporary by the deletion of their state root from this column. /// non-temporary by the deletion of their state root from this column.
#[strum(serialize = "bst")] #[strum(serialize = "bst")]
BeaconStateTemporary, BeaconStateTemporary,
/// Execution payloads for blocks more recent than the finalized checkpoint. /// Pre-gloas execution payloads for blocks more recent than the finalized checkpoint.
#[strum(serialize = "exp")] #[strum(serialize = "exp")]
ExecPayload, ExecPayload,
// TODO(gloas) once finalized envelope pruning is implemented this comment should be updated
// "Post-gloas execution payload envlopes for payloads more recent than the finalized checkpoint"
/// Post-gloas execution payload envelopes.
#[strum(serialize = "pay")]
PayloadEnvelope,
/// For persisting in-memory state to the database. /// For persisting in-memory state to the database.
#[strum(serialize = "bch")] #[strum(serialize = "bch")]
BeaconChain, BeaconChain,
@@ -421,7 +428,8 @@ impl DBColumn {
| Self::BeaconRestorePoint | Self::BeaconRestorePoint
| Self::DhtEnrs | Self::DhtEnrs
| Self::CustodyContext | Self::CustodyContext
| Self::OptimisticTransitionBlock => 32, | Self::OptimisticTransitionBlock
| Self::PayloadEnvelope => 32,
Self::BeaconBlockRoots Self::BeaconBlockRoots
| Self::BeaconDataColumnCustodyInfo | Self::BeaconDataColumnCustodyInfo
| Self::BeaconBlockRootsChunked | Self::BeaconBlockRootsChunked

View File

@@ -251,6 +251,13 @@ pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> = LazyLock
"Number of hits to the store's blob cache", "Number of hits to the store's blob cache",
) )
}); });
pub static BEACON_PAYLOAD_ENVELOPE_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"store_beacon_payload_envelope_cache_hit_total",
"Number of hits to the store's payload envelope cache",
)
});
pub static STORE_BEACON_BLOCK_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| { pub static STORE_BEACON_BLOCK_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge( try_create_int_gauge(
"store_beacon_block_cache_size", "store_beacon_block_cache_size",
@@ -263,6 +270,13 @@ pub static STORE_BEACON_BLOB_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::
"Current count of items in beacon store blob cache", "Current count of items in beacon store blob cache",
) )
}); });
pub static STORE_BEACON_PAYLOAD_ENVELOPE_CACHE_SIZE: LazyLock<Result<IntGauge>> =
LazyLock::new(|| {
try_create_int_gauge(
"store_beacon_payload_envelope_cache_size",
"Current count of items in beacon store payload envelope cache",
)
});
pub static STORE_BEACON_STATE_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| { pub static STORE_BEACON_STATE_CACHE_SIZE: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge( try_create_int_gauge(
"store_beacon_state_cache_size", "store_beacon_state_cache_size",