Merge remote-tracking branch 'upstream/unstable' into gloas-containers

This commit is contained in:
Mark Mackey
2025-10-22 16:05:06 -05:00
78 changed files with 5569 additions and 615 deletions

View File

@@ -282,7 +282,8 @@ impl<E: EthSpec> LevelDB<E> {
) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
let iter = self.db.iter(self.read_options());
let start_key = BytesKey::from_vec(column.as_bytes().to_vec());
iter.seek(&start_key);
iter.take_while(move |(key, _)| key.matches_column(column))
.for_each(|(key, value)| {
if f(&value).unwrap_or(false) {

View File

@@ -949,6 +949,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
));
}
pub fn data_column_as_kv_store_ops(
&self,
block_root: &Hash256,
data_column: Arc<DataColumnSidecar<E>>,
ops: &mut Vec<KeyValueStoreOp>,
) {
ops.push(KeyValueStoreOp::PutKeyValue(
DBColumn::BeaconDataColumn,
get_data_column_key(block_root, &data_column.index),
data_column.as_ssz_bytes(),
));
}
pub fn put_data_column_custody_info(
&self,
earliest_data_column_slot: Option<Slot>,
@@ -3178,13 +3191,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.try_prune_blobs(force, min_data_availability_boundary)
}
/// Try to prune blobs older than the data availability boundary.
/// Try to prune blobs and data columns older than the data availability boundary.
///
/// Blobs from the epoch `data_availability_boundary - blob_prune_margin_epochs` are retained.
/// This epoch is an _exclusive_ endpoint for the pruning process.
///
/// This function only supports pruning blobs older than the split point, which is older than
/// (or equal to) finalization. Pruning blobs newer than finalization is not supported.
/// This function only supports pruning blobs and data columns older than the split point,
/// which is older than (or equal to) finalization. Pruning blobs and data columns newer than
/// finalization is not supported.
///
/// This function also assumes that the split is stationary while it runs. It should only be
/// run from the migrator thread (where `migrate_database` runs) or the database manager.
@@ -3208,6 +3222,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
let blob_info = self.get_blob_info();
let data_column_info = self.get_data_column_info();
let Some(oldest_blob_slot) = blob_info.oldest_blob_slot else {
error!("Slot of oldest blob is not known");
return Err(HotColdDBError::BlobPruneLogicError.into());
@@ -3306,13 +3321,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
let new_blob_info = BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
blobs_db: blob_info.blobs_db,
};
let op = self.compare_and_set_blob_info(blob_info, new_blob_info)?;
self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?;
self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?;
debug!("Blob pruning complete");
@@ -3379,6 +3388,31 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
fn update_blob_or_data_column_info(
&self,
start_epoch: Epoch,
end_slot: Slot,
blob_info: BlobInfo,
data_column_info: DataColumnInfo,
) -> Result<(), Error> {
let op = if self.spec.is_peer_das_enabled_for_epoch(start_epoch) {
let new_data_column_info = DataColumnInfo {
oldest_data_column_slot: Some(end_slot + 1),
};
self.compare_and_set_data_column_info(data_column_info, new_data_column_info)?
} else {
let new_blob_info = BlobInfo {
oldest_blob_slot: Some(end_slot + 1),
blobs_db: blob_info.blobs_db,
};
self.compare_and_set_blob_info(blob_info, new_blob_info)?
};
self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?;
Ok(())
}
}
/// Advance the split point of the store, copying new finalized states to the freezer.