diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a31a364cce..e649497e97 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -396,6 +396,11 @@ where .init_anchor_info(genesis.beacon_block.message(), retain_historic_states) .map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_blob_info(genesis.beacon_block.slot()) + .map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?, + ); let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; @@ -519,6 +524,11 @@ where .init_anchor_info(weak_subj_block.message(), retain_historic_states) .map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_blob_info(weak_subj_block.slot()) + .map_err(|e| format!("Failed to initialize blob info: {:?}", e))?, + ); // Store pruning checkpoint to prevent attempting to prune before the anchor state. self.pending_io_batch @@ -982,7 +992,7 @@ where ); } - // Prune blobs sidecars older than the blob data availability boundary in the background. + // Prune blobs older than the blob data availability boundary in the background. if let Some(data_availability_boundary) = beacon_chain.data_availability_boundary() { beacon_chain .store_migrator diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index a0b4b5a20b..35355754bd 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -762,12 +762,6 @@ impl BeaconChain { // Drop the old cache head nice and early to try and free the memory as soon as possible. drop(old_cached_head); - // Prune blobs in the background. - if let Some(data_availability_boundary) = self.data_availability_boundary() { - self.store_migrator - .process_prune_blobs(data_availability_boundary); - } - // If the finalized checkpoint changed, perform some updates. // // The `after_finalization` function will take a write-lock on `fork_choice`, therefore it @@ -1064,6 +1058,12 @@ impl BeaconChain { self.head_tracker.clone(), )?; + // Prune blobs in the background. + if let Some(data_availability_boundary) = self.data_availability_boundary() { + self.store_migrator + .process_prune_blobs(data_availability_boundary); + } + // Take a write-lock on the canonical head and signal for it to prune. self.canonical_head.fork_choice_write_lock().prune()?; diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 080addb3a7..08c98a5490 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -293,7 +293,7 @@ impl DataAvailabilityChecker { .map(|current_epoch| { std::cmp::max( fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), ) }) }) @@ -466,7 +466,7 @@ async fn availability_cache_maintenance_service( let cutoff_epoch = std::cmp::max( finalized_epoch + 1, std::cmp::max( - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), deneb_fork_epoch, ), ); diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 98aaf7015b..b40f6e7250 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -9,7 +9,7 @@ use state_processing::{ use std::borrow::Cow; use std::iter; use std::time::Duration; -use store::{chunked_vector::BlockRoots, AnchorInfo, ChunkWriter, KeyValueStore}; +use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore}; use types::{Hash256, Slot}; /// Use a longer timeout on the pubkey cache. @@ -65,6 +65,7 @@ impl BeaconChain { .store .get_anchor_info() .ok_or(HistoricalBlockError::NoAnchorInfo)?; + let blob_info = self.store.get_blob_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { @@ -98,6 +99,7 @@ impl BeaconChain { let mut prev_block_slot = anchor_info.oldest_block_slot; let mut chunk_writer = ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; + let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); let mut hot_batch = Vec::with_capacity(blocks_to_import.len() + n_blobs_lists_to_import); @@ -123,6 +125,7 @@ impl BeaconChain { .blinded_block_as_kv_store_ops(&block_root, &blinded_block, &mut hot_batch); // Store the blobs too if let Some(blobs) = maybe_blobs { + new_oldest_blob_slot = Some(block.slot()); self.store .blobs_as_kv_store_ops(&block_root, blobs, &mut hot_batch); } @@ -206,6 +209,22 @@ impl BeaconChain { self.store.hot_db.do_atomically(hot_batch)?; self.store.cold_db.do_atomically(cold_batch)?; + let mut anchor_and_blob_batch = Vec::with_capacity(2); + + // Update the blob info. + if new_oldest_blob_slot != blob_info.oldest_blob_slot { + if let Some(oldest_blob_slot) = new_oldest_blob_slot { + let new_blob_info = BlobInfo { + oldest_blob_slot: Some(oldest_blob_slot), + ..blob_info.clone() + }; + anchor_and_blob_batch.push( + self.store + .compare_and_set_blob_info(blob_info, new_blob_info)?, + ); + } + } + // Update the anchor. let new_anchor = AnchorInfo { oldest_block_slot: prev_block_slot, @@ -213,8 +232,11 @@ impl BeaconChain { ..anchor_info }; let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot); - self.store - .compare_and_set_anchor_info_with_write(Some(anchor_info), Some(new_anchor))?; + anchor_and_blob_batch.push( + self.store + .compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?, + ); + self.store.hot_db.do_atomically(anchor_and_blob_batch)?; // If backfill has completed and the chain is configured to reconstruct historic states, // send a message to the background migrator instructing it to begin reconstruction. diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 32c13ccb04..ad597bf92a 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -218,7 +218,7 @@ impl, Cold: ItemStore> BackgroundMigrator ?e, ); } @@ -390,39 +390,44 @@ impl, Cold: ItemStore> BackgroundMigrator Notification::Reconstruction, - ( - Notification::Finalization(fin1), - Notification::Finalization(fin2), - ) => { - if fin2.finalized_checkpoint.epoch > fin1.finalized_checkpoint.epoch - { - other - } else { - best - } - } - (Notification::Finalization(_), Notification::PruneBlobs(_)) => best, - (Notification::PruneBlobs(_), Notification::Finalization(_)) => other, - (Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => { - if dab2 > dab1 { - other - } else { - best - } - } - }); - + let mut reconstruction_notif = None; + let mut finalization_notif = None; + let mut prune_blobs_notif = None; match notif { - Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log), - Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log), - Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log), + Notification::Reconstruction => reconstruction_notif = Some(notif), + Notification::Finalization(fin) => finalization_notif = Some(fin), + Notification::PruneBlobs(dab) => prune_blobs_notif = Some(dab), + } + // Read the rest of the messages in the channel, taking the best of each type. + for notif in rx.try_iter() { + match notif { + Notification::Reconstruction => reconstruction_notif = Some(notif), + Notification::Finalization(fin) => { + if let Some(current) = finalization_notif.as_mut() { + if fin.finalized_checkpoint.epoch + > current.finalized_checkpoint.epoch + { + *current = fin; + } + } else { + finalization_notif = Some(fin); + } + } + Notification::PruneBlobs(dab) => { + prune_blobs_notif = std::cmp::max(prune_blobs_notif, Some(dab)); + } + } + } + // If reconstruction is on-going, ignore finalization migration and blob pruning. + if reconstruction_notif.is_some() { + Self::run_reconstruction(db.clone(), &log); + } else { + if let Some(fin) = finalization_notif { + Self::run_migration(db.clone(), fin, &log); + } + if let Some(dab) = prune_blobs_notif { + Self::run_prune_blobs(db.clone(), dab, &log); + } } } }); @@ -663,22 +668,15 @@ impl, Cold: ItemStore> BackgroundMigrator> = abandoned_blocks + let mut batch: Vec> = abandoned_blocks .into_iter() .map(Into::into) .flat_map(|block_root: Hash256| { - let mut store_ops = vec![ + [ StoreOp::DeleteBlock(block_root), StoreOp::DeleteExecutionPayload(block_root), - ]; - if store.blobs_sidecar_exists(&block_root).unwrap_or(false) { - // Keep track of non-empty orphaned blobs sidecars. - store_ops.extend([ - StoreOp::DeleteBlobs(block_root), - StoreOp::PutOrphanedBlobsKey(block_root), - ]); - } - store_ops + StoreOp::DeleteBlobs(block_root), + ] }) .chain( abandoned_states @@ -687,8 +685,6 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator( let ops = migration_schema_v17::downgrade_from_v17::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(17), SchemaVersion(18)) => { + let ops = migration_schema_v18::upgrade_to_v18::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(18), SchemaVersion(17)) => { + let ops = migration_schema_v18::downgrade_from_v18::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs new file mode 100644 index 0000000000..7a6409a343 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v18.rs @@ -0,0 +1,120 @@ +use crate::beacon_chain::BeaconChainTypes; +use slog::{error, info, warn, Logger}; +use slot_clock::SlotClock; +use std::sync::Arc; +use std::time::Duration; +use store::{ + get_key_for_col, metadata::BLOB_INFO_KEY, DBColumn, Error, HotColdDB, KeyValueStoreOp, +}; +use types::{Epoch, EthSpec, Hash256, Slot}; + +/// The slot clock isn't usually available before the database is initialized, so we construct a +/// temporary slot clock by reading the genesis state. It should always exist if the database is +/// initialized at a prior schema version, however we still handle the lack of genesis state +/// gracefully. +fn get_slot_clock( + db: &HotColdDB, + log: &Logger, +) -> Result, Error> { + let spec = db.get_chain_spec(); + let genesis_block = if let Some(block) = db.get_blinded_block(&Hash256::zero())? { + block + } else { + error!(log, "Missing genesis block"); + return Ok(None); + }; + let genesis_state = + if let Some(state) = db.get_state(&genesis_block.state_root(), Some(Slot::new(0)))? { + state + } else { + error!(log, "Missing genesis state"; "state_root" => ?genesis_block.state_root()); + return Ok(None); + }; + Ok(Some(T::SlotClock::new( + spec.genesis_slot, + Duration::from_secs(genesis_state.genesis_time()), + Duration::from_secs(spec.seconds_per_slot), + ))) +} + +fn get_current_epoch( + db: &Arc>, + log: &Logger, +) -> Result { + get_slot_clock::(db, log)? + .and_then(|clock| clock.now()) + .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) + .ok_or(Error::SlotClockUnavailableForMigration) +} + +pub fn upgrade_to_v18( + db: Arc>, + log: Logger, +) -> Result, Error> { + // No-op, even if Deneb has already occurred. The database is probably borked in this case, but + // *maybe* the fork recovery will revert the minority fork and succeed. + if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch { + let current_epoch = get_current_epoch::(&db, &log)?; + if current_epoch >= deneb_fork_epoch { + warn!( + log, + "Attempting upgrade to v18 schema"; + "info" => "this may not work as Deneb has already been activated" + ); + } else { + info!( + log, + "Upgrading to v18 schema"; + "info" => "ready for Deneb", + "epochs_until_deneb" => deneb_fork_epoch - current_epoch + ); + } + } else { + info!( + log, + "Upgrading to v18 schema"; + "info" => "ready for Deneb once it is scheduled" + ); + } + Ok(vec![]) +} + +pub fn downgrade_from_v18( + db: Arc>, + log: Logger, +) -> Result, Error> { + // We cannot downgrade from V18 once the Deneb fork has been activated, because there will + // be blobs and blob metadata in the database that aren't understood by the V17 schema. + if let Some(deneb_fork_epoch) = db.get_chain_spec().deneb_fork_epoch { + let current_epoch = get_current_epoch::(&db, &log)?; + if current_epoch >= deneb_fork_epoch { + error!( + log, + "Deneb already active: v18+ is mandatory"; + "current_epoch" => current_epoch, + "deneb_fork_epoch" => deneb_fork_epoch, + ); + return Err(Error::UnableToDowngrade); + } else { + info!( + log, + "Downgrading to v17 schema"; + "info" => "you will need to upgrade before Deneb", + "epochs_until_deneb" => deneb_fork_epoch - current_epoch + ); + } + } else { + info!( + log, + "Downgrading to v17 schema"; + "info" => "you need to upgrade before Deneb", + ); + } + + let ops = vec![KeyValueStoreOp::DeleteKey(get_key_for_col( + DBColumn::BeaconMeta.into(), + BLOB_INFO_KEY.as_bytes(), + ))]; + + Ok(ops) +} diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 6de21763c5..cb5b98fa2c 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -51,16 +51,16 @@ type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { - get_store_with_spec(db_path, test_spec::()) + get_store_generic(db_path, StoreConfig::default(), test_spec::()) } -fn get_store_with_spec( +fn get_store_generic( db_path: &TempDir, + config: StoreConfig, spec: ChainSpec, ) -> Arc, LevelDB>> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); - let config = StoreConfig::default(); let log = test_logger(); HotColdDB::open( @@ -93,7 +93,7 @@ fn get_harness_generic( chain_config: ChainConfig, ) -> TestHarness { let harness = TestHarness::builder(MinimalEthSpec) - .default_spec() + .spec(store.get_chain_spec().clone()) .keypairs(KEYPAIRS[0..validator_count].to_vec()) .logger(store.logger().clone()) .fresh_disk_store(store) @@ -1091,7 +1091,7 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { ); } - assert_eq!(rig.get_finalized_checkpoints(), hashset! {},); + assert_eq!(rig.get_finalized_checkpoints(), hashset! {}); assert!(rig.chain.knows_head(&stray_head)); @@ -1118,8 +1118,11 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { for &block_hash in stray_blocks.values() { assert!( !rig.block_exists(block_hash), - "abandoned block {} should have been pruned", - block_hash + "abandoned block {block_hash:?} should have been pruned", + ); + assert!( + !rig.chain.store.blobs_exist(&block_hash.into()).unwrap(), + "blobs for abandoned block {block_hash:?} should have been pruned" ); } @@ -1808,6 +1811,10 @@ fn check_no_blocks_exist<'a>( "did not expect block {:?} to be in the DB", block_hash ); + assert!( + !harness.chain.store.blobs_exist(&block_hash.into()).unwrap(), + "blobs for abandoned block {block_hash:?} should have been pruned" + ); } } @@ -2590,7 +2597,7 @@ async fn revert_minority_fork_on_resume() { // Chain with no fork epoch configured. let db_path1 = tempdir().unwrap(); - let store1 = get_store_with_spec(&db_path1, spec1.clone()); + let store1 = get_store_generic(&db_path1, StoreConfig::default(), spec1.clone()); let harness1 = BeaconChainHarness::builder(MinimalEthSpec) .spec(spec1) .keypairs(KEYPAIRS[0..validator_count].to_vec()) @@ -2600,7 +2607,7 @@ async fn revert_minority_fork_on_resume() { // Chain with fork epoch configured. let db_path2 = tempdir().unwrap(); - let store2 = get_store_with_spec(&db_path2, spec2.clone()); + let store2 = get_store_generic(&db_path2, StoreConfig::default(), spec2.clone()); let harness2 = BeaconChainHarness::builder(MinimalEthSpec) .spec(spec2.clone()) .keypairs(KEYPAIRS[0..validator_count].to_vec()) @@ -2695,7 +2702,7 @@ async fn revert_minority_fork_on_resume() { // We have to do some hackery with the `slot_clock` so that the correct slot is set when // the beacon chain builder loads the head block. drop(harness1); - let resume_store = get_store_with_spec(&db_path1, spec2.clone()); + let resume_store = get_store_generic(&db_path1, StoreConfig::default(), spec2.clone()); let resumed_harness = TestHarness::builder(MinimalEthSpec) .spec(spec2) @@ -2770,9 +2777,11 @@ async fn schema_downgrade_to_min_version() { ) .await; - let min_version = if harness.spec.capella_fork_epoch.is_some() { - // Can't downgrade beyond V14 once Capella is reached, for simplicity don't test that - // at all if Capella is enabled. + let min_version = if harness.spec.deneb_fork_epoch.is_some() { + // Can't downgrade beyond V18 once Deneb is reached, for simplicity don't test that + // at all if Deneb is enabled. + SchemaVersion(18) + } else if harness.spec.capella_fork_epoch.is_some() { SchemaVersion(14) } else { SchemaVersion(11) @@ -2812,15 +2821,6 @@ async fn schema_downgrade_to_min_version() { .expect("schema upgrade from minimum version should work"); // Recreate the harness. - /* - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(harness.chain.genesis_time), - Duration::from_secs(spec.seconds_per_slot), - ); - slot_clock.set_slot(harness.get_current_slot().as_u64()); - */ - let harness = BeaconChainHarness::builder(MinimalEthSpec) .default_spec() .keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec()) @@ -2848,6 +2848,278 @@ async fn schema_downgrade_to_min_version() { .expect_err("should not downgrade below minimum version"); } +/// Check that blob pruning prunes blobs older than the data availability boundary. +#[tokio::test] +async fn deneb_prune_blobs_happy_case() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { + // No-op prior to Deneb. + return; + }; + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let num_blocks_produced = E::slots_per_epoch() * 8; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Prior to manual pruning with an artifically low data availability boundary all blobs should + // be stored. + assert_eq!( + store.get_blob_info().oldest_blob_slot, + Some(deneb_fork_slot) + ); + check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true); + + // Trigger blob pruning of blobs older than epoch 2. + let data_availability_boundary = Epoch::new(2); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is updated accordingly and prior blobs have been deleted. + let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap(); + assert_eq!( + oldest_blob_slot, + data_availability_boundary.start_slot(E::slots_per_epoch()) + ); + check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false); + check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true); +} + +/// Check that blob pruning does not prune without finalization. +#[tokio::test] +async fn deneb_prune_blobs_no_finalization() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { + // No-op prior to Deneb. + return; + }; + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let initial_num_blocks = E::slots_per_epoch() * 5; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Finalize to epoch 3. + harness + .extend_chain( + initial_num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Extend the chain for another few epochs without attestations. + let unfinalized_num_blocks = E::slots_per_epoch() * 3; + harness.advance_slot(); + harness + .extend_chain( + unfinalized_num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::SomeValidators(vec![]), + ) + .await; + + // Finalization should be at epoch 3. + let finalized_slot = Slot::new(E::slots_per_epoch() * 3); + assert_eq!(harness.get_current_state().finalized_checkpoint().epoch, 3); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All blobs should still be available. + assert_eq!( + store.get_blob_info().oldest_blob_slot, + Some(deneb_fork_slot) + ); + check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Attempt blob pruning of blobs older than epoch 4, which is newer than finalization. + let data_availability_boundary = Epoch::new(4); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is only updated to finalization, and NOT to the DAB. + let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap(); + assert_eq!(oldest_blob_slot, finalized_slot); + check_blob_existence(&harness, Slot::new(0), finalized_slot - 1, false); + check_blob_existence(&harness, finalized_slot, harness.head_slot(), true); +} + +/// Check that blob pruning does not fail trying to prune across the fork boundary. +#[tokio::test] +async fn deneb_prune_blobs_fork_boundary() { + let deneb_fork_epoch = Epoch::new(4); + let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); + spec.deneb_fork_epoch = Some(deneb_fork_epoch); + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + let num_blocks = E::slots_per_epoch() * 7; + + // Finalize to epoch 5. + harness + .extend_chain( + num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Finalization should be at epoch 5. + let finalized_epoch = Epoch::new(5); + let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch()); + assert_eq!( + harness.get_current_state().finalized_checkpoint().epoch, + finalized_epoch + ); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All blobs should still be available. + assert_eq!( + store.get_blob_info().oldest_blob_slot, + Some(deneb_fork_slot) + ); + check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Attempt pruning with data availability epochs that precede the fork epoch. + // No pruning should occur. + assert!(deneb_fork_epoch < finalized_epoch); + for data_availability_boundary in [Epoch::new(0), Epoch::new(3), deneb_fork_epoch] { + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is not updated. + assert_eq!( + store.get_blob_info().oldest_blob_slot, + Some(deneb_fork_slot) + ); + } + // All blobs should still be available. + check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Prune one epoch past the fork. + let pruned_slot = (deneb_fork_epoch + 1).start_slot(E::slots_per_epoch()); + store.try_prune_blobs(true, deneb_fork_epoch + 1).unwrap(); + assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot)); + check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false); + check_blob_existence(&harness, pruned_slot, harness.head_slot(), true); +} + +/// Check that blob pruning prunes blobs older than the data availability boundary with margin +/// applied. +#[tokio::test] +async fn deneb_prune_blobs_margin1() { + deneb_prune_blobs_margin_test(1).await; +} + +#[tokio::test] +async fn deneb_prune_blobs_margin3() { + deneb_prune_blobs_margin_test(3).await; +} + +#[tokio::test] +async fn deneb_prune_blobs_margin4() { + deneb_prune_blobs_margin_test(4).await; +} + +async fn deneb_prune_blobs_margin_test(margin: u64) { + let config = StoreConfig { + blob_prune_margin_epochs: margin, + ..StoreConfig::default() + }; + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, config, test_spec::()); + + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { + // No-op prior to Deneb. + return; + }; + let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + + let num_blocks_produced = E::slots_per_epoch() * 8; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Prior to manual pruning with an artifically low data availability boundary all blobs should + // be stored. + assert_eq!( + store.get_blob_info().oldest_blob_slot, + Some(deneb_fork_slot) + ); + check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true); + + // Trigger blob pruning of blobs older than epoch 6 - margin (6 is the minimum, due to + // finalization). + let data_availability_boundary = Epoch::new(6); + let effective_data_availability_boundary = + data_availability_boundary - store.get_config().blob_prune_margin_epochs; + assert!( + effective_data_availability_boundary > 0, + "must be > 0 because epoch 0 won't get pruned alone" + ); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is updated accordingly and prior blobs have been deleted. + let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap(); + assert_eq!( + oldest_blob_slot, + effective_data_availability_boundary.start_slot(E::slots_per_epoch()) + ); + check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false); + check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true); +} + +/// Check that there are blob sidecars (or not) at every slot in the range. +fn check_blob_existence( + harness: &TestHarness, + start_slot: Slot, + end_slot: Slot, + should_exist: bool, +) { + let mut blobs_seen = 0; + for (block_root, slot) in harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap() + .map(Result::unwrap) + { + if let Some(blobs) = harness.chain.store.get_blobs(&block_root).unwrap() { + assert!(should_exist, "blobs at slot {slot} exist but should not"); + blobs_seen += blobs.len(); + } else { + // We don't actually store empty blobs, so unfortunately we can't assert anything + // meaningful here (like asserting that the blob should not exist). + } + } + if should_exist { + assert_ne!(blobs_seen, 0, "expected non-zero number of blobs"); + } +} + /// Checks that two chains are the same, for the purpose of these tests. /// /// Several fields that are hard/impossible to check are ignored (e.g., the store). diff --git a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs index c839d5da6c..baef278620 100644 --- a/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs +++ b/beacon_node/execution_layer/src/test_utils/execution_block_generator.rs @@ -10,7 +10,8 @@ use crate::{ }; use eth2::types::BlobsBundle; use kzg::Kzg; -use rand::thread_rng; +use parking_lot::Mutex; +use rand::{rngs::StdRng, Rng, SeedableRng}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -131,6 +132,13 @@ pub struct ExecutionBlockGenerator { */ pub blobs_bundles: HashMap>, pub kzg: Option>>, + rng: Arc>, +} + +fn make_rng() -> Arc> { + // Nondeterminism in tests is a highly undesirable thing. Seed the RNG to some arbitrary + // but fixed value for reproducibility. + Arc::new(Mutex::new(StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64))) } impl ExecutionBlockGenerator { @@ -157,6 +165,7 @@ impl ExecutionBlockGenerator { cancun_time, blobs_bundles: <_>::default(), kzg: kzg.map(Arc::new), + rng: make_rng(), }; gen.insert_pow_block(0).unwrap(); @@ -614,9 +623,10 @@ impl ExecutionBlockGenerator { ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => {} ForkName::Deneb => { // get random number between 0 and Max Blobs - let num_blobs = rand::random::() % (T::max_blobs_per_block() + 1); + let mut rng = self.rng.lock(); + let num_blobs = rng.gen::() % (T::max_blobs_per_block() + 1); let kzg = self.kzg.as_ref().ok_or("kzg not initialized")?; - let (bundle, transactions) = generate_random_blobs(num_blobs, kzg)?; + let (bundle, transactions) = generate_random_blobs(num_blobs, kzg, &mut *rng)?; for tx in Vec::from(transactions) { execution_payload .transactions_mut() @@ -633,14 +643,15 @@ impl ExecutionBlockGenerator { } } -pub fn generate_random_blobs( +pub fn generate_random_blobs( n_blobs: usize, kzg: &Kzg, + rng: &mut R, ) -> Result<(BlobsBundle, Transactions), String> { let mut bundle = BlobsBundle::::default(); let mut transactions = vec![]; for blob_index in 0..n_blobs { - let random_valid_sidecar = BlobSidecar::::random_valid(&mut thread_rng(), kzg)?; + let random_valid_sidecar = BlobSidecar::::random_valid(rng, kzg)?; let BlobSidecar { blob, diff --git a/beacon_node/http_api/src/database.rs b/beacon_node/http_api/src/database.rs index 37bf7958ad..aa8b0e8ffc 100644 --- a/beacon_node/http_api/src/database.rs +++ b/beacon_node/http_api/src/database.rs @@ -10,11 +10,13 @@ pub fn info( let split = store.get_split_info(); let config = store.get_config().clone(); let anchor = store.get_anchor_info(); + let blob_info = store.get_blob_info(); Ok(DatabaseInfo { schema_version: CURRENT_SCHEMA_VERSION.as_u64(), config, split, anchor, + blob_info, }) } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 422de8e0c5..5aa8d0d2c5 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -14,6 +14,7 @@ use beacon_chain::test_utils::{build_log, BeaconChainHarness, EphemeralHarnessTy use beacon_processor::WorkEvent; use lighthouse_network::rpc::RPCResponseErrorCode; use lighthouse_network::{NetworkGlobals, Request}; +use rand::Rng; use slot_clock::{ManualSlotClock, SlotClock, TestingSlotClock}; use store::MemoryStore; use tokio::sync::mpsc; @@ -104,20 +105,16 @@ impl TestRig { // get random number between 0 and Max Blobs let payload: &mut FullPayloadDeneb = &mut message.body.execution_payload; let num_blobs = match num_blobs { - NumBlobs::Random => { - let mut num_blobs = rand::random::() % E::max_blobs_per_block(); - if num_blobs == 0 { - num_blobs += 1; - } - num_blobs - } + NumBlobs::Random => 1 + self.rng.gen::() % E::max_blobs_per_block(), NumBlobs::None => 0, }; - let (bundle, transactions) = execution_layer::test_utils::generate_random_blobs::( - num_blobs, - self.harness.chain.kzg.as_ref().unwrap(), - ) - .unwrap(); + let (bundle, transactions) = + execution_layer::test_utils::generate_random_blobs::( + num_blobs, + self.harness.chain.kzg.as_ref().unwrap(), + &mut self.rng, + ) + .unwrap(); payload.execution_payload.transactions = <_>::default(); for tx in Vec::from(transactions) { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 56ff8fdab0..3f6d6b8867 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -97,10 +97,7 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } - pub fn get_block<'a>( - &'a mut self, - block_root: &Hash256, - ) -> Option<&'a SignedBeaconBlock>> { + pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { @@ -155,6 +152,7 @@ pub enum HotColdDBError { slots_per_epoch: u64, }, ZeroEpochsPerBlobPrune, + BlobPruneLogicError, RestorePointBlockHashError(BeaconStateError), IterationError { unexpected_key: BytesKey, @@ -265,47 +263,47 @@ impl HotColdDB, LevelDB> { // Open separate blobs directory if configured and same configuration was used on previous // run. let blob_info = db.load_blob_info()?; - let new_blob_info = { - match (&blob_info, &blobs_db_path) { - (Some(blob_info), Some(_)) => { - if !blob_info.blobs_db { + let deneb_fork_slot = db + .spec + .deneb_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let new_blob_info = match &blob_info { + Some(blob_info) => { + // If the oldest block slot is already set do not allow the blob DB path to be + // changed (require manual migration). + if blob_info.oldest_blob_slot.is_some() { + if blobs_db_path.is_some() && !blob_info.blobs_db { return Err(HotColdDBError::BlobsPreviouslyInDefaultStore.into()); - } - BlobInfo { - oldest_blob_slot: blob_info.oldest_blob_slot, - blobs_db: true, - } - } - (Some(blob_info), None) => { - if blob_info.blobs_db { + } else if blobs_db_path.is_none() && blob_info.blobs_db { return Err(HotColdDBError::MissingPathToBlobsDatabase.into()); } - BlobInfo { - oldest_blob_slot: blob_info.oldest_blob_slot, - blobs_db: false, - } } - (None, Some(_)) => BlobInfo { - oldest_blob_slot: None, - blobs_db: true, - }, // first time starting up node - (None, None) => BlobInfo { - oldest_blob_slot: None, - blobs_db: false, - }, // first time starting up node + // Set the oldest blob slot to the Deneb fork slot if it is not yet set. + let oldest_blob_slot = blob_info.oldest_blob_slot.or(deneb_fork_slot); + BlobInfo { + oldest_blob_slot, + blobs_db: blobs_db_path.is_some(), + } } + // First start. + None => BlobInfo { + // Set the oldest blob slot to the Deneb fork slot if it is not yet set. + oldest_blob_slot: deneb_fork_slot, + blobs_db: blobs_db_path.is_some(), + }, }; if new_blob_info.blobs_db { if let Some(path) = &blobs_db_path { db.blobs_db = Some(LevelDB::open(path.as_path())?); } } - db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info)?; + db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; info!( db.log, - "Blobs DB initialized"; - "use separate blobs db" => db.get_blob_info().blobs_db, - "path" => ?blobs_db_path + "Blob DB initialized"; + "separate_db" => new_blob_info.blobs_db, + "path" => ?blobs_db_path, + "oldest_blob_slot" => ?new_blob_info.oldest_blob_slot, ); // Ensure that the schema version of the on-disk database matches the software. @@ -323,17 +321,6 @@ impl HotColdDB, LevelDB> { db.store_schema_version(CURRENT_SCHEMA_VERSION)?; } - if let Some(blob_info) = db.load_blob_info()? { - let oldest_blob_slot = blob_info.oldest_blob_slot; - *db.blob_info.write() = blob_info; - - info!( - db.log, - "Blob info loaded from disk"; - "oldest_blob_slot" => ?oldest_blob_slot, - ); - } - // Ensure that any on-disk config is compatible with the supplied config. if let Some(disk_config) = db.load_config()? { db.config.check_compatibility(&disk_config)?; @@ -587,10 +574,10 @@ impl, Cold: ItemStore> HotColdDB .map(|payload| payload.is_some()) } - /// Check if the blobs sidecar for a block exists on disk. - pub fn blobs_sidecar_exists(&self, block_root: &Hash256) -> Result { - self.get_item::>(block_root) - .map(|blobs| blobs.is_some()) + /// Check if the blobs for a block exists on disk. + pub fn blobs_exist(&self, block_root: &Hash256) -> Result { + let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); + blobs_db.key_exists(DBColumn::BeaconBlob.into(), block_root.as_bytes()) } /// Determine whether a block exists in the database. @@ -961,12 +948,6 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } - StoreOp::PutOrphanedBlobsKey(block_root) => { - let db_key = - get_key_for_col(DBColumn::BeaconBlobOrphan.into(), block_root.as_bytes()); - key_value_batch.push(KeyValueStoreOp::PutKeyValue(db_key, [].into())); - } - StoreOp::KeyValueOp(kv_op) => { key_value_batch.push(kv_op); } @@ -985,8 +966,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { - Ok(Some(blobs_sidecar_list)) => { - blobs_to_delete.push((*block_root, blobs_sidecar_list)); + Ok(Some(blob_sidecar_list)) => { + blobs_to_delete.push((*block_root, blob_sidecar_list)); } Err(e) => { error!( @@ -1020,6 +1001,12 @@ impl, Cold: ItemStore> HotColdDB }; // Rollback on failure if let Err(e) = tx_res { + error!( + self.log, + "Database write failed"; + "error" => ?e, + "action" => "reverting blob DB changes" + ); let mut blob_cache_ops = blob_cache_ops; for op in blob_cache_ops.iter_mut() { let reverse_op = match op { @@ -1062,8 +1049,6 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteExecutionPayload(_) => (), - StoreOp::PutOrphanedBlobsKey(_) => (), - StoreOp::KeyValueOp(_) => (), } } @@ -1450,7 +1435,7 @@ impl, Cold: ItemStore> HotColdDB }) } - /// Fetch a blobs sidecar from the store. + /// Fetch blobs for a given block from the store. pub fn get_blobs(&self, block_root: &Hash256) -> Result>, Error> { let blobs_db = self.blobs_db.as_ref().unwrap_or(&self.cold_db); @@ -1642,6 +1627,18 @@ impl, Cold: ItemStore> HotColdDB .map(|a| a.anchor_slot) } + /// Initialize the `BlobInfo` when starting from genesis or a checkpoint. + pub fn init_blob_info(&self, anchor_slot: Slot) -> Result { + let oldest_blob_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { + std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) + }); + let blob_info = BlobInfo { + oldest_blob_slot, + blobs_db: self.blobs_db.is_some(), + }; + self.compare_and_set_blob_info(self.get_blob_info(), blob_info) + } + /// Get a clone of the store's blob info. /// /// To do mutations, use `compare_and_set_blob_info`. @@ -1656,7 +1653,7 @@ impl, Cold: ItemStore> HotColdDB /// /// Return an `BlobInfoConcurrentMutation` error if the `prev_value` provided /// is not correct. - fn compare_and_set_blob_info( + pub fn compare_and_set_blob_info( &self, prev_value: BlobInfo, new_value: BlobInfo, @@ -1672,7 +1669,7 @@ impl, Cold: ItemStore> HotColdDB } /// As for `compare_and_set_blob_info`, but also writes the blob info to disk immediately. - fn compare_and_set_blob_info_with_write( + pub fn compare_and_set_blob_info_with_write( &self, prev_value: BlobInfo, new_value: BlobInfo, @@ -1829,7 +1826,7 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } - /// Verify that a parsed config. + /// Verify that a parsed config is valid. fn verify_config(config: &StoreConfig) -> Result<(), HotColdDBError> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; Self::verify_epochs_per_blob_prune(config.epochs_per_blob_prune) @@ -2047,107 +2044,133 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Try to prune blobs, approximating the current epoch from lower epoch numbers end (older - /// end) and is useful when the data availability boundary is not at hand. + /// Try to prune blobs, approximating the current epoch from the split slot. pub fn try_prune_most_blobs(&self, force: bool) -> Result<(), Error> { - let deneb_fork = match self.spec.deneb_fork_epoch { + let deneb_fork_epoch = match self.spec.deneb_fork_epoch { Some(epoch) => epoch, None => { debug!(self.log, "Deneb fork is disabled"); return Ok(()); } }; - // At best, current_epoch = split_epoch + 2. However, if finalization doesn't advance, the - // `split.slot` is not updated and current_epoch > split_epoch + 2. - let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + Epoch::new(2); + // The current epoch is >= split_epoch + 2. It could be greater if the database is + // configured to delay updating the split or finalization has ceased. In this instance we + // choose to also delay the pruning of blobs (we never prune without finalization anyway). + let min_current_epoch = self.get_split_slot().epoch(E::slots_per_epoch()) + 2; let min_data_availability_boundary = std::cmp::max( - deneb_fork, - min_current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), + deneb_fork_epoch, + min_current_epoch.saturating_sub(MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS), ); self.try_prune_blobs(force, min_data_availability_boundary) } /// Try to prune blobs older than the data availability boundary. + /// + /// Blobs from the epoch `data_availability_boundary - blob_prune_margin_epochs` are retained. + /// This epoch is an _exclusive_ endpoint for the pruning process. + /// + /// This function only supports pruning blobs older than the split point, which is older than + /// (or equal to) finalization. Pruning blobs newer than finalization is not supported. + /// + /// This function also assumes that the split is stationary while it runs. It should only be + /// run from the migrator thread (where `migrate_database` runs) or the database manager. pub fn try_prune_blobs( &self, force: bool, data_availability_boundary: Epoch, ) -> Result<(), Error> { - let deneb_fork = match self.spec.deneb_fork_epoch { - Some(epoch) => epoch, - None => { - debug!(self.log, "Deneb fork is disabled"); - return Ok(()); - } - }; + if self.spec.deneb_fork_epoch.is_none() { + debug!(self.log, "Deneb fork is disabled"); + return Ok(()); + } - let should_prune_blobs = self.get_config().prune_blobs; - if !should_prune_blobs && !force { + let pruning_enabled = self.get_config().prune_blobs; + let margin_epochs = self.get_config().blob_prune_margin_epochs; + let epochs_per_blob_prune = self.get_config().epochs_per_blob_prune; + + if !force && !pruning_enabled { debug!( self.log, "Blob pruning is disabled"; - "prune_blobs" => should_prune_blobs + "prune_blobs" => pruning_enabled ); return Ok(()); } let blob_info = self.get_blob_info(); - let oldest_blob_slot = blob_info - .oldest_blob_slot - .unwrap_or_else(|| deneb_fork.start_slot(E::slots_per_epoch())); + let Some(oldest_blob_slot) = blob_info.oldest_blob_slot else { + error!(self.log, "Slot of oldest blob is not known"); + return Err(HotColdDBError::BlobPruneLogicError.into()); + }; - // The last entirely pruned epoch, blobs sidecar pruning may have stopped early in the - // middle of an epoch otherwise the oldest blob slot is a start slot. - let last_pruned_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()) - 1; + // Start pruning from the epoch of the oldest blob stored. + // The start epoch is inclusive (blobs in this epoch will be pruned). + let start_epoch = oldest_blob_slot.epoch(E::slots_per_epoch()); - // At most prune blobs up until the data availability boundary epoch, leaving at least - // blobs of the data availability boundary epoch and younger. - let earliest_prunable_epoch = data_availability_boundary - 1; - // Stop pruning before reaching the data availability boundary if a margin is configured. - let margin_epochs = self.get_config().blob_prune_margin_epochs; - let end_epoch = earliest_prunable_epoch - margin_epochs; + // Prune blobs up until the `data_availability_boundary - margin` or the split + // slot's epoch, whichever is older. We can't prune blobs newer than the split. + // The end epoch is also inclusive (blobs in this epoch will be pruned). + let split = self.get_split_info(); + let end_epoch = std::cmp::min( + data_availability_boundary - margin_epochs - 1, + split.slot.epoch(E::slots_per_epoch()) - 1, + ); + let end_slot = end_epoch.end_slot(E::slots_per_epoch()); - if !force - && last_pruned_epoch.as_u64() + self.get_config().epochs_per_blob_prune - > end_epoch.as_u64() - { - debug!(self.log, "Blobs sidecars are pruned"); + let can_prune = end_epoch != 0 && start_epoch <= end_epoch; + let should_prune = start_epoch + epochs_per_blob_prune <= end_epoch + 1; + + if !force && !should_prune || !can_prune { + debug!( + self.log, + "Blobs are pruned"; + "oldest_blob_slot" => oldest_blob_slot, + "data_availability_boundary" => data_availability_boundary, + "split_slot" => split.slot, + "end_epoch" => end_epoch, + "start_epoch" => start_epoch, + ); return Ok(()); } + // Sanity checks. + if let Some(anchor) = self.get_anchor_info() { + if oldest_blob_slot < anchor.oldest_block_slot { + error!( + self.log, + "Oldest blob is older than oldest block"; + "oldest_blob_slot" => oldest_blob_slot, + "oldest_block_slot" => anchor.oldest_block_slot + ); + return Err(HotColdDBError::BlobPruneLogicError.into()); + } + } + // Iterate block roots forwards from the oldest blob slot. debug!( self.log, - "Pruning blobs sidecars stored longer than data availability boundary"; + "Pruning blobs"; + "start_epoch" => start_epoch, + "end_epoch" => end_epoch, + "data_availability_boundary" => data_availability_boundary, ); - // todo(emhane): If we notice degraded I/O for users switching modes (prune_blobs=true to - // prune_blobs=false) we could add a warning that only fires on a threshold, e.g. more - // than 2x epochs_per_blob_prune epochs without a prune. let mut ops = vec![]; let mut last_pruned_block_root = None; - let end_slot = end_epoch.end_slot(E::slots_per_epoch()); for res in self.forwards_block_roots_iterator_until( oldest_blob_slot, end_slot, || { - // todo(emhane): In the future, if the data availability boundary is more recent - // than the split (finalized) epoch, this code will have to change to decide what - // to do with pruned blobs in our not-yet-finalized canonical chain and - // not-yet-orphaned forks (see DBColumn::BeaconBlobOrphan). - // - // Related to review and the spec PRs linked in it: - // https://github.com/sigp/lighthouse/pull/3852#pullrequestreview-1244785136 - let split = self.get_split_info(); + let (_, split_state) = self + .get_advanced_hot_state(split.block_root, split.slot, split.state_root)? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + ))?; - let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( - HotColdDBError::MissingSplitState(split.state_root, split.slot), - )?; - let split_block_root = split_state.get_latest_block_root(split.state_root); - - Ok((split_state, split_block_root)) + Ok((split_state, split.block_root)) }, &self.spec, )? { @@ -2156,19 +2179,17 @@ impl, Cold: ItemStore> HotColdDB Err(e) => { warn!( self.log, - "Stopping blobs sidecar pruning early"; + "Stopping blob pruning early"; "error" => ?e, ); break; } }; - if Some(block_root) != last_pruned_block_root - && self.blobs_sidecar_exists(&block_root)? - { - debug!( + if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? { + trace!( self.log, - "Pruning blobs sidecar"; + "Pruning blobs of block"; "slot" => slot, "block_root" => ?block_root, ); @@ -2177,15 +2198,10 @@ impl, Cold: ItemStore> HotColdDB } if slot >= end_slot { - info!( - self.log, - "Blobs sidecar pruning reached earliest available blobs sidecar"; - "slot" => slot - ); break; } } - let blobs_sidecars_pruned = ops.len(); + let blob_lists_pruned = ops.len(); let new_blob_info = BlobInfo { oldest_blob_slot: Some(end_slot + 1), blobs_db: blob_info.blobs_db, @@ -2196,8 +2212,8 @@ impl, Cold: ItemStore> HotColdDB self.do_atomically_with_block_and_blobs_cache(ops)?; info!( self.log, - "Blobs sidecar pruning complete"; - "blobs_sidecars_pruned" => blobs_sidecars_pruned, + "Blob pruning complete"; + "blob_lists_pruned" => blob_lists_pruned, ); Ok(()) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 074c05a9e1..85de9697c5 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -170,7 +170,6 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, BlobSidecarList), - PutOrphanedBlobsKey(Hash256), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), @@ -191,9 +190,6 @@ pub enum DBColumn { BeaconBlock, #[strum(serialize = "blb")] BeaconBlob, - /// Block roots of orphaned beacon blobs. - #[strum(serialize = "blo")] - BeaconBlobOrphan, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 6405aff1bc..59a607aaf7 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(17); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(18); // All the keys that get stored under the `BeaconMeta` column. // @@ -127,7 +127,13 @@ impl StoreItem for AnchorInfo { /// Database parameters relevant to blob sync. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] pub struct BlobInfo { - /// The slot after which blobs are available (>=). + /// The slot after which blobs are or *will be* available (>=). + /// + /// If this slot is in the future, then it is the first slot of the Deneb fork, from which blobs + /// will be available. + /// + /// If the `oldest_blob_slot` is `None` then this means that the Deneb fork epoch is not yet + /// known. pub oldest_blob_slot: Option, /// A separate blobs database is in use. pub blobs_db: bool, diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index dfc19db492..cd405386b8 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -20,7 +20,7 @@ use reqwest::IntoUrl; use serde::{Deserialize, Serialize}; use ssz::four_byte_option_impl; use ssz_derive::{Decode, Encode}; -use store::{AnchorInfo, Split, StoreConfig}; +use store::{AnchorInfo, BlobInfo, Split, StoreConfig}; pub use attestation_performance::{ AttestationPerformance, AttestationPerformanceQuery, AttestationPerformanceStatistics, @@ -364,6 +364,7 @@ pub struct DatabaseInfo { pub config: StoreConfig, pub split: Split, pub anchor: Option, + pub blob_info: BlobInfo, } impl BeaconNodeHttpClient { diff --git a/consensus/types/src/consts.rs b/consensus/types/src/consts.rs index 7fa03dc5f8..f93c75ee8d 100644 --- a/consensus/types/src/consts.rs +++ b/consensus/types/src/consts.rs @@ -23,18 +23,10 @@ pub mod merge { pub const INTERVALS_PER_SLOT: u64 = 3; } pub mod deneb { - use crate::{Epoch, Uint256}; + use crate::Epoch; - use lazy_static::lazy_static; - - lazy_static! { - pub static ref BLS_MODULUS: Uint256 = Uint256::from_dec_str( - "52435875175126190479447740508185965837690552500527637822603658699938581184513" - ) - .expect("should initialize BLS_MODULUS"); - pub static ref MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: Epoch = Epoch::from(4096_u64); - } pub const VERSIONED_HASH_VERSION_KZG: u8 = 1; pub const BLOB_SIDECAR_SUBNET_COUNT: u64 = 6; pub const MAX_BLOBS_PER_BLOCK: u64 = BLOB_SIDECAR_SUBNET_COUNT; + pub const MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS: Epoch = Epoch::new(4096); }