diff --git a/Cargo.lock b/Cargo.lock index 18e9e8b479..a5009e06c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,7 +2,7 @@ # It is not intended for manual editing. [[package]] name = "account_manager" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_utils", "bls", @@ -373,7 +373,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "0.2.6" +version = "0.2.7" dependencies = [ "beacon_chain", "clap", @@ -530,7 +530,7 @@ dependencies = [ [[package]] name = "boot_node" -version = "0.2.6" +version = "0.2.7" dependencies = [ "beacon_node", "clap", @@ -2537,7 +2537,7 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "lcli" -version = "0.2.6" +version = "0.2.7" dependencies = [ "bls", "clap", @@ -2894,7 +2894,7 @@ dependencies = [ [[package]] name = "lighthouse" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_manager", "account_utils", @@ -6036,7 +6036,7 @@ dependencies = [ [[package]] name = "validator_client" -version = "0.2.6" +version = "0.2.7" dependencies = [ "account_utils", "bls", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ef94274fab..0f7ed21b4b 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -426,6 +426,17 @@ impl BeaconChain { Ok(iter) } + /// As for `rev_iter_state_roots` but starting from an arbitrary `BeaconState`. + pub fn rev_iter_state_roots_from<'a>( + &self, + state_root: Hash256, + state: &'a BeaconState, + ) -> impl Iterator> + 'a { + std::iter::once(Ok((state_root, state.slot))) + .chain(StateRootsIterator::new(self.store.clone(), state)) + .map(|result| result.map_err(Into::into)) + } + /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. /// /// ## Errors @@ -479,30 +490,36 @@ impl BeaconChain { /// is the state as it was when the head block was received, which could be some slots prior to /// now. pub fn head(&self) -> Result, Error> { - self.canonical_head + self.with_head(|head| Ok(head.clone_with_only_committee_caches())) + } + + /// Apply a function to the canonical head without cloning it. + pub fn with_head( + &self, + f: impl FnOnce(&BeaconSnapshot) -> Result, + ) -> Result { + let head_lock = self + .canonical_head .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or_else(|| Error::CanonicalHeadLockTimeout) - .map(|v| v.clone_with_only_committee_caches()) + .ok_or_else(|| Error::CanonicalHeadLockTimeout)?; + f(&head_lock) } /// Returns info representing the head block and state. /// /// A summarized version of `Self::head` that involves less cloning. pub fn head_info(&self) -> Result { - let head = self - .canonical_head - .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or_else(|| Error::CanonicalHeadLockTimeout)?; - - Ok(HeadInfo { - slot: head.beacon_block.slot(), - block_root: head.beacon_block_root, - state_root: head.beacon_state_root, - current_justified_checkpoint: head.beacon_state.current_justified_checkpoint, - finalized_checkpoint: head.beacon_state.finalized_checkpoint, - fork: head.beacon_state.fork, - genesis_time: head.beacon_state.genesis_time, - genesis_validators_root: head.beacon_state.genesis_validators_root, + self.with_head(|head| { + Ok(HeadInfo { + slot: head.beacon_block.slot(), + block_root: head.beacon_block_root, + state_root: head.beacon_state_root, + current_justified_checkpoint: head.beacon_state.current_justified_checkpoint, + finalized_checkpoint: head.beacon_state.finalized_checkpoint, + fork: head.beacon_state.fork, + genesis_time: head.beacon_state.genesis_time, + genesis_validators_root: head.beacon_state.genesis_validators_root, + }) }) } @@ -1746,7 +1763,7 @@ impl BeaconChain { let beacon_block_root = self.fork_choice.write().get_head(self.slot()?)?; let current_head = self.head_info()?; - let old_finalized_root = current_head.finalized_checkpoint.root; + let old_finalized_checkpoint = current_head.finalized_checkpoint; if beacon_block_root == current_head.block_root { return Ok(()); @@ -1826,15 +1843,32 @@ impl BeaconChain { ); }; - let old_finalized_epoch = current_head.finalized_checkpoint.epoch; - let new_finalized_epoch = new_head.beacon_state.finalized_checkpoint.epoch; - let finalized_root = new_head.beacon_state.finalized_checkpoint.root; + let new_finalized_checkpoint = new_head.beacon_state.finalized_checkpoint; + // State root of the finalized state on the epoch boundary, NOT the state + // of the finalized block. We need to use an iterator in case the state is beyond + // the reach of the new head's `state_roots` array. + let new_finalized_slot = new_finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()); + let new_finalized_state_root = process_results( + StateRootsIterator::new(self.store.clone(), &new_head.beacon_state), + |mut iter| { + iter.find_map(|(state_root, slot)| { + if slot == new_finalized_slot { + Some(state_root) + } else { + None + } + }) + }, + )? + .ok_or_else(|| Error::MissingFinalizedStateRoot(new_finalized_slot))?; // It is an error to try to update to a head with a lesser finalized epoch. - if new_finalized_epoch < old_finalized_epoch { + if new_finalized_checkpoint.epoch < old_finalized_checkpoint.epoch { return Err(Error::RevertedFinalizedEpoch { - previous_epoch: old_finalized_epoch, - new_epoch: new_finalized_epoch, + previous_epoch: old_finalized_checkpoint.epoch, + new_epoch: new_finalized_checkpoint.epoch, }); } @@ -1873,11 +1907,11 @@ impl BeaconChain { ); }); - if new_finalized_epoch != old_finalized_epoch { + if new_finalized_checkpoint.epoch != old_finalized_checkpoint.epoch { self.after_finalization( - old_finalized_epoch, - finalized_root, - old_finalized_root.into(), + old_finalized_checkpoint, + new_finalized_checkpoint, + new_finalized_state_root, )?; } @@ -1905,68 +1939,53 @@ impl BeaconChain { /// Performs pruning and finality-based optimizations. fn after_finalization( &self, - old_finalized_epoch: Epoch, - finalized_block_root: Hash256, - old_finalized_root: SignedBeaconBlockHash, + old_finalized_checkpoint: Checkpoint, + new_finalized_checkpoint: Checkpoint, + new_finalized_state_root: Hash256, ) -> Result<(), Error> { - let finalized_block = self - .store - .get_block(&finalized_block_root)? - .ok_or_else(|| Error::MissingBeaconBlock(finalized_block_root))? - .message; + self.fork_choice.write().prune()?; - let new_finalized_epoch = finalized_block.slot.epoch(T::EthSpec::slots_per_epoch()); + self.observed_block_producers.prune( + new_finalized_checkpoint + .epoch + .start_slot(T::EthSpec::slots_per_epoch()), + ); - if new_finalized_epoch < old_finalized_epoch { - Err(Error::RevertedFinalizedEpoch { - previous_epoch: old_finalized_epoch, - new_epoch: new_finalized_epoch, + self.snapshot_cache + .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) + .map(|mut snapshot_cache| { + snapshot_cache.prune(new_finalized_checkpoint.epoch); }) - } else { - self.fork_choice.write().prune()?; - - self.observed_block_producers - .prune(new_finalized_epoch.start_slot(T::EthSpec::slots_per_epoch())); - - self.snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .map(|mut snapshot_cache| { - snapshot_cache.prune(new_finalized_epoch); - }) - .unwrap_or_else(|| { - error!( - self.log, - "Failed to obtain cache write lock"; - "lock" => "snapshot_cache", - "task" => "prune" - ); - }); - - let finalized_state = self - .get_state(&finalized_block.state_root, Some(finalized_block.slot))? - .ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?; - - self.op_pool - .prune_all(&finalized_state, self.head_info()?.fork); - - // TODO: configurable max finality distance - let max_finality_distance = 0; - self.store_migrator.process_finalization( - finalized_block.state_root, - finalized_state, - max_finality_distance, - Arc::clone(&self.head_tracker), - old_finalized_root, - finalized_block_root.into(), - ); - - let _ = self.event_handler.register(EventKind::BeaconFinalization { - epoch: new_finalized_epoch, - root: finalized_block_root, + .unwrap_or_else(|| { + error!( + self.log, + "Failed to obtain cache write lock"; + "lock" => "snapshot_cache", + "task" => "prune" + ); }); - Ok(()) - } + let finalized_state = self + .get_state(&new_finalized_state_root, None)? + .ok_or_else(|| Error::MissingBeaconState(new_finalized_state_root))?; + + self.op_pool + .prune_all(&finalized_state, self.head_info()?.fork); + + self.store_migrator.process_finalization( + new_finalized_state_root.into(), + finalized_state, + self.head_tracker.clone(), + old_finalized_checkpoint, + new_finalized_checkpoint, + ); + + let _ = self.event_handler.register(EventKind::BeaconFinalization { + epoch: new_finalized_checkpoint.epoch, + root: new_finalized_checkpoint.root, + }); + + Ok(()) } /// Returns `true` if the given block root has not been processed. diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 5c017a3e8e..96f1c9a841 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -1,5 +1,6 @@ use crate::beacon_chain::ForkChoiceError; use crate::eth1_chain::Error as Eth1ChainError; +use crate::migrate::PruningError; use crate::naive_aggregation_pool::Error as NaiveAggregationError; use crate::observed_attestations::Error as ObservedAttestationsError; use crate::observed_attesters::Error as ObservedAttestersError; @@ -61,6 +62,7 @@ pub enum BeaconChainError { requested_slot: Slot, max_task_runtime: Duration, }, + MissingFinalizedStateRoot(Slot), /// Returned when an internal check fails, indicating corrupt data. InvariantViolated(String), SszTypesError(SszTypesError), @@ -79,6 +81,7 @@ pub enum BeaconChainError { ObservedAttestationsError(ObservedAttestationsError), ObservedAttestersError(ObservedAttestersError), ObservedBlockProducersError(ObservedBlockProducersError), + PruningError(PruningError), ArithError(ArithError), } @@ -94,6 +97,7 @@ easy_from_to!(ObservedAttestationsError, BeaconChainError); easy_from_to!(ObservedAttestersError, BeaconChainError); easy_from_to!(ObservedBlockProducersError, BeaconChainError); easy_from_to!(BlockSignatureVerifierError, BeaconChainError); +easy_from_to!(PruningError, BeaconChainError); easy_from_to!(ArithError, BeaconChainError); #[derive(Debug)] diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 83eae0ae71..47c5ef97e9 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -1,18 +1,34 @@ use crate::errors::BeaconChainError; use crate::head_tracker::HeadTracker; use parking_lot::Mutex; -use slog::{debug, warn, Logger}; +use slog::{debug, error, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; use std::sync::mpsc; use std::sync::Arc; use std::thread; use store::hot_cold_store::{process_finalization, HotColdDBError}; -use store::iter::{ParentRootBlockIterator, RootsIterator}; +use store::iter::RootsIterator; use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; -use types::*; -use types::{BeaconState, EthSpec, Hash256, Slot}; +use types::{ + BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, EthSpec, Hash256, + SignedBeaconBlockHash, Slot, +}; + +/// Logic errors that can occur during pruning, none of these should ever happen. +#[derive(Debug)] +pub enum PruningError { + IncorrectFinalizedState { + state_slot: Slot, + new_finalized_slot: Slot, + }, + MissingInfoForCanonicalChain { + slot: Slot, + }, + UnexpectedEqualStateRoots, + UnexpectedUnequalStateRoots, +} /// Trait for migration processes that update the database upon finalization. pub trait Migrate, Cold: ItemStore>: @@ -22,17 +38,16 @@ pub trait Migrate, Cold: ItemStore>: fn process_finalization( &self, - _state_root: Hash256, + _finalized_state_root: BeaconStateHash, _new_finalized_state: BeaconState, - _max_finality_distance: u64, _head_tracker: Arc, - _old_finalized_block_hash: SignedBeaconBlockHash, - _new_finalized_block_hash: SignedBeaconBlockHash, + _old_finalized_checkpoint: Checkpoint, + _new_finalized_checkpoint: Checkpoint, ) { } /// Traverses live heads and prunes blocks and states of chains that we know can't be built - /// upon because finalization would prohibit it. This is an optimisation intended to save disk + /// upon because finalization would prohibit it. This is an optimisation intended to save disk /// space. /// /// Assumptions: @@ -40,37 +55,63 @@ pub trait Migrate, Cold: ItemStore>: fn prune_abandoned_forks( store: Arc>, head_tracker: Arc, - old_finalized_block_hash: SignedBeaconBlockHash, - new_finalized_block_hash: SignedBeaconBlockHash, - new_finalized_slot: Slot, + new_finalized_state_hash: BeaconStateHash, + new_finalized_state: &BeaconState, + old_finalized_checkpoint: Checkpoint, + new_finalized_checkpoint: Checkpoint, + log: &Logger, ) -> Result<(), BeaconChainError> { // There will never be any blocks to prune if there is only a single head in the chain. if head_tracker.heads().len() == 1 { return Ok(()); } - let old_finalized_slot = store - .get_block(&old_finalized_block_hash.into())? - .ok_or_else(|| BeaconChainError::MissingBeaconBlock(old_finalized_block_hash.into()))? - .slot(); + let old_finalized_slot = old_finalized_checkpoint + .epoch + .start_slot(E::slots_per_epoch()); + let new_finalized_slot = new_finalized_checkpoint + .epoch + .start_slot(E::slots_per_epoch()); + let new_finalized_block_hash = new_finalized_checkpoint.root.into(); - // Collect hashes from new_finalized_block back to old_finalized_block (inclusive) - let mut found_block = false; // hack for `take_until` - let newly_finalized_blocks: HashMap = - ParentRootBlockIterator::new(&*store, new_finalized_block_hash.into()) - .take_while(|result| match result { - Ok((block_hash, _)) => { - if found_block { - false - } else { - found_block |= *block_hash == old_finalized_block_hash.into(); - true - } - } - Err(_) => true, - }) - .map(|result| result.map(|(block_hash, block)| (block_hash.into(), block.slot()))) - .collect::>()?; + // The finalized state must be for the epoch boundary slot, not the slot of the finalized + // block. + if new_finalized_state.slot != new_finalized_slot { + return Err(PruningError::IncorrectFinalizedState { + state_slot: new_finalized_state.slot, + new_finalized_slot, + } + .into()); + } + + debug!( + log, + "Starting database pruning"; + "old_finalized_epoch" => old_finalized_checkpoint.epoch, + "old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root), + "new_finalized_epoch" => new_finalized_checkpoint.epoch, + "new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root), + ); + + // For each slot between the new finalized checkpoint and the old finalized checkpoint, + // collect the beacon block root and state root of the canonical chain. + let newly_finalized_chain: HashMap = + std::iter::once(Ok(( + new_finalized_slot, + (new_finalized_block_hash, new_finalized_state_hash), + ))) + .chain( + RootsIterator::new(store.clone(), new_finalized_state).map(|res| { + res.map(|(block_root, state_root, slot)| { + (slot, (block_root.into(), state_root.into())) + }) + }), + ) + .take_while(|res| { + res.as_ref() + .map_or(true, |(slot, _)| *slot >= old_finalized_slot) + }) + .collect::>()?; // We don't know which blocks are shared among abandoned chains, so we buffer and delete // everything in one fell swoop. @@ -79,75 +120,110 @@ pub trait Migrate, Cold: ItemStore>: let mut abandoned_heads: HashSet = HashSet::new(); for (head_hash, head_slot) in head_tracker.heads() { - let mut potentially_abandoned_head: Option = Some(head_hash); - let mut potentially_abandoned_blocks: Vec<( - Slot, - Option, - Option, - )> = Vec::new(); + let mut potentially_abandoned_head = Some(head_hash); + let mut potentially_abandoned_blocks = vec![]; let head_state_hash = store .get_block(&head_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))? .state_root(); + // Iterate backwards from this head, staging blocks and states for deletion. let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot))) - .chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?); + .chain(RootsIterator::from_block(store.clone(), head_hash)?); + for maybe_tuple in iter { - let (block_hash, state_hash, slot) = maybe_tuple?; - if slot < old_finalized_slot { - // We must assume here any candidate chains include old_finalized_block_hash, - // i.e. there aren't any forks starting at a block that is a strict ancestor of - // old_finalized_block_hash. - break; - } - match newly_finalized_blocks.get(&block_hash.into()).copied() { - // Block is not finalized, mark it and its state for deletion + let (block_root, state_root, slot) = maybe_tuple?; + let block_root = SignedBeaconBlockHash::from(block_root); + let state_root = BeaconStateHash::from(state_root); + + match newly_finalized_chain.get(&slot) { + // If there's no information about a slot on the finalized chain, then + // it should be because it's ahead of the new finalized slot. Stage + // the fork's block and state for possible deletion. None => { - potentially_abandoned_blocks.push(( - slot, - Some(block_hash.into()), - Some(state_hash.into()), - )); + if slot > new_finalized_slot { + potentially_abandoned_blocks.push(( + slot, + Some(block_root), + Some(state_root), + )); + } else if slot >= old_finalized_slot { + return Err(PruningError::MissingInfoForCanonicalChain { slot }.into()); + } else { + // We must assume here any candidate chains include the old finalized + // checkpoint, i.e. there aren't any forks starting at a block that is a + // strict ancestor of old_finalized_checkpoint. + warn!( + log, + "Found a chain that should already have been pruned"; + "head_block_root" => format!("{:?}", head_hash), + "head_slot" => head_slot, + ); + break; + } } - Some(finalized_slot) => { - // Block root is finalized, and we have reached the slot it was finalized - // at: we've hit a shared part of the chain. - if finalized_slot == slot { - // The first finalized block of a candidate chain lies after (in terms - // of slots order) the newly finalized block. It's not a candidate for - // prunning. - if finalized_slot == new_finalized_slot { + Some((finalized_block_root, finalized_state_root)) => { + // This fork descends from a newly finalized block, we can stop. + if block_root == *finalized_block_root { + // Sanity check: if the slot and block root match, then the + // state roots should match too. + if state_root != *finalized_state_root { + return Err(PruningError::UnexpectedUnequalStateRoots.into()); + } + + // If the fork descends from the whole finalized chain, + // do not prune it. Otherwise continue to delete all + // of the blocks and states that have been staged for + // deletion so far. + if slot == new_finalized_slot { potentially_abandoned_blocks.clear(); potentially_abandoned_head.take(); } - + // If there are skipped slots on the fork to be pruned, then + // we will have just staged the common block for deletion. + // Unstage it. + else { + for (_, block_root, _) in + potentially_abandoned_blocks.iter_mut().rev() + { + if block_root.as_ref() == Some(finalized_block_root) { + *block_root = None; + } else { + break; + } + } + } break; - } - // Block root is finalized, but we're at a skip slot: delete the state only. - else { + } else { + if state_root == *finalized_state_root { + return Err(PruningError::UnexpectedEqualStateRoots.into()); + } potentially_abandoned_blocks.push(( slot, - None, - Some(state_hash.into()), + Some(block_root), + Some(state_root), )); } } } } - abandoned_heads.extend(potentially_abandoned_head.into_iter()); - if !potentially_abandoned_blocks.is_empty() { + if let Some(abandoned_head) = potentially_abandoned_head { + debug!( + log, + "Pruning head"; + "head_block_root" => format!("{:?}", abandoned_head), + "head_slot" => head_slot, + ); + abandoned_heads.insert(abandoned_head); abandoned_blocks.extend( potentially_abandoned_blocks .iter() .filter_map(|(_, maybe_block_hash, _)| *maybe_block_hash), ); abandoned_states.extend(potentially_abandoned_blocks.iter().filter_map( - |(slot, _, maybe_state_hash)| match maybe_state_hash { - None => None, - Some(state_hash) => Some((*slot, *state_hash)), - }, + |(slot, _, maybe_state_hash)| maybe_state_hash.map(|sr| (*slot, sr)), )); } } @@ -166,6 +242,8 @@ pub trait Migrate, Cold: ItemStore>: head_tracker.remove_head(head_hash); } + debug!(log, "Database pruning complete"); + Ok(()) } } @@ -184,48 +262,52 @@ impl, Cold: ItemStore> Migrate fo /// Mostly useful for tests. pub struct BlockingMigrator, Cold: ItemStore> { db: Arc>, + log: Logger, } impl, Cold: ItemStore> Migrate for BlockingMigrator { - fn new(db: Arc>, _: Logger) -> Self { - BlockingMigrator { db } + fn new(db: Arc>, log: Logger) -> Self { + BlockingMigrator { db, log } } fn process_finalization( &self, - state_root: Hash256, + finalized_state_root: BeaconStateHash, new_finalized_state: BeaconState, - _max_finality_distance: u64, head_tracker: Arc, - old_finalized_block_hash: SignedBeaconBlockHash, - new_finalized_block_hash: SignedBeaconBlockHash, + old_finalized_checkpoint: Checkpoint, + new_finalized_checkpoint: Checkpoint, ) { if let Err(e) = Self::prune_abandoned_forks( self.db.clone(), head_tracker, - old_finalized_block_hash, - new_finalized_block_hash, - new_finalized_state.slot, + finalized_state_root, + &new_finalized_state, + old_finalized_checkpoint, + new_finalized_checkpoint, + &self.log, ) { - eprintln!("Pruning error: {:?}", e); + error!(&self.log, "Pruning error"; "error" => format!("{:?}", e)); } - if let Err(e) = process_finalization(self.db.clone(), state_root, &new_finalized_state) { - // This migrator is only used for testing, so we just log to stderr without a logger. - eprintln!("Migration error: {:?}", e); + if let Err(e) = process_finalization( + self.db.clone(), + finalized_state_root.into(), + &new_finalized_state, + ) { + error!(&self.log, "Migration error"; "error" => format!("{:?}", e)); } } } type MpscSender = mpsc::Sender<( - Hash256, + BeaconStateHash, BeaconState, Arc, - SignedBeaconBlockHash, - SignedBeaconBlockHash, - Slot, + Checkpoint, + Checkpoint, )>; /// Migrator that runs a background thread to migrate state from the hot to the cold database. @@ -243,34 +325,26 @@ impl, Cold: ItemStore> Migrate Self { db, tx_thread, log } } - /// Perform the freezing operation on the database, fn process_finalization( &self, - finalized_state_root: Hash256, + finalized_state_root: BeaconStateHash, new_finalized_state: BeaconState, - max_finality_distance: u64, head_tracker: Arc, - old_finalized_block_hash: SignedBeaconBlockHash, - new_finalized_block_hash: SignedBeaconBlockHash, + old_finalized_checkpoint: Checkpoint, + new_finalized_checkpoint: Checkpoint, ) { - if !self.needs_migration(new_finalized_state.slot, max_finality_distance) { - return; - } - let (ref mut tx, ref mut thread) = *self.tx_thread.lock(); - let new_finalized_slot = new_finalized_state.slot; if let Err(tx_err) = tx.send(( finalized_state_root, new_finalized_state, head_tracker, - old_finalized_block_hash, - new_finalized_block_hash, - new_finalized_slot, + old_finalized_checkpoint, + new_finalized_checkpoint, )) { let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone()); - drop(mem::replace(tx, new_tx)); + *tx = new_tx; let old_thread = mem::replace(thread, new_thread); // Join the old thread, which will probably have panicked, or may have @@ -290,53 +364,37 @@ impl, Cold: ItemStore> Migrate } impl, Cold: ItemStore> BackgroundMigrator { - /// Return true if a migration needs to be performed, given a new `finalized_slot`. - fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool { - let finality_distance = finalized_slot - self.db.get_split_slot(); - finality_distance > max_finality_distance - } - - #[allow(clippy::type_complexity)] /// Spawn a new child thread to run the migration process. /// /// Return a channel handle for sending new finalized states to the thread. fn spawn_thread( db: Arc>, log: Logger, - ) -> ( - mpsc::Sender<( - Hash256, - BeaconState, - Arc, - SignedBeaconBlockHash, - SignedBeaconBlockHash, - Slot, - )>, - thread::JoinHandle<()>, - ) { + ) -> (MpscSender, thread::JoinHandle<()>) { let (tx, rx) = mpsc::channel(); let thread = thread::spawn(move || { while let Ok(( state_root, state, head_tracker, - old_finalized_block_hash, - new_finalized_block_hash, - new_finalized_slot, + old_finalized_checkpoint, + new_finalized_checkpoint, )) = rx.recv() { match Self::prune_abandoned_forks( db.clone(), head_tracker, - old_finalized_block_hash, - new_finalized_block_hash, - new_finalized_slot, + state_root, + &state, + old_finalized_checkpoint, + new_finalized_checkpoint, + &log, ) { Ok(()) => {} Err(e) => warn!(log, "Block pruning failed: {:?}", e), } - match process_finalization(db.clone(), state_root, &state) { + match process_finalization(db.clone(), state_root.into(), &state) { Ok(()) => {} Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { debug!( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a8e86db8ba..7262d7d334 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -298,7 +298,7 @@ where let mut head_block_root = None; loop { - let (block, new_state) = self.build_block(state.clone(), slot, block_strategy); + let (block, new_state) = self.build_block(state.clone(), slot); if !predicate(&block, &new_state) { break; @@ -339,7 +339,7 @@ where let slot = self.chain.slot().unwrap(); - self.build_block(state, slot, BlockStrategy::OnCanonicalHead) + self.build_block(state, slot) } /// A simple method to produce and process all attestation at the current slot. Always uses @@ -368,12 +368,9 @@ where self.chain.head().unwrap().beacon_state } - /// Adds a single block (synchronously) onto either the canonical chain (block_strategy == - /// OnCanonicalHead) or a fork (block_strategy == ForkCanonicalChainAt). pub fn add_block( &self, state: &BeaconState, - block_strategy: BlockStrategy, slot: Slot, validators: &[usize], ) -> (SignedBeaconBlockHash, BeaconState) { @@ -381,7 +378,7 @@ where self.advance_slot(); } - let (block, new_state) = self.build_block(state.clone(), slot, block_strategy); + let (block, new_state) = self.build_block(state.clone(), slot); let block_root = self .chain @@ -395,15 +392,14 @@ where (block_root.into(), new_state) } - #[allow(clippy::type_complexity)] /// `add_block()` repeated `num_blocks` times. + #[allow(clippy::type_complexity)] pub fn add_blocks( &self, mut state: BeaconState, mut slot: Slot, num_blocks: usize, attesting_validators: &[usize], - block_strategy: BlockStrategy, ) -> ( HashMap, HashMap, @@ -414,8 +410,7 @@ where let mut blocks: HashMap = HashMap::with_capacity(num_blocks); let mut states: HashMap = HashMap::with_capacity(num_blocks); for _ in 0..num_blocks { - let (new_root_hash, new_state) = - self.add_block(&state, block_strategy, slot, attesting_validators); + let (new_root_hash, new_state) = self.add_block(&state, slot, attesting_validators); blocks.insert(slot, new_root_hash); states.insert(slot, new_state.tree_hash_root().into()); state = new_state; @@ -426,7 +421,6 @@ where } #[allow(clippy::type_complexity)] - /// A wrapper on `add_blocks()` to avoid passing enums explicitly. pub fn add_canonical_chain_blocks( &self, state: BeaconState, @@ -440,18 +434,10 @@ where SignedBeaconBlockHash, BeaconState, ) { - let block_strategy = BlockStrategy::OnCanonicalHead; - self.add_blocks( - state, - slot, - num_blocks, - attesting_validators, - block_strategy, - ) + self.add_blocks(state, slot, num_blocks, attesting_validators) } #[allow(clippy::type_complexity)] - /// A wrapper on `add_blocks()` to avoid passing enums explicitly. pub fn add_stray_blocks( &self, state: BeaconState, @@ -465,17 +451,7 @@ where SignedBeaconBlockHash, BeaconState, ) { - let block_strategy = BlockStrategy::ForkCanonicalChainAt { - previous_slot: slot, - first_slot: slot + 2, - }; - self.add_blocks( - state, - slot + 2, - num_blocks, - attesting_validators, - block_strategy, - ) + self.add_blocks(state, slot + 2, num_blocks, attesting_validators) } /// Returns a newly created block, signed by the proposer for the given slot. @@ -483,8 +459,8 @@ where &self, mut state: BeaconState, slot: Slot, - block_strategy: BlockStrategy, ) -> (SignedBeaconBlock, BeaconState) { + assert_ne!(slot, 0); if slot < state.slot { panic!("produce slot cannot be prior to the state slot"); } @@ -498,15 +474,9 @@ where .build_all_caches(&self.spec) .expect("should build caches"); - let proposer_index = match block_strategy { - BlockStrategy::OnCanonicalHead => self - .chain - .block_proposer(slot) - .expect("should get block proposer from chain"), - _ => state - .get_beacon_proposer_index(slot, &self.spec) - .expect("should get block proposer from state"), - }; + let proposer_index = state + .get_beacon_proposer_index(slot, &self.spec) + .expect("should get block proposer from state"); let sk = &self.keypairs[proposer_index].sk; let fork = &state.fork; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index bafef37882..8d322f83e8 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1292,6 +1292,226 @@ fn prunes_skipped_slots_states() { } } +fn check_all_blocks_exist<'a>( + harness: &TestHarness, + blocks: impl Iterator, +) { + for &block_hash in blocks { + let block = harness.chain.get_block(&block_hash.into()).unwrap(); + assert!( + block.is_some(), + "expected block {:?} to be in DB", + block_hash + ); + } +} + +fn check_all_states_exist<'a>( + harness: &TestHarness, + states: impl Iterator, +) { + for &state_hash in states { + let state = harness.chain.get_state(&state_hash.into(), None).unwrap(); + assert!( + state.is_some(), + "expected state {:?} to be in DB", + state_hash, + ); + } +} + +// Check that none of the given states exist in the database. +fn check_no_states_exist<'a>( + harness: &TestHarness, + states: impl Iterator, +) { + for &state_root in states { + assert!( + harness + .chain + .get_state(&state_root.into(), None) + .unwrap() + .is_none(), + "state {:?} should not be in the DB", + state_root + ); + } +} + +// Check that none of the given blocks exist in the database. +fn check_no_blocks_exist<'a>( + harness: &TestHarness, + blocks: impl Iterator, +) { + for &block_hash in blocks { + let block = harness.chain.get_block(&block_hash.into()).unwrap(); + assert!( + block.is_none(), + "did not expect block {:?} to be in the DB", + block_hash + ); + } +} + +#[test] +fn prune_single_block_fork() { + let slots_per_epoch = E::slots_per_epoch() as usize; + pruning_test(3 * slots_per_epoch, 1, slots_per_epoch, 0, 1); +} + +#[test] +fn prune_single_block_long_skip() { + let slots_per_epoch = E::slots_per_epoch() as usize; + pruning_test( + 2 * slots_per_epoch, + 1, + slots_per_epoch, + 2 * slots_per_epoch as u64, + 1, + ); +} + +#[test] +fn prune_shared_skip_states_mid_epoch() { + let slots_per_epoch = E::slots_per_epoch() as usize; + pruning_test( + slots_per_epoch + slots_per_epoch / 2, + 1, + slots_per_epoch, + 2, + slots_per_epoch - 1, + ); +} + +#[test] +fn prune_shared_skip_states_epoch_boundaries() { + let slots_per_epoch = E::slots_per_epoch() as usize; + pruning_test(slots_per_epoch - 1, 1, slots_per_epoch, 2, slots_per_epoch); + pruning_test(slots_per_epoch - 1, 2, slots_per_epoch, 1, slots_per_epoch); + pruning_test( + 2 * slots_per_epoch + slots_per_epoch / 2, + slots_per_epoch as u64 / 2, + slots_per_epoch, + slots_per_epoch as u64 / 2 + 1, + slots_per_epoch, + ); + pruning_test( + 2 * slots_per_epoch + slots_per_epoch / 2, + slots_per_epoch as u64 / 2, + slots_per_epoch, + slots_per_epoch as u64 / 2 + 1, + slots_per_epoch, + ); + pruning_test( + 2 * slots_per_epoch - 1, + slots_per_epoch as u64, + 1, + 0, + 2 * slots_per_epoch, + ); +} + +/// Generic harness for pruning tests. +fn pruning_test( + // Number of blocks to start the chain with before forking. + num_initial_blocks: usize, + // Number of skip slots on the main chain after the initial blocks. + num_canonical_skips: u64, + // Number of blocks on the main chain after the skip, but before the finalisation-triggering + // blocks. + num_canonical_middle_blocks: usize, + // Number of skip slots on the fork chain after the initial blocks. + num_fork_skips: u64, + // Number of blocks on the fork chain after the skips. + num_fork_blocks: usize, +) { + const VALIDATOR_COUNT: usize = 24; + const VALIDATOR_SUPERMAJORITY: usize = (VALIDATOR_COUNT / 3) * 2; + const HONEST_VALIDATOR_COUNT: usize = VALIDATOR_SUPERMAJORITY; + + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); + let faulty_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); + let slots_per_epoch = MinimalEthSpec::slots_per_epoch() as usize; + + let (_, _, divergence_slot, _, divergence_state) = harness.add_blocks( + harness.get_head_state(), + harness.get_chain_slot(), + num_initial_blocks, + &honest_validators, + ); + + let (_, _, canonical_slot, _, canonical_state) = harness.add_blocks( + divergence_state.clone(), + divergence_slot + num_canonical_skips, + num_canonical_middle_blocks, + &honest_validators, + ); + + let (stray_blocks, stray_states, stray_slot, _, stray_head_state) = harness.add_blocks( + divergence_state.clone(), + divergence_slot + num_fork_skips, + num_fork_blocks, + &faulty_validators, + ); + + let stray_head_state_root = stray_states[&(stray_slot - 1)]; + let stray_states = harness + .chain + .rev_iter_state_roots_from(stray_head_state_root.into(), &stray_head_state) + .map(Result::unwrap) + .map(|(state_root, _)| state_root.into()) + .collect::>(); + + check_all_blocks_exist(&harness, stray_blocks.values()); + check_all_states_exist(&harness, stray_states.iter()); + + let chain_dump = harness.chain.chain_dump().unwrap(); + assert_eq!( + get_finalized_epoch_boundary_blocks(&chain_dump), + vec![Hash256::zero().into()].into_iter().collect(), + ); + + // Trigger finalization + let num_finalization_blocks = 4 * slots_per_epoch; + let (_, _, _, _, _) = harness.add_blocks( + canonical_state, + canonical_slot, + num_finalization_blocks, + &honest_validators, + ); + + // Check that finalization has advanced past the divergence slot. + assert!( + harness + .chain + .head_info() + .unwrap() + .finalized_checkpoint + .epoch + .start_slot(E::slots_per_epoch()) + > divergence_slot + ); + check_chain_dump( + &harness, + (num_initial_blocks + num_canonical_middle_blocks + num_finalization_blocks + 1) as u64, + ); + + let all_canonical_states = harness + .chain + .rev_iter_state_roots() + .unwrap() + .map(Result::unwrap) + .map(|(state_root, _)| state_root.into()) + .collect::>(); + + check_all_states_exist(&harness, all_canonical_states.iter()); + check_no_states_exist(&harness, stray_states.difference(&all_canonical_states)); + check_no_blocks_exist(&harness, stray_blocks.values()); +} + /// Check that the head state's slot matches `expected_slot`. fn check_slot(harness: &TestHarness, expected_slot: u64) { let state = &harness.chain.head().expect("should get head").beacon_state; @@ -1396,18 +1616,31 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { } } -/// Check that state and block root iterators can reach genesis +/// Check that every state from the canonical chain is in the database, and that the +/// reverse state and block root iterators reach genesis. fn check_iterators(harness: &TestHarness) { - assert_eq!( - harness - .chain - .rev_iter_state_roots() - .expect("should get iter") - .last() - .map(Result::unwrap) - .map(|(_, slot)| slot), - Some(Slot::new(0)) - ); + let mut min_slot = None; + for (state_root, slot) in harness + .chain + .rev_iter_state_roots() + .expect("should get iter") + .map(Result::unwrap) + { + assert!( + harness + .chain + .store + .get_state(&state_root, Some(slot)) + .unwrap() + .is_some(), + "state {:?} from canonical chain should be in DB", + state_root + ); + min_slot = Some(slot); + } + // Assert that we reached genesis. + assert_eq!(min_slot, Some(Slot::new(0))); + // Assert that the block root iterator reaches genesis. assert_eq!( harness .chain diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 692e747d79..3378d43daa 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -14,7 +14,7 @@ use crate::{ }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; -use slog::{debug, error, trace, warn, Logger}; +use slog::{debug, error, info, trace, warn, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use state_processing::{ @@ -147,6 +147,12 @@ impl HotColdDB, LevelDB> { // Load the previous split slot from the database (if any). This ensures we can // stop and restart correctly. if let Some(split) = db.load_split()? { + info!( + db.log, + "Hot-Cold DB initialized"; + "split_slot" => split.slot, + "split_state" => format!("{:?}", split.state_root) + ); *db.split.write() = split; } Ok(db)