Modularize beacon node backend (#4718)

#4669


  Modularize the beacon node backend to make it easier to add new database implementations
This commit is contained in:
Eitan Seri-Levi
2025-01-23 09:12:16 +07:00
committed by GitHub
parent 266b241123
commit a1b7d616b4
38 changed files with 1479 additions and 650 deletions

View File

@@ -1,10 +1,10 @@
use crate::config::{OnDiskStoreConfig, StoreConfig};
use crate::database::interface::BeaconNodeBackend;
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::hdiff::{HDiff, HDiffBuffer, HierarchyModuli, StorageStrategy};
use crate::historic_state_cache::HistoricStateCache;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::leveldb_store::{BytesKey, LevelDB};
use crate::memory_store::MemoryStore;
use crate::metadata::{
AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion,
@@ -14,12 +14,10 @@ use crate::metadata::{
};
use crate::state_cache::{PutStateOutcome, StateCache};
use crate::{
get_data_column_key, get_key_for_col, BlobSidecarListFromRoot, DBColumn, DatabaseBlock, Error,
ItemStore, KeyValueStoreOp, StoreItem, StoreOp,
get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, DBColumn,
DatabaseBlock, Error, ItemStore, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use crate::{metrics, parse_data_column_key};
use itertools::{process_results, Itertools};
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
@@ -231,7 +229,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
}
}
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
/// Open a new or existing database, with the given paths to the hot and cold DBs.
///
/// The `migrate_schema` function is passed in so that the parent `BeaconChain` can provide
@@ -249,7 +247,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
let hierarchy = config.hierarchy_config.to_moduli()?;
let hot_db = LevelDB::open(hot_path)?;
let hot_db = BeaconNodeBackend::open(&config, hot_path)?;
let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?);
let db = HotColdDB {
@@ -257,8 +255,8 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
anchor_info,
blob_info: RwLock::new(BlobInfo::default()),
data_column_info: RwLock::new(DataColumnInfo::default()),
cold_db: LevelDB::open(cold_path)?,
blobs_db: LevelDB::open(blobs_db_path)?,
blobs_db: BeaconNodeBackend::open(&config, blobs_db_path)?,
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
hot_db,
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
@@ -408,23 +406,8 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
/// Return an iterator over the state roots of all temporary states.
pub fn iter_temporary_state_roots(&self) -> impl Iterator<Item = Result<Hash256, Error>> + '_ {
let column = DBColumn::BeaconStateTemporary;
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_slice()));
let keys_iter = self.hot_db.keys_iter();
keys_iter.seek(&start_key);
keys_iter
.take_while(move |key| key.matches_column(column))
.map(move |bytes_key| {
bytes_key.remove_column(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key,
}
.into()
})
})
self.hot_db
.iter_column_keys::<Hash256>(DBColumn::BeaconStateTemporary)
}
}
@@ -536,9 +519,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blinded_block: &SignedBeaconBlock<E, BlindedPayload<E>>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlock.into(), key.as_slice());
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
DBColumn::BeaconBlock,
key.as_slice().into(),
blinded_block.as_ssz_bytes(),
));
}
@@ -660,7 +643,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
decoder: impl FnOnce(&[u8]) -> Result<SignedBeaconBlock<E, Payload>, ssz::DecodeError>,
) -> Result<Option<SignedBeaconBlock<E, Payload>>, Error> {
self.hot_db
.get_bytes(DBColumn::BeaconBlock.into(), block_root.as_slice())?
.get_bytes(DBColumn::BeaconBlock, block_root.as_slice())?
.map(|block_bytes| decoder(&block_bytes))
.transpose()
.map_err(|e| e.into())
@@ -673,10 +656,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
block_root: &Hash256,
fork_name: ForkName,
) -> Result<Option<ExecutionPayload<E>>, Error> {
let column = ExecutionPayload::<E>::db_column().into();
let key = block_root.as_slice();
match self.hot_db.get_bytes(column, key)? {
match self
.hot_db
.get_bytes(ExecutionPayload::<E>::db_column(), key)?
{
Some(bytes) => Ok(Some(ExecutionPayload::from_ssz_bytes(&bytes, fork_name)?)),
None => Ok(None),
}
@@ -705,10 +690,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<Option<MerkleProof>, Error> {
let column = DBColumn::SyncCommitteeBranch;
if let Some(bytes) = self
.hot_db
.get_bytes(column.into(), &block_root.as_ssz_bytes())?
{
if let Some(bytes) = self.hot_db.get_bytes(column, &block_root.as_ssz_bytes())? {
let sync_committee_branch = Vec::<Hash256>::from_ssz_bytes(&bytes)?;
return Ok(Some(sync_committee_branch));
}
@@ -725,7 +707,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if let Some(bytes) = self
.hot_db
.get_bytes(column.into(), &sync_committee_period.as_ssz_bytes())?
.get_bytes(column, &sync_committee_period.as_ssz_bytes())?
{
let sync_committee: SyncCommittee<E> = SyncCommittee::from_ssz_bytes(&bytes)?;
return Ok(Some(sync_committee));
@@ -741,7 +723,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
let column = DBColumn::SyncCommitteeBranch;
self.hot_db.put_bytes(
column.into(),
column,
&block_root.as_ssz_bytes(),
&sync_committee_branch.as_ssz_bytes(),
)?;
@@ -755,7 +737,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
let column = DBColumn::SyncCommittee;
self.hot_db.put_bytes(
column.into(),
column,
&sync_committee_period.to_le_bytes(),
&sync_committee.as_ssz_bytes(),
)?;
@@ -767,10 +749,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
sync_committee_period: u64,
) -> Result<Option<LightClientUpdate<E>>, Error> {
let column = DBColumn::LightClientUpdate;
let res = self
.hot_db
.get_bytes(column.into(), &sync_committee_period.to_le_bytes())?;
let res = self.hot_db.get_bytes(
DBColumn::LightClientUpdate,
&sync_committee_period.to_le_bytes(),
)?;
if let Some(light_client_update_bytes) = res {
let epoch = sync_committee_period
@@ -822,10 +804,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
sync_committee_period: u64,
light_client_update: &LightClientUpdate<E>,
) -> Result<(), Error> {
let column = DBColumn::LightClientUpdate;
self.hot_db.put_bytes(
column.into(),
DBColumn::LightClientUpdate,
&sync_committee_period.to_le_bytes(),
&light_client_update.as_ssz_bytes(),
)?;
@@ -836,29 +816,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Check if the blobs for a block exists on disk.
pub fn blobs_exist(&self, block_root: &Hash256) -> Result<bool, Error> {
self.blobs_db
.key_exists(DBColumn::BeaconBlob.into(), block_root.as_slice())
.key_exists(DBColumn::BeaconBlob, block_root.as_slice())
}
/// Determine whether a block exists in the database.
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.hot_db
.key_exists(DBColumn::BeaconBlock.into(), block_root.as_slice())
.key_exists(DBColumn::BeaconBlock, block_root.as_slice())
}
/// Delete a block from the store and the block cache.
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().delete(block_root);
self.hot_db
.key_delete(DBColumn::BeaconBlock.into(), block_root.as_slice())?;
.key_delete(DBColumn::BeaconBlock, block_root.as_slice())?;
self.hot_db
.key_delete(DBColumn::ExecPayload.into(), block_root.as_slice())?;
.key_delete(DBColumn::ExecPayload, block_root.as_slice())?;
self.blobs_db
.key_delete(DBColumn::BeaconBlob.into(), block_root.as_slice())
.key_delete(DBColumn::BeaconBlob, block_root.as_slice())
}
pub fn put_blobs(&self, block_root: &Hash256, blobs: BlobSidecarList<E>) -> Result<(), Error> {
self.blobs_db.put_bytes(
DBColumn::BeaconBlob.into(),
DBColumn::BeaconBlob,
block_root.as_slice(),
&blobs.as_ssz_bytes(),
)?;
@@ -872,8 +852,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blobs: BlobSidecarList<E>,
ops: &mut Vec<KeyValueStoreOp>,
) {
let db_key = get_key_for_col(DBColumn::BeaconBlob.into(), key.as_slice());
ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes()));
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconBlob,
key.as_slice().to_vec(),
blobs.as_ssz_bytes(),
));
}
pub fn data_columns_as_kv_store_ops(
@@ -883,12 +866,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
ops: &mut Vec<KeyValueStoreOp>,
) {
for data_column in data_columns {
let db_key = get_key_for_col(
DBColumn::BeaconDataColumn.into(),
&get_data_column_key(block_root, &data_column.index),
);
ops.push(KeyValueStoreOp::PutKeyValue(
db_key,
DBColumn::BeaconDataColumn,
get_data_column_key(block_root, &data_column.index),
data_column.as_ssz_bytes(),
));
}
@@ -1202,63 +1182,68 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
StoreOp::DeleteStateTemporaryFlag(state_root) => {
let db_key =
get_key_for_col(TemporaryFlag::db_column().into(), state_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(db_key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
TemporaryFlag::db_column(),
state_root.as_slice().to_vec(),
));
}
StoreOp::DeleteBlock(block_root) => {
let key = get_key_for_col(DBColumn::BeaconBlock.into(), block_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::BeaconBlock,
block_root.as_slice().to_vec(),
));
}
StoreOp::DeleteBlobs(block_root) => {
let key = get_key_for_col(DBColumn::BeaconBlob.into(), block_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::BeaconBlob,
block_root.as_slice().to_vec(),
));
}
StoreOp::DeleteDataColumns(block_root, column_indices) => {
for index in column_indices {
let key = get_key_for_col(
DBColumn::BeaconDataColumn.into(),
&get_data_column_key(&block_root, &index),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
let key = get_data_column_key(&block_root, &index);
key_value_batch
.push(KeyValueStoreOp::DeleteKey(DBColumn::BeaconDataColumn, key));
}
}
StoreOp::DeleteState(state_root, slot) => {
// Delete the hot state summary.
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::BeaconStateSummary,
state_root.as_slice().to_vec(),
));
// Delete the state temporary flag (if any). Temporary flags are commonly
// created by the state advance routine.
let state_temp_key = get_key_for_col(
DBColumn::BeaconStateTemporary.into(),
state_root.as_slice(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::BeaconStateTemporary,
state_root.as_slice().to_vec(),
));
if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) {
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::BeaconState,
state_root.as_slice().to_vec(),
));
}
}
StoreOp::DeleteExecutionPayload(block_root) => {
let key = get_key_for_col(DBColumn::ExecPayload.into(), block_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::ExecPayload,
block_root.as_slice().to_vec(),
));
}
StoreOp::DeleteSyncCommitteeBranch(block_root) => {
let key = get_key_for_col(
DBColumn::SyncCommitteeBranch.into(),
block_root.as_slice(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
key_value_batch.push(KeyValueStoreOp::DeleteKey(
DBColumn::SyncCommitteeBranch,
block_root.as_slice().to_vec(),
));
}
StoreOp::KeyValueOp(kv_op) => {
@@ -1269,6 +1254,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(key_value_batch)
}
pub fn delete_batch(&self, col: DBColumn, ops: Vec<Hash256>) -> Result<(), Error> {
let new_ops: HashSet<&[u8]> = ops.iter().map(|v| v.as_slice()).collect();
self.hot_db.delete_batch(col, new_ops)
}
pub fn delete_if(
&self,
column: DBColumn,
f: impl Fn(&[u8]) -> Result<bool, Error>,
) -> Result<(), Error> {
self.hot_db.delete_if(column, f)
}
pub fn do_atomically_with_block_and_blobs_cache(
&self,
batch: Vec<StoreOp<E>>,
@@ -1608,10 +1606,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<(), Error> {
ops.push(ColdStateSummary { slot }.as_kv_store_op(*state_root));
ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconStateRoots.into(),
&slot.as_u64().to_be_bytes(),
),
DBColumn::BeaconStateRoots,
slot.as_u64().to_be_bytes().to_vec(),
state_root.as_slice().to_vec(),
));
Ok(())
@@ -1678,19 +1674,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
out
};
let key = get_key_for_col(
DBColumn::BeaconStateSnapshot.into(),
&state.slot().as_u64().to_be_bytes(),
);
ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value));
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconStateSnapshot,
state.slot().as_u64().to_be_bytes().to_vec(),
compressed_value,
));
Ok(())
}
fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result<Option<Vec<u8>>, Error> {
match self.cold_db.get_bytes(
DBColumn::BeaconStateSnapshot.into(),
&slot.as_u64().to_be_bytes(),
)? {
match self
.cold_db
.get_bytes(DBColumn::BeaconStateSnapshot, &slot.as_u64().to_be_bytes())?
{
Some(bytes) => {
let _timer =
metrics::start_timer(&metrics::STORE_BEACON_STATE_FREEZER_DECOMPRESS_TIME);
@@ -1731,11 +1727,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
};
let diff_bytes = diff.as_ssz_bytes();
let key = get_key_for_col(
DBColumn::BeaconStateDiff.into(),
&state.slot().as_u64().to_be_bytes(),
);
ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes));
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconStateDiff,
state.slot().as_u64().to_be_bytes().to_vec(),
diff_bytes,
));
Ok(())
}
@@ -1858,10 +1854,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let bytes = {
let _t = metrics::start_timer(&metrics::BEACON_HDIFF_READ_TIMES);
self.cold_db
.get_bytes(
DBColumn::BeaconStateDiff.into(),
&slot.as_u64().to_be_bytes(),
)?
.get_bytes(DBColumn::BeaconStateDiff, &slot.as_u64().to_be_bytes())?
.ok_or(HotColdDBError::MissingHDiff(slot))?
};
let hdiff = {
@@ -2054,7 +2047,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
match self
.blobs_db
.get_bytes(DBColumn::BeaconBlob.into(), block_root.as_slice())?
.get_bytes(DBColumn::BeaconBlob, block_root.as_slice())?
{
Some(ref blobs_bytes) => {
// We insert a VariableList of BlobSidecars into the db, but retrieve
@@ -2084,8 +2077,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Fetch all keys in the data_column column with prefix `block_root`
pub fn get_data_column_keys(&self, block_root: Hash256) -> Result<Vec<ColumnIndex>, Error> {
self.blobs_db
.iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_slice())
.map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1)))
.iter_column_from::<Vec<u8>>(DBColumn::BeaconDataColumn, block_root.as_slice())
.take_while(|res| {
let Ok((key, _)) = res else { return false };
if !key.starts_with(block_root.as_slice()) {
return false;
}
true
})
.map(|key| key.and_then(|(key, _)| parse_data_column_key(key).map(|key| key.1)))
.collect()
}
@@ -2106,7 +2108,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
match self.blobs_db.get_bytes(
DBColumn::BeaconDataColumn.into(),
DBColumn::BeaconDataColumn,
&get_data_column_key(block_root, column_index),
)? {
Some(ref data_column_bytes) => {
@@ -2164,10 +2166,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
schema_version: SchemaVersion,
mut ops: Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let column = SchemaVersion::db_column().into();
let key = SCHEMA_VERSION_KEY.as_slice();
let db_key = get_key_for_col(column, key);
let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes());
let op = KeyValueStoreOp::PutKeyValue(
SchemaVersion::db_column(),
key.to_vec(),
schema_version.as_store_bytes(),
);
ops.push(op);
self.hot_db.do_atomically(ops)
@@ -2589,7 +2593,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut ops = vec![];
for slot in start_slot.as_u64()..end_slot.as_u64() {
ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(DBColumn::BeaconBlockRoots.into(), &slot.to_be_bytes()),
DBColumn::BeaconBlockRoots,
slot.to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));
}
@@ -2811,77 +2816,62 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
"data_availability_boundary" => data_availability_boundary,
);
let mut ops = vec![];
let mut last_pruned_block_root = None;
// We collect block roots of deleted blobs in memory. Even for 10y of blob history this
// vec won't go beyond 1GB. We can probably optimise this out eventually.
let mut removed_block_roots = vec![];
for res in self.forwards_block_roots_iterator_until(oldest_blob_slot, end_slot, || {
let (_, split_state) = self
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;
Ok((split_state, split.block_root))
})? {
let (block_root, slot) = match res {
Ok(tuple) => tuple,
Err(e) => {
warn!(
self.log,
"Stopping blob pruning early";
"error" => ?e,
);
break;
}
let remove_blob_if = |blobs_bytes: &[u8]| {
let blobs = Vec::from_ssz_bytes(blobs_bytes)?;
let Some(blob): Option<&Arc<BlobSidecar<E>>> = blobs.first() else {
return Ok(false);
};
if Some(block_root) != last_pruned_block_root {
if self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch()))
{
// data columns
let indices = self.get_data_column_keys(block_root)?;
if !indices.is_empty() {
trace!(
self.log,
"Pruning data columns of block";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteDataColumns(block_root, indices));
}
} else if self.blobs_exist(&block_root)? {
trace!(
self.log,
"Pruning blobs of block";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteBlobs(block_root));
}
}
if blob.slot() <= end_slot {
// Store the block root so we can delete from the blob cache
removed_block_roots.push(blob.block_root());
// Delete from the on-disk db
return Ok(true);
};
Ok(false)
};
if slot >= end_slot {
break;
}
self.blobs_db
.delete_if(DBColumn::BeaconBlob, remove_blob_if)?;
if self.spec.is_peer_das_enabled_for_epoch(start_epoch) {
let remove_data_column_if = |blobs_bytes: &[u8]| {
let data_column: DataColumnSidecar<E> =
DataColumnSidecar::from_ssz_bytes(blobs_bytes)?;
if data_column.slot() <= end_slot {
return Ok(true);
};
Ok(false)
};
self.blobs_db
.delete_if(DBColumn::BeaconDataColumn, remove_data_column_if)?;
}
let blob_lists_pruned = ops.len();
// Remove deleted blobs from the cache.
let mut block_cache = self.block_cache.lock();
for block_root in removed_block_roots {
block_cache.delete_blobs(&block_root);
}
drop(block_cache);
let new_blob_info = BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
blobs_db: blob_info.blobs_db,
};
let update_blob_info = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
ops.push(StoreOp::KeyValueOp(update_blob_info));
self.do_atomically_with_block_and_blobs_cache(ops)?;
let op = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?;
debug!(
self.log,
"Blob pruning complete";
"blob_lists_pruned" => blob_lists_pruned,
);
Ok(())
@@ -2944,10 +2934,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for column in columns {
for res in self.cold_db.iter_column_keys::<Vec<u8>>(column) {
let key = res?;
cold_ops.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
column.as_str(),
&key,
)));
cold_ops.push(KeyValueStoreOp::DeleteKey(column, key));
}
}
let delete_ops = cold_ops.len();
@@ -3085,10 +3072,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Store the slot to block root mapping.
cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue(
get_key_for_col(
DBColumn::BeaconBlockRoots.into(),
&slot.as_u64().to_be_bytes(),
),
DBColumn::BeaconBlockRoots,
slot.as_u64().to_be_bytes().to_vec(),
block_root.as_slice().to_vec(),
));
@@ -3339,3 +3324,57 @@ impl StoreItem for TemporaryFlag {
Ok(TemporaryFlag)
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct BytesKey {
pub key: Vec<u8>,
}
impl db_key::Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
f(self.key.as_slice())
}
}
impl BytesKey {
pub fn starts_with(&self, prefix: &Self) -> bool {
self.key.starts_with(&prefix.key)
}
/// Return `true` iff this `BytesKey` was created with the given `column`.
pub fn matches_column(&self, column: DBColumn) -> bool {
self.key.starts_with(column.as_bytes())
}
/// Remove the column from a key, returning its `Hash256` portion.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
}
}
None
}
/// Remove the column from a key.
///
/// Will return `None` if the value doesn't match the column or has the wrong length.
pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == column.key_size() {
return Some(subkey);
}
}
None
}
pub fn from_vec(key: Vec<u8>) -> Self {
Self { key }
}
}