Store changes to persist data columns (#6073)

* Store changes to persist data columns.

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>

* Update to use `eip7594_fork_epoch` for data column slot in Store.

* Fix formatting.

* Merge branch 'unstable' into data-columns-store

# Conflicts:
#	beacon_node/store/src/lib.rs
#	consensus/types/src/chain_spec.rs

* Minor refactor.

* Merge branch 'unstable' into data-columns-store

# Conflicts:
#	beacon_node/store/src/metrics.rs

* Init data colum info at PeerDAS epoch instead of Deneb fork epoch. Address review comments.

* Remove Deneb-related comments
This commit is contained in:
Jimmy Chen
2024-08-02 16:58:37 +10:00
committed by GitHub
parent d9f8b13e36
commit 0e96d4f105
7 changed files with 348 additions and 18 deletions

View File

@@ -27,6 +27,8 @@ pub enum Error {
AnchorInfoConcurrentMutation,
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
BlobInfoConcurrentMutation,
/// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied.
DataColumnInfoConcurrentMutation,
/// The block or state is unavailable due to weak subjectivity sync.
HistoryUnavailable,
/// State reconstruction cannot commence because not all historic blocks are known.

View File

@@ -12,12 +12,13 @@ use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion,
ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY,
STATE_UPPER_LIMIT_NO_RETAIN,
};
use crate::metrics;
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{get_data_column_key, metrics, parse_data_column_key};
use crate::{
get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
@@ -35,11 +36,13 @@ use state_processing::{
SlotProcessingError,
};
use std::cmp::min;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList};
use types::*;
/// On-disk database that stores finalized states efficiently.
@@ -57,6 +60,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
anchor_info: RwLock<Option<AnchorInfo>>,
/// The starting slots for the range of blobs stored in the database.
blob_info: RwLock<BlobInfo>,
/// The starting slots for the range of data columns stored in the database.
data_column_info: RwLock<DataColumnInfo>,
pub(crate) config: StoreConfig,
/// Cold database containing compact historical data.
pub cold_db: Cold,
@@ -86,6 +91,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
struct BlockCache<E: EthSpec> {
block_cache: LruCache<Hash256, SignedBeaconBlock<E>>,
blob_cache: LruCache<Hash256, BlobSidecarList<E>>,
data_column_cache: LruCache<Hash256, HashMap<ColumnIndex, Arc<DataColumnSidecar<E>>>>,
}
impl<E: EthSpec> BlockCache<E> {
@@ -93,6 +99,7 @@ impl<E: EthSpec> BlockCache<E> {
Self {
block_cache: LruCache::new(size),
blob_cache: LruCache::new(size),
data_column_cache: LruCache::new(size),
}
}
pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock<E>) {
@@ -101,12 +108,26 @@ impl<E: EthSpec> BlockCache<E> {
pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList<E>) {
self.blob_cache.put(block_root, blobs);
}
pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc<DataColumnSidecar<E>>) {
self.data_column_cache
.get_or_insert_mut(block_root, Default::default)
.insert(data_column.index, data_column);
}
pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock<E>> {
self.block_cache.get(block_root)
}
pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList<E>> {
self.blob_cache.get(block_root)
}
pub fn get_data_column<'a>(
&'a mut self,
block_root: &Hash256,
column_index: &ColumnIndex,
) -> Option<&'a Arc<DataColumnSidecar<E>>> {
self.data_column_cache
.get(block_root)
.and_then(|map| map.get(column_index))
}
pub fn delete_block(&mut self, block_root: &Hash256) {
let _ = self.block_cache.pop(block_root);
}
@@ -180,6 +201,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: MemoryStore::open(),
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
@@ -216,6 +238,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
split: RwLock::new(Split::default()),
anchor_info: RwLock::new(None),
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: LevelDB::open(cold_path)?,
blobs_db: LevelDB::open(blobs_db_path)?,
hot_db: LevelDB::open(hot_path)?,
@@ -294,11 +317,39 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
},
};
db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?;
let data_column_info = db.load_data_column_info()?;
let eip7594_fork_slot = db
.spec
.eip7594_fork_epoch
.map(|epoch| epoch.start_slot(E::slots_per_epoch()));
let new_data_column_info = match &data_column_info {
Some(data_column_info) => {
// Set the oldest data column slot to the fork slot if it is not yet set.
let oldest_data_column_slot = data_column_info
.oldest_data_column_slot
.or(eip7594_fork_slot);
DataColumnInfo {
oldest_data_column_slot,
}
}
// First start.
None => DataColumnInfo {
// Set the oldest data column slot to the fork slot if it is not yet set.
oldest_data_column_slot: eip7594_fork_slot,
},
};
db.compare_and_set_data_column_info_with_write(
<_>::default(),
new_data_column_info.clone(),
)?;
info!(
db.log,
"Blob DB initialized";
"path" => ?blobs_db_path,
"oldest_blob_slot" => ?new_blob_info.oldest_blob_slot,
"oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot,
);
// Ensure that the schema version of the on-disk database matches the software.
@@ -626,6 +677,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes()));
}
pub fn data_columns_as_kv_store_ops(
&self,
block_root: &Hash256,
data_columns: DataColumnSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
for data_column in data_columns {
let db_key = get_key_for_col(
DBColumn::BeaconDataColumn.into(),
&get_data_column_key(block_root, &data_column.index),
);
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
data_column.as_ssz_bytes(),
));
}
}
pub fn put_state_summary(
&self,
state_root: &Hash256,
@@ -909,6 +978,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch);
}
StoreOp::PutDataColumns(block_root, data_columns) => {
self.data_columns_as_kv_store_ops(
&block_root,
data_columns,
&mut key_value_batch,
);
}
StoreOp::PutStateSummary(state_root, summary) => {
key_value_batch.push(summary.as_kv_store_op(state_root));
}
@@ -933,6 +1010,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::DeleteDataColumns(block_root, column_indices) => {
for index in column_indices {
let key = get_key_for_col(
DBColumn::BeaconDataColumn.into(),
&get_data_column_key(&block_root, &index),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
}
StoreOp::DeleteState(state_root, slot) => {
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes());
@@ -963,9 +1050,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
batch: Vec<StoreOp<E>>,
) -> Result<(), Error> {
let mut blobs_to_delete = Vec::new();
let mut data_columns_to_delete = Vec::new();
let (blobs_ops, hot_db_ops): (Vec<StoreOp<E>>, Vec<StoreOp<E>>) =
batch.into_iter().partition(|store_op| match store_op {
StoreOp::PutBlobs(_, _) => true,
StoreOp::PutBlobs(_, _) | StoreOp::PutDataColumns(_, _) => true,
StoreOp::DeleteBlobs(block_root) => {
match self.get_blobs(block_root) {
Ok(Some(blob_sidecar_list)) => {
@@ -982,6 +1070,31 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
true
}
StoreOp::DeleteDataColumns(block_root, indices) => {
match indices
.iter()
.map(|index| self.get_data_column(block_root, index))
.collect::<Result<Vec<_>, _>>()
{
Ok(data_column_sidecar_list_opt) => {
let data_column_sidecar_list = data_column_sidecar_list_opt
.into_iter()
.flatten()
.collect::<Vec<_>>();
// Must push the same number of items as StoreOp::DeleteDataColumns items to
// prevent a `HotColdDBError::Rollback` error below in case of rollback
data_columns_to_delete.push((*block_root, data_column_sidecar_list));
}
Err(e) => {
error!(
self.log, "Error getting data columns";
"block_root" => %block_root,
"error" => ?e
);
}
}
true
}
StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false,
_ => false,
});
@@ -1013,10 +1126,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in blob_cache_ops.iter_mut() {
let reverse_op = match op {
StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root),
StoreOp::PutDataColumns(block_root, data_columns) => {
let indices = data_columns.iter().map(|c| c.index).collect();
StoreOp::DeleteDataColumns(*block_root, indices)
}
StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() {
Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs),
None => return Err(HotColdDBError::Rollback.into()),
},
StoreOp::DeleteDataColumns(_, _) => match data_columns_to_delete.pop() {
Some((block_root, data_columns)) => {
StoreOp::PutDataColumns(block_root, data_columns)
}
None => return Err(HotColdDBError::Rollback.into()),
},
_ => return Err(HotColdDBError::Rollback.into()),
};
*op = reverse_op;
@@ -1034,6 +1157,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutBlobs(_, _) => (),
StoreOp::PutDataColumns(_, _) => (),
StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (),
@@ -1053,6 +1178,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteBlobs(_) => (),
StoreOp::DeleteDataColumns(_, _) => (),
StoreOp::DeleteExecutionPayload(_) => (),
StoreOp::KeyValueOp(_) => (),
@@ -1552,6 +1679,45 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
/// Fetch all keys in the data_column column with prefix `block_root`
pub fn get_data_column_keys(&self, block_root: Hash256) -> Result<Vec<ColumnIndex>, Error> {
self.blobs_db
.iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_bytes())
.map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1)))
.collect()
}
/// Fetch a single data_column for a given block from the store.
pub fn get_data_column(
&self,
block_root: &Hash256,
column_index: &ColumnIndex,
) -> Result<Option<Arc<DataColumnSidecar<E>>>, Error> {
// Check the cache.
if let Some(data_column) = self
.block_cache
.lock()
.get_data_column(block_root, column_index)
{
metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT);
return Ok(Some(data_column.clone()));
}
match self.blobs_db.get_bytes(
DBColumn::BeaconDataColumn.into(),
&get_data_column_key(block_root, column_index),
)? {
Some(ref data_column_bytes) => {
let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?);
self.block_cache
.lock()
.put_data_column(*block_root, data_column.clone());
Ok(Some(data_column))
}
None => Ok(None),
}
}
/// Get a reference to the `ChainSpec` used by the database.
pub fn get_chain_spec(&self) -> &ChainSpec {
&self.spec
@@ -1748,6 +1914,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.blob_info.read_recursive().clone()
}
/// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint.
pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result<KeyValueStoreOp, Error> {
let oldest_data_column_slot = self.spec.eip7594_fork_epoch.map(|fork_epoch| {
std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch()))
});
let data_column_info = DataColumnInfo {
oldest_data_column_slot,
};
self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info)
}
/// Get a clone of the store's data column info.
///
/// To do mutations, use `compare_and_set_data_column_info`.
pub fn get_data_column_info(&self) -> DataColumnInfo {
self.data_column_info.read_recursive().clone()
}
/// Atomically update the blob info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
@@ -1793,6 +1977,54 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blob_info.as_kv_store_op(BLOB_INFO_KEY)
}
/// Atomically update the data column info from `prev_value` to `new_value`.
///
/// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other
/// values.
///
/// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided
/// is not correct.
pub fn compare_and_set_data_column_info(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<KeyValueStoreOp, Error> {
let mut data_column_info = self.data_column_info.write();
if *data_column_info == prev_value {
let kv_op = self.store_data_column_info_in_batch(&new_value);
*data_column_info = new_value;
Ok(kv_op)
} else {
Err(Error::DataColumnInfoConcurrentMutation)
}
}
/// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately.
pub fn compare_and_set_data_column_info_with_write(
&self,
prev_value: DataColumnInfo,
new_value: DataColumnInfo,
) -> Result<(), Error> {
let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?;
self.hot_db.do_atomically(vec![kv_store_op])
}
/// Load the blob info from disk, but do not set `self.data_column_info`.
fn load_data_column_info(&self) -> Result<Option<DataColumnInfo>, Error> {
self.hot_db.get(&DATA_COLUMN_INFO_KEY)
}
/// Store the given `data_column_info` to disk.
///
/// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_data_column_info_in_batch(
&self,
data_column_info: &DataColumnInfo,
) -> KeyValueStoreOp {
data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY)
}
/// Return the slot-window describing the available historic states.
///
/// Returns `(lower_limit, upper_limit)`.
@@ -2285,15 +2517,33 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
};
if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? {
trace!(
self.log,
"Pruning blobs of block";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteBlobs(block_root));
if Some(block_root) != last_pruned_block_root {
if self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch()))
{
// data columns
let indices = self.get_data_column_keys(block_root)?;
if !indices.is_empty() {
trace!(
self.log,
"Pruning data columns of block";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteDataColumns(block_root, indices));
}
} else if self.blobs_exist(&block_root)? {
trace!(
self.log,
"Pruning blobs of block";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteBlobs(block_root));
}
}
if slot >= end_slot {

View File

@@ -270,6 +270,10 @@ impl db_key::Key for BytesKey {
}
impl BytesKey {
pub fn starts_with(&self, prefix: &Self) -> bool {
self.key.starts_with(&prefix.key)
}
/// Return `true` iff this `BytesKey` was created with the given `column`.
pub fn matches_column(&self, column: DBColumn) -> bool {
self.key.starts_with(column.as_bytes())

View File

@@ -44,6 +44,8 @@ use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;
const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;
@@ -109,9 +111,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
Box::new(std::iter::empty())
}
fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter {
Box::new(std::iter::empty())
}
fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter;
/// Iterate through all keys in a particular column.
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
@@ -150,6 +150,28 @@ pub fn get_col_from_key(key: &[u8]) -> Option<String> {
String::from_utf8(key[0..3].to_vec()).ok()
}
pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec<u8> {
let mut result = block_root.as_bytes().to_vec();
result.extend_from_slice(&column_index.to_le_bytes());
result
}
pub fn parse_data_column_key(data: Vec<u8>) -> Result<(Hash256, ColumnIndex), Error> {
if data.len() != DBColumn::BeaconDataColumn.key_size() {
return Err(Error::InvalidKey);
}
// split_at panics if 32 < 40 which will never happen after the length check above
let (block_root_bytes, column_index_bytes) = data.split_at(32);
let block_root = Hash256::from_slice(block_root_bytes);
// column_index_bytes is asserted to be 8 bytes after the length check above
let column_index = ColumnIndex::from_le_bytes(
column_index_bytes
.try_into()
.map_err(|_| Error::InvalidKey)?,
);
Ok((block_root, column_index))
}
#[must_use]
#[derive(Clone)]
pub enum KeyValueStoreOp {
@@ -210,11 +232,13 @@ pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, BlobSidecarList<E>),
PutDataColumns(Hash256, DataColumnSidecarList<E>),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteBlobs(Hash256),
DeleteDataColumns(Hash256, Vec<ColumnIndex>),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
@@ -230,6 +254,8 @@ pub enum DBColumn {
BeaconBlock,
#[strum(serialize = "blb")]
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,
@@ -317,6 +343,7 @@ impl DBColumn {
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE,
}
}
}

View File

@@ -1,6 +1,6 @@
use crate::{
get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error,
ItemStore, Key, KeyValueStore, KeyValueStoreOp,
ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter,
};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::BTreeMap;
@@ -100,6 +100,18 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}))
}
fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter {
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix));
let keys = self
.db
.read()
.range(start_key.clone()..)
.take_while(|(k, _)| k.starts_with(&start_key))
.filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec()))
.collect::<Vec<_>>();
Box::new(keys.into_iter().map(Ok))
}
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
}

View File

@@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);
/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
@@ -152,3 +153,30 @@ impl StoreItem for BlobInfo {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
/// The slot after which data columns are or *will be* available (>=).
///
/// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which
/// data columns will be available.
///
/// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is
/// not yet known.
pub oldest_data_column_slot: Option<Slot>,
}
impl StoreItem for DataColumnInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

View File

@@ -151,6 +151,13 @@ pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> = LazyLock
"Number of hits to the store's blob cache",
)
});
pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"store_beacon_data_columns_cache_hit_total",
"Number of hits to the store's data column cache",
)
});
/// Updates the global metrics registry with store-related information.
pub fn scrape_for_metrics(db_path: &Path, freezer_db_path: &Path) {