diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 221d380a86..11b0d6af2b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3021,6 +3021,22 @@ impl BeaconChain { info!(self.log, "Writing blobs to store"; "block_root" => ?block_root); ops.push(StoreOp::PutBlobs(block_root, blobs)); } + + // Update db's metadata for blobs pruning. + if current_slot == current_epoch.start_slot(T::EthSpec::slots_per_epoch()) { + if let Some(mut blob_info) = self.store.get_blob_info() { + let next_epoch_to_prune = + blob_info.last_pruned_epoch + *MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS; + + if current_epoch > next_epoch_to_prune { + blob_info.availability_breakpoint = block_root; + self.store.compare_and_set_blob_info_with_write( + self.store.get_blob_info(), + Some(blob_info), + )?; + } + } + } }; let txn_lock = self.store.hot_db.begin_rw_transaction(); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 345d06841e..bbb74ef5cd 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -94,6 +94,7 @@ pub enum HotColdDBError { MissingHotStateSummary(Hash256), MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), + MissingStateToPruneBlobs(Hash256, Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, @@ -1687,41 +1688,50 @@ impl, Cold: ItemStore> HotColdDB } pub fn try_prune_blobs(&self, force: bool) -> Result<(), Error> { - let split = self.get_split_info(); + let mut blob_info: BlobInfo; - if split.slot == 0 { + if let Some(old_blob_info) = self.get_blob_info() { + blob_info = old_blob_info; + } else { return Ok(()); } - let eip4844_fork_slot = if let Some(epoch) = self.spec.eip4844_fork_epoch { - epoch.start_slot(E::slots_per_epoch()) - } else { + if blob_info.availability_breakpoint == blob_info.oldest_blob_parent { return Ok(()); - }; + } - // Load the split state so we can backtrack to find blobs sidecars. - // todo(emhane): MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS - let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( - HotColdDBError::MissingSplitState(split.state_root, split.slot), - )?; + // Load the state from which to prune blobs so we can backtrack. + let erase_state = self + .get_state( + &blob_info.availability_breakpoint, + Some(blob_info.last_pruned_epoch.end_slot(E::slots_per_epoch())), + )? + .ok_or(HotColdDBError::MissingStateToPruneBlobs( + blob_info.availability_breakpoint, + blob_info.oldest_blob_slot, + ))?; + + // The data availability breakpoint is set at the start of an epoch indicating the epoch + // before can be pruned. + let erase_epoch = erase_state.current_epoch() - 1; + let erase_slot = erase_epoch.end_slot(E::slots_per_epoch()); // The finalized block may or may not have its blobs sidecar stored, depending on // whether it was at a skipped slot. However for a fully pruned database its parent - // should *always* have been pruned. In case of a long split (no parent found) we - // continue as if the payloads are pruned, as the node probably has other things to worry - // about. - let split_block_root = split_state.get_latest_block_root(split.state_root); + // should *always* have been pruned. In the case of blobs sidecars we look at the next + // parent block with at least one kzg commitment. - let already_pruned = - process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| { - iter.find(|(_, block_root)| { + let already_pruned = process_results( + BlockRootsIter::new(&erase_state, blob_info.oldest_blob_slot), + |mut iter| { + iter.find(|(slot, block_root)| { move || -> bool { - if *block_root != split_block_root { - if let Ok(Some(split_parent_block)) = + if *slot <= erase_slot { + if let Ok(Some(erase_parent_block)) = self.get_blinded_block(&block_root) { if let Ok(expected_kzg_commitments) = - split_parent_block.message().body().blob_kzg_commitments() + erase_parent_block.message().body().blob_kzg_commitments() { if expected_kzg_commitments.len() > 0 { return true; @@ -1736,28 +1746,25 @@ impl, Cold: ItemStore> HotColdDB self.blobs_sidecar_exists(&split_parent_root) .map(|exists| !exists) }) - })??; + }, + )??; if already_pruned && !force { info!(self.log, "Blobs sidecars are pruned"); return Ok(()); } - // Iterate block roots backwards to the Eip48444 fork or the latest blob slot, whichever - // comes first. + // Iterate block roots backwards to oldest blob slot. warn!( self.log, - "Pruning finalized blobs sidecars"; + "Pruning blobs sidecars stored longer than data availability boundary"; "info" => "you may notice degraded I/O performance while this runs" ); - let latest_blob_slot = self.get_blob_info().map(|info| info.latest_blob_slot); let mut ops = vec![]; let mut last_pruned_block_root = None; - for res in std::iter::once(Ok((split_block_root, split.slot))) - .chain(BlockRootsIterator::new(self, &split_state)) - { + for res in BlockRootsIterator::new(self, &erase_state) { let (block_root, slot) = match res { Ok(tuple) => tuple, Err(e) => { @@ -1770,14 +1777,6 @@ impl, Cold: ItemStore> HotColdDB } }; - if slot < eip4844_fork_slot { - info!( - self.log, - "Blobs sidecar pruning reached Eip4844 boundary"; - ); - break; - } - if Some(block_root) != last_pruned_block_root && self.blobs_sidecar_exists(&block_root)? { @@ -1791,15 +1790,16 @@ impl, Cold: ItemStore> HotColdDB ops.push(StoreOp::DeleteBlobs(block_root)); } - if Some(slot) == latest_blob_slot { + if slot <= erase_slot { info!( self.log, - "Blobs sidecar pruning reached anchor state"; + "Blobs sidecar pruning reached earliest available blob state"; "slot" => slot ); break; } } + let blobs_sidecars_pruned = ops.len(); self.do_atomically(ops)?; info!( @@ -1807,6 +1807,11 @@ impl, Cold: ItemStore> HotColdDB "Blobs sidecar pruning complete"; "blobs_sidecars_pruned" => blobs_sidecars_pruned, ); + + blob_info.last_pruned_epoch = erase_epoch; + blob_info.oldest_blob_parent = blob_info.availability_breakpoint; + self.compare_and_set_blob_info_with_write(self.get_blob_info(), Some(blob_info))?; + Ok(()) } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 8998d56baa..e1b2c948a9 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -35,6 +35,7 @@ pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split}; pub use self::leveldb_store::LevelDB; pub use self::memory_store::MemoryStore; pub use self::partial_beacon_state::PartialBeaconState; +pub use crate::metadata::BlobInfo; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 15bcaf1bb0..c2e72fac35 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -2,7 +2,7 @@ use crate::{DBColumn, Error, StoreItem}; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use types::{Checkpoint, Hash256, Slot}; +use types::{Checkpoint, Epoch, Hash256, Slot}; pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(15); @@ -122,6 +122,10 @@ impl StoreItem for AnchorInfo { /// Database parameters relevant to blob sync. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize)] pub struct BlobInfo { + /// The latest epoch that blobs were pruned. + pub last_pruned_epoch: Epoch, + /// The block root of the next blobs to prune from. + pub availability_breakpoint: Hash256, /// The block root of the next blob that needs to be added to fill in the history. pub oldest_blob_parent: Hash256, /// The slot before which blobs are available.