Improvements to Deneb store upon review (#4693)

* Start testing blob pruning

* Get rid of unnecessary orphaned blob column

* Make random blob tests deterministic

* Test for pruning being blocked by finality

* Fix bugs and test fork boundary

* A few more tweaks to pruning conditions

* Tweak oldest_blob_slot semantics

* Test margin pruning

* Clean up some terminology and lints

* Schema migrations for v18

* Remove FIXME

* Prune blobs on finalization not every slot

* Fix more bugs + tests

* Address review comments
This commit is contained in:
Michael Sproul
2023-09-26 04:21:54 +10:00
committed by GitHub
parent 5c5afafc0d
commit 9244f7f7bc
16 changed files with 700 additions and 246 deletions

View File

@@ -396,6 +396,11 @@ where
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_blob_info(genesis.beacon_block.slot())
.map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?,
);
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
@@ -519,6 +524,11 @@ where
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
self.pending_io_batch.push(
store
.init_blob_info(weak_subj_block.slot())
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
);
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
self.pending_io_batch
@@ -982,7 +992,7 @@ where
);
}
// Prune blobs sidecars older than the blob data availability boundary in the background.
// Prune blobs older than the blob data availability boundary in the background.
if let Some(data_availability_boundary) = beacon_chain.data_availability_boundary() {
beacon_chain
.store_migrator

View File

@@ -762,12 +762,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Drop the old cache head nice and early to try and free the memory as soon as possible.
drop(old_cached_head);
// Prune blobs in the background.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
self.store_migrator
.process_prune_blobs(data_availability_boundary);
}
// If the finalized checkpoint changed, perform some updates.
//
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
@@ -1064,6 +1058,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.head_tracker.clone(),
)?;
// Prune blobs in the background.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
self.store_migrator
.process_prune_blobs(data_availability_boundary);
}
// Take a write-lock on the canonical head and signal for it to prune.
self.canonical_head.fork_choice_write_lock().prune()?;

View File

@@ -293,7 +293,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(|current_epoch| {
std::cmp::max(
fork_epoch,
current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
)
})
})
@@ -466,7 +466,7 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
let cutoff_epoch = std::cmp::max(
finalized_epoch + 1,
std::cmp::max(
current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS),
deneb_fork_epoch,
),
);

View File

@@ -9,7 +9,7 @@ use state_processing::{
use std::borrow::Cow;
use std::iter;
use std::time::Duration;
use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore};
use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore};
use types::{Hash256, Slot};
/// Use a longer timeout on the pubkey cache.
@@ -65,6 +65,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let blob_info = self.store.get_blob_info();
// Take all blocks with slots less than the oldest block slot.
let num_relevant = blocks.partition_point(|available_block| {
@@ -98,6 +99,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut prev_block_slot = anchor_info.oldest_block_slot;
let mut chunk_writer =
ChunkWriter::<BlockRoots, _, _>::new(&self.store.cold_db, prev_block_slot.as_usize())?;
let mut new_oldest_blob_slot = blob_info.oldest_blob_slot;
let mut cold_batch = Vec::with_capacity(blocks_to_import.len());
let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import);
@@ -123,6 +125,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.blinded_block_as_kv_store_ops(&block_root, &blinded_block, &mut hot_batch);
// Store the blobs too
if let Some(blobs) = maybe_blobs {
new_oldest_blob_slot = Some(block.slot());
self.store
.blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch);
}
@@ -206,6 +209,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.hot_db.do_atomically(hot_batch)?;
self.store.cold_db.do_atomically(cold_batch)?;
let mut anchor_and_blob_batch = Vec::with_capacity(2);
// Update the blob info.
if new_oldest_blob_slot != blob_info.oldest_blob_slot {
if let Some(oldest_blob_slot) = new_oldest_blob_slot {
let new_blob_info = BlobInfo {
oldest_blob_slot: Some(oldest_blob_slot),
..blob_info.clone()
};
anchor_and_blob_batch.push(
self.store
.compare_and_set_blob_info(blob_info, new_blob_info)?,
);
}
}
// Update the anchor.
let new_anchor = AnchorInfo {
oldest_block_slot: prev_block_slot,
@@ -213,8 +232,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
..anchor_info
};
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
self.store
.compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?;
anchor_and_blob_batch.push(
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
);
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;
// If backfill has completed and the chain is configured to reconstruct historic states,
// send a message to the background migrator instructing it to begin reconstruction.

