From 70850fe58d5676d9072d418f04f4db7257c1513b Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Mon, 7 Apr 2025 01:23:52 -0300 Subject: [PATCH] Drop head tracker for summaries DAG (#6744) The head tracker is a persisted piece of state that must be kept in sync with the fork-choice. It has been a source of pruning issues in the past, so we want to remove it - see https://github.com/sigp/lighthouse/issues/1785 When implementing tree-states in the hot DB we have to change the pruning routine (more details below) so we want to do those changes first in isolation. - see https://github.com/sigp/lighthouse/issues/6580 - If you want to see the full feature of tree-states hot https://github.com/dapplion/lighthouse/pull/39 Closes https://github.com/sigp/lighthouse/issues/1785 **Current DB migration routine** - Locate abandoned heads with head tracker - Use a roots iterator to collect the ancestors of those heads can be pruned - Delete those abandoned blocks / states - Migrate the newly finalized chain to the freezer In summary, it computes what it has to delete and keeps the rest. Then it migrates data to the freezer. If the abandoned forks routine has a bug it can break the freezer migration. **Proposed migration routine (this PR)** - Migrate the newly finalized chain to the freezer - Load all state summaries from disk - From those, just knowing the head and finalized block compute two sets: (1) descendants of finalized (2) newly finalized chain - Iterate all summaries, if a summary does not belong to set (1) or (2), delete This strategy is more sound as it just checks what's there in the hot DB, computes what it has to keep and deletes the rest. Because it does not rely and 3rd pieces of data we can drop the head tracker and pruning checkpoint. Since the DB migration happens **first** now, as long as the computation of the sets to keep is correct we won't have pruning issues. --- beacon_node/beacon_chain/src/beacon_chain.rs | 92 +-- .../beacon_chain/src/block_verification.rs | 44 +- .../src/block_verification_types.rs | 2 - beacon_node/beacon_chain/src/builder.rs | 18 +- .../beacon_chain/src/canonical_head.rs | 13 +- .../overflow_lru_cache.rs | 2 - .../state_lru_cache.rs | 10 +- beacon_node/beacon_chain/src/head_tracker.rs | 214 ------- beacon_node/beacon_chain/src/lib.rs | 2 +- beacon_node/beacon_chain/src/metrics.rs | 6 - beacon_node/beacon_chain/src/migrate.rs | 575 +++++++++--------- .../src/persisted_beacon_chain.rs | 13 - beacon_node/beacon_chain/src/schema_change.rs | 9 + .../src/schema_change/migration_schema_v23.rs | 147 +++++ .../beacon_chain/src/state_advance_timer.rs | 22 +- beacon_node/beacon_chain/src/summaries_dag.rs | 464 ++++++++++++++ beacon_node/beacon_chain/src/test_utils.rs | 22 + .../tests/payload_invalidation.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 113 ++-- .../store/src/database/leveldb_impl.rs | 1 - beacon_node/store/src/errors.rs | 2 +- beacon_node/store/src/garbage_collection.rs | 36 -- beacon_node/store/src/hot_cold_store.rs | 251 ++------ beacon_node/store/src/lib.rs | 9 +- beacon_node/store/src/metadata.rs | 2 +- consensus/proto_array/src/proto_array.rs | 15 + .../src/proto_array_fork_choice.rs | 5 + 27 files changed, 1110 insertions(+), 983 deletions(-) delete mode 100644 beacon_node/beacon_chain/src/head_tracker.rs create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs create mode 100644 beacon_node/beacon_chain/src/summaries_dag.rs delete mode 100644 beacon_node/store/src/garbage_collection.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 668da4f0fa..d9ac2fa6ea 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -33,7 +33,6 @@ use crate::events::ServerSentEventHandler; use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle}; use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::graffiti_calculator::GraffitiCalculator; -use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker}; use crate::kzg_utils::reconstruct_blobs; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, @@ -57,7 +56,7 @@ use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_data_sidecars::ObservedDataSidecars; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; use crate::observed_slashable::ObservedSlashable; -use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; +use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; @@ -454,8 +453,6 @@ pub struct BeaconChain { /// A handler for events generated by the beacon chain. This is only initialized when the /// HTTP server is enabled. pub event_handler: Option>, - /// Used to track the heads of the beacon chain. - pub(crate) head_tracker: Arc, /// Caches the attester shuffling for a given epoch and shuffling key root. pub shuffling_cache: RwLock, /// A cache of eth1 deposit data at epoch boundaries for deposit finalization @@ -607,57 +604,13 @@ impl BeaconChain { }) } - /// Persists the head tracker and fork choice. + /// Return a database operation for writing the `PersistedBeaconChain` to disk. /// - /// We do it atomically even though no guarantees need to be made about blocks from - /// the head tracker also being present in fork choice. - pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> { - let mut batch = vec![]; - - let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); - - // Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race - // condition with the pruning thread which can result in a block present in the head tracker - // but absent in the DB. This inconsistency halts pruning and dramastically increases disk - // size. Ref: https://github.com/sigp/lighthouse/issues/4773 - let head_tracker = self.head_tracker.0.read(); - batch.push(self.persist_head_in_batch(&head_tracker)); - - let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); - batch.push(self.persist_fork_choice_in_batch()); - - self.store.hot_db.do_atomically(batch)?; - drop(head_tracker); - - Ok(()) - } - - /// Return a `PersistedBeaconChain` without reference to a `BeaconChain`. - pub fn make_persisted_head( - genesis_block_root: Hash256, - head_tracker_reader: &HeadTrackerReader, - ) -> PersistedBeaconChain { - PersistedBeaconChain { - _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, - genesis_block_root, - ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader), - } - } - - /// Return a database operation for writing the beacon chain head to disk. - pub fn persist_head_in_batch( - &self, - head_tracker_reader: &HeadTrackerReader, - ) -> KeyValueStoreOp { - Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader) - } - - pub fn persist_head_in_batch_standalone( - genesis_block_root: Hash256, - head_tracker_reader: &HeadTrackerReader, - ) -> KeyValueStoreOp { - Self::make_persisted_head(genesis_block_root, head_tracker_reader) - .as_kv_store_op(BEACON_CHAIN_DB_KEY) + /// These days the `PersistedBeaconChain` is only used to store the genesis block root, so it + /// should only ever be written once at startup. It used to be written more frequently, but + /// this is no longer necessary. + pub fn persist_head_in_batch_standalone(genesis_block_root: Hash256) -> KeyValueStoreOp { + PersistedBeaconChain { genesis_block_root }.as_kv_store_op(BEACON_CHAIN_DB_KEY) } /// Load fork choice from disk, returning `None` if it isn't found. @@ -1450,12 +1403,13 @@ impl BeaconChain { /// /// Returns `(block_root, block_slot)`. pub fn heads(&self) -> Vec<(Hash256, Slot)> { - self.head_tracker.heads() - } - - /// Only used in tests. - pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool { - self.head_tracker.contains_head((*block_hash).into()) + self.canonical_head + .fork_choice_read_lock() + .proto_array() + .heads_descended_from_finalization::() + .iter() + .map(|node| (node.root, node.slot)) + .collect() } /// Returns the `BeaconState` at the given slot. @@ -1735,8 +1689,6 @@ impl BeaconChain { let notif = ManualFinalizationNotification { state_root: state_root.into(), checkpoint, - head_tracker: self.head_tracker.clone(), - genesis_block_root: self.genesis_block_root, }; self.store_migrator.process_manual_finalization(notif); @@ -3762,7 +3714,6 @@ impl BeaconChain { state, parent_block, parent_eth1_finalization_data, - confirmed_state_roots, consensus_context, } = import_data; @@ -3786,7 +3737,6 @@ impl BeaconChain { block, block_root, state, - confirmed_state_roots, payload_verification_outcome.payload_verification_status, parent_block, parent_eth1_finalization_data, @@ -3824,7 +3774,6 @@ impl BeaconChain { signed_block: AvailableBlock, block_root: Hash256, mut state: BeaconState, - confirmed_state_roots: Vec, payload_verification_status: PayloadVerificationStatus, parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, @@ -4012,11 +3961,6 @@ impl BeaconChain { let block = signed_block.message(); let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - ops.extend( - confirmed_state_roots - .into_iter() - .map(StoreOp::DeleteStateTemporaryFlag), - ); ops.push(StoreOp::PutBlock(block_root, signed_block.clone())); ops.push(StoreOp::PutState(block.state_root(), &state)); @@ -4043,9 +3987,6 @@ impl BeaconChain { // about it. let block_time_imported = timestamp_now(); - let parent_root = block.parent_root(); - let slot = block.slot(); - let current_eth1_finalization_data = Eth1FinalizationData { eth1_data: state.eth1_data().clone(), eth1_deposit_index: state.eth1_deposit_index(), @@ -4066,9 +4007,6 @@ impl BeaconChain { }); } - self.head_tracker - .register_block(block_root, parent_root, slot); - metrics::stop_timer(db_write_timer); metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES); @@ -7208,7 +7146,7 @@ impl BeaconChain { impl Drop for BeaconChain { fn drop(&mut self) { let drop = || -> Result<(), Error> { - self.persist_head_and_fork_choice()?; + self.persist_fork_choice()?; self.persist_op_pool()?; self.persist_eth1_cache() }; diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0a0ffab7fa..39bad34cd6 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1453,22 +1453,8 @@ impl ExecutionPendingBlock { let catchup_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_CATCHUP_STATE); - // Stage a batch of operations to be completed atomically if this block is imported - // successfully. If there is a skipped slot, we include the state root of the pre-state, - // which may be an advanced state that was stored in the DB with a `temporary` flag. let mut state = parent.pre_state; - let mut confirmed_state_roots = - if block.slot() > state.slot() && state.slot() > parent.beacon_block.slot() { - // Advanced pre-state. Delete its temporary flag. - let pre_state_root = state.update_tree_hash_cache()?; - vec![pre_state_root] - } else { - // Pre state is either unadvanced, or should not be stored long-term because there - // is no skipped slot between `parent` and `block`. - vec![] - }; - // The block must have a higher slot than its parent. if block.slot() <= parent.beacon_block.slot() { return Err(BlockError::BlockIsNotLaterThanParent { @@ -1515,38 +1501,29 @@ impl ExecutionPendingBlock { // processing, but we get early access to it. let state_root = state.update_tree_hash_cache()?; - // Store the state immediately, marking it as temporary, and staging the deletion - // of its temporary status as part of the larger atomic operation. + // Store the state immediately. let txn_lock = chain.store.hot_db.begin_rw_transaction(); let state_already_exists = chain.store.load_hot_state_summary(&state_root)?.is_some(); let state_batch = if state_already_exists { - // If the state exists, it could be temporary or permanent, but in neither case - // should we rewrite it or store a new temporary flag for it. We *will* stage - // the temporary flag for deletion because it's OK to double-delete the flag, - // and we don't mind if another thread gets there first. + // If the state exists, we do not need to re-write it. vec![] } else { - vec![ - if state.slot() % T::EthSpec::slots_per_epoch() == 0 { - StoreOp::PutState(state_root, &state) - } else { - StoreOp::PutStateSummary( - state_root, - HotStateSummary::new(&state_root, &state)?, - ) - }, - StoreOp::PutStateTemporaryFlag(state_root), - ] + vec![if state.slot() % T::EthSpec::slots_per_epoch() == 0 { + StoreOp::PutState(state_root, &state) + } else { + StoreOp::PutStateSummary( + state_root, + HotStateSummary::new(&state_root, &state)?, + ) + }] }; chain .store .do_atomically_with_block_and_blobs_cache(state_batch)?; drop(txn_lock); - confirmed_state_roots.push(state_root); - state_root }; @@ -1713,7 +1690,6 @@ impl ExecutionPendingBlock { state, parent_block: parent.beacon_block, parent_eth1_finalization_data, - confirmed_state_roots, consensus_context, }, payload_verification_handle, diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index d3a6e93862..aa7418646f 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -358,7 +358,6 @@ pub struct BlockImportData { pub state: BeaconState, pub parent_block: SignedBeaconBlock>, pub parent_eth1_finalization_data: Eth1FinalizationData, - pub confirmed_state_roots: Vec, pub consensus_context: ConsensusContext, } @@ -376,7 +375,6 @@ impl BlockImportData { eth1_data: <_>::default(), eth1_deposit_index: 0, }, - confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), } } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index f6d18c3705..6f8a0dcb7c 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -8,7 +8,6 @@ use crate::eth1_finalization_cache::Eth1FinalizationCache; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; use crate::graffiti_calculator::{GraffitiCalculator, GraffitiOrigin}; -use crate::head_tracker::HeadTracker; use crate::kzg_utils::blobs_to_data_column_sidecars; use crate::light_client_server_cache::LightClientServerCache; use crate::migrate::{BackgroundMigrator, MigratorConfig}; @@ -93,7 +92,6 @@ pub struct BeaconChainBuilder { slot_clock: Option, shutdown_sender: Option>, light_client_server_tx: Option>>, - head_tracker: Option, validator_pubkey_cache: Option>, spec: Arc, chain_config: ChainConfig, @@ -136,7 +134,6 @@ where slot_clock: None, shutdown_sender: None, light_client_server_tx: None, - head_tracker: None, validator_pubkey_cache: None, spec: Arc::new(E::default_spec()), chain_config: ChainConfig::default(), @@ -314,10 +311,6 @@ where self.genesis_block_root = Some(chain.genesis_block_root); self.genesis_state_root = Some(genesis_block.state_root()); - self.head_tracker = Some( - HeadTracker::from_ssz_container(&chain.ssz_head_tracker) - .map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?, - ); self.validator_pubkey_cache = Some(pubkey_cache); self.fork_choice = Some(fork_choice); @@ -729,7 +722,6 @@ where .genesis_state_root .ok_or("Cannot build without a genesis state root")?; let validator_monitor_config = self.validator_monitor_config.unwrap_or_default(); - let head_tracker = Arc::new(self.head_tracker.unwrap_or_default()); let beacon_proposer_cache: Arc> = <_>::default(); let mut validator_monitor = @@ -769,8 +761,6 @@ where &self.spec, )?; - // Update head tracker. - head_tracker.register_block(block_root, block.parent_root(), block.slot()); (block_root, block, true) } Err(e) => return Err(descriptive_db_error("head block", &e)), @@ -846,8 +836,7 @@ where })?; let migrator_config = self.store_migrator_config.unwrap_or_default(); - let store_migrator = - BackgroundMigrator::new(store.clone(), migrator_config, genesis_block_root); + let store_migrator = BackgroundMigrator::new(store.clone(), migrator_config); if let Some(slot) = slot_clock.now() { validator_monitor.process_valid_state( @@ -872,11 +861,10 @@ where // // This *must* be stored before constructing the `BeaconChain`, so that its `Drop` instance // doesn't write a `PersistedBeaconChain` without the rest of the batch. - let head_tracker_reader = head_tracker.0.read(); self.pending_io_batch.push(BeaconChain::< Witness, >::persist_head_in_batch_standalone( - genesis_block_root, &head_tracker_reader + genesis_block_root )); self.pending_io_batch.push(BeaconChain::< Witness, @@ -887,7 +875,6 @@ where .hot_db .do_atomically(self.pending_io_batch) .map_err(|e| format!("Error writing chain & metadata to disk: {:?}", e))?; - drop(head_tracker_reader); let genesis_validators_root = head_snapshot.beacon_state.genesis_validators_root(); let genesis_time = head_snapshot.beacon_state.genesis_time(); @@ -968,7 +955,6 @@ where fork_choice_signal_tx, fork_choice_signal_rx, event_handler: self.event_handler, - head_tracker, shuffling_cache: RwLock::new(ShufflingCache::new( shuffling_cache_size, head_shuffling_ids, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index d99c6038d3..a6f5179fdc 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -53,7 +53,7 @@ use slot_clock::SlotClock; use state_processing::AllCaches; use std::sync::Arc; use std::time::Duration; -use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem}; +use store::{iter::StateRootsIterator, KeyValueStore, KeyValueStoreOp, StoreItem}; use task_executor::{JoinHandle, ShutdownReason}; use tracing::{debug, error, info, warn}; use types::*; @@ -840,7 +840,7 @@ impl BeaconChain { ); if is_epoch_transition || reorg_distance.is_some() { - self.persist_head_and_fork_choice()?; + self.persist_fork_choice()?; self.op_pool.prune_attestations(self.epoch()?); } @@ -983,7 +983,6 @@ impl BeaconChain { self.store_migrator.process_finalization( new_finalized_state_root.into(), new_view.finalized_checkpoint, - self.head_tracker.clone(), )?; // Prune blobs in the background. @@ -998,6 +997,14 @@ impl BeaconChain { Ok(()) } + /// Persist fork choice to disk, writing immediately. + pub fn persist_fork_choice(&self) -> Result<(), Error> { + let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); + let batch = vec![self.persist_fork_choice_in_batch()]; + self.store.hot_db.do_atomically(batch)?; + Ok(()) + } + /// Return a database operation for writing fork choice to disk. pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { Self::persist_fork_choice_in_batch_standalone(&self.canonical_head.fork_choice_read_lock()) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index f38a3b8b9c..4359d7fbdb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -920,7 +920,6 @@ mod test { state, parent_block, parent_eth1_finalization_data, - confirmed_state_roots: vec![], consensus_context, }; @@ -1305,7 +1304,6 @@ mod pending_components_tests { eth1_data: Default::default(), eth1_deposit_index: 0, }, - confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), }, payload_verification_outcome: PayloadVerificationOutcome { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 09d0563a4a..5fe674f30c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -7,26 +7,21 @@ use crate::{ }; use lru::LruCache; use parking_lot::RwLock; -use ssz_derive::{Decode, Encode}; use state_processing::BlockReplayer; use std::sync::Arc; use store::OnDiskConsensusContext; use types::beacon_block_body::KzgCommitments; -use types::{ssz_tagged_signed_beacon_block, ssz_tagged_signed_beacon_block_arc}; use types::{BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; /// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except /// that it is much smaller because it contains only a state root instead of /// a full `BeaconState`. -#[derive(Encode, Decode, Clone)] +#[derive(Clone)] pub struct DietAvailabilityPendingExecutedBlock { - #[ssz(with = "ssz_tagged_signed_beacon_block_arc")] block: Arc>, state_root: Hash256, - #[ssz(with = "ssz_tagged_signed_beacon_block")] parent_block: SignedBeaconBlock>, parent_eth1_finalization_data: Eth1FinalizationData, - confirmed_state_roots: Vec, consensus_context: OnDiskConsensusContext, payload_verification_outcome: PayloadVerificationOutcome, custody_columns_count: usize, @@ -108,7 +103,6 @@ impl StateLRUCache { state_root, parent_block: executed_block.import_data.parent_block, parent_eth1_finalization_data: executed_block.import_data.parent_eth1_finalization_data, - confirmed_state_roots: executed_block.import_data.confirmed_state_roots, consensus_context: OnDiskConsensusContext::from_consensus_context( executed_block.import_data.consensus_context, ), @@ -138,7 +132,6 @@ impl StateLRUCache { state, parent_block: diet_executed_block.parent_block, parent_eth1_finalization_data: diet_executed_block.parent_eth1_finalization_data, - confirmed_state_roots: diet_executed_block.confirmed_state_roots, consensus_context: diet_executed_block .consensus_context .into_consensus_context(), @@ -227,7 +220,6 @@ impl From> state_root: value.import_data.state.canonical_root().unwrap(), parent_block: value.import_data.parent_block, parent_eth1_finalization_data: value.import_data.parent_eth1_finalization_data, - confirmed_state_roots: value.import_data.confirmed_state_roots, consensus_context: OnDiskConsensusContext::from_consensus_context( value.import_data.consensus_context, ), diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs deleted file mode 100644 index 9c06ef33a1..0000000000 --- a/beacon_node/beacon_chain/src/head_tracker.rs +++ /dev/null @@ -1,214 +0,0 @@ -use parking_lot::{RwLock, RwLockReadGuard}; -use ssz_derive::{Decode, Encode}; -use std::collections::HashMap; -use types::{Hash256, Slot}; - -#[derive(Debug, PartialEq)] -pub enum Error { - MismatchingLengths { roots_len: usize, slots_len: usize }, -} - -/// Maintains a list of `BeaconChain` head block roots and slots. -/// -/// Each time a new block is imported, it should be applied to the `Self::register_block` function. -/// In order for this struct to be effective, every single block that is imported must be -/// registered here. -#[derive(Default, Debug)] -pub struct HeadTracker(pub RwLock>); - -pub type HeadTrackerReader<'a> = RwLockReadGuard<'a, HashMap>; - -impl HeadTracker { - /// Register a block with `Self`, so it may or may not be included in a `Self::heads` call. - /// - /// This function assumes that no block is imported without its parent having already been - /// imported. It cannot detect an error if this is not the case, it is the responsibility of - /// the upstream user. - pub fn register_block(&self, block_root: Hash256, parent_root: Hash256, slot: Slot) { - let mut map = self.0.write(); - map.remove(&parent_root); - map.insert(block_root, slot); - } - - /// Returns true iff `block_root` is a recognized head. - pub fn contains_head(&self, block_root: Hash256) -> bool { - self.0.read().contains_key(&block_root) - } - - /// Returns the list of heads in the chain. - pub fn heads(&self) -> Vec<(Hash256, Slot)> { - self.0 - .read() - .iter() - .map(|(root, slot)| (*root, *slot)) - .collect() - } - - /// Returns a `SszHeadTracker`, which contains all necessary information to restore the state - /// of `Self` at some later point. - /// - /// Should ONLY be used for tests, due to the potential for database races. - /// - /// See - #[cfg(test)] - pub fn to_ssz_container(&self) -> SszHeadTracker { - SszHeadTracker::from_map(&self.0.read()) - } - - /// Creates a new `Self` from the given `SszHeadTracker`, restoring `Self` to the same state of - /// the `Self` that created the `SszHeadTracker`. - pub fn from_ssz_container(ssz_container: &SszHeadTracker) -> Result { - let roots_len = ssz_container.roots.len(); - let slots_len = ssz_container.slots.len(); - - if roots_len != slots_len { - Err(Error::MismatchingLengths { - roots_len, - slots_len, - }) - } else { - let map = ssz_container - .roots - .iter() - .zip(ssz_container.slots.iter()) - .map(|(root, slot)| (*root, *slot)) - .collect::>(); - - Ok(Self(RwLock::new(map))) - } - } -} - -impl PartialEq for HeadTracker { - fn eq(&self, other: &HeadTracker) -> bool { - *self.0.read() == *other.0.read() - } -} - -/// Helper struct that is used to encode/decode the state of the `HeadTracker` as SSZ bytes. -/// -/// This is used when persisting the state of the `BeaconChain` to disk. -#[derive(Encode, Decode, Clone)] -pub struct SszHeadTracker { - roots: Vec, - slots: Vec, -} - -impl SszHeadTracker { - pub fn from_map(map: &HashMap) -> Self { - let (roots, slots) = map.iter().map(|(hash, slot)| (*hash, *slot)).unzip(); - SszHeadTracker { roots, slots } - } -} - -#[cfg(test)] -mod test { - use super::*; - use ssz::{Decode, Encode}; - use types::{BeaconBlock, EthSpec, FixedBytesExtended, MainnetEthSpec}; - - type E = MainnetEthSpec; - - #[test] - fn block_add() { - let spec = &E::default_spec(); - - let head_tracker = HeadTracker::default(); - - for i in 0..16 { - let mut block: BeaconBlock = BeaconBlock::empty(spec); - let block_root = Hash256::from_low_u64_be(i); - - *block.slot_mut() = Slot::new(i); - *block.parent_root_mut() = if i == 0 { - Hash256::random() - } else { - Hash256::from_low_u64_be(i - 1) - }; - - head_tracker.register_block(block_root, block.parent_root(), block.slot()); - } - - assert_eq!( - head_tracker.heads(), - vec![(Hash256::from_low_u64_be(15), Slot::new(15))], - "should only have one head" - ); - - let mut block: BeaconBlock = BeaconBlock::empty(spec); - let block_root = Hash256::from_low_u64_be(42); - *block.slot_mut() = Slot::new(15); - *block.parent_root_mut() = Hash256::from_low_u64_be(14); - head_tracker.register_block(block_root, block.parent_root(), block.slot()); - - let heads = head_tracker.heads(); - - assert_eq!(heads.len(), 2, "should only have two heads"); - assert!( - heads - .iter() - .any(|(root, slot)| *root == Hash256::from_low_u64_be(15) && *slot == Slot::new(15)), - "should contain first head" - ); - assert!( - heads - .iter() - .any(|(root, slot)| *root == Hash256::from_low_u64_be(42) && *slot == Slot::new(15)), - "should contain second head" - ); - } - - #[test] - fn empty_round_trip() { - let non_empty = HeadTracker::default(); - for i in 0..16 { - non_empty.0.write().insert(Hash256::random(), Slot::new(i)); - } - let bytes = non_empty.to_ssz_container().as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Ok(non_empty), - "non_empty should pass round trip" - ); - } - - #[test] - fn non_empty_round_trip() { - let non_empty = HeadTracker::default(); - for i in 0..16 { - non_empty.0.write().insert(Hash256::random(), Slot::new(i)); - } - let bytes = non_empty.to_ssz_container().as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Ok(non_empty), - "non_empty should pass round trip" - ); - } - - #[test] - fn bad_length() { - let container = SszHeadTracker { - roots: vec![Hash256::random()], - slots: vec![], - }; - let bytes = container.as_ssz_bytes(); - - assert_eq!( - HeadTracker::from_ssz_container( - &SszHeadTracker::from_ssz_bytes(&bytes).expect("should decode") - ), - Err(Error::MismatchingLengths { - roots_len: 1, - slots_len: 0 - }), - "should fail decoding with bad lengths" - ); - } -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 48168aeb02..5b79312d37 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -33,7 +33,6 @@ pub mod fork_choice_signal; pub mod fork_revert; pub mod fulu_readiness; pub mod graffiti_calculator; -mod head_tracker; pub mod historical_blocks; pub mod kzg_utils; pub mod light_client_finality_update_verification; @@ -56,6 +55,7 @@ pub mod schema_change; pub mod shuffling_cache; pub mod single_attestation; pub mod state_advance_timer; +pub mod summaries_dag; pub mod sync_committee_rewards; pub mod sync_committee_verification; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 463319a1f5..871721b4d8 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -601,12 +601,6 @@ pub static BALANCES_CACHE_MISSES: LazyLock> = LazyLock::new(| /* * Persisting BeaconChain components to disk */ -pub static PERSIST_HEAD: LazyLock> = LazyLock::new(|| { - try_create_histogram( - "beacon_persist_head", - "Time taken to persist the canonical head", - ) -}); pub static PERSIST_OP_POOL: LazyLock> = LazyLock::new(|| { try_create_histogram( "beacon_persist_op_pool", diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index cda5b34103..94fa0a1890 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -1,22 +1,16 @@ -use crate::beacon_chain::BEACON_CHAIN_DB_KEY; use crate::errors::BeaconChainError; -use crate::head_tracker::{HeadTracker, SszHeadTracker}; -use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; +use crate::summaries_dag::{DAGStateSummaryV22, Error as SummariesDagError, StateSummariesDAG}; use parking_lot::Mutex; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::mem; use std::sync::{mpsc, Arc}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{migrate_database, HotColdDBError}; -use store::iter::RootsIterator; -use store::{Error, ItemStore, StoreItem, StoreOp}; +use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use tracing::{debug, error, info, warn}; -use types::{ - BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, FixedBytesExtended, - Hash256, SignedBeaconBlockHash, Slot, -}; +use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot}; /// Compact at least this frequently, finalization permitting (7 days). const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800; @@ -42,8 +36,6 @@ pub struct BackgroundMigrator, Cold: ItemStore> prev_migration: Arc>, #[allow(clippy::type_complexity)] tx_thread: Option, thread::JoinHandle<()>)>>, - /// Genesis block root, for persisting the `PersistedBeaconChain`. - genesis_block_root: Hash256, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -89,7 +81,7 @@ pub struct PrevMigration { pub enum PruningOutcome { /// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`. Successful { - old_finalized_checkpoint: Checkpoint, + old_finalized_checkpoint_epoch: Epoch, }, /// The run was aborted because the new finalized checkpoint is older than the previous one. OutOfOrderFinalization { @@ -116,6 +108,11 @@ pub enum PruningError { }, UnexpectedEqualStateRoots, UnexpectedUnequalStateRoots, + MissingSummaryForFinalizedCheckpoint(Hash256), + MissingBlindedBlock(Hash256), + SummariesDagError(&'static str, SummariesDagError), + EmptyFinalizedStates, + EmptyFinalizedBlocks, } /// Message sent to the migration thread containing the information it needs to run. @@ -130,25 +127,17 @@ pub enum Notification { pub struct ManualFinalizationNotification { pub state_root: BeaconStateHash, pub checkpoint: Checkpoint, - pub head_tracker: Arc, - pub genesis_block_root: Hash256, } pub struct FinalizationNotification { pub finalized_state_root: BeaconStateHash, pub finalized_checkpoint: Checkpoint, - pub head_tracker: Arc, pub prev_migration: Arc>, - pub genesis_block_root: Hash256, } impl, Cold: ItemStore> BackgroundMigrator { /// Create a new `BackgroundMigrator` and spawn its thread if necessary. - pub fn new( - db: Arc>, - config: MigratorConfig, - genesis_block_root: Hash256, - ) -> Self { + pub fn new(db: Arc>, config: MigratorConfig) -> Self { // Estimate last migration run from DB split slot. let prev_migration = Arc::new(Mutex::new(PrevMigration { epoch: db.get_split_slot().epoch(E::slots_per_epoch()), @@ -163,7 +152,6 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator, ) -> Result<(), BeaconChainError> { let notif = FinalizationNotification { finalized_state_root, finalized_checkpoint, - head_tracker, prev_migration: self.prev_migration.clone(), - genesis_block_root: self.genesis_block_root, }; // Send to background thread if configured, otherwise run in foreground. @@ -314,9 +299,7 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator {} + Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { + debug!( + slot = slot.as_u64(), + "Database migration postponed, unaligned finalized block" + ); + } + Err(e) => { + warn!(error = ?e, "Database migration failed"); + return; + } + }; + + let old_finalized_checkpoint_epoch = match Self::prune_hot_db( + db.clone(), + finalized_state_root.into(), &finalized_state, notif.finalized_checkpoint, - notif.genesis_block_root, ) { Ok(PruningOutcome::Successful { - old_finalized_checkpoint, - }) => old_finalized_checkpoint, + old_finalized_checkpoint_epoch, + }) => old_finalized_checkpoint_epoch, Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation) => { warn!( message = "this is expected only very rarely!", @@ -391,26 +391,10 @@ impl, Cold: ItemStore> BackgroundMigrator { - warn!(error = ?e,"Block pruning failed"); - return; - } - }; - - match migrate_database( - db.clone(), - finalized_state_root.into(), - finalized_block_root, - &finalized_state, - ) { - Ok(()) => {} - Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { - debug!( - slot = slot.as_u64(), - "Database migration postponed, unaligned finalized block" + warn!( + error = ?e, + "Hot DB pruning failed" ); - } - Err(e) => { - warn!(error = ?e, "Database migration failed"); return; } }; @@ -418,7 +402,7 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator>, - head_tracker: Arc, - new_finalized_state_hash: BeaconStateHash, + new_finalized_state_root: Hash256, new_finalized_state: &BeaconState, new_finalized_checkpoint: Checkpoint, - genesis_block_root: Hash256, ) -> Result { - let old_finalized_checkpoint = - store - .load_pruning_checkpoint()? - .unwrap_or_else(|| Checkpoint { - epoch: Epoch::new(0), - root: Hash256::zero(), - }); - - let old_finalized_slot = old_finalized_checkpoint - .epoch - .start_slot(E::slots_per_epoch()); let new_finalized_slot = new_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); - let new_finalized_block_hash = new_finalized_checkpoint.root.into(); // The finalized state must be for the epoch boundary slot, not the slot of the finalized // block. @@ -549,200 +518,220 @@ impl, Cold: ItemStore> BackgroundMigrator new_finalized_slot { - return Ok(PruningOutcome::OutOfOrderFinalization { - old_finalized_checkpoint, - new_finalized_checkpoint, - }); - } - debug!( - old_finalized_epoch = %old_finalized_checkpoint.epoch, - new_finalized_epoch = %new_finalized_checkpoint.epoch, + new_finalized_checkpoint = ?new_finalized_checkpoint, + new_finalized_state_root = %new_finalized_state_root, "Starting database pruning" ); - // For each slot between the new finalized checkpoint and the old finalized checkpoint, - // collect the beacon block root and state root of the canonical chain. - let newly_finalized_chain: HashMap = - std::iter::once(Ok(( - new_finalized_slot, - (new_finalized_block_hash, new_finalized_state_hash), - ))) - .chain(RootsIterator::new(&store, new_finalized_state).map(|res| { - res.map(|(block_root, state_root, slot)| { - (slot, (block_root.into(), state_root.into())) + + let state_summaries_dag = { + let state_summaries = store + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| { + let block_root = summary.latest_block_root; + // This error should never happen unless we break a DB invariant + let block = store + .get_blinded_block(&block_root)? + .ok_or(PruningError::MissingBlindedBlock(block_root))?; + Ok(( + state_root, + DAGStateSummaryV22 { + slot: summary.slot, + latest_block_root: summary.latest_block_root, + block_slot: block.slot(), + block_parent_root: block.parent_root(), + }, + )) }) - })) - .take_while(|res| { - res.as_ref() - .map_or(true, |(slot, _)| *slot >= old_finalized_slot) - }) - .collect::>()?; + .collect::, BeaconChainError>>()?; + + // De-duplicate block roots to reduce block reads below + let summary_block_roots = HashSet::::from_iter( + state_summaries + .iter() + .map(|(_, summary)| summary.latest_block_root), + ); + + // Sanity check, there is at least one summary with the new finalized block root + if !summary_block_roots.contains(&new_finalized_checkpoint.root) { + return Err(BeaconChainError::PruningError( + PruningError::MissingSummaryForFinalizedCheckpoint( + new_finalized_checkpoint.root, + ), + )); + } + + StateSummariesDAG::new_from_v22(state_summaries) + .map_err(|e| PruningError::SummariesDagError("new StateSumariesDAG", e))? + }; + + // To debug faulty trees log if we unexpectedly have more than one root. These trees may not + // result in an error, as they may not be queried in the codepaths below. + let state_summaries_dag_roots = state_summaries_dag.tree_roots(); + if state_summaries_dag_roots.len() > 1 { + warn!( + state_summaries_dag_roots = ?state_summaries_dag_roots, + "Prune state summaries dag found more than one root" + ); + } + + // `new_finalized_state_root` is the *state at the slot of the finalized epoch*, + // rather than the state of the latest finalized block. These two values will only + // differ when the first slot of the finalized epoch is a skip slot. + let finalized_and_descendant_state_roots_of_finalized_checkpoint = + HashSet::::from_iter( + std::iter::once(new_finalized_state_root).chain( + state_summaries_dag + .descendants_of(&new_finalized_state_root) + .map_err(|e| PruningError::SummariesDagError("descendants of", e))?, + ), + ); + + // Collect all `latest_block_roots` of the + // finalized_and_descendant_state_roots_of_finalized_checkpoint set. Includes the finalized + // block as `new_finalized_state_root` always has a latest block root equal to the finalized + // block. + let finalized_and_descendant_block_roots_of_finalized_checkpoint = + HashSet::::from_iter( + state_summaries_dag + .blocks_of_states( + finalized_and_descendant_state_roots_of_finalized_checkpoint.iter(), + ) + // should never error, we just constructed + // finalized_and_descendant_state_roots_of_finalized_checkpoint from the + // state_summaries_dag + .map_err(|e| PruningError::SummariesDagError("blocks of descendant", e))? + .into_iter() + .map(|(block_root, _)| block_root), + ); + + // Note: ancestors_of includes the finalized state root + let newly_finalized_state_summaries = state_summaries_dag + .ancestors_of(new_finalized_state_root) + .map_err(|e| PruningError::SummariesDagError("ancestors of", e))?; + let newly_finalized_state_roots = newly_finalized_state_summaries + .iter() + .map(|(root, _)| *root) + .collect::>(); + let newly_finalized_states_min_slot = *newly_finalized_state_summaries + .iter() + .map(|(_, slot)| slot) + .min() + .ok_or(PruningError::EmptyFinalizedStates)?; + + // Note: ancestors_of includes the finalized block + let newly_finalized_blocks = state_summaries_dag + .blocks_of_states(newly_finalized_state_roots.iter()) + .map_err(|e| PruningError::SummariesDagError("blocks of newly finalized", e))?; // We don't know which blocks are shared among abandoned chains, so we buffer and delete // everything in one fell swoop. - let mut abandoned_blocks: HashSet = HashSet::new(); - let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new(); - let mut abandoned_heads: HashSet = HashSet::new(); + let mut blocks_to_prune: HashSet = HashSet::new(); + let mut states_to_prune: HashSet<(Slot, Hash256)> = HashSet::new(); - let heads = head_tracker.heads(); - debug!( - old_finalized_root = ?old_finalized_checkpoint.root, - new_finalized_root = ?new_finalized_checkpoint.root, - head_count = heads.len(), - "Extra pruning information" - ); + // Consider the following block tree where we finalize block `[0]` at the checkpoint `(f)`. + // There's a block `[3]` that descendends from the finalized block but NOT from the + // finalized checkpoint. The block tree rooted in `[3]` conflicts with finality and must be + // pruned. Therefore we collect all state summaries descendant of `(f)`. + // + // finalize epoch boundary + // | /-------[2]----- + // [0]-------|--(f)--[1]---------- + // \---[3]--|-----------------[4] + // | - for (head_hash, head_slot) in heads { - // Load head block. If it fails with a decode error, it's likely a reverted block, - // so delete it from the head tracker but leave it and its states in the database - // This is suboptimal as it wastes disk space, but it's difficult to fix. A re-sync - // can be used to reclaim the space. - let head_state_root = match store.get_blinded_block(&head_hash) { - Ok(Some(block)) => block.state_root(), - Ok(None) => { - return Err(BeaconStateError::MissingBeaconBlock(head_hash.into()).into()) + for (_, summaries) in state_summaries_dag.summaries_by_slot_ascending() { + for (state_root, summary) in summaries { + let should_prune = if finalized_and_descendant_state_roots_of_finalized_checkpoint + .contains(&state_root) + { + // This state is a viable descendant of the finalized checkpoint, so does not + // conflict with finality and can be built on or become a head + false + } else { + // Everything else, prune + true + }; + + if should_prune { + // States are migrated into the cold DB in the migrate step. All hot states + // prior to finalized can be pruned from the hot DB columns + states_to_prune.insert((summary.slot, state_root)); } - Err(Error::SszDecodeError(e)) => { - warn!( - block_root = ?head_hash, - error = ?e, - "Forgetting invalid head block" - ); - abandoned_heads.insert(head_hash); - continue; - } - Err(e) => return Err(e.into()), + } + } + + for (block_root, slot) in state_summaries_dag.iter_blocks() { + // Blocks both finalized and unfinalized are in the same DB column. We must only + // prune blocks from abandoned forks. Note that block pruning and state pruning differ. + // The blocks DB column is shared for hot and cold data, while the states have different + // columns. Thus, we only prune unviable blocks or from abandoned forks. + let should_prune = if finalized_and_descendant_block_roots_of_finalized_checkpoint + .contains(&block_root) + { + // Keep unfinalized blocks descendant of finalized checkpoint + finalized block + // itself Note that we anchor this set on the finalized checkpoint instead of the + // finalized block. A diagram above shows a relevant example. + false + } else if newly_finalized_blocks.contains(&(block_root, slot)) { + // Keep recently finalized blocks + false + } else if slot < newly_finalized_states_min_slot { + // Keep recently finalized blocks that we know are canonical. Blocks with slots < + // that `newly_finalized_blocks_min_slot` we don't have canonical information so we + // assume they are part of the finalized pruned chain + // + // Pruning these would risk breaking the DB by deleting canonical blocks once the + // HDiff grid advances. If the pruning routine is correct this condition should + // never be hit. + false + } else { + // Everything else, prune + true }; - let mut potentially_abandoned_head = Some(head_hash); - let mut potentially_abandoned_blocks = vec![]; - - // Iterate backwards from this head, staging blocks and states for deletion. - let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot))) - .chain(RootsIterator::from_block(&store, head_hash)?); - - for maybe_tuple in iter { - let (block_root, state_root, slot) = maybe_tuple?; - let block_root = SignedBeaconBlockHash::from(block_root); - let state_root = BeaconStateHash::from(state_root); - - match newly_finalized_chain.get(&slot) { - // If there's no information about a slot on the finalized chain, then - // it should be because it's ahead of the new finalized slot. Stage - // the fork's block and state for possible deletion. - None => { - if slot > new_finalized_slot { - potentially_abandoned_blocks.push(( - slot, - Some(block_root), - Some(state_root), - )); - } else if slot >= old_finalized_slot { - return Err(PruningError::MissingInfoForCanonicalChain { slot }.into()); - } else { - // We must assume here any candidate chains include the old finalized - // checkpoint, i.e. there aren't any forks starting at a block that is a - // strict ancestor of old_finalized_checkpoint. - warn!( - head_block_root = ?head_hash, - %head_slot, - "Found a chain that should already have been pruned" - ); - potentially_abandoned_head.take(); - break; - } - } - Some((finalized_block_root, finalized_state_root)) => { - // This fork descends from a newly finalized block, we can stop. - if block_root == *finalized_block_root { - // Sanity check: if the slot and block root match, then the - // state roots should match too. - if state_root != *finalized_state_root { - return Err(PruningError::UnexpectedUnequalStateRoots.into()); - } - - // If the fork descends from the whole finalized chain, - // do not prune it. Otherwise continue to delete all - // of the blocks and states that have been staged for - // deletion so far. - if slot == new_finalized_slot { - potentially_abandoned_blocks.clear(); - potentially_abandoned_head.take(); - } - // If there are skipped slots on the fork to be pruned, then - // we will have just staged the common block for deletion. - // Unstage it. - else { - for (_, block_root, _) in - potentially_abandoned_blocks.iter_mut().rev() - { - if block_root.as_ref() == Some(finalized_block_root) { - *block_root = None; - } else { - break; - } - } - } - break; - } else { - if state_root == *finalized_state_root { - return Err(PruningError::UnexpectedEqualStateRoots.into()); - } - potentially_abandoned_blocks.push(( - slot, - Some(block_root), - Some(state_root), - )); - } - } - } - } - - if let Some(abandoned_head) = potentially_abandoned_head { - debug!( - head_block_root = ?abandoned_head, - %head_slot, - "Pruning head" - ); - abandoned_heads.insert(abandoned_head); - abandoned_blocks.extend( - potentially_abandoned_blocks - .iter() - .filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash), - ); - abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map( - |(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)), - )); + if should_prune { + blocks_to_prune.insert(block_root); } } - // Update the head tracker before the database, so that we maintain the invariant - // that a block present in the head tracker is present in the database. - // See https://github.com/sigp/lighthouse/issues/1557 - let mut head_tracker_lock = head_tracker.0.write(); + // Sort states to prune to make it more readable + let mut states_to_prune = states_to_prune.into_iter().collect::>(); + states_to_prune.sort_by_key(|(slot, _)| *slot); - // Check that all the heads to be deleted are still present. The absence of any - // head indicates a race, that will likely resolve itself, so we defer pruning until - // later. - for head_hash in &abandoned_heads { - if !head_tracker_lock.contains_key(head_hash) { - return Ok(PruningOutcome::DeferredConcurrentHeadTrackerMutation); - } + debug!( + new_finalized_checkpoint = ?new_finalized_checkpoint, + newly_finalized_blocks = newly_finalized_blocks.len(), + newly_finalized_state_roots = newly_finalized_state_roots.len(), + newly_finalized_states_min_slot = %newly_finalized_states_min_slot, + state_summaries_count = state_summaries_dag.summaries_count(), + state_summaries_dag_roots = ?state_summaries_dag_roots, + finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(), + finalized_and_descendant_state_roots_of_finalized_checkpoint = finalized_and_descendant_state_roots_of_finalized_checkpoint.len(), + blocks_to_prune = blocks_to_prune.len(), + states_to_prune = states_to_prune.len(), + "Extra pruning information" + ); + // Don't log the full `states_to_prune` in the log statement above as it can result in a + // single log line of +1Kb and break logging setups. + for block_root in &blocks_to_prune { + debug!( + block_root = ?block_root, + "Pruning block" + ); + } + for (slot, state_root) in &states_to_prune { + debug!( + ?state_root, + %slot, + "Pruning hot state" + ); } - // Then remove them for real. - for head_hash in abandoned_heads { - head_tracker_lock.remove(&head_hash); - } - - let mut batch: Vec> = abandoned_blocks + let mut batch: Vec> = blocks_to_prune .into_iter() - .map(Into::into) - .flat_map(|block_root: Hash256| { + .flat_map(|block_root| { [ StoreOp::DeleteBlock(block_root), StoreOp::DeleteExecutionPayload(block_root), @@ -750,43 +739,87 @@ impl, Cold: ItemStore> BackgroundMigrator>, + ) { + for (block_root, slot) in finalized_blocks { + // Delete the execution payload if payload pruning is enabled. At a skipped slot we may + // delete the payload for the finalized block itself, but that's OK as we only guarantee + // that payloads are present for slots >= the split slot. + if *slot < new_finalized_slot { + hot_db_ops.push(StoreOp::DeleteExecutionPayload(*block_root)); + } + } + } + + fn prune_non_checkpoint_sync_committee_branches( + finalized_blocks_desc: &[(Hash256, Slot)], + hot_db_ops: &mut Vec>, + ) { + let mut epoch_boundary_blocks = HashSet::new(); + let mut non_checkpoint_block_roots = HashSet::new(); + + // Then, iterate states in slot ascending order, as they are stored wrt previous states. + for (block_root, slot) in finalized_blocks_desc.iter().rev() { + // At a missed slot, `state_root_iter` will return the block root + // from the previous non-missed slot. This ensures that the block root at an + // epoch boundary is always a checkpoint block root. We keep track of block roots + // at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set. + // We then ensure that block roots at the epoch boundary aren't included in the + // `non_checkpoint_block_roots` hash set. + if *slot % E::slots_per_epoch() == 0 { + epoch_boundary_blocks.insert(block_root); + } else { + non_checkpoint_block_roots.insert(block_root); + } + + if epoch_boundary_blocks.contains(&block_root) { + non_checkpoint_block_roots.remove(&block_root); + } + } + + // Prune sync committee branch data for all non checkpoint block roots. + // Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots + // as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case + // we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root. + // E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31) + // and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch` + // for a checkpoint block root. + non_checkpoint_block_roots + .into_iter() + .for_each(|block_root| { + hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(*block_root)); + }); + } + /// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it /// was last compacted. pub fn run_compaction( diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index adb68def0d..83affb0dcd 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,24 +1,11 @@ -use crate::head_tracker::SszHeadTracker; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; use types::Hash256; -/// Dummy value to use for the canonical head block root, see below. -pub const DUMMY_CANONICAL_HEAD_BLOCK_ROOT: Hash256 = Hash256::repeat_byte(0xff); - #[derive(Clone, Encode, Decode)] pub struct PersistedBeaconChain { - /// This value is ignored to resolve the issue described here: - /// - /// https://github.com/sigp/lighthouse/pull/1639 - /// - /// Its removal is tracked here: - /// - /// https://github.com/sigp/lighthouse/issues/1784 - pub _canonical_head_block_root: Hash256, pub genesis_block_root: Hash256, - pub ssz_head_tracker: SszHeadTracker, } impl StoreItem for PersistedBeaconChain { diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index ccfae1b182..49aa116f6c 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -2,6 +2,7 @@ mod migration_schema_v20; mod migration_schema_v21; mod migration_schema_v22; +mod migration_schema_v23; use crate::beacon_chain::BeaconChainTypes; use std::sync::Arc; @@ -57,6 +58,14 @@ pub fn migrate_schema( // bumped inside the upgrade_to_v22 fn migration_schema_v22::upgrade_to_v22::(db.clone(), genesis_state_root) } + (SchemaVersion(22), SchemaVersion(23)) => { + let ops = migration_schema_v23::upgrade_to_v23::(db.clone())?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(23), SchemaVersion(22)) => { + let ops = migration_schema_v23::downgrade_from_v23::(db.clone())?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs new file mode 100644 index 0000000000..e66178df53 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -0,0 +1,147 @@ +use crate::beacon_chain::BeaconChainTypes; +use crate::persisted_fork_choice::PersistedForkChoice; +use crate::schema_change::StoreError; +use crate::test_utils::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY, FORK_CHOICE_DB_KEY}; +use crate::BeaconForkChoiceStore; +use fork_choice::{ForkChoice, ResetPayloadStatuses}; +use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use store::{DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem}; +use types::{Hash256, Slot}; + +/// Dummy value to use for the canonical head block root, see below. +pub const DUMMY_CANONICAL_HEAD_BLOCK_ROOT: Hash256 = Hash256::repeat_byte(0xff); + +pub fn upgrade_to_v23( + db: Arc>, +) -> Result, Error> { + // 1) Set the head-tracker to empty + let Some(persisted_beacon_chain_v22) = + db.get_item::(&BEACON_CHAIN_DB_KEY)? + else { + return Err(Error::MigrationError( + "No persisted beacon chain found in DB. Datadir could be incorrect or DB could be corrupt".to_string() + )); + }; + + let persisted_beacon_chain = PersistedBeaconChain { + genesis_block_root: persisted_beacon_chain_v22.genesis_block_root, + }; + + let mut ops = vec![persisted_beacon_chain.as_kv_store_op(BEACON_CHAIN_DB_KEY)]; + + // 2) Wipe out all state temporary flags. While un-used in V23, if there's a rollback we could + // end-up with an inconsistent DB. + for state_root_result in db + .hot_db + .iter_column_keys::(DBColumn::BeaconStateTemporary) + { + ops.push(KeyValueStoreOp::DeleteKey( + DBColumn::BeaconStateTemporary, + state_root_result?.as_slice().to_vec(), + )); + } + + Ok(ops) +} + +pub fn downgrade_from_v23( + db: Arc>, +) -> Result, Error> { + let Some(persisted_beacon_chain) = db.get_item::(&BEACON_CHAIN_DB_KEY)? + else { + // The `PersistedBeaconChain` must exist if fork choice exists. + return Err(Error::MigrationError( + "No persisted beacon chain found in DB. Datadir could be incorrect or DB could be corrupt".to_string(), + )); + }; + + // Recreate head-tracker from fork choice. + let Some(persisted_fork_choice) = db.get_item::(&FORK_CHOICE_DB_KEY)? + else { + // Fork choice should exist if the database exists. + return Err(Error::MigrationError( + "No fork choice found in DB".to_string(), + )); + }; + + let fc_store = + BeaconForkChoiceStore::from_persisted(persisted_fork_choice.fork_choice_store, db.clone()) + .map_err(|e| { + Error::MigrationError(format!( + "Error loading fork choise store from persisted: {e:?}" + )) + })?; + + // Doesn't matter what policy we use for invalid payloads, as our head calculation just + // considers descent from finalization. + let reset_payload_statuses = ResetPayloadStatuses::OnlyWithInvalidPayload; + let fork_choice = ForkChoice::from_persisted( + persisted_fork_choice.fork_choice, + reset_payload_statuses, + fc_store, + &db.spec, + ) + .map_err(|e| { + Error::MigrationError(format!("Error loading fork choice from persisted: {e:?}")) + })?; + + let heads = fork_choice + .proto_array() + .heads_descended_from_finalization::(); + + let head_roots = heads.iter().map(|node| node.root).collect(); + let head_slots = heads.iter().map(|node| node.slot).collect(); + + let persisted_beacon_chain_v22 = PersistedBeaconChainV22 { + _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, + genesis_block_root: persisted_beacon_chain.genesis_block_root, + ssz_head_tracker: SszHeadTracker { + roots: head_roots, + slots: head_slots, + }, + }; + + let ops = vec![persisted_beacon_chain_v22.as_kv_store_op(BEACON_CHAIN_DB_KEY)]; + + Ok(ops) +} + +/// Helper struct that is used to encode/decode the state of the `HeadTracker` as SSZ bytes. +/// +/// This is used when persisting the state of the `BeaconChain` to disk. +#[derive(Encode, Decode, Clone)] +pub struct SszHeadTracker { + roots: Vec, + slots: Vec, +} + +#[derive(Clone, Encode, Decode)] +pub struct PersistedBeaconChainV22 { + /// This value is ignored to resolve the issue described here: + /// + /// https://github.com/sigp/lighthouse/pull/1639 + /// + /// Its removal is tracked here: + /// + /// https://github.com/sigp/lighthouse/issues/1784 + pub _canonical_head_block_root: Hash256, + pub genesis_block_root: Hash256, + /// DEPRECATED + pub ssz_head_tracker: SszHeadTracker, +} + +impl StoreItem for PersistedBeaconChainV22 { + fn db_column() -> DBColumn { + DBColumn::BeaconChain + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Self::from_ssz_bytes(bytes).map_err(Into::into) + } +} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index f4216ef76d..9135c3ce88 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -23,7 +23,6 @@ use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; -use store::KeyValueStore; use task_executor::TaskExecutor; use tokio::time::{sleep, sleep_until, Instant}; use tracing::{debug, error, warn}; @@ -297,7 +296,7 @@ fn advance_head(beacon_chain: &Arc>) -> Resu // Protect against advancing a state more than a single slot. // // Advancing more than one slot without storing the intermediate state would corrupt the - // database. Future works might store temporary, intermediate states inside this function. + // database. Future works might store intermediate states inside this function. match state.slot().cmp(&state.latest_block_header().slot) { std::cmp::Ordering::Equal => (), std::cmp::Ordering::Greater => { @@ -432,20 +431,13 @@ fn advance_head(beacon_chain: &Arc>) -> Resu ); } - // Write the advanced state to the database with a temporary flag that will be deleted when - // a block is imported on top of this state. We should delete this once we bring in the DB - // changes from tree-states that allow us to prune states without temporary flags. + // Write the advanced state to the database. + // We no longer use a transaction lock here when checking whether the state exists, because + // even if we race with the deletion of this state by the finalization pruning code, the worst + // case is we end up with a finalized state stored, that will get pruned the next time pruning + // runs. let advanced_state_root = state.update_tree_hash_cache()?; - let txn_lock = beacon_chain.store.hot_db.begin_rw_transaction(); - let state_already_exists = beacon_chain - .store - .load_hot_state_summary(&advanced_state_root)? - .is_some(); - let temporary = !state_already_exists; - beacon_chain - .store - .put_state_possibly_temporary(&advanced_state_root, &state, temporary)?; - drop(txn_lock); + beacon_chain.store.put_state(&advanced_state_root, &state)?; debug!( ?head_block_root, diff --git a/beacon_node/beacon_chain/src/summaries_dag.rs b/beacon_node/beacon_chain/src/summaries_dag.rs new file mode 100644 index 0000000000..ab379d1eb2 --- /dev/null +++ b/beacon_node/beacon_chain/src/summaries_dag.rs @@ -0,0 +1,464 @@ +use itertools::Itertools; +use std::{ + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap, HashMap}, +}; +use types::{Hash256, Slot}; + +#[derive(Debug, Clone, Copy)] +pub struct DAGStateSummary { + pub slot: Slot, + pub latest_block_root: Hash256, + pub latest_block_slot: Slot, + pub previous_state_root: Hash256, +} + +#[derive(Debug, Clone, Copy)] +pub struct DAGStateSummaryV22 { + pub slot: Slot, + pub latest_block_root: Hash256, + pub block_slot: Slot, + pub block_parent_root: Hash256, +} + +pub struct StateSummariesDAG { + // state_root -> state_summary + state_summaries_by_state_root: HashMap, + // block_root -> state slot -> (state_root, state summary) + state_summaries_by_block_root: HashMap>, + // parent_state_root -> Vec + // cached value to prevent having to recompute in each recursive call into `descendants_of` + child_state_roots: HashMap>, +} + +#[derive(Debug)] +pub enum Error { + DuplicateStateSummary { + block_root: Hash256, + existing_state_summary: Box<(Slot, Hash256)>, + new_state_summary: (Slot, Hash256), + }, + MissingStateSummary(Hash256), + MissingStateSummaryByBlockRoot { + state_root: Hash256, + latest_block_root: Hash256, + }, + StateSummariesNotContiguous { + state_root: Hash256, + state_slot: Slot, + latest_block_root: Hash256, + parent_block_root: Box, + parent_block_latest_state_summary: Box>, + }, + MissingChildStateRoot(Hash256), + RequestedSlotAboveSummary { + starting_state_root: Hash256, + ancestor_slot: Slot, + state_root: Hash256, + state_slot: Slot, + }, + RootUnknownPreviousStateRoot(Slot, Hash256), + RootUnknownAncestorStateRoot { + starting_state_root: Hash256, + ancestor_slot: Slot, + root_state_root: Hash256, + root_state_slot: Slot, + }, +} + +impl StateSummariesDAG { + pub fn new(state_summaries: Vec<(Hash256, DAGStateSummary)>) -> Result { + // Group them by latest block root, and sorted state slot + let mut state_summaries_by_state_root = HashMap::new(); + let mut state_summaries_by_block_root = HashMap::<_, BTreeMap<_, _>>::new(); + let mut child_state_roots = HashMap::<_, Vec<_>>::new(); + + for (state_root, summary) in state_summaries.into_iter() { + let summaries = state_summaries_by_block_root + .entry(summary.latest_block_root) + .or_default(); + + // Sanity check to ensure no duplicate summaries for the tuple (block_root, state_slot) + match summaries.entry(summary.slot) { + Entry::Vacant(entry) => { + entry.insert((state_root, summary)); + } + Entry::Occupied(existing) => { + return Err(Error::DuplicateStateSummary { + block_root: summary.latest_block_root, + existing_state_summary: (summary.slot, state_root).into(), + new_state_summary: (*existing.key(), existing.get().0), + }) + } + } + + state_summaries_by_state_root.insert(state_root, summary); + + child_state_roots + .entry(summary.previous_state_root) + .or_default() + .push(state_root); + // Add empty entry for the child state + child_state_roots.entry(state_root).or_default(); + } + + Ok(Self { + state_summaries_by_state_root, + state_summaries_by_block_root, + child_state_roots, + }) + } + + /// Computes a DAG from a sequence of state summaries, including their parent block + /// relationships. + /// + /// - Expects summaries to be contiguous per slot: there must exist a summary at every slot + /// of each tree branch + /// - Maybe include multiple disjoint trees. The root of each tree will have a ZERO parent state + /// root, which will error later when calling `previous_state_root`. + pub fn new_from_v22( + state_summaries_v22: Vec<(Hash256, DAGStateSummaryV22)>, + ) -> Result { + // Group them by latest block root, and sorted state slot + let mut state_summaries_by_block_root = HashMap::<_, BTreeMap<_, _>>::new(); + for (state_root, summary) in state_summaries_v22.iter() { + let summaries = state_summaries_by_block_root + .entry(summary.latest_block_root) + .or_default(); + + // Sanity check to ensure no duplicate summaries for the tuple (block_root, state_slot) + match summaries.entry(summary.slot) { + Entry::Vacant(entry) => { + entry.insert((state_root, summary)); + } + Entry::Occupied(existing) => { + return Err(Error::DuplicateStateSummary { + block_root: summary.latest_block_root, + existing_state_summary: (summary.slot, *state_root).into(), + new_state_summary: (*existing.key(), *existing.get().0), + }) + } + } + } + + let state_summaries = state_summaries_v22 + .iter() + .map(|(state_root, summary)| { + let previous_state_root = if summary.slot == 0 { + Hash256::ZERO + } else { + let previous_slot = summary.slot - 1; + + // Check the set of states in the same state's block root + let same_block_root_summaries = state_summaries_by_block_root + .get(&summary.latest_block_root) + // Should never error: we construct the HashMap here and must have at least + // one entry per block root + .ok_or(Error::MissingStateSummaryByBlockRoot { + state_root: *state_root, + latest_block_root: summary.latest_block_root, + })?; + if let Some((state_root, _)) = same_block_root_summaries.get(&previous_slot) { + // Skipped slot: block root at previous slot is the same as latest block root. + **state_root + } else { + // Common case: not a skipped slot. + let parent_block_root = summary.block_parent_root; + if let Some(parent_block_summaries) = + state_summaries_by_block_root.get(&parent_block_root) + { + *parent_block_summaries + .get(&previous_slot) + // Should never error: summaries are contiguous, so if there's an + // entry it must contain at least one summary at the previous slot. + .ok_or(Error::StateSummariesNotContiguous { + state_root: *state_root, + state_slot: summary.slot, + latest_block_root: summary.latest_block_root, + parent_block_root: parent_block_root.into(), + parent_block_latest_state_summary: parent_block_summaries + .iter() + .max_by(|a, b| a.0.cmp(b.0)) + .map(|(slot, (state_root, _))| (*slot, **state_root)) + .into(), + })? + .0 + } else { + // We don't know of any summary with this parent block root. We'll + // consider this summary to be a root of `state_summaries_v22` + // collection and mark it as zero. + // The test store_tests::finalizes_non_epoch_start_slot manages to send two + // disjoint trees on its second migration. + Hash256::ZERO + } + } + }; + + Ok(( + *state_root, + DAGStateSummary { + slot: summary.slot, + latest_block_root: summary.latest_block_root, + latest_block_slot: summary.block_slot, + previous_state_root, + }, + )) + }) + .collect::, _>>()?; + + Self::new(state_summaries) + } + + // Returns all non-unique latest block roots of a given set of states + pub fn blocks_of_states<'a, I: Iterator>( + &self, + state_roots: I, + ) -> Result, Error> { + state_roots + .map(|state_root| { + let summary = self + .state_summaries_by_state_root + .get(state_root) + .ok_or(Error::MissingStateSummary(*state_root))?; + Ok((summary.latest_block_root, summary.latest_block_slot)) + }) + .collect() + } + + // Returns all unique latest blocks of this DAG's summaries + pub fn iter_blocks(&self) -> impl Iterator + '_ { + self.state_summaries_by_state_root + .values() + .map(|summary| (summary.latest_block_root, summary.latest_block_slot)) + .unique() + } + + /// Returns a vec of state summaries that have an unknown parent when forming the DAG tree + pub fn tree_roots(&self) -> Vec<(Hash256, DAGStateSummary)> { + self.state_summaries_by_state_root + .iter() + .filter_map(|(state_root, summary)| { + if self + .state_summaries_by_state_root + .contains_key(&summary.previous_state_root) + { + // Summaries with a known parent are not roots + None + } else { + Some((*state_root, *summary)) + } + }) + .collect() + } + + pub fn summaries_count(&self) -> usize { + self.state_summaries_by_block_root + .values() + .map(|s| s.len()) + .sum() + } + + pub fn summaries_by_slot_ascending(&self) -> BTreeMap> { + let mut summaries = BTreeMap::>::new(); + for (state_root, summary) in self.state_summaries_by_state_root.iter() { + summaries + .entry(summary.slot) + .or_default() + .push((*state_root, *summary)); + } + summaries + } + + pub fn previous_state_root(&self, state_root: Hash256) -> Result { + let summary = self + .state_summaries_by_state_root + .get(&state_root) + .ok_or(Error::MissingStateSummary(state_root))?; + if summary.previous_state_root == Hash256::ZERO { + Err(Error::RootUnknownPreviousStateRoot( + summary.slot, + state_root, + )) + } else { + Ok(summary.previous_state_root) + } + } + + pub fn ancestor_state_root_at_slot( + &self, + starting_state_root: Hash256, + ancestor_slot: Slot, + ) -> Result { + let mut state_root = starting_state_root; + // Walk backwards until we reach the state at `ancestor_slot`. + loop { + let summary = self + .state_summaries_by_state_root + .get(&state_root) + .ok_or(Error::MissingStateSummary(state_root))?; + + // Assumes all summaries are contiguous + match summary.slot.cmp(&ancestor_slot) { + Ordering::Less => { + return Err(Error::RequestedSlotAboveSummary { + starting_state_root, + ancestor_slot, + state_root, + state_slot: summary.slot, + }) + } + Ordering::Equal => { + return Ok(state_root); + } + Ordering::Greater => { + if summary.previous_state_root == Hash256::ZERO { + return Err(Error::RootUnknownAncestorStateRoot { + starting_state_root, + ancestor_slot, + root_state_root: state_root, + root_state_slot: summary.slot, + }); + } else { + state_root = summary.previous_state_root; + } + } + } + } + } + + /// Returns all ancestors of `state_root` INCLUDING `state_root` until the next parent is not + /// known. + pub fn ancestors_of(&self, mut state_root: Hash256) -> Result, Error> { + // Sanity check that the first summary exists + if !self.state_summaries_by_state_root.contains_key(&state_root) { + return Err(Error::MissingStateSummary(state_root)); + } + + let mut ancestors = vec![]; + loop { + if let Some(summary) = self.state_summaries_by_state_root.get(&state_root) { + ancestors.push((state_root, summary.slot)); + state_root = summary.previous_state_root + } else { + return Ok(ancestors); + } + } + } + + /// Returns of the descendant state summaries roots given an initiail state root. + pub fn descendants_of(&self, query_state_root: &Hash256) -> Result, Error> { + let mut descendants = vec![]; + for child_root in self + .child_state_roots + .get(query_state_root) + .ok_or(Error::MissingChildStateRoot(*query_state_root))? + { + descendants.push(*child_root); + descendants.extend(self.descendants_of(child_root)?); + } + Ok(descendants) + } +} + +#[cfg(test)] +mod tests { + use super::{DAGStateSummaryV22, Error, StateSummariesDAG}; + use bls::FixedBytesExtended; + use types::{Hash256, Slot}; + + fn root(n: u64) -> Hash256 { + Hash256::from_low_u64_le(n) + } + + #[test] + fn new_from_v22_empty() { + StateSummariesDAG::new_from_v22(vec![]).unwrap(); + } + + fn assert_previous_state_root_is_zero(dag: &StateSummariesDAG, root: Hash256) { + assert!(matches!( + dag.previous_state_root(root).unwrap_err(), + Error::RootUnknownPreviousStateRoot { .. } + )); + } + + #[test] + fn new_from_v22_one_state() { + let root_a = root(0xa); + let root_1 = root(1); + let root_2 = root(2); + let summary_1 = DAGStateSummaryV22 { + slot: Slot::new(1), + latest_block_root: root_1, + block_parent_root: root_2, + block_slot: Slot::new(1), + }; + + let dag = StateSummariesDAG::new_from_v22(vec![(root_a, summary_1)]).unwrap(); + + // The parent of the root summary is ZERO + assert_previous_state_root_is_zero(&dag, root_a); + } + + #[test] + fn new_from_v22_multiple_states() { + let dag = StateSummariesDAG::new_from_v22(vec![ + ( + root(0xa), + DAGStateSummaryV22 { + slot: Slot::new(3), + latest_block_root: root(3), + block_parent_root: root(1), + block_slot: Slot::new(3), + }, + ), + ( + root(0xb), + DAGStateSummaryV22 { + slot: Slot::new(4), + latest_block_root: root(4), + block_parent_root: root(3), + block_slot: Slot::new(4), + }, + ), + // fork 1 + ( + root(0xc), + DAGStateSummaryV22 { + slot: Slot::new(5), + latest_block_root: root(5), + block_parent_root: root(4), + block_slot: Slot::new(5), + }, + ), + // fork 2 + // skipped slot + ( + root(0xd), + DAGStateSummaryV22 { + slot: Slot::new(5), + latest_block_root: root(4), + block_parent_root: root(3), + block_slot: Slot::new(4), + }, + ), + // normal slot + ( + root(0xe), + DAGStateSummaryV22 { + slot: Slot::new(6), + latest_block_root: root(6), + block_parent_root: root(4), + block_slot: Slot::new(6), + }, + ), + ]) + .unwrap(); + + // The parent of the root summary is ZERO + assert_previous_state_root_is_zero(&dag, root(0xa)); + assert_eq!(dag.previous_state_root(root(0xc)).unwrap(), root(0xb)); + assert_eq!(dag.previous_state_root(root(0xd)).unwrap(), root(0xb)); + assert_eq!(dag.previous_state_root(root(0xe)).unwrap(), root(0xd)); + } +} diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index beff95eb77..fe78d83c03 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -893,6 +893,28 @@ where state.get_block_root(slot).unwrap() == state.get_block_root(slot - 1).unwrap() } + pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool { + self.chain + .heads() + .iter() + .any(|(head, _)| *head == Hash256::from(*block_hash)) + } + + pub fn assert_knows_head(&self, head_block_root: Hash256) { + let heads = self.chain.heads(); + if !heads.iter().any(|(head, _)| *head == head_block_root) { + let fork_choice = self.chain.canonical_head.fork_choice_read_lock(); + if heads.is_empty() { + let nodes = &fork_choice.proto_array().core_proto_array().nodes; + panic!("Expected to know head block root {head_block_root:?}, but heads is empty. Nodes: {nodes:#?}"); + } else { + panic!( + "Expected to know head block root {head_block_root:?}, known heads {heads:#?}" + ); + } + } + } + pub async fn make_blinded_block( &self, state: BeaconState, diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index ac7627b0b1..4c4f0d8c6a 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1419,8 +1419,8 @@ async fn recover_from_invalid_head_after_persist_and_reboot() { let slot_clock = rig.harness.chain.slot_clock.clone(); - // Forcefully persist the head and fork choice. - rig.harness.chain.persist_head_and_fork_choice().unwrap(); + // Forcefully persist fork choice. + rig.harness.chain.persist_fork_choice().unwrap(); let resumed = BeaconChainHarness::builder(MainnetEthSpec) .default_spec() diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 38ff87d0c8..e41f547fb5 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -31,7 +31,6 @@ use store::{ BlobInfo, DBColumn, HotColdDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; -use tokio::time::sleep; use types::test_utils::{SeedableRng, XorShiftRng}; use types::*; @@ -120,6 +119,17 @@ fn get_harness_generic( harness } +fn count_states_descendant_of_block( + store: &HotColdDB, BeaconNodeBackend>, + block_root: Hash256, +) -> usize { + let summaries = store.load_hot_state_summaries().unwrap(); + summaries + .iter() + .filter(|(_, s)| s.latest_block_root == block_root) + .count() +} + #[tokio::test] async fn light_client_bootstrap_test() { let spec = test_spec::(); @@ -1225,7 +1235,7 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { assert_eq!(rig.get_finalized_checkpoints(), hashset! {}); - assert!(rig.chain.knows_head(&stray_head)); + rig.assert_knows_head(stray_head.into()); // Trigger finalization let finalization_slots: Vec = ((canonical_chain_slot + 1) @@ -1273,7 +1283,7 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { ); } - assert!(!rig.chain.knows_head(&stray_head)); + assert!(!rig.knows_head(&stray_head)); } #[tokio::test] @@ -1399,7 +1409,7 @@ async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() { ); } - assert!(!rig.chain.knows_head(&stray_head)); + assert!(!rig.knows_head(&stray_head)); let chain_dump = rig.chain.chain_dump().unwrap(); assert!(get_blocks(&chain_dump).contains(&shared_head)); } @@ -1492,7 +1502,7 @@ async fn pruning_does_not_touch_blocks_prior_to_finalization() { ); } - assert!(rig.chain.knows_head(&stray_head)); + rig.assert_knows_head(stray_head.into()); } #[tokio::test] @@ -1576,7 +1586,7 @@ async fn prunes_fork_growing_past_youngest_finalized_checkpoint() { // Precondition: Nothing is finalized yet assert_eq!(rig.get_finalized_checkpoints(), hashset! {},); - assert!(rig.chain.knows_head(&stray_head)); + rig.assert_knows_head(stray_head.into()); // Trigger finalization let canonical_slots: Vec = (rig.epoch_start_slot(2)..=rig.epoch_start_slot(6)) @@ -1631,7 +1641,7 @@ async fn prunes_fork_growing_past_youngest_finalized_checkpoint() { ); } - assert!(!rig.chain.knows_head(&stray_head)); + assert!(!rig.knows_head(&stray_head)); } // This is to check if state outside of normal block processing are pruned correctly. @@ -2150,64 +2160,6 @@ async fn pruning_test( check_no_blocks_exist(&harness, stray_blocks.values()); } -#[tokio::test] -async fn garbage_collect_temp_states_from_failed_block_on_startup() { - let db_path = tempdir().unwrap(); - - // Wrap these functions to ensure the variables are dropped before we try to open another - // instance of the store. - let mut store = { - let store = get_store(&db_path); - let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); - - let slots_per_epoch = E::slots_per_epoch(); - - let genesis_state = harness.get_current_state(); - let block_slot = Slot::new(2 * slots_per_epoch); - let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await; - - let (mut block, _) = (*signed_block).clone().deconstruct(); - - // Mutate the block to make it invalid, and re-sign it. - *block.state_root_mut() = Hash256::repeat_byte(0xff); - let proposer_index = block.proposer_index() as usize; - let block = Arc::new(block.sign( - &harness.validator_keypairs[proposer_index].sk, - &state.fork(), - state.genesis_validators_root(), - &harness.spec, - )); - - // The block should be rejected, but should store a bunch of temporary states. - harness.set_current_slot(block_slot); - harness - .process_block_result((block, None)) - .await - .unwrap_err(); - - assert_eq!( - store.iter_temporary_state_roots().count(), - block_slot.as_usize() - 1 - ); - store - }; - - // Wait until all the references to the store have been dropped, this helps ensure we can - // re-open the store later. - loop { - store = if let Err(store_arc) = Arc::try_unwrap(store) { - sleep(Duration::from_millis(500)).await; - store_arc - } else { - break; - } - } - - // On startup, the store should garbage collect all the temporary states. - let store = get_store(&db_path); - assert_eq!(store.iter_temporary_state_roots().count(), 0); -} - #[tokio::test] async fn garbage_collect_temp_states_from_failed_block_on_finalization() { let db_path = tempdir().unwrap(); @@ -2222,6 +2174,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await; let (mut block, _) = (*signed_block).clone().deconstruct(); + let bad_block_parent_root = block.parent_root(); // Mutate the block to make it invalid, and re-sign it. *block.state_root_mut() = Hash256::repeat_byte(0xff); @@ -2240,9 +2193,11 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { .await .unwrap_err(); + // The bad block parent root is the genesis block root. There's `block_slot - 1` temporary + // states to remove + the genesis state = block_slot. assert_eq!( - store.iter_temporary_state_roots().count(), - block_slot.as_usize() - 1 + count_states_descendant_of_block(&store, bad_block_parent_root), + block_slot.as_usize(), ); // Finalize the chain without the block, which should result in pruning of all temporary states. @@ -2259,8 +2214,12 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { // Check that the finalization migration ran. assert_ne!(store.get_split_slot(), 0); - // Check that temporary states have been pruned. - assert_eq!(store.iter_temporary_state_roots().count(), 0); + // Check that temporary states have been pruned. The genesis block is not a descendant of the + // latest finalized checkpoint, so all its states have been pruned from the hot DB, = 0. + assert_eq!( + count_states_descendant_of_block(&store, bad_block_parent_root), + 0 + ); } #[tokio::test] @@ -2785,8 +2744,8 @@ async fn finalizes_after_resuming_from_db() { harness .chain - .persist_head_and_fork_choice() - .expect("should persist the head and fork choice"); + .persist_fork_choice() + .expect("should persist fork choice"); harness .chain .persist_op_pool() @@ -2999,11 +2958,13 @@ async fn revert_minority_fork_on_resume() { resumed_harness.chain.recompute_head_at_current_slot().await; assert_eq!(resumed_harness.head_slot(), fork_slot - 1); - // Head track should know the canonical head and the rogue head. - assert_eq!(resumed_harness.chain.heads().len(), 2); - assert!(resumed_harness - .chain - .knows_head(&resumed_harness.head_block_root().into())); + // Fork choice should only know the canonical head. When we reverted the head we also should + // have called `reset_fork_choice_to_finalization` which rebuilds fork choice from scratch + // without the reverted block. + assert_eq!( + resumed_harness.chain.heads(), + vec![(resumed_harness.head_block_root(), fork_slot - 1)] + ); // Apply blocks from the majority chain and trigger finalization. let initial_split_slot = resumed_harness.chain.store.get_split_slot(); diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 3d8bbe1473..81d6d1d4bd 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -195,7 +195,6 @@ impl LevelDB { }; for (start_key, end_key) in [ - endpoints(DBColumn::BeaconStateTemporary), endpoints(DBColumn::BeaconState), endpoints(DBColumn::BeaconStateSummary), ] { diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 41fd17ef43..ed6154da80 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -25,7 +25,7 @@ pub enum Error { NoContinuationData, SplitPointModified(Slot, Slot), ConfigError(StoreConfigError), - SchemaMigrationError(String), + MigrationError(String), /// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied. AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs deleted file mode 100644 index 586db44c89..0000000000 --- a/beacon_node/store/src/garbage_collection.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Garbage collection process that runs at start-up to clean up the database. -use crate::database::interface::BeaconNodeBackend; -use crate::hot_cold_store::HotColdDB; -use crate::{DBColumn, Error}; -use tracing::debug; -use types::EthSpec; - -impl HotColdDB, BeaconNodeBackend> -where - E: EthSpec, -{ - /// Clean up the database by performing one-off maintenance at start-up. - pub fn remove_garbage(&self) -> Result<(), Error> { - self.delete_temp_states()?; - Ok(()) - } - - /// Delete the temporary states that were leftover by failed block imports. - pub fn delete_temp_states(&self) -> Result<(), Error> { - let mut ops = vec![]; - self.iter_temporary_state_roots().for_each(|state_root| { - if let Ok(state_root) = state_root { - ops.push(state_root); - } - }); - if !ops.is_empty() { - debug!("Garbage collecting {} temporary states", ops.len()); - - self.delete_batch(DBColumn::BeaconState, ops.clone())?; - self.delete_batch(DBColumn::BeaconStateSummary, ops.clone())?; - self.delete_batch(DBColumn::BeaconStateTemporary, ops)?; - } - - Ok(()) - } -} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6a30d8a428..362c5d8014 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -14,8 +14,8 @@ use crate::metadata::{ }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ - get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, ColumnKeyIter, - DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, + get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, DBColumn, + DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, StoreOp, }; use itertools::{process_results, Itertools}; use lru::LruCache; @@ -36,7 +36,7 @@ use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::Duration; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList}; use types::*; use zstd::{Decoder, Encoder}; @@ -80,7 +80,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// HTTP API. historic_state_cache: Mutex>, /// Chain spec. - pub(crate) spec: Arc, + pub spec: Arc, /// Mere vessel for E. _phantom: PhantomData, } @@ -161,7 +161,7 @@ pub enum HotColdDBError { MissingRestorePoint(Hash256), MissingColdStateSummary(Hash256), MissingHotStateSummary(Hash256), - MissingEpochBoundaryState(Hash256), + MissingEpochBoundaryState(Hash256, Hash256), MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), MissingStateDiff(Hash256), @@ -390,8 +390,11 @@ impl HotColdDB, BeaconNodeBackend> { } db.store_config()?; - // Run a garbage collection pass. - db.remove_garbage()?; + // TODO(tree-states): Here we can choose to prune advanced states to reclaim disk space. As + // it's a foreground task there's no risk of race condition that can corrupt the DB. + // Advanced states for invalid blocks that were never written to the DB, or descendants of + // heads can be safely pruned at the expense of potentially having to recompute them in the + // future. However this would require a new dedicated pruning routine. // If configured, run a foreground compaction pass. if db.config.compact_on_init { @@ -402,12 +405,6 @@ impl HotColdDB, BeaconNodeBackend> { Ok(db) } - - /// Return an iterator over the state roots of all temporary states. - pub fn iter_temporary_state_roots(&self) -> ColumnKeyIter { - self.hot_db - .iter_column_keys::(DBColumn::BeaconStateTemporary) - } } impl, Cold: ItemStore> HotColdDB { @@ -903,26 +900,11 @@ impl, Cold: ItemStore> HotColdDB /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { - self.put_state_possibly_temporary(state_root, state, false) - } - - /// Store a state in the store. - /// - /// The `temporary` flag indicates whether this state should be considered canonical. - pub fn put_state_possibly_temporary( - &self, - state_root: &Hash256, - state: &BeaconState, - temporary: bool, - ) -> Result<(), Error> { let mut ops: Vec = Vec::new(); if state.slot() < self.get_split_slot() { self.store_cold_state(state_root, state, &mut ops)?; self.cold_db.do_atomically(ops) } else { - if temporary { - ops.push(TemporaryFlag.as_kv_store_op(*state_root)); - } self.store_hot_state(state_root, state, &mut ops)?; self.hot_db.do_atomically(ops) } @@ -1138,6 +1120,7 @@ impl, Cold: ItemStore> HotColdDB .load_hot_state(&epoch_boundary_state_root, true)? .ok_or(HotColdDBError::MissingEpochBoundaryState( epoch_boundary_state_root, + *state_root, ))?; Ok(Some(state)) } else { @@ -1201,17 +1184,6 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(summary.as_kv_store_op(state_root)); } - StoreOp::PutStateTemporaryFlag(state_root) => { - key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root)); - } - - StoreOp::DeleteStateTemporaryFlag(state_root) => { - key_value_batch.push(KeyValueStoreOp::DeleteKey( - TemporaryFlag::db_column(), - state_root.as_slice().to_vec(), - )); - } - StoreOp::DeleteBlock(block_root) => { key_value_batch.push(KeyValueStoreOp::DeleteKey( DBColumn::BeaconBlock, @@ -1241,13 +1213,6 @@ impl, Cold: ItemStore> HotColdDB state_root.as_slice().to_vec(), )); - // Delete the state temporary flag (if any). Temporary flags are commonly - // created by the state advance routine. - key_value_batch.push(KeyValueStoreOp::DeleteKey( - DBColumn::BeaconStateTemporary, - state_root.as_slice().to_vec(), - )); - if slot.is_none_or(|slot| slot % E::slots_per_epoch() == 0) { key_value_batch.push(KeyValueStoreOp::DeleteKey( DBColumn::BeaconState, @@ -1408,10 +1373,6 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutStateSummary(_, _) => (), - StoreOp::PutStateTemporaryFlag(_) => (), - - StoreOp::DeleteStateTemporaryFlag(_) => (), - StoreOp::DeleteBlock(block_root) => { guard.delete_block(&block_root); self.state_cache.lock().delete_block_states(&block_root); @@ -1492,8 +1453,8 @@ impl, Cold: ItemStore> HotColdDB // On the epoch boundary, store the full state. if state.slot() % E::slots_per_epoch() == 0 { - trace!( - slot = %state.slot().as_u64(), + debug!( + slot = %state.slot(), ?state_root, "Storing full state on epoch boundary" ); @@ -1571,12 +1532,6 @@ impl, Cold: ItemStore> HotColdDB ) -> Result, Hash256)>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); - // If the state is marked as temporary, do not return it. It will become visible - // only once its transaction commits and deletes its temporary flag. - if self.load_state_temporary_flag(state_root)?.is_some() { - return Ok(None); - } - if let Some(HotStateSummary { slot, latest_block_root, @@ -1585,7 +1540,10 @@ impl, Cold: ItemStore> HotColdDB { let mut boundary_state = get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), + HotColdDBError::MissingEpochBoundaryState( + epoch_boundary_state_root, + *state_root, + ), )?; // Immediately rebase the state from disk on the finalized state so that we can reuse @@ -2545,15 +2503,16 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.get(state_root) } - /// Load the temporary flag for a state root, if one exists. - /// - /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not - /// exist -- you should call `load_hot_state_summary` to find out which. - pub fn load_state_temporary_flag( - &self, - state_root: &Hash256, - ) -> Result, Error> { - self.hot_db.get(state_root) + /// Load all hot state summaries present in the hot DB + pub fn load_hot_state_summaries(&self) -> Result, Error> { + self.hot_db + .iter_column::(DBColumn::BeaconStateSummary) + .map(|res| { + let (state_root, value) = res?; + let summary = HotStateSummary::from_ssz_bytes(&value)?; + Ok((state_root, summary)) + }) + .collect() } /// Run a compaction pass to free up space used by deleted states. @@ -2985,54 +2944,13 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - - /// Prune states from the hot database which are prior to the split. - /// - /// This routine is important for cleaning up advanced states which are stored in the database - /// with a temporary flag. - pub fn prune_old_hot_states(&self) -> Result<(), Error> { - let split = self.get_split_info(); - debug!( - %split.slot, - "Database state pruning started" - ); - let mut state_delete_batch = vec![]; - for res in self - .hot_db - .iter_column::(DBColumn::BeaconStateSummary) - { - let (state_root, summary_bytes) = res?; - let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?; - - if summary.slot <= split.slot { - let old = summary.slot < split.slot; - let non_canonical = summary.slot == split.slot - && state_root != split.state_root - && !split.state_root.is_zero(); - if old || non_canonical { - let reason = if old { - "old dangling state" - } else { - "non-canonical" - }; - debug!( - ?state_root, - slot = %summary.slot, - %reason, - "Deleting state" - ); - state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot))); - } - } - } - let num_deleted_states = state_delete_batch.len(); - self.do_atomically_with_block_and_blobs_cache(state_delete_batch)?; - debug!(%num_deleted_states, "Database state pruning complete"); - Ok(()) - } } -/// Advance the split point of the store, moving new finalized states to the freezer. +/// Advance the split point of the store, copying new finalized states to the freezer. +/// +/// This function previously did a combination of freezer migration alongside pruning. Now it is +/// *just* responsible for copying relevant data to the freezer, while pruning is implemented +/// in `prune_hot_db`. pub fn migrate_database, Cold: ItemStore>( store: Arc>, finalized_state_root: Hash256, @@ -3064,29 +2982,17 @@ pub fn migrate_database, Cold: ItemStore>( return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); } - let mut hot_db_ops = vec![]; let mut cold_db_block_ops = vec![]; - let mut epoch_boundary_blocks = HashSet::new(); - let mut non_checkpoint_block_roots = HashSet::new(); // Iterate in descending order until the current split slot - let state_roots = RootsIterator::new(&store, finalized_state) - .take_while(|result| match result { - Ok((_, _, slot)) => *slot >= current_split_slot, - Err(_) => true, - }) - .collect::, _>>()?; + let state_roots: Vec<_> = + process_results(RootsIterator::new(&store, finalized_state), |iter| { + iter.take_while(|(_, _, slot)| *slot >= current_split_slot) + .collect() + })?; // Then, iterate states in slot ascending order, as they are stored wrt previous states. for (block_root, state_root, slot) in state_roots.into_iter().rev() { - // Delete the execution payload if payload pruning is enabled. At a skipped slot we may - // delete the payload for the finalized block itself, but that's OK as we only guarantee - // that payloads are present for slots >= the split slot. The payload fetching code is also - // forgiving of missing payloads. - if store.config.prune_payloads { - hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root)); - } - // Store the slot to block root mapping. cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue( DBColumn::BeaconBlockRoots, @@ -3094,44 +3000,27 @@ pub fn migrate_database, Cold: ItemStore>( block_root.as_slice().to_vec(), )); - // At a missed slot, `state_root_iter` will return the block root - // from the previous non-missed slot. This ensures that the block root at an - // epoch boundary is always a checkpoint block root. We keep track of block roots - // at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set. - // We then ensure that block roots at the epoch boundary aren't included in the - // `non_checkpoint_block_roots` hash set. - if slot % E::slots_per_epoch() == 0 { - epoch_boundary_blocks.insert(block_root); - } else { - non_checkpoint_block_roots.insert(block_root); - } - - if epoch_boundary_blocks.contains(&block_root) { - non_checkpoint_block_roots.remove(&block_root); - } - - // Delete the old summary, and the full state if we lie on an epoch boundary. - hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); - // Do not try to store states if a restore point is yet to be stored, or will never be // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state // which always needs to be copied from the hot DB to the freezer and should not be deleted. if slot != 0 && slot < anchor_info.state_upper_limit { - debug!(%slot, "Pruning finalized state"); continue; } - let mut cold_db_ops = vec![]; + let mut cold_db_state_ops = vec![]; // Only store the cold state if it's on a diff boundary. // Calling `store_cold_state_summary` instead of `store_cold_state` for those allows us // to skip loading many hot states. - if matches!( - store.hierarchy.storage_strategy(slot)?, - StorageStrategy::ReplayFrom(..) - ) { + if let StorageStrategy::ReplayFrom(from) = store.hierarchy.storage_strategy(slot)? { // Store slot -> state_root and state_root -> slot mappings. - store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; + debug!( + strategy = "replay", + from_slot = %from, + %slot, + "Storing cold state" + ); + store.store_cold_state_summary(&state_root, slot, &mut cold_db_state_ops)?; } else { // This is some state that we want to migrate to the freezer db. // There is no reason to cache this state. @@ -3139,36 +3028,22 @@ pub fn migrate_database, Cold: ItemStore>( .get_hot_state(&state_root, false)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; - store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; + store.store_cold_state(&state_root, &state, &mut cold_db_state_ops)?; } // Cold states are diffed with respect to each other, so we need to finish writing previous // states before storing new ones. - store.cold_db.do_atomically(cold_db_ops)?; + store.cold_db.do_atomically(cold_db_state_ops)?; } - // Prune sync committee branch data for all non checkpoint block roots. - // Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots - // as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case - // we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root. - // E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31) - // and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch` - // for a checkpoint block root. - non_checkpoint_block_roots - .into_iter() - .for_each(|block_root| { - hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(block_root)); - }); - - // Warning: Critical section. We have to take care not to put any of the two databases in an + // Warning: Critical section. We have to take care not to put any of the two databases in an // inconsistent state if the OS process dies at any point during the freezing // procedure. // // Since it is pretty much impossible to be atomic across more than one database, we trade - // losing track of states to delete, for consistency. In other words: We should be safe to die - // at any point below but it may happen that some states won't be deleted from the hot database - // and will remain there forever. Since dying in these particular few lines should be an - // exceedingly rare event, this should be an acceptable tradeoff. + // potentially re-doing the migration to copy data to the freezer, for consistency. If we crash + // after writing all new block & state data to the freezer but before updating the split, then + // in the worst case we will restart with the old split and re-run the migration. store.cold_db.do_atomically(cold_db_block_ops)?; store.cold_db.sync()?; { @@ -3181,7 +3056,7 @@ pub fn migrate_database, Cold: ItemStore>( error!( previous_split_slot = %current_split_slot, current_split_slot = %latest_split_slot, - "Race condition detected: Split point changed while moving states to the freezer" + "Race condition detected: Split point changed while copying states to the freezer" ); // Assume the freezing procedure will be retried in case this happens. @@ -3206,9 +3081,6 @@ pub fn migrate_database, Cold: ItemStore>( *split_guard = split; } - // Delete the blocks and states from the hot database if we got this far. - store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?; - // Update the cache's view of the finalized state. store.update_finalized_state( finalized_state_root, @@ -3325,23 +3197,6 @@ impl StoreItem for ColdStateSummary { } } -#[derive(Debug, Clone, Copy, Default)] -pub struct TemporaryFlag; - -impl StoreItem for TemporaryFlag { - fn db_column() -> DBColumn { - DBColumn::BeaconStateTemporary - } - - fn as_store_bytes(&self) -> Vec { - vec![] - } - - fn from_store_bytes(_: &[u8]) -> Result { - Ok(TemporaryFlag) - } -} - #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct BytesKey { pub key: Vec, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 2b5be03489..5b30971fd8 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -14,7 +14,6 @@ pub mod config; pub mod consensus_context; pub mod errors; mod forwards_iter; -mod garbage_collection; pub mod hdiff; pub mod historic_state_cache; pub mod hot_cold_store; @@ -241,8 +240,6 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlobs(Hash256, BlobSidecarList), PutDataColumns(Hash256, DataColumnSidecarList), PutStateSummary(Hash256, HotStateSummary), - PutStateTemporaryFlag(Hash256), - DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteBlobs(Hash256), DeleteDataColumns(Hash256, Vec), @@ -287,8 +284,10 @@ pub enum DBColumn { /// Mapping from state root to `ColdStateSummary` in the cold DB. #[strum(serialize = "bcs")] BeaconColdStateSummary, - /// For the list of temporary states stored during block import, - /// and then made non-temporary by the deletion of their state root from this column. + /// DEPRECATED. + /// + /// Previously used for the list of temporary states stored during block import, and then made + /// non-temporary by the deletion of their state root from this column. #[strum(serialize = "bst")] BeaconStateTemporary, /// Execution payloads for blocks more recent than the finalized checkpoint. diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 1d70e105b9..55c64bf850 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index cf6ebb3b00..cbae54bd36 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -1041,6 +1041,21 @@ impl ProtoArray { }) .map(|node| node.root) } + + /// Returns all nodes that have zero children and are descended from the finalized checkpoint. + /// + /// For informational purposes like the beacon HTTP API, we use this as the list of known heads, + /// even though some of them might not be viable. We do this to maintain consistency between the + /// definition of "head" used by pruning (which does not consider viability) and fork choice. + pub fn heads_descended_from_finalization(&self) -> Vec<&ProtoNode> { + self.nodes + .iter() + .filter(|node| { + node.best_child.is_none() + && self.is_finalized_checkpoint_or_descendant::(node.root) + }) + .collect() + } } /// A helper method to calculate the proposer boost based on the given `justified_balances`. diff --git a/consensus/proto_array/src/proto_array_fork_choice.rs b/consensus/proto_array/src/proto_array_fork_choice.rs index 4da632bf58..880c93d5c9 100644 --- a/consensus/proto_array/src/proto_array_fork_choice.rs +++ b/consensus/proto_array/src/proto_array_fork_choice.rs @@ -885,6 +885,11 @@ impl ProtoArrayForkChoice { pub fn core_proto_array_mut(&mut self) -> &mut ProtoArray { &mut self.proto_array } + + /// Returns all nodes that have zero children and are descended from the finalized checkpoint. + pub fn heads_descended_from_finalization(&self) -> Vec<&ProtoNode> { + self.proto_array.heads_descended_from_finalization::() + } } /// Returns a list of `deltas`, where there is one delta for each of the indices in