Drop head tracker for summaries DAG (#6744)

The head tracker is a persisted piece of state that must be kept in sync with the fork-choice. It has been a source of pruning issues in the past, so we want to remove it
- see https://github.com/sigp/lighthouse/issues/1785

When implementing tree-states in the hot DB we have to change the pruning routine (more details below) so we want to do those changes first in isolation.
- see https://github.com/sigp/lighthouse/issues/6580
- If you want to see the full feature of tree-states hot https://github.com/dapplion/lighthouse/pull/39

Closes https://github.com/sigp/lighthouse/issues/1785


  **Current DB migration routine**

- Locate abandoned heads with head tracker
- Use a roots iterator to collect the ancestors of those heads can be pruned
- Delete those abandoned blocks / states
- Migrate the newly finalized chain to the freezer

In summary, it computes what it has to delete and keeps the rest. Then it migrates data to the freezer. If the abandoned forks routine has a bug it can break the freezer migration.

**Proposed migration routine (this PR)**

- Migrate the newly finalized chain to the freezer
- Load all state summaries from disk
- From those, just knowing the head and finalized block compute two sets: (1) descendants of finalized (2) newly finalized chain
- Iterate all summaries, if a summary does not belong to set (1) or (2), delete

This strategy is more sound as it just checks what's there in the hot DB, computes what it has to keep and deletes the rest. Because it does not rely and 3rd pieces of data we can drop the head tracker and pruning checkpoint. Since the DB migration happens **first** now, as long as the computation of the sets to keep is correct we won't have pruning issues.
This commit is contained in:
Lion - dapplion
2025-04-07 01:23:52 -03:00
committed by GitHub
parent b5d40e3db0
commit 70850fe58d
27 changed files with 1110 additions and 983 deletions

View File

@@ -1,22 +1,16 @@
use crate::beacon_chain::BEACON_CHAIN_DB_KEY;
use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use crate::summaries_dag::{DAGStateSummaryV22, Error as SummariesDagError, StateSummariesDAG};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::mem;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::iter::RootsIterator;
use store::{Error, ItemStore, StoreItem, StoreOp};
use store::{Error, ItemStore, StoreOp};
pub use store::{HotColdDB, MemoryStore};
use tracing::{debug, error, info, warn};
use types::{
BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, FixedBytesExtended,
Hash256, SignedBeaconBlockHash, Slot,
};
use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot};
/// Compact at least this frequently, finalization permitting (7 days).
const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
@@ -42,8 +36,6 @@ pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
prev_migration: Arc<Mutex<PrevMigration>>,
#[allow(clippy::type_complexity)]
tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -89,7 +81,7 @@ pub struct PrevMigration {
pub enum PruningOutcome {
/// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`.
Successful {
old_finalized_checkpoint: Checkpoint,
old_finalized_checkpoint_epoch: Epoch,
},
/// The run was aborted because the new finalized checkpoint is older than the previous one.
OutOfOrderFinalization {
@@ -116,6 +108,11 @@ pub enum PruningError {
},
UnexpectedEqualStateRoots,
UnexpectedUnequalStateRoots,
MissingSummaryForFinalizedCheckpoint(Hash256),
MissingBlindedBlock(Hash256),
SummariesDagError(&'static str, SummariesDagError),
EmptyFinalizedStates,
EmptyFinalizedBlocks,
}
/// Message sent to the migration thread containing the information it needs to run.
@@ -130,25 +127,17 @@ pub enum Notification {
pub struct ManualFinalizationNotification {
pub state_root: BeaconStateHash,
pub checkpoint: Checkpoint,
pub head_tracker: Arc<HeadTracker>,
pub genesis_block_root: Hash256,
}
pub struct FinalizationNotification {
pub finalized_state_root: BeaconStateHash,
pub finalized_checkpoint: Checkpoint,
pub head_tracker: Arc<HeadTracker>,
pub prev_migration: Arc<Mutex<PrevMigration>>,
pub genesis_block_root: Hash256,
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Create a new `BackgroundMigrator` and spawn its thread if necessary.
pub fn new(
db: Arc<HotColdDB<E, Hot, Cold>>,
config: MigratorConfig,
genesis_block_root: Hash256,
) -> Self {
pub fn new(db: Arc<HotColdDB<E, Hot, Cold>>, config: MigratorConfig) -> Self {
// Estimate last migration run from DB split slot.
let prev_migration = Arc::new(Mutex::new(PrevMigration {
epoch: db.get_split_slot().epoch(E::slots_per_epoch()),
@@ -163,7 +152,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
db,
tx_thread,
prev_migration,
genesis_block_root,
}
}
@@ -176,14 +164,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
&self,
finalized_state_root: BeaconStateHash,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
) -> Result<(), BeaconChainError> {
let notif = FinalizationNotification {
finalized_state_root,
finalized_checkpoint,
head_tracker,
prev_migration: self.prev_migration.clone(),
genesis_block_root: self.genesis_block_root,
};
// Send to background thread if configured, otherwise run in foreground.
@@ -314,9 +299,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let notif = FinalizationNotification {
finalized_state_root: notif.state_root,
finalized_checkpoint: notif.checkpoint,
head_tracker: notif.head_tracker,
prev_migration: Arc::new(prev_migration.into()),
genesis_block_root: notif.genesis_block_root,
};
Self::run_migration(db, notif);
}
@@ -360,17 +343,34 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
};
let old_finalized_checkpoint = match Self::prune_abandoned_forks(
match migrate_database(
db.clone(),
notif.head_tracker,
finalized_state_root,
finalized_state_root.into(),
finalized_block_root,
&finalized_state,
) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
slot = slot.as_u64(),
"Database migration postponed, unaligned finalized block"
);
}
Err(e) => {
warn!(error = ?e, "Database migration failed");
return;
}
};
let old_finalized_checkpoint_epoch = match Self::prune_hot_db(
db.clone(),
finalized_state_root.into(),
&finalized_state,
notif.finalized_checkpoint,
notif.genesis_block_root,
) {
Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
}) => old_finalized_checkpoint,
old_finalized_checkpoint_epoch,
}) => old_finalized_checkpoint_epoch,
Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation) => {
warn!(
message = "this is expected only very rarely!",
@@ -391,26 +391,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
return;
}
Err(e) => {
warn!(error = ?e,"Block pruning failed");
return;
}
};
match migrate_database(
db.clone(),
finalized_state_root.into(),
finalized_block_root,
&finalized_state,
) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
slot = slot.as_u64(),
"Database migration postponed, unaligned finalized block"
warn!(
error = ?e,
"Hot DB pruning failed"
);
}
Err(e) => {
warn!(error = ?e, "Database migration failed");
return;
}
};
@@ -418,7 +402,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Finally, compact the database so that new free space is properly reclaimed.
if let Err(e) = Self::run_compaction(
db,
old_finalized_checkpoint.epoch,
old_finalized_checkpoint_epoch,
notif.finalized_checkpoint.epoch,
) {
warn!(error = ?e, "Database compaction failed");
@@ -514,30 +498,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
/// Traverses live heads and prunes blocks and states of chains that we know can't be built
/// upon because finalization would prohibit it. This is an optimisation intended to save disk
/// space.
#[allow(clippy::too_many_arguments)]
fn prune_abandoned_forks(
fn prune_hot_db(
store: Arc<HotColdDB<E, Hot, Cold>>,
head_tracker: Arc<HeadTracker>,
new_finalized_state_hash: BeaconStateHash,
new_finalized_state_root: Hash256,
new_finalized_state: &BeaconState<E>,
new_finalized_checkpoint: Checkpoint,
genesis_block_root: Hash256,
) -> Result<PruningOutcome, BeaconChainError> {
let old_finalized_checkpoint =
store
.load_pruning_checkpoint()?
.unwrap_or_else(|| Checkpoint {
epoch: Epoch::new(0),
root: Hash256::zero(),
});
let old_finalized_slot = old_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let new_finalized_slot = new_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let new_finalized_block_hash = new_finalized_checkpoint.root.into();
// The finalized state must be for the epoch boundary slot, not the slot of the finalized
// block.
@@ -549,200 +518,220 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into());
}
// The new finalized state must be newer than the previous finalized state.
// I think this can happen sometimes currently due to `fork_choice` running in parallel
// with itself and sending us notifications out of order.
if old_finalized_slot > new_finalized_slot {
return Ok(PruningOutcome::OutOfOrderFinalization {
old_finalized_checkpoint,
new_finalized_checkpoint,
});
}
debug!(
old_finalized_epoch = %old_finalized_checkpoint.epoch,
new_finalized_epoch = %new_finalized_checkpoint.epoch,
new_finalized_checkpoint = ?new_finalized_checkpoint,
new_finalized_state_root = %new_finalized_state_root,
"Starting database pruning"
);
// For each slot between the new finalized checkpoint and the old finalized checkpoint,
// collect the beacon block root and state root of the canonical chain.
let newly_finalized_chain: HashMap<Slot, (SignedBeaconBlockHash, BeaconStateHash)> =
std::iter::once(Ok((
new_finalized_slot,
(new_finalized_block_hash, new_finalized_state_hash),
)))
.chain(RootsIterator::new(&store, new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
let state_summaries_dag = {
let state_summaries = store
.load_hot_state_summaries()?
.into_iter()
.map(|(state_root, summary)| {
let block_root = summary.latest_block_root;
// This error should never happen unless we break a DB invariant
let block = store
.get_blinded_block(&block_root)?
.ok_or(PruningError::MissingBlindedBlock(block_root))?;
Ok((
state_root,
DAGStateSummaryV22 {
slot: summary.slot,
latest_block_root: summary.latest_block_root,
block_slot: block.slot(),
block_parent_root: block.parent_root(),
},
))
})
}))
.take_while(|res| {
res.as_ref()
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
})
.collect::<Result<_, _>>()?;
.collect::<Result<Vec<(Hash256, DAGStateSummaryV22)>, BeaconChainError>>()?;
// De-duplicate block roots to reduce block reads below
let summary_block_roots = HashSet::<Hash256>::from_iter(
state_summaries
.iter()
.map(|(_, summary)| summary.latest_block_root),
);
// Sanity check, there is at least one summary with the new finalized block root
if !summary_block_roots.contains(&new_finalized_checkpoint.root) {
return Err(BeaconChainError::PruningError(
PruningError::MissingSummaryForFinalizedCheckpoint(
new_finalized_checkpoint.root,
),
));
}
StateSummariesDAG::new_from_v22(state_summaries)
.map_err(|e| PruningError::SummariesDagError("new StateSumariesDAG", e))?
};
// To debug faulty trees log if we unexpectedly have more than one root. These trees may not
// result in an error, as they may not be queried in the codepaths below.
let state_summaries_dag_roots = state_summaries_dag.tree_roots();
if state_summaries_dag_roots.len() > 1 {
warn!(
state_summaries_dag_roots = ?state_summaries_dag_roots,
"Prune state summaries dag found more than one root"
);
}
// `new_finalized_state_root` is the *state at the slot of the finalized epoch*,
// rather than the state of the latest finalized block. These two values will only
// differ when the first slot of the finalized epoch is a skip slot.
let finalized_and_descendant_state_roots_of_finalized_checkpoint =
HashSet::<Hash256>::from_iter(
std::iter::once(new_finalized_state_root).chain(
state_summaries_dag
.descendants_of(&new_finalized_state_root)
.map_err(|e| PruningError::SummariesDagError("descendants of", e))?,
),
);
// Collect all `latest_block_roots` of the
// finalized_and_descendant_state_roots_of_finalized_checkpoint set. Includes the finalized
// block as `new_finalized_state_root` always has a latest block root equal to the finalized
// block.
let finalized_and_descendant_block_roots_of_finalized_checkpoint =
HashSet::<Hash256>::from_iter(
state_summaries_dag
.blocks_of_states(
finalized_and_descendant_state_roots_of_finalized_checkpoint.iter(),
)
// should never error, we just constructed
// finalized_and_descendant_state_roots_of_finalized_checkpoint from the
// state_summaries_dag
.map_err(|e| PruningError::SummariesDagError("blocks of descendant", e))?
.into_iter()
.map(|(block_root, _)| block_root),
);
// Note: ancestors_of includes the finalized state root
let newly_finalized_state_summaries = state_summaries_dag
.ancestors_of(new_finalized_state_root)
.map_err(|e| PruningError::SummariesDagError("ancestors of", e))?;
let newly_finalized_state_roots = newly_finalized_state_summaries
.iter()
.map(|(root, _)| *root)
.collect::<HashSet<Hash256>>();
let newly_finalized_states_min_slot = *newly_finalized_state_summaries
.iter()
.map(|(_, slot)| slot)
.min()
.ok_or(PruningError::EmptyFinalizedStates)?;
// Note: ancestors_of includes the finalized block
let newly_finalized_blocks = state_summaries_dag
.blocks_of_states(newly_finalized_state_roots.iter())
.map_err(|e| PruningError::SummariesDagError("blocks of newly finalized", e))?;
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
// everything in one fell swoop.
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new();
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
let mut blocks_to_prune: HashSet<Hash256> = HashSet::new();
let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new();
let heads = head_tracker.heads();
debug!(
old_finalized_root = ?old_finalized_checkpoint.root,
new_finalized_root = ?new_finalized_checkpoint.root,
head_count = heads.len(),
"Extra pruning information"
);
// Consider the following block tree where we finalize block `[0]` at the checkpoint `(f)`.
// There's a block `[3]` that descendends from the finalized block but NOT from the
// finalized checkpoint. The block tree rooted in `[3]` conflicts with finality and must be
// pruned. Therefore we collect all state summaries descendant of `(f)`.
//
// finalize epoch boundary
// | /-------[2]-----
// [0]-------|--(f)--[1]----------
// \---[3]--|-----------------[4]
// |
for (head_hash, head_slot) in heads {
// Load head block. If it fails with a decode error, it's likely a reverted block,
// so delete it from the head tracker but leave it and its states in the database
// This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync
// can be used to reclaim the space.
let head_state_root = match store.get_blinded_block(&head_hash) {
Ok(Some(block)) => block.state_root(),
Ok(None) => {
return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into())
for (_, summaries) in state_summaries_dag.summaries_by_slot_ascending() {
for (state_root, summary) in summaries {
let should_prune = if finalized_and_descendant_state_roots_of_finalized_checkpoint
.contains(&state_root)
{
// This state is a viable descendant of the finalized checkpoint, so does not
// conflict with finality and can be built on or become a head
false
} else {
// Everything else, prune
true
};
if should_prune {
// States are migrated into the cold DB in the migrate step. All hot states
// prior to finalized can be pruned from the hot DB columns
states_to_prune.insert((summary.slot, state_root));
}
Err(Error::SszDecodeError(e)) => {
warn!(
block_root = ?head_hash,
error = ?e,
"Forgetting invalid head block"
);
abandoned_heads.insert(head_hash);
continue;
}
Err(e) => return Err(e.into()),
}
}
for (block_root, slot) in state_summaries_dag.iter_blocks() {
// Blocks both finalized and unfinalized are in the same DB column. We must only
// prune blocks from abandoned forks. Note that block pruning and state pruning differ.
// The blocks DB column is shared for hot and cold data, while the states have different
// columns. Thus, we only prune unviable blocks or from abandoned forks.
let should_prune = if finalized_and_descendant_block_roots_of_finalized_checkpoint
.contains(&block_root)
{
// Keep unfinalized blocks descendant of finalized checkpoint + finalized block
// itself Note that we anchor this set on the finalized checkpoint instead of the
// finalized block. A diagram above shows a relevant example.
false
} else if newly_finalized_blocks.contains(&(block_root, slot)) {
// Keep recently finalized blocks
false
} else if slot < newly_finalized_states_min_slot {
// Keep recently finalized blocks that we know are canonical. Blocks with slots <
// that `newly_finalized_blocks_min_slot` we don't have canonical information so we
// assume they are part of the finalized pruned chain
//
// Pruning these would risk breaking the DB by deleting canonical blocks once the
// HDiff grid advances. If the pruning routine is correct this condition should
// never be hit.
false
} else {
// Everything else, prune
true
};
let mut potentially_abandoned_head = Some(head_hash);
let mut potentially_abandoned_blocks = vec![];
// Iterate backwards from this head, staging blocks and states for deletion.
let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot)))
.chain(RootsIterator::from_block(&store, head_hash)?);
for maybe_tuple in iter {
let (block_root, state_root, slot) = maybe_tuple?;
let block_root = SignedBeaconBlockHash::from(block_root);
let state_root = BeaconStateHash::from(state_root);
match newly_finalized_chain.get(&slot) {
// If there's no information about a slot on the finalized chain, then
// it should be because it's ahead of the new finalized slot. Stage
// the fork's block and state for possible deletion.
None => {
if slot > new_finalized_slot {
potentially_abandoned_blocks.push((
slot,
Some(block_root),
Some(state_root),
));
} else if slot >= old_finalized_slot {
return Err(PruningError::MissingInfoForCanonicalChain { slot }.into());
} else {
// We must assume here any candidate chains include the old finalized
// checkpoint, i.e. there aren't any forks starting at a block that is a
// strict ancestor of old_finalized_checkpoint.
warn!(
head_block_root = ?head_hash,
%head_slot,
"Found a chain that should already have been pruned"
);
potentially_abandoned_head.take();
break;
}
}
Some((finalized_block_root, finalized_state_root)) => {
// This fork descends from a newly finalized block, we can stop.
if block_root == *finalized_block_root {
// Sanity check: if the slot and block root match, then the
// state roots should match too.
if state_root != *finalized_state_root {
return Err(PruningError::UnexpectedUnequalStateRoots.into());
}
// If the fork descends from the whole finalized chain,
// do not prune it. Otherwise continue to delete all
// of the blocks and states that have been staged for
// deletion so far.
if slot == new_finalized_slot {
potentially_abandoned_blocks.clear();
potentially_abandoned_head.take();
}
// If there are skipped slots on the fork to be pruned, then
// we will have just staged the common block for deletion.
// Unstage it.
else {
for (_, block_root, _) in
potentially_abandoned_blocks.iter_mut().rev()
{
if block_root.as_ref() == Some(finalized_block_root) {
*block_root = None;
} else {
break;
}
}
}
break;
} else {
if state_root == *finalized_state_root {
return Err(PruningError::UnexpectedEqualStateRoots.into());
}
potentially_abandoned_blocks.push((
slot,
Some(block_root),
Some(state_root),
));
}
}
}
}
if let Some(abandoned_head) = potentially_abandoned_head {
debug!(
head_block_root = ?abandoned_head,
%head_slot,
"Pruning head"
);
abandoned_heads.insert(abandoned_head);
abandoned_blocks.extend(
potentially_abandoned_blocks
.iter()
.filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash),
);
abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map(
|(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)),
));
if should_prune {
blocks_to_prune.insert(block_root);
}
}
// Update the head tracker before the database, so that we maintain the invariant
// that a block present in the head tracker is present in the database.
// See https://github.com/sigp/lighthouse/issues/1557
let mut head_tracker_lock = head_tracker.0.write();
// Sort states to prune to make it more readable
let mut states_to_prune = states_to_prune.into_iter().collect::<Vec<_>>();
states_to_prune.sort_by_key(|(slot, _)| *slot);
// Check that all the heads to be deleted are still present. The absence of any
// head indicates a race, that will likely resolve itself, so we defer pruning until
// later.
for head_hash in &abandoned_heads {
if !head_tracker_lock.contains_key(head_hash) {
return Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation);
}
debug!(
new_finalized_checkpoint = ?new_finalized_checkpoint,
newly_finalized_blocks = newly_finalized_blocks.len(),
newly_finalized_state_roots = newly_finalized_state_roots.len(),
newly_finalized_states_min_slot = %newly_finalized_states_min_slot,
state_summaries_count = state_summaries_dag.summaries_count(),
state_summaries_dag_roots = ?state_summaries_dag_roots,
finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(),
finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(),
blocks_to_prune = blocks_to_prune.len(),
states_to_prune = states_to_prune.len(),
"Extra pruning information"
);
// Don't log the full `states_to_prune` in the log statement above as it can result in a
// single log line of +1Kb and break logging setups.
for block_root in &blocks_to_prune {
debug!(
block_root = ?block_root,
"Pruning block"
);
}
for (slot, state_root) in &states_to_prune {
debug!(
?state_root,
%slot,
"Pruning hot state"
);
}
// Then remove them for real.
for head_hash in abandoned_heads {
head_tracker_lock.remove(&head_hash);
}
let mut batch: Vec<StoreOp<E>> = abandoned_blocks
let mut batch: Vec<StoreOp<E>> = blocks_to_prune
.into_iter()
.map(Into::into)
.flat_map(|block_root: Hash256| {
.flat_map(|block_root| {
[
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
@@ -750,43 +739,87 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
StoreOp::DeleteSyncCommitteeBranch(block_root),
]
})
.chain(
abandoned_states
.into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
)
.chain(states_to_prune.into_iter().flat_map(|(slot, state_hash)| {
// Hot state diffs necessary for the HDiff grid are never added to `states_to_prune`
[StoreOp::DeleteState(state_hash, Some(slot))]
}))
.collect();
// Persist the head in case the process is killed or crashes here. This prevents
// the head tracker reverting after our mutation above.
let persisted_head = PersistedBeaconChain {
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
genesis_block_root,
ssz_head_tracker: SszHeadTracker::from_map(&head_tracker_lock),
};
drop(head_tracker_lock);
batch.push(StoreOp::KeyValueOp(
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY),
));
// Prune sync committee branches of non-checkpoint canonical finalized blocks
Self::prune_non_checkpoint_sync_committee_branches(&newly_finalized_blocks, &mut batch);
// Persist the new finalized checkpoint as the pruning checkpoint.
batch.push(StoreOp::KeyValueOp(
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
));
// Prune all payloads of the canonical finalized blocks
if store.get_config().prune_payloads {
Self::prune_finalized_payloads(new_finalized_slot, &newly_finalized_blocks, &mut batch);
}
store.do_atomically_with_block_and_blobs_cache(batch)?;
// Do a quick separate pass to delete obsoleted hot states, usually pre-states from the state
// advance which are not canonical due to blocks being applied on top.
store.prune_old_hot_states()?;
debug!("Database pruning complete");
Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
// Approximation of the previous finalized checkpoint. Only used in the compaction to
// compute time since last compaction.
old_finalized_checkpoint_epoch: newly_finalized_states_min_slot
.epoch(E::slots_per_epoch()),
})
}
fn prune_finalized_payloads(
new_finalized_slot: Slot,
finalized_blocks: &[(Hash256, Slot)],
hot_db_ops: &mut Vec<StoreOp<E>>,
) {
for (block_root, slot) in finalized_blocks {
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot.
if *slot < new_finalized_slot {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(*block_root));
}
}
}
fn prune_non_checkpoint_sync_committee_branches(
finalized_blocks_desc: &[(Hash256, Slot)],
hot_db_ops: &mut Vec<StoreOp<E>>,
) {
let mut epoch_boundary_blocks = HashSet::new();
let mut non_checkpoint_block_roots = HashSet::new();
// Then, iterate states in slot ascending order, as they are stored wrt previous states.
for (block_root, slot) in finalized_blocks_desc.iter().rev() {
// At a missed slot, `state_root_iter` will return the block root
// from the previous non-missed slot. This ensures that the block root at an
// epoch boundary is always a checkpoint block root. We keep track of block roots
// at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set.
// We then ensure that block roots at the epoch boundary aren't included in the
// `non_checkpoint_block_roots` hash set.
if *slot % E::slots_per_epoch() == 0 {
epoch_boundary_blocks.insert(block_root);
} else {
non_checkpoint_block_roots.insert(block_root);
}
if epoch_boundary_blocks.contains(&block_root) {
non_checkpoint_block_roots.remove(&block_root);
}
}
// Prune sync committee branch data for all non checkpoint block roots.
// Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots
// as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case
// we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root.
// E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31)
// and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch`
// for a checkpoint block root.
non_checkpoint_block_roots
.into_iter()
.for_each(|block_root| {
hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(*block_root));
});
}
/// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it
/// was last compacted.
pub fn run_compaction(