From f36a5a15d62c67ffd8dac7af14ee39a03c550af8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Wed, 8 Jan 2020 13:58:01 +1100 Subject: [PATCH] Store states efficiently in the hot database (#746) * Sparse hot DB and block root tree * Fix store_tests * Ensure loads of hot states on boundaries are fast * Milder error for unaligned finalized blocks --- Cargo.lock | 1 + beacon_node/beacon_chain/src/beacon_chain.rs | 102 +++-- beacon_node/beacon_chain/src/builder.rs | 22 +- beacon_node/beacon_chain/src/errors.rs | 3 + beacon_node/beacon_chain/src/fork_choice.rs | 29 +- .../src/persisted_beacon_chain.rs | 3 +- beacon_node/beacon_chain/tests/store_tests.rs | 81 ++++ beacon_node/store/Cargo.toml | 1 + beacon_node/store/src/block_root_tree.rs | 364 ++++++++++++++++++ beacon_node/store/src/errors.rs | 10 +- beacon_node/store/src/hot_cold_store.rs | 329 ++++++++++++---- beacon_node/store/src/iter.rs | 7 +- beacon_node/store/src/lib.rs | 31 +- beacon_node/store/src/migrate.rs | 28 +- eth2/lmd_ghost/src/lib.rs | 12 +- eth2/lmd_ghost/src/reduced_tree.rs | 140 ++++--- eth2/lmd_ghost/tests/test.rs | 1 + eth2/types/src/beacon_state.rs | 15 + 18 files changed, 953 insertions(+), 226 deletions(-) create mode 100644 beacon_node/store/src/block_root_tree.rs diff --git a/Cargo.lock b/Cargo.lock index 080508580f..6d6038a449 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3948,6 +3948,7 @@ dependencies = [ "db-key 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "eth2_ssz 0.1.2", "eth2_ssz_derive 0.1.0", + "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "leveldb 0.8.4 (registry+https://github.com/rust-lang/crates.io-index)", "lighthouse_metrics 0.1.0", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c5df4fe83e..a383ea4580 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -31,7 +31,7 @@ use std::time::{Duration, Instant}; use store::iter::{ BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator, }; -use store::{Error as DBError, Migrate, Store}; +use store::{BlockRootTree, Error as DBError, Migrate, Store}; use tree_hash::TreeHash; use types::*; @@ -149,6 +149,8 @@ pub struct BeaconChain { pub(crate) head_tracker: HeadTracker, /// Provides a small cache of `BeaconState` and `BeaconBlock`. pub(crate) checkpoint_cache: CheckPointCache, + /// Cache of block roots for all known forks post-finalization. + pub block_root_tree: Arc, /// Logging to CLI, etc. pub(crate) log: Logger, } @@ -170,8 +172,7 @@ impl BeaconChain { .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; let beacon_state_root = beacon_block.state_root; let beacon_state = self - .store - .get_state(&beacon_state_root, Some(beacon_block.slot))? + .get_state_caching(&beacon_state_root, Some(beacon_block.slot))? .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; CheckPoint { @@ -189,6 +190,7 @@ impl BeaconChain { genesis_block_root: self.genesis_block_root, ssz_head_tracker: self.head_tracker.to_ssz_container(), fork_choice: self.fork_choice.as_ssz_container(), + block_root_tree: self.block_root_tree.as_ssz_container(), }; let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); @@ -411,7 +413,7 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - fn get_block_caching( + pub fn get_block_caching( &self, block_root: &Hash256, ) -> Result>, Error> { @@ -427,7 +429,7 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - fn get_state_caching( + pub fn get_state_caching( &self, state_root: &Hash256, slot: Option, @@ -448,7 +450,7 @@ impl BeaconChain { /// ## Errors /// /// May return a database error. - fn get_state_caching_only_with_committee_caches( + pub fn get_state_caching_only_with_committee_caches( &self, state_root: &Hash256, slot: Option, @@ -888,37 +890,40 @@ impl BeaconChain { let result = if let Some(attestation_head_block) = self.get_block_caching(&attestation.data.beacon_block_root)? { - // Use the `data.beacon_block_root` to load the state from the latest non-skipped - // slot preceding the attestation's creation. - // - // This state is guaranteed to be in the same chain as the attestation, but it's - // not guaranteed to be from the same slot or epoch as the attestation. - let mut state: BeaconState = self - .get_state_caching_only_with_committee_caches( - &attestation_head_block.state_root, - Some(attestation_head_block.slot), - )? - .ok_or_else(|| Error::MissingBeaconState(attestation_head_block.state_root))?; - - // Ensure the state loaded from the database matches the state of the attestation - // head block. - // - // The state needs to be advanced from the current slot through to the epoch in - // which the attestation was created in. It would be an error to try and use - // `state.get_attestation_data_slot(..)` because the state matching the - // `data.beacon_block_root` isn't necessarily in a nearby epoch to the attestation - // (e.g., if there were lots of skip slots since the head of the chain and the - // epoch creation epoch). - for _ in state.slot.as_u64() - ..attestation - .data - .target - .epoch - .start_slot(T::EthSpec::slots_per_epoch()) - .as_u64() + // If the attestation points to a block in the same epoch in which it was made, + // then it is sufficient to load the state from that epoch's boundary, because + // the epoch-variable fields like the justified checkpoints cannot have changed + // between the epoch boundary and when the attestation was made. If conversely, + // the attestation points to a block in a prior epoch, then it is necessary to + // load the full state corresponding to its block, and transition it to the + // attestation's epoch. + let attestation_epoch = attestation.data.target.epoch; + let slots_per_epoch = T::EthSpec::slots_per_epoch(); + let mut state = if attestation_epoch + == attestation_head_block.slot.epoch(slots_per_epoch) { - per_slot_processing(&mut state, None, &self.spec)?; - } + self.store + .load_epoch_boundary_state(&attestation_head_block.state_root)? + .ok_or_else(|| Error::MissingBeaconState(attestation_head_block.state_root))? + } else { + let mut state = self + .store + .get_state( + &attestation_head_block.state_root, + Some(attestation_head_block.slot), + )? + .ok_or_else(|| Error::MissingBeaconState(attestation_head_block.state_root))?; + + // Fastforward the state to the epoch in which the attestation was made. + // NOTE: this looks like a potential DoS vector, we should probably limit + // the amount we're willing to fastforward without a valid signature. + for _ in state.slot.as_u64()..attestation_epoch.start_slot(slots_per_epoch).as_u64() + { + per_slot_processing(&mut state, None, &self.spec)?; + } + + state + }; state.build_committee_cache(RelativeEpoch::Current, &self.spec)?; @@ -1242,14 +1247,16 @@ impl BeaconChain { }); } + // Check if the block is already known. We know it is post-finalization, so it is + // sufficient to check the block root tree. + if self.block_root_tree.is_known_block_root(&block_root) { + return Ok(BlockProcessingOutcome::BlockIsAlreadyKnown); + } + // Records the time taken to load the block and state from the database during block // processing. let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ); - if self.store.exists::>(&block_root)? { - return Ok(BlockProcessingOutcome::BlockIsAlreadyKnown); - } - // Load the blocks parent block from the database, returning invalid if that block is not // found. let parent_block: BeaconBlock = @@ -1381,6 +1388,9 @@ impl BeaconChain { metrics::stop_timer(db_write_timer); + self.block_root_tree + .add_block_root(block_root, block.parent_root, block.slot)?; + self.head_tracker.register_block(block_root, &block); let fork_choice_register_timer = @@ -1692,8 +1702,10 @@ impl BeaconChain { .process_finalization(&finalized_block, finalized_block_root)?; let finalized_state = self - .store - .get_state(&finalized_block.state_root, Some(finalized_block.slot))? + .get_state_caching_only_with_committee_caches( + &finalized_block.state_root, + Some(finalized_block.slot), + )? .ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?; self.op_pool.prune_all(&finalized_state, &self.spec); @@ -1706,6 +1718,12 @@ impl BeaconChain { max_finality_distance, ); + // Prune in-memory block root tree. + self.block_root_tree.prune_to( + finalized_block_root, + self.heads().into_iter().map(|(block_root, _)| block_root), + ); + let _ = self.event_handler.register(EventKind::BeaconFinalization { epoch: new_finalized_epoch, root: finalized_block_root, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 3305d66553..25201c0910 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -16,7 +16,7 @@ use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use store::Store; +use store::{BlockRootTree, Store}; use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot}; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -92,6 +92,7 @@ pub struct BeaconChainBuilder { slot_clock: Option, persisted_beacon_chain: Option>, head_tracker: Option, + block_root_tree: Option>, spec: ChainSpec, log: Option, } @@ -134,6 +135,7 @@ where slot_clock: None, persisted_beacon_chain: None, head_tracker: None, + block_root_tree: None, spec: TEthSpec::default_spec(), log: None, } @@ -224,6 +226,7 @@ where HeadTracker::from_ssz_container(&p.ssz_head_tracker) .map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?, ); + self.block_root_tree = Some(Arc::new(p.block_root_tree.clone().into())); self.persisted_beacon_chain = Some(p); Ok(self) @@ -266,6 +269,11 @@ where ) })?; + self.block_root_tree = Some(Arc::new(BlockRootTree::new( + beacon_block_root, + beacon_block.slot, + ))); + self.finalized_checkpoint = Some(CheckPoint { beacon_block_root, beacon_block, @@ -375,6 +383,9 @@ where .event_handler .ok_or_else(|| "Cannot build without an event handler".to_string())?, head_tracker: self.head_tracker.unwrap_or_default(), + block_root_tree: self + .block_root_tree + .ok_or_else(|| "Cannot build without a block root tree".to_string())?, checkpoint_cache: CheckPointCache::default(), log: log.clone(), }; @@ -425,10 +436,16 @@ where .clone() .ok_or_else(|| "reduced_tree_fork_choice requires a store")?; + let block_root_tree = self + .block_root_tree + .clone() + .ok_or_else(|| "reduced_tree_fork_choice requires a block root tree")?; + let fork_choice = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain { ForkChoice::from_ssz_container( persisted_beacon_chain.fork_choice.clone(), store.clone(), + block_root_tree, ) .map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))? } else { @@ -442,11 +459,12 @@ where let backend = ThreadSafeReducedTree::new( store.clone(), + block_root_tree, &finalized_checkpoint.beacon_block, finalized_checkpoint.beacon_block_root, ); - ForkChoice::new(store, backend, genesis_block_root, self.spec.genesis_slot) + ForkChoice::new(backend, genesis_block_root, self.spec.genesis_slot) }; self.fork_choice = Some(fork_choice); diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 1a977393ad..809fb1d600 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -5,6 +5,7 @@ use state_processing::per_block_processing::errors::AttestationValidationError; use state_processing::BlockProcessingError; use state_processing::SlotProcessingError; use std::time::Duration; +use store::block_root_tree::BlockRootTreeError; use types::*; macro_rules! easy_from_to { @@ -49,11 +50,13 @@ pub enum BeaconChainError { InvariantViolated(String), SszTypesError(SszTypesError), CanonicalHeadLockTimeout, + BlockRootTreeError(BlockRootTreeError), } easy_from_to!(SlotProcessingError, BeaconChainError); easy_from_to!(AttestationValidationError, BeaconChainError); easy_from_to!(SszTypesError, BeaconChainError); +easy_from_to!(BlockRootTreeError, BeaconChainError); #[derive(Debug, PartialEq)] pub enum BlockProductionError { diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index d62f9f8fb3..b8d0e4a460 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -4,7 +4,7 @@ use parking_lot::RwLock; use ssz_derive::{Decode, Encode}; use state_processing::{common::get_attesting_indices, per_slot_processing}; use std::sync::Arc; -use store::{Error as StoreError, Store}; +use store::{BlockRootTree, Error as StoreError, Store}; use types::{ Attestation, BeaconBlock, BeaconState, BeaconStateError, Checkpoint, EthSpec, Hash256, Slot, }; @@ -22,7 +22,6 @@ pub enum Error { } pub struct ForkChoice { - store: Arc, backend: T::LmdGhost, /// Used for resolving the `0x00..00` alias back to genesis. /// @@ -36,7 +35,6 @@ pub struct ForkChoice { } impl PartialEq for ForkChoice { - /// This implementation ignores the `store`. fn eq(&self, other: &Self) -> bool { self.backend == other.backend && self.genesis_block_root == other.genesis_block_root @@ -50,18 +48,12 @@ impl ForkChoice { /// /// "Genesis" does not necessarily need to be the absolute genesis, it can be some finalized /// block. - pub fn new( - store: Arc, - backend: T::LmdGhost, - genesis_block_root: Hash256, - genesis_slot: Slot, - ) -> Self { + pub fn new(backend: T::LmdGhost, genesis_block_root: Hash256, genesis_slot: Slot) -> Self { let justified_checkpoint = Checkpoint { epoch: genesis_slot.epoch(T::EthSpec::slots_per_epoch()), root: genesis_block_root, }; Self { - store: store.clone(), backend, genesis_block_root, justified_checkpoint: RwLock::new(justified_checkpoint.clone()), @@ -149,8 +141,7 @@ impl ForkChoice { }; let mut state: BeaconState = chain - .store - .get_state(&block.state_root, Some(block.slot))? + .get_state_caching_only_with_committee_caches(&block.state_root, Some(block.slot))? .ok_or_else(|| Error::MissingState(block.state_root))?; // Fast-forward the state to the start slot of the epoch where it was justified. @@ -201,10 +192,7 @@ impl ForkChoice { for attestation in &block.body.attestations { // If the `data.beacon_block_root` block is not known to us, simply ignore the latest // vote. - if let Some(block) = self - .store - .get::>(&attestation.data.beacon_block_root)? - { + if let Some(block) = chain.get_block_caching(&attestation.data.beacon_block_root)? { self.process_attestation(state, attestation, &block)?; } } @@ -316,11 +304,14 @@ impl ForkChoice { /// Instantiates `Self` from a prior `SszForkChoice`. /// /// The created `Self` will have the same state as the `Self` that created the `SszForkChoice`. - pub fn from_ssz_container(ssz_container: SszForkChoice, store: Arc) -> Result { - let backend = LmdGhost::from_bytes(&ssz_container.backend_bytes, store.clone())?; + pub fn from_ssz_container( + ssz_container: SszForkChoice, + store: Arc, + block_root_tree: Arc, + ) -> Result { + let backend = LmdGhost::from_bytes(&ssz_container.backend_bytes, store, block_root_tree)?; Ok(Self { - store, backend, genesis_block_root: ssz_container.genesis_block_root, justified_checkpoint: RwLock::new(ssz_container.justified_checkpoint), diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index 08b75420d2..7f42466e22 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -4,7 +4,7 @@ use crate::{BeaconChainTypes, CheckPoint}; use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use store::{DBColumn, Error as StoreError, SimpleStoreItem}; +use store::{DBColumn, Error as StoreError, SimpleStoreItem, SszBlockRootTree}; use types::Hash256; /// 32-byte key for accessing the `PersistedBeaconChain`. @@ -18,6 +18,7 @@ pub struct PersistedBeaconChain { pub genesis_block_root: Hash256, pub ssz_head_tracker: SszHeadTracker, pub fork_choice: SszForkChoice, + pub block_root_tree: SszBlockRootTree, } impl SimpleStoreItem for PersistedBeaconChain { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 983b761f9d..06d096c9b4 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -6,6 +6,7 @@ extern crate lazy_static; use beacon_chain::test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; +use beacon_chain::AttestationProcessingOutcome; use rand::Rng; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; @@ -239,6 +240,86 @@ fn split_slot_restore() { assert_eq!(store.get_split_slot(), split_slot); } +// Check attestation processing and `load_epoch_boundary_state` in the presence of a split DB. +// This is a bit of a monster test in that it tests lots of different things, but until they're +// tested elsewhere, this is as good a place as any. +#[test] +fn epoch_boundary_state_attestation_processing() { + let num_blocks_produced = E::slots_per_epoch() * 5; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + + let late_validators = vec![0, 1]; + let timely_validators = (2..VALIDATOR_COUNT).collect::>(); + + let mut late_attestations = vec![]; + + for _ in 0..num_blocks_produced { + harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::SomeValidators(timely_validators.clone()), + ); + + let head = harness.chain.head().expect("head ok"); + late_attestations.extend(harness.get_free_attestations( + &AttestationStrategy::SomeValidators(late_validators.clone()), + &head.beacon_state, + head.beacon_block_root, + head.beacon_block.slot, + )); + + harness.advance_slot(); + } + + check_finalization(&harness, num_blocks_produced); + check_split_slot(&harness, store.clone()); + check_chain_dump(&harness, num_blocks_produced + 1); + check_iterators(&harness); + + let mut checked_pre_fin = false; + + for attestation in late_attestations { + // load_epoch_boundary_state is idempotent! + let block_root = attestation.data.beacon_block_root; + let block: BeaconBlock = store.get(&block_root).unwrap().expect("block exists"); + let epoch_boundary_state = store + .load_epoch_boundary_state(&block.state_root) + .expect("no error") + .expect("epoch boundary state exists"); + let ebs_of_ebs = store + .load_epoch_boundary_state(&epoch_boundary_state.canonical_root()) + .expect("no error") + .expect("ebs of ebs exists"); + assert_eq!(epoch_boundary_state, ebs_of_ebs); + + // If the attestation is pre-finalization it should be rejected. + let finalized_epoch = harness + .chain + .head_info() + .expect("head ok") + .finalized_checkpoint + .epoch; + let res = harness + .chain + .process_attestation_internal(attestation.clone()); + if attestation.data.slot <= finalized_epoch.start_slot(E::slots_per_epoch()) { + checked_pre_fin = true; + assert_eq!( + res, + Ok(AttestationProcessingOutcome::FinalizedSlot { + attestation: attestation.data.target.epoch, + finalized: finalized_epoch, + }) + ); + } else { + assert_eq!(res, Ok(AttestationProcessingOutcome::Processed)); + } + } + assert!(checked_pre_fin); +} + /// Check that the head state's slot matches `expected_slot`. fn check_slot(harness: &TestHarness, expected_slot: u64) { let state = &harness.chain.head().expect("should get head").beacon_state; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index bc4b684c0e..d36cd28f25 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -18,6 +18,7 @@ rayon = "1.2.0" db-key = "0.0.5" leveldb = "0.8.4" parking_lot = "0.9.0" +itertools = "0.8" eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" tree_hash = "0.1.0" diff --git a/beacon_node/store/src/block_root_tree.rs b/beacon_node/store/src/block_root_tree.rs new file mode 100644 index 0000000000..76950069c9 --- /dev/null +++ b/beacon_node/store/src/block_root_tree.rs @@ -0,0 +1,364 @@ +use itertools::Itertools; +use parking_lot::RwLock; +use ssz_derive::{Decode, Encode}; +use std::collections::{HashMap, HashSet}; +use std::iter::{self, FromIterator}; +use types::{Hash256, Slot}; + +/// In-memory cache of all block roots post-finalization. Includes short-lived forks. +/// +/// Used by fork choice to avoid reconstructing hot states just for their block roots. +// NOTE: could possibly be streamlined by combining with the head tracker and/or fork choice +#[derive(Debug)] +pub struct BlockRootTree { + nodes: RwLock>, +} + +impl Clone for BlockRootTree { + fn clone(&self) -> Self { + Self { + nodes: RwLock::new(self.nodes.read().clone()), + } + } +} + +#[derive(Debug, PartialEq)] +pub enum BlockRootTreeError { + PrevUnknown(Hash256), +} + +/// Data for a single `block_root` in the tree. +#[derive(Debug, Clone, Encode, Decode)] +struct Node { + /// Hash of the preceding block (should be the parent block). + /// + /// A `previous` of `Hash256::zero` indicates the root of the tree. + previous: Hash256, + /// Slot of this node's block. + slot: Slot, +} + +impl BlockRootTree { + /// Create a new block root tree where `(root_hash, root_slot)` is considered finalized. + /// + /// All subsequent blocks added should descend from the root block. + pub fn new(root_hash: Hash256, root_slot: Slot) -> Self { + Self { + nodes: RwLock::new(HashMap::from_iter(iter::once(( + root_hash, + Node { + previous: Hash256::zero(), + slot: root_slot, + }, + )))), + } + } + + /// Check if `block_root` exists in the tree. + pub fn is_known_block_root(&self, block_root: &Hash256) -> bool { + self.nodes.read().contains_key(block_root) + } + + /// Add a new `block_root` to the tree. + /// + /// Will return an error if `prev_block_root` doesn't exist in the tree. + pub fn add_block_root( + &self, + block_root: Hash256, + prev_block_root: Hash256, + block_slot: Slot, + ) -> Result<(), BlockRootTreeError> { + let mut nodes = self.nodes.write(); + if nodes.contains_key(&prev_block_root) { + nodes.insert( + block_root, + Node { + previous: prev_block_root, + slot: block_slot, + }, + ); + Ok(()) + } else { + Err(BlockRootTreeError::PrevUnknown(prev_block_root)) + } + } + + /// Create a reverse iterator from `block_root` (inclusive). + /// + /// Will skip slots, see `every_slot_iter_from` for a non-skipping variant. + pub fn iter_from(&self, block_root: Hash256) -> BlockRootTreeIter { + BlockRootTreeIter { + tree: self, + current_block_root: block_root, + } + } + + /// Create a reverse iterator that yields a block root for every slot. + /// + /// E.g. if slot 6 is skipped, this iterator will return the block root from slot 5 at slot 6. + pub fn every_slot_iter_from<'a>( + &'a self, + block_root: Hash256, + ) -> impl Iterator + 'a { + let mut block_roots = self.iter_from(block_root).peekable(); + + // Include the value for the first `block_root` if any, then fill in the skipped slots + // between each pair of previous block roots by duplicating the older root. + block_roots + .peek() + .cloned() + .into_iter() + .chain(block_roots.tuple_windows().flat_map( + |((_, high_slot), (low_hash, low_slot))| { + (low_slot.as_u64()..high_slot.as_u64()) + .rev() + .map(move |slot| (low_hash, Slot::new(slot))) + }, + )) + } + + /// Prune the tree. + /// + /// Only keep block roots descended from `finalized_root`, which lie on a chain leading + /// to one of the heads contained in `heads`. + pub fn prune_to(&self, finalized_root: Hash256, heads: impl IntoIterator) { + let mut keep = HashSet::new(); + keep.insert(finalized_root); + + for head_block_root in heads.into_iter() { + // Iterate backwards until we reach a portion of the chain that we've already decided + // to keep. This also discards the pre-finalization block roots. + let mut keep_head = false; + + let head_blocks = self + .iter_from(head_block_root) + .map(|(block_root, _)| block_root) + .inspect(|block_root| { + if block_root == &finalized_root { + keep_head = true; + } + }) + .take_while(|block_root| !keep.contains(&block_root)) + .collect::>(); + + // If the head descends from the finalized root, keep it. Else throw it out. + if keep_head { + keep.extend(head_blocks); + } + } + + self.nodes + .write() + .retain(|block_root, _| keep.contains(block_root)); + } + + pub fn as_ssz_container(&self) -> SszBlockRootTree { + SszBlockRootTree { + nodes: Vec::from_iter(self.nodes.read().clone()), + } + } +} + +/// Simple (skipping) iterator for `BlockRootTree`. +#[derive(Debug)] +pub struct BlockRootTreeIter<'a> { + tree: &'a BlockRootTree, + current_block_root: Hash256, +} + +impl<'a> Iterator for BlockRootTreeIter<'a> { + type Item = (Hash256, Slot); + + fn next(&mut self) -> Option { + // Genesis + if self.current_block_root.is_zero() { + None + } else { + let block_root = self.current_block_root; + self.tree.nodes.read().get(&block_root).map(|node| { + self.current_block_root = node.previous; + (block_root, node.slot) + }) + } + } +} + +/// Serializable version of `BlockRootTree` that can be persisted to disk. +#[derive(Debug, Clone, Encode, Decode)] +pub struct SszBlockRootTree { + nodes: Vec<(Hash256, Node)>, +} + +impl Into for SszBlockRootTree { + fn into(self) -> BlockRootTree { + BlockRootTree { + nodes: RwLock::new(HashMap::from_iter(self.nodes)), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + fn int_hash(x: u64) -> Hash256 { + Hash256::from_low_u64_be(x) + } + + fn check_iter_from( + block_tree: &BlockRootTree, + start_block_root: Hash256, + expected: &[(Hash256, Slot)], + ) { + assert_eq!( + &block_tree.iter_from(start_block_root).collect::>()[..], + expected + ); + } + + fn check_every_slot_iter_from( + block_tree: &BlockRootTree, + start_block_root: Hash256, + expected: &[(Hash256, Slot)], + ) { + assert_eq!( + &block_tree + .every_slot_iter_from(start_block_root) + .collect::>()[..], + expected + ); + } + + #[test] + fn single_chain() { + let block_tree = BlockRootTree::new(int_hash(1), Slot::new(1)); + for i in 2..100 { + block_tree + .add_block_root(int_hash(i), int_hash(i - 1), Slot::new(i)) + .expect("add_block_root ok"); + + let expected = (1..i + 1) + .rev() + .map(|j| (int_hash(j), Slot::new(j))) + .collect::>(); + + check_iter_from(&block_tree, int_hash(i), &expected); + check_every_slot_iter_from(&block_tree, int_hash(i), &expected); + + // Still OK after pruning. + block_tree.prune_to(int_hash(1), vec![int_hash(i)]); + + check_iter_from(&block_tree, int_hash(i), &expected); + check_every_slot_iter_from(&block_tree, int_hash(i), &expected); + } + } + + #[test] + fn skips_of_2() { + let block_tree = BlockRootTree::new(int_hash(1), Slot::new(1)); + let step_length = 2u64; + for i in (1 + step_length..100).step_by(step_length as usize) { + block_tree + .add_block_root(int_hash(i), int_hash(i - step_length), Slot::new(i)) + .expect("add_block_root ok"); + + let sparse_expected = (1..i + 1) + .rev() + .step_by(step_length as usize) + .map(|j| (int_hash(j), Slot::new(j))) + .collect_vec(); + let every_slot_expected = (1..i + 1) + .rev() + .map(|j| { + let nearest = 1 + (j - 1) / step_length * step_length; + (int_hash(nearest), Slot::new(j)) + }) + .collect_vec(); + + check_iter_from(&block_tree, int_hash(i), &sparse_expected); + check_every_slot_iter_from(&block_tree, int_hash(i), &every_slot_expected); + + // Still OK after pruning. + block_tree.prune_to(int_hash(1), vec![int_hash(i)]); + + check_iter_from(&block_tree, int_hash(i), &sparse_expected); + check_every_slot_iter_from(&block_tree, int_hash(i), &every_slot_expected); + } + } + + #[test] + fn prune_small_fork() { + let tree = BlockRootTree::new(int_hash(1), Slot::new(1)); + // Space between fork hash values + let offset = 1000; + let num_blocks = 50; + + let fork1_start = 2; + let fork2_start = 2 + offset; + + tree.add_block_root(int_hash(fork1_start), int_hash(1), Slot::new(2)) + .expect("add first block of left fork"); + tree.add_block_root(int_hash(fork2_start), int_hash(1), Slot::new(2)) + .expect("add first block of right fork"); + + for i in 3..num_blocks { + tree.add_block_root(int_hash(i), int_hash(i - 1), Slot::new(i)) + .expect("add block to left fork"); + tree.add_block_root(int_hash(i + offset), int_hash(i + offset - 1), Slot::new(i)) + .expect("add block to right fork"); + } + + let root = (int_hash(1), Slot::new(1)); + + let (all_fork1_blocks, all_fork2_blocks): (Vec<_>, Vec<_>) = (2..num_blocks) + .rev() + .map(|i| { + ( + (int_hash(i), Slot::new(i)), + (int_hash(i + offset), Slot::new(i)), + ) + }) + .chain(iter::once((root, root))) + .unzip(); + + let fork1_head = int_hash(num_blocks - 1); + let fork2_head = int_hash(num_blocks + offset - 1); + + // Check that pruning with both heads preserves both chains. + let both_tree = tree.clone(); + both_tree.prune_to(root.0, vec![fork1_head, fork2_head]); + check_iter_from(&both_tree, fork1_head, &all_fork1_blocks); + check_iter_from(&both_tree, fork2_head, &all_fork2_blocks); + + // Check that pruning to either of the single chains leaves just that chain in the tree. + let fork1_tree = tree.clone(); + fork1_tree.prune_to(root.0, vec![fork1_head]); + check_iter_from(&fork1_tree, fork1_head, &all_fork1_blocks); + check_iter_from(&fork1_tree, fork2_head, &[]); + + let fork2_tree = tree.clone(); + fork2_tree.prune_to(root.0, vec![fork2_head]); + check_iter_from(&fork2_tree, fork1_head, &[]); + check_iter_from(&fork2_tree, fork2_head, &all_fork2_blocks); + + // Check that advancing the finalized root onto one side completely removes the other + // side. + let fin_tree = tree.clone(); + let prune_point = num_blocks / 2; + let remaining_fork1_blocks = all_fork1_blocks + .clone() + .into_iter() + .take_while(|(_, slot)| *slot >= prune_point) + .collect_vec(); + fin_tree.prune_to(int_hash(prune_point), vec![fork1_head, fork2_head]); + check_iter_from(&fin_tree, fork1_head, &remaining_fork1_blocks); + check_iter_from(&fin_tree, fork2_head, &[]); + } + + #[test] + fn iter_zero() { + let block_tree = BlockRootTree::new(int_hash(0), Slot::new(0)); + assert_eq!(block_tree.iter_from(int_hash(0)).count(), 0); + assert_eq!(block_tree.every_slot_iter_from(int_hash(0)).count(), 0); + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 70cc327a78..8a03d00c3c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,5 +1,5 @@ use crate::chunked_vector::ChunkError; -use crate::hot_cold_store::HotColdDbError; +use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; use types::BeaconStateError; @@ -9,7 +9,7 @@ pub enum Error { VectorChunkError(ChunkError), BeaconStateError(BeaconStateError), PartialBeaconStateError, - HotColdDbError(HotColdDbError), + HotColdDBError(HotColdDBError), DBError { message: String }, } @@ -25,9 +25,9 @@ impl From for Error { } } -impl From for Error { - fn from(e: HotColdDbError) -> Error { - Error::HotColdDbError(e) +impl From for Error { + fn from(e: HotColdDBError) -> Error { + Error::HotColdDBError(e) } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index a07810bb0f..834f83cde7 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -50,7 +50,10 @@ pub struct HotColdDB { } #[derive(Debug, PartialEq)] -pub enum HotColdDbError { +pub enum HotColdDBError { + /// Recoverable error indicating that the database freeze point couldn't be updated + /// due to the finalized block not lying on an epoch boundary (should be infrequent). + FreezeSlotUnaligned(Slot), FreezeSlotError { current_split_slot: Slot, proposed_split_slot: Slot, @@ -58,13 +61,12 @@ pub enum HotColdDbError { MissingStateToFreeze(Hash256), MissingRestorePointHash(u64), MissingRestorePoint(Hash256), - MissingStateSlot(Hash256), + MissingColdStateSummary(Hash256), + MissingHotStateSummary(Hash256), + MissingEpochBoundaryState(Hash256), MissingSplitState(Hash256, Slot), + HotStateSummaryError(BeaconStateError), RestorePointDecodeError(ssz::DecodeError), - RestorePointReplayFailure { - expected_state_root: Hash256, - observed_state_root: Hash256, - }, BlockReplayBeaconError(BeaconStateError), BlockReplaySlotError(SlotProcessingError), BlockReplayBlockError(BlockProcessingError), @@ -98,9 +100,9 @@ impl Store for HotColdDB { /// Store a state in the store. fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { - self.store_archive_state(state_root, state) + self.store_cold_state(state_root, state) } else { - self.hot_db.put_state(state_root, state) + self.store_hot_state(state_root, state) } } @@ -112,20 +114,14 @@ impl Store for HotColdDB { ) -> Result>, Error> { if let Some(slot) = slot { if slot < self.get_split_slot() { - self.load_archive_state(state_root, slot).map(Some) + self.load_cold_state_by_slot(slot).map(Some) } else { - self.hot_db.get_state(state_root, None) + self.load_hot_state(state_root) } } else { - match self.hot_db.get_state(state_root, None)? { + match self.load_hot_state(state_root)? { Some(state) => Ok(Some(state)), - None => { - // Look-up the state in the freezer DB. We don't know the slot, so we must - // look it up separately and then use it to reconstruct the state from a - // restore point. - let slot = self.load_state_slot(state_root)?; - self.load_archive_state(state_root, slot).map(Some) - } + None => self.load_cold_state(state_root), } } } @@ -142,17 +138,24 @@ impl Store for HotColdDB { "slot" => frozen_head.slot ); - // 1. Copy all of the states between the head and the split slot, from the hot DB - // to the cold DB. + // 0. Check that the migration is sensible. + // The new frozen head must increase the current split slot, and lie on an epoch + // boundary (in order for the hot state summary scheme to work). let current_split_slot = store.get_split_slot(); if frozen_head.slot < current_split_slot { - Err(HotColdDbError::FreezeSlotError { + Err(HotColdDBError::FreezeSlotError { current_split_slot, proposed_split_slot: frozen_head.slot, })?; } + if frozen_head.slot % E::slots_per_epoch() != 0 { + Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot))?; + } + + // 1. Copy all of the states between the head and the split slot, from the hot DB + // to the cold DB. let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); let mut to_delete = vec![]; @@ -163,16 +166,20 @@ impl Store for HotColdDB { let state: BeaconState = store .hot_db .get_state(&state_root, None)? - .ok_or_else(|| HotColdDbError::MissingStateToFreeze(state_root))?; + .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; - store.store_archive_state(&state_root, &state)?; + store.store_cold_state(&state_root, &state)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states // from their state root alone. - store.store_state_slot(&state_root, slot)?; + store.store_cold_state_slot(&state_root, slot)?; - to_delete.push(state_root); + // Delete the old summary, and the full state if we lie on an epoch boundary. + to_delete.push((DBColumn::BeaconStateSummary, state_root)); + if slot % E::slots_per_epoch() == 0 { + to_delete.push((DBColumn::BeaconState, state_root)); + } } // 2. Update the split slot @@ -183,10 +190,10 @@ impl Store for HotColdDB { store.store_split()?; // 3. Delete from the hot DB - for state_root in to_delete { + for (column, state_root) in to_delete { store .hot_db - .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; + .key_delete(column.into(), state_root.as_bytes())?; } debug!( @@ -207,6 +214,38 @@ impl Store for HotColdDB { ) -> Self::ForwardsBlockRootsIterator { HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) } + + /// Load an epoch boundary state by using the hot state summary look-up. + /// + /// Will fall back to the cold DB if a hot state summary is not found. + fn load_epoch_boundary_state( + &self, + state_root: &Hash256, + ) -> Result>, Error> { + if let Some(HotStateSummary { + epoch_boundary_state_root, + .. + }) = self.load_hot_state_summary(state_root)? + { + let state = self + .hot_db + .get_state(&epoch_boundary_state_root, None)? + .ok_or_else(|| { + HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root) + })?; + Ok(Some(state)) + } else { + // Try the cold DB + match self.load_cold_state_slot(state_root)? { + Some(state_slot) => { + let epoch_boundary_slot = + state_slot / E::slots_per_epoch() * E::slots_per_epoch(); + self.load_cold_state_by_slot(epoch_boundary_slot).map(Some) + } + None => Ok(None), + } + } + } } impl HotColdDB { @@ -240,10 +279,69 @@ impl HotColdDB { Ok(db) } + /// Store a post-finalization state efficiently in the hot database. + /// + /// On an epoch boundary, store a full state. On an intermediate slot, store + /// just a backpointer to the nearest epoch boundary. + pub fn store_hot_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + // On the epoch boundary, store the full state. + if state.slot % E::slots_per_epoch() == 0 { + trace!( + self.log, + "Storing full state on epoch boundary"; + "slot" => state.slot.as_u64(), + "state_root" => format!("{:?}", state_root) + ); + self.hot_db.put_state(state_root, state)?; + } + + // Store a summary of the state. + // We store one even for the epoch boundary states, as we may need their slots + // when doing a look up by state root. + self.store_hot_state_summary(state_root, state)?; + + Ok(()) + } + + /// Load a post-finalization state from the hot database. + /// + /// Will replay blocks from the nearest epoch boundary. + pub fn load_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + if let Some(HotStateSummary { + slot, + latest_block_root, + epoch_boundary_state_root, + }) = self.load_hot_state_summary(state_root)? + { + let state: BeaconState = self + .hot_db + .get_state(&epoch_boundary_state_root, None)? + .ok_or_else(|| { + HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root) + })?; + + // Optimization to avoid even *thinking* about replaying blocks if we're already + // on an epoch boundary. + if slot % E::slots_per_epoch() == 0 { + Ok(Some(state)) + } else { + let blocks = self.load_blocks_to_replay(state.slot, slot, latest_block_root)?; + self.replay_blocks(state, blocks, slot).map(Some) + } + } else { + Ok(None) + } + } + /// Store a pre-finalization state in the freezer database. /// - /// Will return an error if the state does not lie on a restore point boundary. - pub fn store_archive_state( + /// Will log a warning and not store anything if the state does not lie on a restore point + /// boundary. + pub fn store_cold_state( &self, state_root: &Hash256, state: &BeaconState, @@ -283,25 +381,32 @@ impl HotColdDB { Ok(()) } + /// Try to load a pre-finalization state from the freezer database. + /// + /// Return `None` if no state with `state_root` lies in the freezer. + pub fn load_cold_state(&self, state_root: &Hash256) -> Result>, Error> { + match self.load_cold_state_slot(state_root)? { + Some(slot) => self.load_cold_state_by_slot(slot).map(Some), + None => Ok(None), + } + } + /// Load a pre-finalization state from the freezer database. /// /// Will reconstruct the state if it lies between restore points. - pub fn load_archive_state( - &self, - state_root: &Hash256, - slot: Slot, - ) -> Result, Error> { + pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result, Error> { if slot % self.slots_per_restore_point == 0 { - self.load_restore_point(state_root) + let restore_point_idx = slot.as_u64() / self.slots_per_restore_point; + self.load_restore_point_by_index(restore_point_idx) } else { - self.load_intermediate_state(state_root, slot) + self.load_cold_intermediate_state(slot) } } /// Load a restore point state by its `state_root`. fn load_restore_point(&self, state_root: &Hash256) -> Result, Error> { let mut partial_state = PartialBeaconState::db_get(&self.cold_db, state_root)? - .ok_or_else(|| HotColdDbError::MissingRestorePoint(*state_root))?; + .ok_or_else(|| HotColdDBError::MissingRestorePoint(*state_root))?; // Fill in the fields of the partial state. partial_state.load_block_roots(&self.cold_db, &self.spec)?; @@ -321,12 +426,8 @@ impl HotColdDB { self.load_restore_point(&state_root) } - /// Load a state that lies between restore points. - fn load_intermediate_state( - &self, - state_root: &Hash256, - slot: Slot, - ) -> Result, Error> { + /// Load a frozen state that lies between restore points. + fn load_cold_intermediate_state(&self, slot: Slot) -> Result, Error> { // 1. Load the restore points either side of the intermediate state. let low_restore_point_idx = slot.as_u64() / self.slots_per_restore_point; let high_restore_point_idx = low_restore_point_idx + 1; @@ -341,7 +442,7 @@ impl HotColdDB { >= split.slot.as_u64() { self.get_state(&split.state_root, Some(split.slot))? - .ok_or_else(|| HotColdDbError::MissingSplitState(split.state_root, split.slot))? + .ok_or_else(|| HotColdDBError::MissingSplitState(split.state_root, split.slot))? } else { self.load_restore_point_by_index(high_restore_point_idx)? }; @@ -354,22 +455,7 @@ impl HotColdDB { )?; // 3. Replay the blocks on top of the low restore point. - let mut state = self.replay_blocks(low_restore_point, blocks, slot)?; - - // 4. Check that the state root is correct (should be quick with the cache already built). - // TODO: we could optimise out *all* the tree hashing when replaying blocks, - // in which case we could also drop this check. - let observed_state_root = state.update_tree_hash_cache()?; - - if observed_state_root == *state_root { - Ok(state) - } else { - Err(HotColdDbError::RestorePointReplayFailure { - expected_state_root: *state_root, - observed_state_root, - } - .into()) - } + self.replay_blocks(low_restore_point, blocks, slot) } /// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`. @@ -379,12 +465,12 @@ impl HotColdDB { &self, high_restore_point: &BeaconState, slot: Slot, - ) -> Result { + ) -> Result { high_restore_point .get_block_root(slot) .or_else(|_| high_restore_point.get_oldest_block_root()) .map(|x| *x) - .map_err(HotColdDbError::RestorePointBlockHashError) + .map_err(HotColdDBError::RestorePointBlockHashError) } /// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`. @@ -398,6 +484,7 @@ impl HotColdDB { end_block_hash: Hash256, ) -> Result>, Error> { let mut blocks = ParentRootBlockIterator::new(self, end_block_hash) + .map(|(_, block)| block) // Include the block at the end slot (if any), it needs to be // replayed in order to construct the canonical state at `end_slot`. .filter(|block| block.slot <= end_slot) @@ -420,12 +507,26 @@ impl HotColdDB { ) -> Result, Error> { state .build_all_caches(&self.spec) - .map_err(HotColdDbError::BlockReplayBeaconError)?; + .map_err(HotColdDBError::BlockReplayBeaconError)?; - for block in blocks { + let state_root_from_prev_block = |i: usize, state: &BeaconState| { + if i > 0 { + let prev_block = &blocks[i - 1]; + if prev_block.slot == state.slot { + Some(prev_block.state_root) + } else { + None + } + } else { + None + } + }; + + for (i, block) in blocks.iter().enumerate() { while state.slot < block.slot { - per_slot_processing(&mut state, None, &self.spec) - .map_err(HotColdDbError::BlockReplaySlotError)?; + let state_root = state_root_from_prev_block(i, &state); + per_slot_processing(&mut state, state_root, &self.spec) + .map_err(HotColdDBError::BlockReplaySlotError)?; } per_block_processing( &mut state, @@ -434,12 +535,13 @@ impl HotColdDB { BlockSignatureStrategy::NoVerification, &self.spec, ) - .map_err(HotColdDbError::BlockReplayBlockError)?; + .map_err(HotColdDBError::BlockReplayBlockError)?; } while state.slot < target_slot { - per_slot_processing(&mut state, None, &self.spec) - .map_err(HotColdDbError::BlockReplaySlotError)?; + let state_root = state_root_from_prev_block(blocks.len(), &state); + per_slot_processing(&mut state, state_root, &self.spec) + .map_err(HotColdDBError::BlockReplaySlotError)?; } Ok(state) @@ -474,7 +576,7 @@ impl HotColdDB { let key = Self::restore_point_key(restore_point_index); RestorePointHash::db_get(&self.cold_db, &key)? .map(|r| r.state_root) - .ok_or(HotColdDbError::MissingRestorePointHash(restore_point_index).into()) + .ok_or(HotColdDBError::MissingRestorePointHash(restore_point_index).into()) } /// Store the state root of a restore point. @@ -495,30 +597,63 @@ impl HotColdDB { } /// Load a frozen state's slot, given its root. - fn load_state_slot(&self, state_root: &Hash256) -> Result { - StateSlot::db_get(&self.cold_db, state_root)? - .map(|s| s.slot) - .ok_or_else(|| HotColdDbError::MissingStateSlot(*state_root).into()) + fn load_cold_state_slot(&self, state_root: &Hash256) -> Result, Error> { + Ok(ColdStateSummary::db_get(&self.cold_db, state_root)?.map(|s| s.slot)) } /// Store the slot of a frozen state. - fn store_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { - StateSlot { slot } + fn store_cold_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { + ColdStateSummary { slot } .db_put(&self.cold_db, state_root) .map_err(Into::into) } + /// Load a hot state's summary, given its root. + pub fn load_hot_state_summary( + &self, + state_root: &Hash256, + ) -> Result, Error> { + HotStateSummary::db_get(&self.hot_db, state_root) + } + + /// Store a summary of a hot database state. + fn store_hot_state_summary( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + // Fill in the state root on the latest block header if necessary (this happens on all + // slots where there isn't a skip). + let latest_block_root = state.get_latest_block_root(*state_root); + let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); + let epoch_boundary_state_root = if epoch_boundary_slot == state.slot { + *state_root + } else { + *state + .get_state_root(epoch_boundary_slot) + .map_err(HotColdDBError::HotStateSummaryError)? + }; + + HotStateSummary { + slot: state.slot, + latest_block_root, + epoch_boundary_state_root, + } + .db_put(&self.hot_db, state_root) + .map_err(Into::into) + } + /// Check that the restore point frequency is a divisor of the slots per historical root. /// /// This ensures that we have at least one restore point within range of our state /// root history when iterating backwards (and allows for more frequent restore points if /// desired). - fn verify_slots_per_restore_point(slots_per_restore_point: u64) -> Result<(), HotColdDbError> { + fn verify_slots_per_restore_point(slots_per_restore_point: u64) -> Result<(), HotColdDBError> { let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64(); if slots_per_restore_point > 0 && slots_per_historical_root % slots_per_restore_point == 0 { Ok(()) } else { - Err(HotColdDbError::InvalidSlotsPerRestorePoint { + Err(HotColdDBError::InvalidSlotsPerRestorePoint { slots_per_restore_point, slots_per_historical_root, }) @@ -527,7 +662,7 @@ impl HotColdDB { } /// Struct for storing the split slot and state root in the database. -#[derive(Clone, Copy, Default, Encode, Decode)] +#[derive(Debug, Clone, Copy, Default, Encode, Decode)] struct Split { slot: Slot, state_root: Hash256, @@ -547,15 +682,39 @@ impl SimpleStoreItem for Split { } } -/// Struct for storing the slot of a state root in the database. -#[derive(Clone, Copy, Default, Encode, Decode)] -struct StateSlot { +/// Struct for summarising a state in the hot database. +/// +/// Allows full reconstruction by replaying blocks. +#[derive(Debug, Clone, Copy, Default, Encode, Decode)] +pub struct HotStateSummary { + slot: Slot, + latest_block_root: Hash256, + epoch_boundary_state_root: Hash256, +} + +impl SimpleStoreItem for HotStateSummary { + fn db_column() -> DBColumn { + DBColumn::BeaconStateSummary + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} + +/// Struct for summarising a state in the freezer database. +#[derive(Debug, Clone, Copy, Default, Encode, Decode)] +struct ColdStateSummary { slot: Slot, } -impl SimpleStoreItem for StateSlot { +impl SimpleStoreItem for ColdStateSummary { fn db_column() -> DBColumn { - DBColumn::BeaconStateSlot + DBColumn::BeaconStateSummary } fn as_store_bytes(&self) -> Vec { @@ -568,7 +727,7 @@ impl SimpleStoreItem for StateSlot { } /// Struct for storing the state root of a restore point in the database. -#[derive(Clone, Copy, Default, Encode, Decode)] +#[derive(Debug, Clone, Copy, Default, Encode, Decode)] struct RestorePointHash { state_root: Hash256, } diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index e1e6386570..07078aceea 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -120,7 +120,7 @@ impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { } impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { - type Item = BeaconBlock; + type Item = (Hash256, BeaconBlock); fn next(&mut self) -> Option { // Stop once we reach the zero parent, otherwise we'll keep returning the genesis @@ -128,9 +128,10 @@ impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> if self.next_block_root.is_zero() { None } else { - let block: BeaconBlock = self.store.get(&self.next_block_root).ok()??; + let block_root = self.next_block_root; + let block: BeaconBlock = self.store.get(&block_root).ok()??; self.next_block_root = block.parent_root; - Some(block) + Some((block_root, block)) } } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 29a38fe8dd..b15477e0ef 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -11,6 +11,7 @@ extern crate lazy_static; mod block_at_slot; +pub mod block_root_tree; pub mod chunked_iter; pub mod chunked_vector; pub mod config; @@ -28,6 +29,7 @@ pub mod migrate; use std::sync::Arc; +pub use self::block_root_tree::{BlockRootTree, SszBlockRootTree}; pub use self::config::StoreConfig; pub use self::hot_cold_store::HotColdDB as DiskStore; pub use self::leveldb_store::LevelDB as SimpleDiskStore; @@ -128,6 +130,29 @@ pub trait Store: Sync + Send + Sized + 'static { end_block_root: Hash256, spec: &ChainSpec, ) -> Self::ForwardsBlockRootsIterator; + + /// Load the most recent ancestor state of `state_root` which lies on an epoch boundary. + /// + /// If `state_root` corresponds to an epoch boundary state, then that state itself should be + /// returned. + fn load_epoch_boundary_state( + &self, + state_root: &Hash256, + ) -> Result>, Error> { + // The default implementation is not very efficient, but isn't used in prod. + // See `HotColdDB` for the optimized implementation. + if let Some(state) = self.get_state(state_root, None)? { + let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch(); + if state.slot == epoch_boundary_slot { + Ok(Some(state)) + } else { + let epoch_boundary_state_root = state.get_state_root(epoch_boundary_slot)?; + self.get_state(epoch_boundary_state_root, Some(epoch_boundary_slot)) + } + } else { + Ok(None) + } + } } /// A unique column identifier. @@ -140,8 +165,8 @@ pub enum DBColumn { BeaconChain, /// For the table mapping restore point numbers to state roots. BeaconRestorePoint, - /// For the mapping from state roots to their slots. - BeaconStateSlot, + /// For the mapping from state roots to their slots or summaries. + BeaconStateSummary, BeaconBlockRoots, BeaconStateRoots, BeaconHistoricalRoots, @@ -157,7 +182,7 @@ impl Into<&'static str> for DBColumn { DBColumn::BeaconState => "ste", DBColumn::BeaconChain => "bch", DBColumn::BeaconRestorePoint => "brp", - DBColumn::BeaconStateSlot => "bss", + DBColumn::BeaconStateSummary => "bss", DBColumn::BeaconBlockRoots => "bbr", DBColumn::BeaconStateRoots => "bsr", DBColumn::BeaconHistoricalRoots => "bhr", diff --git a/beacon_node/store/src/migrate.rs b/beacon_node/store/src/migrate.rs index 7aa54f6205..2d6cd604b2 100644 --- a/beacon_node/store/src/migrate.rs +++ b/beacon_node/store/src/migrate.rs @@ -1,6 +1,8 @@ -use crate::{DiskStore, MemoryStore, SimpleDiskStore, Store}; +use crate::{ + hot_cold_store::HotColdDBError, DiskStore, Error, MemoryStore, SimpleDiskStore, Store, +}; use parking_lot::Mutex; -use slog::warn; +use slog::{info, warn}; use std::mem; use std::sync::mpsc; use std::sync::Arc; @@ -127,12 +129,22 @@ impl BackgroundMigrator { let (tx, rx) = mpsc::channel(); let thread = thread::spawn(move || { while let Ok((state_root, state)) = rx.recv() { - if let Err(e) = DiskStore::freeze_to_state(db.clone(), state_root, &state) { - warn!( - db.log, - "Database migration failed"; - "error" => format!("{:?}", e) - ); + match DiskStore::freeze_to_state(db.clone(), state_root, &state) { + Ok(()) => {} + Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { + info!( + db.log, + "Database migration postponed, unaligned finalized block"; + "slot" => slot.as_u64() + ); + } + Err(e) => { + warn!( + db.log, + "Database migration failed"; + "error" => format!("{:?}", e) + ); + } } } }); diff --git a/eth2/lmd_ghost/src/lib.rs b/eth2/lmd_ghost/src/lib.rs index 6cb96b787a..08f7484848 100644 --- a/eth2/lmd_ghost/src/lib.rs +++ b/eth2/lmd_ghost/src/lib.rs @@ -1,7 +1,7 @@ mod reduced_tree; use std::sync::Arc; -use store::Store; +use store::{BlockRootTree, Store}; use types::{BeaconBlock, EthSpec, Hash256, Slot}; pub use reduced_tree::ThreadSafeReducedTree; @@ -12,7 +12,12 @@ pub type Result = std::result::Result; // can remove it. pub trait LmdGhost, E: EthSpec>: PartialEq + Send + Sync + Sized { /// Create a new instance, with the given `store` and `finalized_root`. - fn new(store: Arc, finalized_block: &BeaconBlock, finalized_root: Hash256) -> Self; + fn new( + store: Arc, + block_root_tree: Arc, + finalized_block: &BeaconBlock, + finalized_root: Hash256, + ) -> Self; /// Process an attestation message from some validator that attests to some `block_hash` /// representing a block at some `block_slot`. @@ -59,5 +64,6 @@ pub trait LmdGhost, E: EthSpec>: PartialEq + Send + Sync + Sized { fn as_bytes(&self) -> Vec; /// Create a new `LmdGhost` instance given a `store` and encoded bytes. - fn from_bytes(bytes: &[u8], store: Arc) -> Result; + fn from_bytes(bytes: &[u8], store: Arc, block_root_tree: Arc) + -> Result; } diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index 1d369fcce6..547bdf247f 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -12,8 +12,8 @@ use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; use std::sync::Arc; -use store::{iter::BlockRootsIterator, Error as StoreError, Store}; -use types::{BeaconBlock, BeaconState, EthSpec, Hash256, Slot}; +use store::{BlockRootTree, Error as StoreError, Store}; +use types::{BeaconBlock, EthSpec, Hash256, Slot}; type Result = std::result::Result; @@ -67,9 +67,19 @@ where T: Store, E: EthSpec, { - fn new(store: Arc, genesis_block: &BeaconBlock, genesis_root: Hash256) -> Self { + fn new( + store: Arc, + block_root_tree: Arc, + genesis_block: &BeaconBlock, + genesis_root: Hash256, + ) -> Self { ThreadSafeReducedTree { - core: RwLock::new(ReducedTree::new(store, genesis_block, genesis_root)), + core: RwLock::new(ReducedTree::new( + store, + block_root_tree, + genesis_block, + genesis_root, + )), } } @@ -136,10 +146,14 @@ where /// encoded ssz bytes representation. /// /// Returns an error if ssz bytes are not a valid `ReducedTreeSsz` object. - fn from_bytes(bytes: &[u8], store: Arc) -> SuperResult { + fn from_bytes( + bytes: &[u8], + store: Arc, + block_root_tree: Arc, + ) -> SuperResult { Ok(ThreadSafeReducedTree { core: RwLock::new( - ReducedTree::from_bytes(bytes, store) + ReducedTree::from_bytes(bytes, store, block_root_tree) .map_err(|e| format!("Cannot decode ssz bytes {:?}", e))?, ), }) @@ -168,7 +182,11 @@ impl ReducedTreeSsz { } } - pub fn to_reduced_tree(self, store: Arc) -> Result> { + pub fn to_reduced_tree( + self, + store: Arc, + block_root_tree: Arc, + ) -> Result> { if self.node_hashes.len() != self.nodes.len() { Error::InvalidReducedTreeSsz("node_hashes and nodes should have equal length".into()); } @@ -181,6 +199,7 @@ impl ReducedTreeSsz { let root = (self.root_hash, self.root_slot); Ok(ReducedTree { store, + block_root_tree, nodes, latest_votes, root, @@ -192,6 +211,7 @@ impl ReducedTreeSsz { #[derive(Clone)] struct ReducedTree { store: Arc, + block_root_tree: Arc, /// Stores all nodes of the tree, keyed by the block hash contained in the node. nodes: HashMap, /// Maps validator indices to their latest votes. @@ -221,20 +241,20 @@ where T: Store, E: EthSpec, { - pub fn new(store: Arc, genesis_block: &BeaconBlock, genesis_root: Hash256) -> Self { + pub fn new( + store: Arc, + block_root_tree: Arc, + genesis_block: &BeaconBlock, + genesis_root: Hash256, + ) -> Self { let mut nodes = HashMap::new(); // Insert the genesis node. - nodes.insert( - genesis_root, - Node { - block_hash: genesis_root, - ..Node::default() - }, - ); + nodes.insert(genesis_root, Node::new(genesis_root)); Self { store, + block_root_tree, nodes, latest_votes: ElasticList::default(), root: (genesis_root, genesis_block.slot), @@ -504,9 +524,8 @@ where node.add_voter(validator_index); } else { let node = Node { - block_hash: hash, voters: vec![validator_index], - ..Node::default() + ..Node::new(hash) }; self.add_node(node)?; @@ -517,10 +536,7 @@ where fn maybe_add_weightless_node(&mut self, slot: Slot, hash: Hash256) -> Result<()> { if slot > self.root_slot() && !self.nodes.contains_key(&hash) { - let node = Node { - block_hash: hash, - ..Node::default() - }; + let node = Node::new(hash); self.add_node(node)?; @@ -540,12 +556,10 @@ where ancestor: Hash256, descendant: Hash256, ) -> Result> { - Ok(std::iter::once(descendant) - .chain( - self.iter_ancestors(descendant)? - .take_while(|(_, slot)| *slot >= self.root_slot()) - .map(|(block_hash, _)| block_hash), - ) + Ok(self + .iter_ancestors(descendant, true) + .take_while(|(_, slot)| *slot >= self.root_slot()) + .map(|(block_hash, _)| block_hash) .tuple_windows() .find_map(|(successor, block_hash)| { if block_hash == ancestor { @@ -574,7 +588,7 @@ where // // If this node has no ancestor in the tree, exit early. let mut prev_in_tree = self - .find_prev_in_tree(node.block_hash) + .find_prev_in_tree(&node) .ok_or_else(|| Error::NotInTree(node.block_hash)) .and_then(|hash| self.get_node(hash))? .clone(); @@ -599,6 +613,7 @@ where if let Some(successor) = self.find_ancestor_successor_opt(node.block_hash, child_hash)? { + let successor_slot = self.get_block(successor)?.slot; let child = self.get_mut_node(child_hash)?; // Graft `child` to `node`. @@ -606,7 +621,7 @@ where // Graft `node` to `child`. node.children.push(ChildLink { hash: child_hash, - successor_slot: self.get_block(successor)?.slot, + successor_slot, }); // Detach `child` from `prev_in_tree`, replacing it with `node`. prev_in_tree.replace_child_hash(child_hash, node.block_hash)?; @@ -640,7 +655,6 @@ where // block, has `prev_in_tree` as the parent and has both `node` and `child` // as children. let common_ancestor = Node { - block_hash: ancestor_hash, parent_hash: Some(prev_in_tree.block_hash), children: vec![ ChildLink { @@ -656,7 +670,7 @@ where .find_ancestor_successor_slot(ancestor_hash, child_hash)?, }, ], - ..Node::default() + ..Node::new(ancestor_hash) }; let child = self.get_mut_node(child_hash)?; @@ -698,24 +712,23 @@ where Ok(()) } - /// For the given block `hash`, find it's highest (by slot) ancestor that exists in the reduced + /// For the given block `hash`, find its highest (by slot) ancestor that exists in the reduced /// tree. - fn find_prev_in_tree(&mut self, hash: Hash256) -> Option { - self.iter_ancestors(hash) - .ok()? + fn find_prev_in_tree(&mut self, node: &Node) -> Option { + self.iter_ancestors(node.block_hash, false) .take_while(|(_, slot)| *slot >= self.root_slot()) - .find(|(root, _slot)| self.nodes.contains_key(root)) - .and_then(|(root, _slot)| Some(root)) + .find(|(root, _)| self.nodes.contains_key(root)) + .map(|(root, _)| root) } /// For the two given block roots (`a_root` and `b_root`), find the first block they share in /// the tree. Viz, find the block that these two distinct blocks forked from. fn find_highest_common_ancestor(&self, a_root: Hash256, b_root: Hash256) -> Result { let mut a_iter = self - .iter_ancestors(a_root)? + .iter_ancestors(a_root, false) .take_while(|(_, slot)| *slot >= self.root_slot()); let mut b_iter = self - .iter_ancestors(b_root)? + .iter_ancestors(b_root, false) .take_while(|(_, slot)| *slot >= self.root_slot()); // Combines the `next()` fns on the `a_iter` and `b_iter` and returns the roots of two @@ -753,11 +766,17 @@ where } } - fn iter_ancestors(&self, child: Hash256) -> Result> { - let block = self.get_block(child)?; - let state = self.get_state(block.state_root, block.slot)?; - - Ok(BlockRootsIterator::owned(self.store.clone(), state)) + /// Return an iterator from the given `block_root` back to finalization. + /// + /// If `include_latest` is true, then the hash and slot for `block_root` will be included. + pub fn iter_ancestors<'a>( + &'a self, + block_root: Hash256, + include_latest: bool, + ) -> impl Iterator + 'a { + self.block_root_tree + .every_slot_iter_from(block_root) + .skip(if include_latest { 0 } else { 1 }) } /// Verify the integrity of `self`. Returns `Ok(())` if the tree has integrity, otherwise returns `Err(description)`. @@ -842,28 +861,26 @@ where .ok_or_else(|| Error::MissingBlock(block_root)) } - fn get_state(&self, state_root: Hash256, slot: Slot) -> Result> { - self.store - .get_state(&state_root, Some(slot))? - .ok_or_else(|| Error::MissingState(state_root)) - } - fn root_slot(&self) -> Slot { self.root.1 } fn as_bytes(&self) -> Vec { - let reduced_tree_ssz: ReducedTreeSsz = ReducedTreeSsz::from_reduced_tree(&self); + let reduced_tree_ssz = ReducedTreeSsz::from_reduced_tree(&self); reduced_tree_ssz.as_ssz_bytes() } - fn from_bytes(bytes: &[u8], store: Arc) -> Result { + fn from_bytes( + bytes: &[u8], + store: Arc, + block_root_tree: Arc, + ) -> Result { let reduced_tree_ssz = ReducedTreeSsz::from_ssz_bytes(bytes)?; - Ok(reduced_tree_ssz.to_reduced_tree(store)?) + Ok(reduced_tree_ssz.to_reduced_tree(store, block_root_tree)?) } } -#[derive(Default, Clone, Debug, PartialEq, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Encode, Decode)] pub struct Node { /// Hash of the parent node in the reduced tree (not necessarily parent block). pub parent_hash: Option, @@ -884,6 +901,16 @@ pub struct ChildLink { } impl Node { + pub fn new(block_hash: Hash256) -> Self { + Self { + parent_hash: None, + children: vec![], + weight: 0, + block_hash, + voters: vec![], + } + } + /// Replace a child with a new child, whilst preserving the successor slot. /// /// The new child should have the same ancestor successor block as the old one. @@ -977,14 +1004,17 @@ mod tests { #[test] fn test_reduced_tree_ssz() { let store = Arc::new(MemoryStore::::open()); + let block_root_tree = Arc::new(BlockRootTree::new(Hash256::zero(), Slot::new(0))); let tree = ReducedTree::new( store.clone(), + block_root_tree.clone(), &BeaconBlock::empty(&MinimalEthSpec::default_spec()), Hash256::zero(), ); let ssz_tree = ReducedTreeSsz::from_reduced_tree(&tree); let bytes = tree.as_bytes(); - let recovered_tree = ReducedTree::from_bytes(&bytes, store.clone()).unwrap(); + let recovered_tree = + ReducedTree::from_bytes(&bytes, store.clone(), block_root_tree).unwrap(); let recovered_ssz = ReducedTreeSsz::from_reduced_tree(&recovered_tree); assert_eq!(ssz_tree, recovered_ssz); diff --git a/eth2/lmd_ghost/tests/test.rs b/eth2/lmd_ghost/tests/test.rs index 8a1734b22f..f0a952d70a 100644 --- a/eth2/lmd_ghost/tests/test.rs +++ b/eth2/lmd_ghost/tests/test.rs @@ -130,6 +130,7 @@ impl ForkedHarness { ThreadSafeReducedTree::new( Arc::new(store), + self.harness.chain.block_root_tree.clone(), &self.genesis_block, self.genesis_block_root, ) diff --git a/eth2/types/src/beacon_state.rs b/eth2/types/src/beacon_state.rs index 1cc148f4fa..43886d4f5b 100644 --- a/eth2/types/src/beacon_state.rs +++ b/eth2/types/src/beacon_state.rs @@ -464,6 +464,21 @@ impl BeaconState { Ok(hash(&preimage)) } + /// Get the canonical root of the `latest_block_header`, filling in its state root if necessary. + /// + /// It needs filling in on all slots where there isn't a skip. + /// + /// Spec v0.9.1 + pub fn get_latest_block_root(&self, current_state_root: Hash256) -> Hash256 { + if self.latest_block_header.state_root.is_zero() { + let mut latest_block_header = self.latest_block_header.clone(); + latest_block_header.state_root = current_state_root; + latest_block_header.canonical_root() + } else { + self.latest_block_header.canonical_root() + } + } + /// Safely obtains the index for latest block roots, given some `slot`. /// /// Spec v0.9.1