mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-03 04:44:28 +00:00
New design for blob/column pruning (#8266)
We are seeing some crazy IO utilisation on Holesky now that data columns have started to expire. Our previous approach of _iterating the entire blobs DB_ doesn't seem to be scaling. New blob pruning algorithm that uses a backwards block iterator from the epoch we want to prune, stopping early if an already-pruned slot is encountered. Co-Authored-By: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
@@ -146,9 +146,13 @@ impl<E: EthSpec> BlockCache<E> {
|
|||||||
pub fn delete_blobs(&mut self, block_root: &Hash256) {
|
pub fn delete_blobs(&mut self, block_root: &Hash256) {
|
||||||
let _ = self.blob_cache.pop(block_root);
|
let _ = self.blob_cache.pop(block_root);
|
||||||
}
|
}
|
||||||
|
pub fn delete_data_columns(&mut self, block_root: &Hash256) {
|
||||||
|
let _ = self.data_column_cache.pop(block_root);
|
||||||
|
}
|
||||||
pub fn delete(&mut self, block_root: &Hash256) {
|
pub fn delete(&mut self, block_root: &Hash256) {
|
||||||
let _ = self.block_cache.pop(block_root);
|
self.delete_block(block_root);
|
||||||
let _ = self.blob_cache.pop(block_root);
|
self.delete_blobs(block_root);
|
||||||
|
self.delete_data_columns(block_root);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2553,6 +2557,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Fetch all possible data column keys for a given `block_root`.
|
||||||
|
///
|
||||||
|
/// Unlike `get_data_column_keys`, these keys are not necessarily all present in the database,
|
||||||
|
/// due to the node's custody requirements many just store a subset.
|
||||||
|
pub fn get_all_data_column_keys(&self, block_root: Hash256) -> Vec<Vec<u8>> {
|
||||||
|
(0..E::number_of_columns() as u64)
|
||||||
|
.map(|column_index| get_data_column_key(&block_root, &column_index))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// Fetch a single data_column for a given block from the store.
|
/// Fetch a single data_column for a given block from the store.
|
||||||
pub fn get_data_column(
|
pub fn get_data_column(
|
||||||
&self,
|
&self,
|
||||||
@@ -3228,13 +3242,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
return Err(HotColdDBError::BlobPruneLogicError.into());
|
return Err(HotColdDBError::BlobPruneLogicError.into());
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start pruning from the epoch of the oldest blob stored.
|
// The start epoch is not necessarily iterated back to, but is used for deciding whether we
|
||||||
// The start epoch is inclusive (blobs in this epoch will be pruned).
|
// should attempt pruning. We could probably refactor it out eventually (while reducing our
|
||||||
|
// dependence on BlobInfo).
|
||||||
let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch());
|
let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch());
|
||||||
|
|
||||||
// Prune blobs up until the `data_availability_boundary - margin` or the split
|
// Prune blobs up until the `data_availability_boundary - margin` or the split
|
||||||
// slot's epoch, whichever is older. We can't prune blobs newer than the split.
|
// slot's epoch, whichever is older. We can't prune blobs newer than the split.
|
||||||
// The end epoch is also inclusive (blobs in this epoch will be pruned).
|
// The end epoch is inclusive (blobs in this epoch will be pruned).
|
||||||
let split = self.get_split_info();
|
let split = self.get_split_info();
|
||||||
let end_epoch = std::cmp::min(
|
let end_epoch = std::cmp::min(
|
||||||
data_availability_boundary - margin_epochs - 1,
|
data_availability_boundary - margin_epochs - 1,
|
||||||
@@ -3257,20 +3272,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sanity checks.
|
// Iterate blocks backwards from the `end_epoch` (usually the data availability boundary).
|
||||||
let anchor = self.get_anchor_info();
|
let Some((end_block_root, _)) = self
|
||||||
if oldest_blob_slot < anchor.oldest_block_slot {
|
.forwards_block_roots_iterator_until(end_slot, end_slot, || {
|
||||||
error!(
|
self.get_hot_state(&split.state_root, true)?
|
||||||
%oldest_blob_slot,
|
.ok_or(HotColdDBError::MissingSplitState(
|
||||||
oldest_block_slot = %anchor.oldest_block_slot,
|
split.state_root,
|
||||||
"Oldest blob is older than oldest block"
|
split.slot,
|
||||||
);
|
))
|
||||||
return Err(HotColdDBError::BlobPruneLogicError.into());
|
.map(|state| (state, split.state_root))
|
||||||
}
|
.map_err(Into::into)
|
||||||
|
})?
|
||||||
// Iterate block roots forwards from the oldest blob slot.
|
.next()
|
||||||
|
.transpose()?
|
||||||
|
else {
|
||||||
|
// Can't prune blobs if we don't know the block at `end_slot`. This is expected if we
|
||||||
|
// have checkpoint synced and haven't backfilled to the DA boundary yet.
|
||||||
|
debug!(
|
||||||
|
%end_epoch,
|
||||||
|
%data_availability_boundary,
|
||||||
|
"No blobs to prune"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
debug!(
|
debug!(
|
||||||
%start_epoch,
|
|
||||||
%end_epoch,
|
%end_epoch,
|
||||||
%data_availability_boundary,
|
%data_availability_boundary,
|
||||||
"Pruning blobs"
|
"Pruning blobs"
|
||||||
@@ -3279,48 +3304,77 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
// We collect block roots of deleted blobs in memory. Even for 10y of blob history this
|
// We collect block roots of deleted blobs in memory. Even for 10y of blob history this
|
||||||
// vec won't go beyond 1GB. We can probably optimise this out eventually.
|
// vec won't go beyond 1GB. We can probably optimise this out eventually.
|
||||||
let mut removed_block_roots = vec![];
|
let mut removed_block_roots = vec![];
|
||||||
|
let mut blobs_db_ops = vec![];
|
||||||
|
|
||||||
let remove_blob_if = |blobs_bytes: &[u8]| {
|
// Iterate blocks backwards until we reach a block for which we've already pruned
|
||||||
let blobs = Vec::from_ssz_bytes(blobs_bytes)?;
|
// blobs/columns.
|
||||||
let Some(blob): Option<&Arc<BlobSidecar<E>>> = blobs.first() else {
|
for tuple in ParentRootBlockIterator::new(self, end_block_root) {
|
||||||
return Ok(false);
|
let (block_root, blinded_block) = tuple?;
|
||||||
|
let slot = blinded_block.slot();
|
||||||
|
|
||||||
|
// If the block has no blobs we can't tell if they've been pruned, and there is nothing
|
||||||
|
// to prune, so we just skip.
|
||||||
|
if !blinded_block.message().body().has_blobs() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have blobs or columns stored. If not, we assume pruning has already
|
||||||
|
// reached this point.
|
||||||
|
let (db_column, db_keys) = if blinded_block.fork_name_unchecked().fulu_enabled() {
|
||||||
|
(
|
||||||
|
DBColumn::BeaconDataColumn,
|
||||||
|
self.get_all_data_column_keys(block_root),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
(DBColumn::BeaconBlob, vec![block_root.as_slice().to_vec()])
|
||||||
};
|
};
|
||||||
|
|
||||||
if blob.slot() <= end_slot {
|
// For data columns, consider a block pruned if ALL column indices are absent.
|
||||||
// Store the block root so we can delete from the blob cache
|
// In future we might want to refactor this to read the data column indices that *exist*
|
||||||
removed_block_roots.push(blob.block_root());
|
// from the DB, which could be slightly more efficient than checking existence for every
|
||||||
// Delete from the on-disk db
|
// possible column.
|
||||||
return Ok(true);
|
let mut data_stored_for_block = false;
|
||||||
};
|
for db_key in db_keys {
|
||||||
Ok(false)
|
if self.blobs_db.key_exists(db_column, &db_key)? {
|
||||||
};
|
data_stored_for_block = true;
|
||||||
|
blobs_db_ops.push(KeyValueStoreOp::DeleteKey(db_column, db_key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.blobs_db
|
if data_stored_for_block {
|
||||||
.delete_if(DBColumn::BeaconBlob, remove_blob_if)?;
|
debug!(
|
||||||
|
?block_root,
|
||||||
if self.spec.is_peer_das_enabled_for_epoch(start_epoch) {
|
%slot,
|
||||||
let remove_data_column_if = |blobs_bytes: &[u8]| {
|
"Pruning blobs or columns for block"
|
||||||
let data_column: DataColumnSidecar<E> =
|
);
|
||||||
DataColumnSidecar::from_ssz_bytes(blobs_bytes)?;
|
removed_block_roots.push(block_root);
|
||||||
|
} else {
|
||||||
if data_column.slot() <= end_slot {
|
debug!(
|
||||||
return Ok(true);
|
%slot,
|
||||||
};
|
?block_root,
|
||||||
|
"Reached slot with blobs or columns already pruned"
|
||||||
Ok(false)
|
);
|
||||||
};
|
break;
|
||||||
|
}
|
||||||
self.blobs_db
|
|
||||||
.delete_if(DBColumn::BeaconDataColumn, remove_data_column_if)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove deleted blobs from the cache.
|
// Remove deleted blobs from the cache.
|
||||||
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
|
if let Some(mut block_cache) = self.block_cache.as_ref().map(|cache| cache.lock()) {
|
||||||
for block_root in removed_block_roots {
|
for block_root in removed_block_roots {
|
||||||
block_cache.delete_blobs(&block_root);
|
block_cache.delete_blobs(&block_root);
|
||||||
|
block_cache.delete_data_columns(&block_root);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from disk.
|
||||||
|
if !blobs_db_ops.is_empty() {
|
||||||
|
debug!(
|
||||||
|
num_deleted = blobs_db_ops.len(),
|
||||||
|
"Deleting blobs and data columns from disk"
|
||||||
|
);
|
||||||
|
self.blobs_db.do_atomically(blobs_db_ops)?;
|
||||||
|
}
|
||||||
|
|
||||||
self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?;
|
self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?;
|
||||||
|
|
||||||
debug!("Blob pruning complete");
|
debug!("Blob pruning complete");
|
||||||
|
|||||||
Reference in New Issue
Block a user