View File

@@ -218,7 +218,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) {
error!(
log,
"Blobs pruning failed";
"Blob pruning failed";
"error" => ?e,
);
}
@@ -390,39 +390,44 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
// Read the rest of the messages in the channel, preferring any reconstruction
// notification, or the finalization notification with the greatest finalized epoch.
let notif =
rx.try_iter()
.fold(notif, |best, other: Notification| match (&best, &other) {
(Notification::Reconstruction, _)
| (_, Notification::Reconstruction) => Notification::Reconstruction,
(
Notification::Finalization(fin1),
Notification::Finalization(fin2),
) => {
if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch
{
other
} else {
best
}
}
(Notification::Finalization(_), Notification::PruneBlobs(_)) => best,
(Notification::PruneBlobs(_), Notification::Finalization(_)) => other,
(Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => {
if dab2 > dab1 {
other
} else {
best
}
}
});
let mut reconstruction_notif = None;
let mut finalization_notif = None;
let mut prune_blobs_notif = None;
match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log),
Notification::Reconstruction => reconstruction_notif = Some(notif),
Notification::Finalization(fin) => finalization_notif = Some(fin),
Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab),
}
// Read the rest of the messages in the channel, taking the best of each type.
for notif in rx.try_iter() {
match notif {
Notification::Reconstruction => reconstruction_notif = Some(notif),
Notification::Finalization(fin) => {
if let Some(current) = finalization_notif.as_mut() {
if fin.finalized_checkpoint.epoch
> current.finalized_checkpoint.epoch
{
*current = fin;
}
} else {
finalization_notif = Some(fin);
}
}
Notification::PruneBlobs(dab) => {
prune_blobs_notif = std::cmp::max(prune_blobs_notif, Some(dab));
}
}
}
// If reconstruction is on-going, ignore finalization migration and blob pruning.
if reconstruction_notif.is_some() {
Self::run_reconstruction(db.clone(), &log);
} else {
if let Some(fin) = finalization_notif {
Self::run_migration(db.clone(), fin, &log);
}
if let Some(dab) = prune_blobs_notif {
Self::run_prune_blobs(db.clone(), dab, &log);
}
}
}
});
@@ -663,22 +668,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
head_tracker_lock.remove(&head_hash);
}
let batch: Vec<StoreOp<E>> = abandoned_blocks
let mut batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(Into::into)
.flat_map(|block_root: Hash256| {
let mut store_ops = vec![
[
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
];
if store.blobs_sidecar_exists(&block_root).unwrap_or(false) {
// Keep track of non-empty orphaned blobs sidecars.
store_ops.extend([
StoreOp::DeleteBlobs(block_root),
StoreOp::PutOrphanedBlobsKey(block_root),
]);
}
store_ops
StoreOp::DeleteBlobs(block_root),
]
})
.chain(
abandoned_states
@@ -687,8 +685,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
)
.collect();
let mut kv_batch = store.convert_to_kv_batch(batch)?;
// Persist the head in case the process is killed or crashes here. This prevents
// the head tracker reverting after our mutation above.
let persisted_head = PersistedBeaconChain {
@@ -697,12 +693,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
ssz_head_tracker: SszHeadTracker::from_map(&head_tracker_lock),
};
drop(head_tracker_lock);
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));
batch.push(StoreOp::KeyValueOp(
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY),
));
// Persist the new finalized checkpoint as the pruning checkpoint.
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));
batch.push(StoreOp::KeyValueOp(
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
));
store.hot_db.do_atomically(kv_batch)?;
store.do_atomically_with_block_and_blobs_cache(batch)?;
debug!(log, "Database pruning complete");
Ok(PruningOutcome::Successful {

View File

@@ -5,6 +5,7 @@ mod migration_schema_v14;
mod migration_schema_v15;
mod migration_schema_v16;
mod migration_schema_v17;
mod migration_schema_v18;
use crate::beacon_chain::{BeaconChainTypes, ETH1_CACHE_DB_KEY};
use crate::eth1_chain::SszEth1;
@@ -150,6 +151,14 @@ pub fn migrate_schema<T: BeaconChainTypes>(
let ops = migration_schema_v17::downgrade_from_v17::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(17), SchemaVersion(18)) => {
let ops = migration_schema_v18::upgrade_to_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(18), SchemaVersion(17)) => {
let ops = migration_schema_v18::downgrade_from_v18::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,

View File

@@ -0,0 +1,120 @@
use crate::beacon_chain::BeaconChainTypes;
use slog::{error, info, warn, Logger};
use slot_clock::SlotClock;
use std::sync::Arc;
use std::time::Duration;
use store::{
get_key_for_col, metadata::BLOB_INFO_KEY, DBColumn, Error, HotColdDB, KeyValueStoreOp,
};
use types::{Epoch, EthSpec, Hash256, Slot};
/// The slot clock isn't usually available before the database is initialized, so we construct a
/// temporary slot clock by reading the genesis state. It should always exist if the database is
/// initialized at a prior schema version, however we still handle the lack of genesis state
/// gracefully.
fn get_slot_clock<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
log: &Logger,
) -> Result<Option<T::SlotClock>, Error> {
let spec = db.get_chain_spec();
let genesis_block = if let Some(block) = db.get_blinded_block(&Hash256::zero())? {
block
} else {
error!(log, "Missing genesis block");
return Ok(None);
};
let genesis_state =
if let Some(state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? {
state
} else {
error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root());
return Ok(None);
};
Ok(Some(T::SlotClock::new(
spec.genesis_slot,
Duration::from_secs(genesis_state.genesis_time()),
Duration::from_secs(spec.seconds_per_slot),
)))
}
fn get_current_epoch<T: BeaconChainTypes>(
db: &Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: &Logger,
) -> Result<Epoch, Error> {
get_slot_clock::<T>(db, log)?
.and_then(|clock| clock.now())
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(Error::SlotClockUnavailableForMigration)
}
pub fn upgrade_to_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// No-op, even if Deneb has already occurred. The database is probably borked in this case, but
// *maybe* the fork recovery will revert the minority fork and succeed.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
warn!(
log,
"Attempting upgrade to v18 schema";
"info" => "this may not work as Deneb has already been activated"
);
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Upgrading to v18 schema";
"info" => "ready for Deneb once it is scheduled"
);
}
Ok(vec![])
}
pub fn downgrade_from_v18<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
// We cannot downgrade from V18 once the Deneb fork has been activated, because there will
// be blobs and blob metadata in the database that aren't understood by the V17 schema.
if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch {
let current_epoch = get_current_epoch::<T>(&db, &log)?;
if current_epoch >= deneb_fork_epoch {
error!(
log,
"Deneb already active: v18+ is mandatory";
"current_epoch" => current_epoch,
"deneb_fork_epoch" => deneb_fork_epoch,
);
return Err(Error::UnableToDowngrade);
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you will need to upgrade before Deneb",
"epochs_until_deneb" => deneb_fork_epoch - current_epoch
);
}
} else {
info!(
log,
"Downgrading to v17 schema";
"info" => "you need to upgrade before Deneb",
);
}
let ops = vec![KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
BLOB_INFO_KEY.as_bytes(),
))];
Ok(ops)
}

