From 64f0e3e13de5cdd4207b03e156ea38b0ad76295e Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 2 Mar 2022 15:40:56 +1100 Subject: [PATCH] New state pruning algorithm --- beacon_node/beacon_chain/src/migrate.rs | 104 ++++++++++-------- beacon_node/beacon_chain/tests/store_tests.rs | 4 +- beacon_node/store/src/hot_cold_store.rs | 78 +++++++++---- beacon_node/store/src/leveldb_store.rs | 30 ++++- beacon_node/store/src/lib.rs | 10 ++ beacon_node/store/src/state_cache.rs | 14 ++- 6 files changed, 165 insertions(+), 75 deletions(-) diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 30a4987e19..d13a3d08c5 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,7 +3,7 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; -use slog::{debug, error, info, warn, Logger}; +use slog::{debug, error, info, trace, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::{mpsc, Arc}; @@ -380,15 +380,14 @@ impl, Cold: ItemStore> BackgroundMigrator = HashSet::new(); - let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new(); let mut abandoned_heads: HashSet = HashSet::new(); let heads = head_tracker.heads(); debug!( log, "Extra pruning information"; - "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), - "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), + "old_finalized_root" => ?old_finalized_checkpoint.root, + "new_finalized_root" => ?new_finalized_checkpoint.root, "head_count" => heads.len(), ); @@ -416,9 +415,9 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator { if slot > new_finalized_slot { - potentially_abandoned_blocks.push(( - slot, - Some(block_root), - Some(state_root), - )); + potentially_abandoned_blocks.insert(block_root); } else if slot >= old_finalized_slot { return Err(PruningError::MissingInfoForCanonicalChain { slot }.into()); } else { @@ -447,7 +442,7 @@ impl, Cold: ItemStore> BackgroundMigrator format!("{:?}", head_hash), + "head_block_root" => ?head_hash, "head_slot" => head_slot, ); potentially_abandoned_head.take(); @@ -475,26 +470,14 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator format!("{:?}", abandoned_head), + "head_block_root" => ?abandoned_head, "head_slot" => head_slot, ); abandoned_heads.insert(abandoned_head); - abandoned_blocks.extend( - potentially_abandoned_blocks - .iter() - .filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash), - ); - abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map( - |(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)), - )); + abandoned_blocks.extend(potentially_abandoned_blocks); } } @@ -538,19 +514,13 @@ impl, Cold: ItemStore> BackgroundMigrator> = abandoned_blocks + let num_deleted_blocks = abandoned_blocks.len(); + let mut batch: Vec> = abandoned_blocks .into_iter() .map(Into::into) .map(StoreOp::DeleteBlock) - .chain( - abandoned_states - .into_iter() - .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))), - ) .collect(); - let mut kv_batch = store.convert_to_kv_batch(&batch)?; - // Persist the head in case the process is killed or crashes here. This prevents // the head tracker reverting after our mutation above. let persisted_head = PersistedBeaconChain { @@ -559,13 +529,53 @@ impl, Cold: ItemStore> BackgroundMigrator num_deleted_blocks, + ); + + // Do a separate pass to clean up irrelevant states. + let mut state_delete_batch = vec![]; + for res in store.iter_hot_state_summaries() { + let (state_root, summary) = res?; + + if summary.slot <= new_finalized_slot { + // If state root doesn't match state root from canonical chain, or this slot + // is not part of the recently finalized chain, then delete. + if newly_finalized_chain + .get(&summary.slot) + .map_or(true, |(_, canonical_state_root)| { + state_root != Hash256::from(*canonical_state_root) + }) + { + trace!( + log, + "Deleting state"; + "state_root" => ?state_root, + "slot" => summary.slot, + ); + state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot))); + } + } + } + let num_deleted_states = state_delete_batch.len(); + store.do_atomically(state_delete_batch)?; + debug!( + log, + "Database state pruning complete"; + "num_deleted_states" => num_deleted_states, + ); Ok(PruningOutcome::Successful { old_finalized_checkpoint, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 8903c9879d..a6a395ecd7 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1915,8 +1915,9 @@ fn pruning_test( check_no_blocks_exist(&harness, stray_blocks.values()); } +/* FIXME(sproul): adapt this test for new paradigm #[test] -fn garbage_collect_temp_states_from_failed_block() { +fn delete_states_from_failed_block() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); @@ -1954,6 +1955,7 @@ fn garbage_collect_temp_states_from_failed_block() { let store = get_store(&db_path); assert_eq!(store.iter_temporary_state_roots().count(), 0); } +*/ #[test] fn weak_subjectivity_sync() { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9f1679ee1c..87de5ee7c5 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -524,20 +524,23 @@ impl, Cold: ItemStore> HotColdDB } /// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`. - pub fn convert_to_kv_batch(&self, batch: &[StoreOp]) -> Result, Error> { + pub fn convert_to_kv_batch( + &self, + batch: Vec>, + ) -> Result, Error> { let mut key_value_batch = Vec::with_capacity(batch.len()); for op in batch { match op { StoreOp::PutBlock(block_root, block) => { - key_value_batch.push(self.block_as_kv_store_op(block_root, block)); + key_value_batch.push(self.block_as_kv_store_op(&block_root, &block)); } StoreOp::PutState(state_root, state) => { - self.store_hot_state(state_root, state, &mut key_value_batch)?; + self.store_hot_state(&state_root, state, &mut key_value_batch)?; } StoreOp::PutStateTemporaryFlag(state_root) => { - key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)?); + key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root)?); } StoreOp::DeleteStateTemporaryFlag(state_root) => { @@ -570,6 +573,7 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key)); } } + StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op), } } Ok(key_value_batch) @@ -578,9 +582,6 @@ impl, Cold: ItemStore> HotColdDB pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { let mut block_cache = self.block_cache.lock(); - self.hot_db - .do_atomically(self.convert_to_kv_batch(&batch)?)?; - for op in &batch { match op { StoreOp::PutBlock(block_root, block) => { @@ -595,14 +596,20 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteBlock(block_root) => { block_cache.pop(block_root); + self.state_cache.lock().delete_block_states(block_root); } StoreOp::DeleteState(state_root, _) => { - // FIXME(sproul): atomics are a bit sketchy here - self.state_cache.lock().delete(state_root) + self.state_cache.lock().delete_state(state_root) } + + StoreOp::KeyValueOp(_) => (), } } + self.hot_db + .do_atomically(self.convert_to_kv_batch(batch)?)?; + drop(block_cache); + Ok(()) } @@ -731,13 +738,6 @@ impl, Cold: ItemStore> HotColdDB return self.load_hot_state_full(state_root).map(Some); } - // If the state is marked as temporary, do not return it. It will become visible - // only once its transaction commits and deletes its temporary flag. - // FIXME(sproul): reconsider - if self.load_state_temporary_flag(state_root)?.is_some() { - return Ok(None); - } - if let Some(HotStateSummary { slot, latest_block_root, @@ -745,6 +745,21 @@ impl, Cold: ItemStore> HotColdDB prev_state_root, }) = self.load_hot_state_summary(state_root)? { + // Load the latest block, and use it to confirm the validity of this state. + let latest_block = if let Some(block) = self.get_block(&latest_block_root)? { + block + } else { + // Dangling state, will be deleted fully once finalization advances past it. + debug!( + self.log, + "Ignoring state load for dangling state"; + "state_root" => ?state_root, + "slot" => slot, + "latest_block_root" => ?latest_block_root, + ); + return Ok(None); + }; + // On a fork boundary slot load a full state from disk. if self.spec.fork_activated_at_slot::(slot).is_some() { return self.load_hot_state_full(state_root).map(Some); @@ -757,13 +772,16 @@ impl, Cold: ItemStore> HotColdDB .map(Some); } - // Otherwise load the prior state, potentially from the cache, and replay a single block - // on top of it. + // Otherwise try to load the prior state and replay the `latest_block` on top of it as + // necessary (if it's not a skip slot). let prev_state = self .get_hot_state(&prev_state_root)? .ok_or(HotColdDBError::MissingPrevState(prev_state_root))?; - - let blocks = self.load_blocks_to_replay(slot, slot, latest_block_root)?; + let blocks = if latest_block.slot() == slot { + vec![latest_block] + } else { + vec![] + }; let state_roots = [(prev_state_root, slot - 1), (*state_root, slot)]; let state_root_iter = state_roots.into_iter().map(Ok); @@ -1287,6 +1305,18 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } + /// Iterate all hot state summaries in the database. + pub fn iter_hot_state_summaries( + &self, + ) -> impl Iterator> + '_ { + self.hot_db + .iter_column(DBColumn::BeaconStateSummary) + .map(|res| { + let (key, value_bytes) = res?; + Ok((key, HotStateSummary::from_store_bytes(&value_bytes)?)) + }) + } + /// Load the temporary flag for a state root, if one exists. /// /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not @@ -1549,12 +1579,12 @@ impl StoreItem for Split { /// Allows full reconstruction by replaying blocks. #[derive(Debug, Clone, Copy, Default, Encode, Decode)] pub struct HotStateSummary { - pub(crate) slot: Slot, - pub(crate) latest_block_root: Hash256, - pub(crate) epoch_boundary_state_root: Hash256, + pub slot: Slot, + pub latest_block_root: Hash256, + pub epoch_boundary_state_root: Hash256, /// The state root of the state at the prior slot. // FIXME(sproul): migrate - pub(crate) prev_state_root: Hash256, + pub prev_state_root: Hash256, } impl StoreItem for HotStateSummary { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 5727351c95..3478048975 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,4 +1,5 @@ use super::*; +use crate::hot_cold_store::HotColdDBError; use crate::metrics; use db_key::Key; use leveldb::compaction::Compaction; @@ -6,7 +7,7 @@ use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; use leveldb::database::Database; use leveldb::error::Error as LevelDBError; -use leveldb::iterator::{Iterable, KeyIterator}; +use leveldb::iterator::{Iterable, KeyIterator, LevelDBIterator}; use leveldb::options::{Options, ReadOptions, WriteOptions}; use parking_lot::{Mutex, MutexGuard}; use std::marker::PhantomData; @@ -167,13 +168,38 @@ impl KeyValueStore for LevelDB { }; for (start_key, end_key) in vec![ - endpoints(DBColumn::BeaconStateTemporary), endpoints(DBColumn::BeaconState), + endpoints(DBColumn::BeaconStateDiff), + endpoints(DBColumn::BeaconStateSummary), ] { self.db.compact(&start_key, &end_key); } Ok(()) } + + /// Iterate through all keys and values in a particular column. + fn iter_column<'a>( + &'a self, + column: DBColumn, + ) -> Box), Error>> + 'a> { + let start_key = + BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + + let iter = self.db.iter(self.read_options()); + iter.seek(&start_key); + + Box::new( + iter.take_while(move |(key, _)| key.matches_column(column)) + .map(move |(bytes_key, value)| { + let key = bytes_key.remove_column(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key, + } + })?; + Ok((key, value)) + }), + ) + } } impl ItemStore for LevelDB {} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index d683432579..960a4624d4 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -75,6 +75,15 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Compact the database, freeing space used by deleted items. fn compact(&self) -> Result<(), Error>; + + /// Iterate through all values in a particular column. + fn iter_column<'a>( + &'a self, + _column: DBColumn, + ) -> Box), Error>> + 'a> { + // Default impl for non LevelDB databases + Box::new(std::iter::empty()) + } } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { @@ -144,6 +153,7 @@ pub enum StoreOp<'a, E: EthSpec> { DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteState(Hash256, Option), + KeyValueOp(KeyValueStoreOp), } /// A unique column identifier. diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index fbdf83196c..14dd92c8cd 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -146,10 +146,18 @@ impl StateCache { Some((state_root, state)) } - pub fn delete(&mut self, state_root: &Hash256) { + pub fn delete_state(&mut self, state_root: &Hash256) { self.states.pop(state_root); self.block_map.delete(state_root); } + + pub fn delete_block_states(&mut self, block_root: &Hash256) { + if let Some(slot_map) = self.block_map.delete_block_states(block_root) { + for state_root in slot_map.slots.values() { + self.states.pop(state_root); + } + } + } } impl BlockMap { @@ -188,6 +196,10 @@ impl BlockMap { !slot_map.slots.is_empty() }); } + + fn delete_block_states(&mut self, block_root: &Hash256) -> Option { + self.blocks.remove(block_root) + } } #[cfg(test)]