View File

@@ -51,16 +51,16 @@ type E = MinimalEthSpec;
type TestHarness = BeaconChainHarness<DiskHarnessType<E>>;
fn get_store(db_path: &TempDir) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
get_store_with_spec(db_path, test_spec::<E>())
get_store_generic(db_path, StoreConfig::default(), test_spec::<E>())
}
fn get_store_with_spec(
fn get_store_generic(
db_path: &TempDir,
config: StoreConfig,
spec: ChainSpec,
) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let config = StoreConfig::default();
let log = test_logger();
HotColdDB::open(
@@ -93,7 +93,7 @@ fn get_harness_generic(
chain_config: ChainConfig,
) -> TestHarness {
let harness = TestHarness::builder(MinimalEthSpec)
.default_spec()
.spec(store.get_chain_spec().clone())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.logger(store.logger().clone())
.fresh_disk_store(store)
@@ -1091,7 +1091,7 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
);
}
assert_eq!(rig.get_finalized_checkpoints(), hashset! {},);
assert_eq!(rig.get_finalized_checkpoints(), hashset! {});
assert!(rig.chain.knows_head(&stray_head));
@@ -1118,8 +1118,11 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
for &block_hash in stray_blocks.values() {
assert!(
!rig.block_exists(block_hash),
"abandoned block {} should have been pruned",
block_hash
"abandoned block {block_hash:?} should have been pruned",
);
assert!(
!rig.chain.store.blobs_exist(&block_hash.into()).unwrap(),
"blobs for abandoned block {block_hash:?} should have been pruned"
);
}
@@ -1808,6 +1811,10 @@ fn check_no_blocks_exist<'a>(
"did not expect block {:?} to be in the DB",
block_hash
);
assert!(
!harness.chain.store.blobs_exist(&block_hash.into()).unwrap(),
"blobs for abandoned block {block_hash:?} should have been pruned"
);
}
}
@@ -2590,7 +2597,7 @@ async fn revert_minority_fork_on_resume() {
// Chain with no fork epoch configured.
let db_path1 = tempdir().unwrap();
let store1 = get_store_with_spec(&db_path1, spec1.clone());
let store1 = get_store_generic(&db_path1, StoreConfig::default(), spec1.clone());
let harness1 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec1)
.keypairs(KEYPAIRS[0..validator_count].to_vec())
@@ -2600,7 +2607,7 @@ async fn revert_minority_fork_on_resume() {
// Chain with fork epoch configured.
let db_path2 = tempdir().unwrap();
let store2 = get_store_with_spec(&db_path2, spec2.clone());
let store2 = get_store_generic(&db_path2, StoreConfig::default(), spec2.clone());
let harness2 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec2.clone())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
@@ -2695,7 +2702,7 @@ async fn revert_minority_fork_on_resume() {
// We have to do some hackery with the `slot_clock` so that the correct slot is set when
// the beacon chain builder loads the head block.
drop(harness1);
let resume_store = get_store_with_spec(&db_path1, spec2.clone());
let resume_store = get_store_generic(&db_path1, StoreConfig::default(), spec2.clone());
let resumed_harness = TestHarness::builder(MinimalEthSpec)
.spec(spec2)
@@ -2770,9 +2777,11 @@ async fn schema_downgrade_to_min_version() {
)
.await;
let min_version = if harness.spec.capella_fork_epoch.is_some() {
// Can't downgrade beyond V14 once Capella is reached, for simplicity don't test that
// at all if Capella is enabled.
let min_version = if harness.spec.deneb_fork_epoch.is_some() {
// Can't downgrade beyond V18 once Deneb is reached, for simplicity don't test that
// at all if Deneb is enabled.
SchemaVersion(18)
} else if harness.spec.capella_fork_epoch.is_some() {
SchemaVersion(14)
} else {
SchemaVersion(11)
@@ -2812,15 +2821,6 @@ async fn schema_downgrade_to_min_version() {
.expect("schema upgrade from minimum version should work");
// Recreate the harness.
/*
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(spec.seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());
*/
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec())
@@ -2848,6 +2848,278 @@ async fn schema_downgrade_to_min_version() {
.expect_err("should not downgrade below minimum version");
}
/// Check that blob pruning prunes blobs older than the data availability boundary.
#[tokio::test]
async fn deneb_prune_blobs_happy_case() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let num_blocks_produced = E::slots_per_epoch() * 8;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Prior to manual pruning with an artifically low data availability boundary all blobs should
// be stored.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true);
// Trigger blob pruning of blobs older than epoch 2.
let data_availability_boundary = Epoch::new(2);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is updated accordingly and prior blobs have been deleted.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(
oldest_blob_slot,
data_availability_boundary.start_slot(E::slots_per_epoch())
);
check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false);
check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true);
}
/// Check that blob pruning does not prune without finalization.
#[tokio::test]
async fn deneb_prune_blobs_no_finalization() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let initial_num_blocks = E::slots_per_epoch() * 5;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Finalize to epoch 3.
harness
.extend_chain(
initial_num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Extend the chain for another few epochs without attestations.
let unfinalized_num_blocks = E::slots_per_epoch() * 3;
harness.advance_slot();
harness
.extend_chain(
unfinalized_num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
// Finalization should be at epoch 3.
let finalized_slot = Slot::new(E::slots_per_epoch() * 3);
assert_eq!(harness.get_current_state().finalized_checkpoint().epoch, 3);
assert_eq!(store.get_split_slot(), finalized_slot);
// All blobs should still be available.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Attempt blob pruning of blobs older than epoch 4, which is newer than finalization.
let data_availability_boundary = Epoch::new(4);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is only updated to finalization, and NOT to the DAB.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(oldest_blob_slot, finalized_slot);
check_blob_existence(&harness, Slot::new(0), finalized_slot - 1, false);
check_blob_existence(&harness, finalized_slot, harness.head_slot(), true);
}
/// Check that blob pruning does not fail trying to prune across the fork boundary.
#[tokio::test]
async fn deneb_prune_blobs_fork_boundary() {
let deneb_fork_epoch = Epoch::new(4);
let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec());
spec.deneb_fork_epoch = Some(deneb_fork_epoch);
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, StoreConfig::default(), spec);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let num_blocks = E::slots_per_epoch() * 7;
// Finalize to epoch 5.
harness
.extend_chain(
num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Finalization should be at epoch 5.
let finalized_epoch = Epoch::new(5);
let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch());
assert_eq!(
harness.get_current_state().finalized_checkpoint().epoch,
finalized_epoch
);
assert_eq!(store.get_split_slot(), finalized_slot);
// All blobs should still be available.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Attempt pruning with data availability epochs that precede the fork epoch.
// No pruning should occur.
assert!(deneb_fork_epoch < finalized_epoch);
for data_availability_boundary in [Epoch::new(0), Epoch::new(3), deneb_fork_epoch] {
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is not updated.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
}
// All blobs should still be available.
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Prune one epoch past the fork.
let pruned_slot = (deneb_fork_epoch + 1).start_slot(E::slots_per_epoch());
store.try_prune_blobs(true, deneb_fork_epoch + 1).unwrap();
assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot));
check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false);
check_blob_existence(&harness, pruned_slot, harness.head_slot(), true);
}
/// Check that blob pruning prunes blobs older than the data availability boundary with margin
/// applied.
#[tokio::test]
async fn deneb_prune_blobs_margin1() {
deneb_prune_blobs_margin_test(1).await;
}
#[tokio::test]
async fn deneb_prune_blobs_margin3() {
deneb_prune_blobs_margin_test(3).await;
}
#[tokio::test]
async fn deneb_prune_blobs_margin4() {
deneb_prune_blobs_margin_test(4).await;
}
async fn deneb_prune_blobs_margin_test(margin: u64) {
let config = StoreConfig {
blob_prune_margin_epochs: margin,
..StoreConfig::default()
};
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, config, test_spec::<E>());
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let num_blocks_produced = E::slots_per_epoch() * 8;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Prior to manual pruning with an artifically low data availability boundary all blobs should
// be stored.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true);
// Trigger blob pruning of blobs older than epoch 6 - margin (6 is the minimum, due to
// finalization).
let data_availability_boundary = Epoch::new(6);
let effective_data_availability_boundary =
data_availability_boundary - store.get_config().blob_prune_margin_epochs;
assert!(
effective_data_availability_boundary > 0,
"must be > 0 because epoch 0 won't get pruned alone"
);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is updated accordingly and prior blobs have been deleted.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(
oldest_blob_slot,
effective_data_availability_boundary.start_slot(E::slots_per_epoch())
);
check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false);
check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true);
}
/// Check that there are blob sidecars (or not) at every slot in the range.
fn check_blob_existence(
harness: &TestHarness,
start_slot: Slot,
end_slot: Slot,
should_exist: bool,
) {
let mut blobs_seen = 0;
for (block_root, slot) in harness
.chain
.forwards_iter_block_roots_until(start_slot, end_slot)
.unwrap()
.map(Result::unwrap)
{
if let Some(blobs) = harness.chain.store.get_blobs(&block_root).unwrap() {
assert!(should_exist, "blobs at slot {slot} exist but should not");
blobs_seen += blobs.len();
} else {
// We don't actually store empty blobs, so unfortunately we can't assert anything
// meaningful here (like asserting that the blob should not exist).
}
}
if should_exist {
assert_ne!(blobs_seen, 0, "expected non-zero number of blobs");
}
}
/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).