From a2d071e681aa82e71d1a97ffc5fbe3647583c8c8 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Sat, 30 Nov 2019 13:33:07 +1100 Subject: [PATCH] Add tests for BeaconChain persistence + fix bugs --- beacon_node/beacon_chain/src/beacon_chain.rs | 44 ++++++- beacon_node/beacon_chain/src/builder.rs | 60 ++++----- beacon_node/beacon_chain/src/fork_choice.rs | 41 +++++- beacon_node/beacon_chain/src/head_tracker.rs | 20 ++- .../src/persisted_beacon_chain.rs | 6 +- beacon_node/beacon_chain/src/test_utils.rs | 43 ++++++- .../beacon_chain/tests/persistence_tests.rs | 120 ++++++++++++++++++ beacon_node/beacon_chain/tests/store_tests.rs | 2 +- eth2/operation_pool/src/persistence.rs | 2 +- 9 files changed, 289 insertions(+), 49 deletions(-) create mode 100644 beacon_node/beacon_chain/tests/persistence_tests.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9b2d670233..a47030440e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -140,12 +140,35 @@ impl BeaconChain { pub fn persist(&self) -> Result<(), Error> { let timer = metrics::start_timer(&metrics::PERSIST_CHAIN); + let canonical_head = self.head(); + + let finalized_checkpoint = { + let beacon_block_root = canonical_head.beacon_state.finalized_checkpoint.root; + let beacon_block = self + .store + .get::>(&beacon_block_root)? + .ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?; + let beacon_state_root = beacon_block.state_root; + let beacon_state = self + .store + .get_state(&beacon_state_root, Some(beacon_block.slot))? + .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; + + CheckPoint { + beacon_block_root, + beacon_block, + beacon_state_root, + beacon_state, + } + }; + let p: PersistedBeaconChain = PersistedBeaconChain { - canonical_head: self.canonical_head.read().clone(), + canonical_head, + finalized_checkpoint, op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool), genesis_block_root: self.genesis_block_root, ssz_head_tracker: self.head_tracker.to_ssz_container(), - fork_choice_ssz_bytes: self.fork_choice.as_bytes(), + fork_choice: self.fork_choice.as_ssz_container(), }; let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); @@ -1610,6 +1633,23 @@ impl BeaconChain { } } +impl Drop for BeaconChain { + fn drop(&mut self) { + if let Err(e) = self.persist() { + error!( + self.log, + "Failed to persist BeaconChain on drop"; + "error" => format!("{:?}", e) + ) + } else { + info!( + self.log, + "Saved beacon chain state"; + ) + } + } +} + fn write_state(prefix: &str, state: &BeaconState, log: &Logger) { if WRITE_BLOCK_PROCESSING_SSZ { let root = Hash256::from_slice(&state.tree_hash_root()); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 620b996f7c..3066aa096b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -213,15 +213,16 @@ where self.op_pool = Some( p.op_pool + .clone() .into_operation_pool(&p.canonical_head.beacon_state, &self.spec), ); - self.finalized_checkpoint = Some(p.canonical_head); + self.finalized_checkpoint = Some(p.finalized_checkpoint.clone()); self.genesis_block_root = Some(p.genesis_block_root); - self.head_tracker = HeadTracker::from_ssz_container(&p.ssz_head_tracker) .map_err(|e| error!(log, "Failed to decode head tracker for database: {:?}", e)) .ok(); + self.persisted_beacon_chain = Some(p); Ok(self) } @@ -273,28 +274,6 @@ where Ok(self.empty_op_pool()) } - /// Sets the `BeaconChain` fork choice backend. - /// - /// Requires the store and state to have been specified earlier in the build chain. - pub fn fork_choice_backend(mut self, backend: TLmdGhost) -> Result { - let store = self - .store - .clone() - .ok_or_else(|| "reduced_tree_fork_choice requires a store")?; - let genesis_block_root = self - .genesis_block_root - .ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?; - - self.fork_choice = Some(ForkChoice::new( - store, - backend, - genesis_block_root, - self.spec.genesis_slot, - )); - - Ok(self) - } - /// Sets the `BeaconChain` eth1 backend. pub fn eth1_backend(mut self, backend: Option) -> Self { self.eth1_chain = backend.map(Eth1Chain::new); @@ -346,9 +325,12 @@ where >, String, > { - let mut canonical_head = self - .finalized_checkpoint - .ok_or_else(|| "Cannot build without a state".to_string())?; + let mut canonical_head = if let Some(persisted_beacon_chain) = self.persisted_beacon_chain { + persisted_beacon_chain.canonical_head + } else { + self.finalized_checkpoint + .ok_or_else(|| "Cannot build without a state".to_string())? + }; canonical_head .beacon_state @@ -428,29 +410,39 @@ where /// `ThreadSafeReducedTree` backend. /// /// Requires the store and state to be initialized. - pub fn reduced_tree_fork_choice(self) -> Result { + pub fn reduced_tree_fork_choice(mut self) -> Result { let store = self .store .clone() .ok_or_else(|| "reduced_tree_fork_choice requires a store")?; - let backend = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain { - ThreadSafeReducedTree::from_bytes(&persisted_beacon_chain.fork_choice_ssz_bytes, store) - .map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))? + let fork_choice = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain { + ForkChoice::from_ssz_container( + persisted_beacon_chain.fork_choice.clone(), + store.clone(), + ) + .map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))? } else { let finalized_checkpoint = &self .finalized_checkpoint .as_ref() .expect("should have finalized checkpoint"); + let genesis_block_root = self + .genesis_block_root + .ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?; - ThreadSafeReducedTree::new( + let backend = ThreadSafeReducedTree::new( store.clone(), &finalized_checkpoint.beacon_block, finalized_checkpoint.beacon_block_root, - ) + ); + + ForkChoice::new(store, backend, genesis_block_root, self.spec.genesis_slot) }; - self.fork_choice_backend(backend) + self.fork_choice = Some(fork_choice); + + Ok(self) } } diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 9ac7a80005..e0434b3f04 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -1,6 +1,7 @@ use crate::{errors::BeaconChainError, metrics, BeaconChain, BeaconChainTypes}; use lmd_ghost::LmdGhost; use parking_lot::RwLock; +use ssz_derive::{Decode, Encode}; use state_processing::{common::get_attesting_indices, per_slot_processing}; use std::sync::Arc; use store::{Error as StoreError, Store}; @@ -292,13 +293,41 @@ impl ForkChoice { .map_err(Into::into) } - /// Returns a byte-level representation of the present state of the fork choice cache. - /// - /// This simply calls `as_bytes()`, on the backend. To decode these bytes, decode the backend - /// directly then use `Self::new(..)`. - pub fn as_bytes(&self) -> Vec { - self.backend.as_bytes() + /// Returns a `SszForkChoice` which contains the current state of `Self`. + pub fn as_ssz_container(&self) -> SszForkChoice { + SszForkChoice { + genesis_block_root: self.genesis_block_root.clone(), + justified_checkpoint: self.justified_checkpoint.read().clone(), + best_justified_checkpoint: self.best_justified_checkpoint.read().clone(), + backend_bytes: self.backend.as_bytes(), + } } + + /// Instantiates `Self` from a prior `SszForkChoice`. + /// + /// The created `Self` will have the same state as the `Self` that created the `SszForkChoice`. + pub fn from_ssz_container(ssz_container: SszForkChoice, store: Arc) -> Result { + let backend = LmdGhost::from_bytes(&ssz_container.backend_bytes, store.clone())?; + + Ok(Self { + store, + backend, + genesis_block_root: ssz_container.genesis_block_root, + justified_checkpoint: RwLock::new(ssz_container.justified_checkpoint), + best_justified_checkpoint: RwLock::new(ssz_container.best_justified_checkpoint), + }) + } +} + +/// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes. +/// +/// This is used when persisting the state of the `BeaconChain` to disk. +#[derive(Encode, Decode, Clone)] +pub struct SszForkChoice { + genesis_block_root: Hash256, + justified_checkpoint: Checkpoint, + best_justified_checkpoint: Checkpoint, + backend_bytes: Vec, } impl From for Error { diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs index c3588b7e92..f1a7b4d19a 100644 --- a/beacon_node/beacon_chain/src/head_tracker.rs +++ b/beacon_node/beacon_chain/src/head_tracker.rs @@ -9,16 +9,29 @@ pub enum Error { MismatchingLengths { roots_len: usize, slots_len: usize }, } -#[derive(Encode, Decode)] +/// Helper struct that is used to encode/decode the state of the `HeadTracker` as SSZ bytes. +/// +/// This is used when persisting the state of the `BeaconChain` to disk. +#[derive(Encode, Decode, Clone)] pub struct SszHeadTracker { roots: Vec, slots: Vec, } +/// Maintains a list of `BeaconChain` head block roots and slots. +/// +/// Each time a new block is imported, it should be applied to the `Self::register_block` function. +/// In order for this struct to be effective, every single block that is imported must be +/// registered here. #[derive(Default, Debug)] pub struct HeadTracker(RwLock>); impl HeadTracker { + /// Register a block with `Self`, so it may or may not be included in a `Self::heads` call. + /// + /// This function assumes that no block is imported without its parent having already been + /// imported. It cannot detect an error if this is not the case, it is the responsibility of + /// the upstream user. pub fn register_block(&self, block_root: Hash256, block: &BeaconBlock) { let mut map = self.0.write(); @@ -26,6 +39,7 @@ impl HeadTracker { map.insert(block_root, block.slot); } + /// Returns the list of heads in the chain. pub fn heads(&self) -> Vec<(Hash256, Slot)> { self.0 .read() @@ -34,6 +48,8 @@ impl HeadTracker { .collect() } + /// Returns a `SszHeadTracker`, which contains all necessary information to restore the state + /// of `Self` at some later point. pub fn to_ssz_container(&self) -> SszHeadTracker { let (roots, slots) = self .0 @@ -45,6 +61,8 @@ impl HeadTracker { SszHeadTracker { roots, slots } } + /// Creates a new `Self` from the given `SszHeadTracker`, restoring `Self` to the same state of + /// the `Self` that created the `SszHeadTracker`. pub fn from_ssz_container(ssz_container: &SszHeadTracker) -> Result { let roots_len = ssz_container.roots.len(); let slots_len = ssz_container.slots.len(); diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index c98dbeec14..08b75420d2 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -1,3 +1,4 @@ +use crate::fork_choice::SszForkChoice; use crate::head_tracker::SszHeadTracker; use crate::{BeaconChainTypes, CheckPoint}; use operation_pool::PersistedOperationPool; @@ -9,13 +10,14 @@ use types::Hash256; /// 32-byte key for accessing the `PersistedBeaconChain`. pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA"; -#[derive(Encode, Decode)] +#[derive(Clone, Encode, Decode)] pub struct PersistedBeaconChain { pub canonical_head: CheckPoint, + pub finalized_checkpoint: CheckPoint, pub op_pool: PersistedOperationPool, pub genesis_block_root: Hash256, pub ssz_head_tracker: SszHeadTracker, - pub fork_choice_ssz_bytes: Vec, + pub fork_choice: SszForkChoice, } impl SimpleStoreItem for PersistedBeaconChain { diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index c52630e4f6..d467836858 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -26,6 +26,8 @@ pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KE pub use types::test_utils::generate_deterministic_keypairs; pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // 4th September 2019 + // This parameter is required by a builder, but not used because we use the `TestingSlotClock`. +pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); pub type BaseHarnessType = Witness< TStore, @@ -98,7 +100,7 @@ impl BeaconChainHarness> { .dummy_eth1_backend() .expect("should build dummy backend") .null_event_handler() - .testing_slot_clock(Duration::from_secs(1)) + .testing_slot_clock(HARNESS_SLOT_TIME) .expect("should configure testing slot clock") .reduced_tree_fork_choice() .expect("should add fork choice to builder") @@ -115,7 +117,7 @@ impl BeaconChainHarness> { impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. - pub fn with_disk_store( + pub fn new_with_disk_store( eth_spec_instance: E, store: Arc, keypairs: Vec, @@ -140,6 +142,43 @@ impl BeaconChainHarness> { .dummy_eth1_backend() .expect("should build dummy backend") .null_event_handler() + .testing_slot_clock(HARNESS_SLOT_TIME) + .expect("should configure testing slot clock") + .reduced_tree_fork_choice() + .expect("should add fork choice to builder") + .build() + .expect("should build"); + + Self { + spec: chain.spec.clone(), + chain, + keypairs, + } + } + + /// Instantiate a new harness with `validator_count` initial validators. + pub fn resume_from_disk_store( + eth_spec_instance: E, + store: Arc, + keypairs: Vec, + ) -> Self { + let spec = E::default_spec(); + + let log = TerminalLoggerBuilder::new() + .level(Severity::Warning) + .build() + .expect("logger should build"); + + let chain = BeaconChainBuilder::new(eth_spec_instance) + .logger(log.clone()) + .custom_spec(spec.clone()) + .store(store.clone()) + .store_migrator( as Migrate<_, E>>::new(store)) + .resume_from_db() + .expect("should resume beacon chain from db") + .dummy_eth1_backend() + .expect("should build dummy backend") + .null_event_handler() .testing_slot_clock(Duration::from_secs(1)) .expect("should configure testing slot clock") .reduced_tree_fork_choice() diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs new file mode 100644 index 0000000000..bcd31ac228 --- /dev/null +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -0,0 +1,120 @@ +// #![cfg(not(debug_assertions))] + +#[macro_use] +extern crate lazy_static; + +use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; +use sloggers::{null::NullLoggerBuilder, Build}; +use std::sync::Arc; +use store::DiskStore; +use tempfile::{tempdir, TempDir}; +use types::{EthSpec, Keypair, MinimalEthSpec}; + +type E = MinimalEthSpec; + +// Should ideally be divisible by 3. +pub const VALIDATOR_COUNT: usize = 24; + +lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +fn get_store(db_path: &TempDir) -> Arc { + let spec = E::default_spec(); + let hot_path = db_path.path().join("hot_db"); + let cold_path = db_path.path().join("cold_db"); + let log = NullLoggerBuilder.build().expect("logger should build"); + Arc::new( + DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"), + ) +} + +#[test] +fn finalizes_after_resuming_from_db() { + let validator_count = 16; + let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; + let first_half = num_blocks_produced / 2; + + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + let harness = BeaconChainHarness::new_with_disk_store( + MinimalEthSpec, + store.clone(), + KEYPAIRS[0..validator_count].to_vec(), + ); + + harness.advance_slot(); + + harness.extend_chain( + first_half as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + let latest_slot = harness.chain.slot().expect("should have a slot"); + + let original_head = harness.chain.head(); + let original_heads = harness.chain.heads(); + + assert_eq!( + original_head.beacon_state.slot, first_half, + "head should be half way through test" + ); + + drop(harness); + + let resumed_harness = BeaconChainHarness::resume_from_disk_store( + MinimalEthSpec, + store, + KEYPAIRS[0..validator_count].to_vec(), + ); + + // Set the slot clock of the resumed harness to be in the slot following the previous harness. + // + // This allows us to produce the block at the next slot. + resumed_harness + .chain + .slot_clock + .set_slot(latest_slot.as_u64() + 1); + + assert_eq!( + original_head, + resumed_harness.chain.head(), + "resumed head should be same as previous head" + ); + + assert_eq!( + original_heads, + resumed_harness.chain.heads(), + "resumed heads should be same as previous heads" + ); + + resumed_harness.extend_chain( + (num_blocks_produced - first_half) as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + let state = &resumed_harness.chain.head().beacon_state; + assert_eq!( + state.slot, num_blocks_produced, + "head should be at the current slot" + ); + assert_eq!( + state.current_epoch(), + num_blocks_produced / MinimalEthSpec::slots_per_epoch(), + "head should be at the expected epoch" + ); + assert_eq!( + state.current_justified_checkpoint.epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint.epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); +} diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 3b548fce46..1dd05da399 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -37,7 +37,7 @@ fn get_store(db_path: &TempDir) -> Arc { } fn get_harness(store: Arc, validator_count: usize) -> TestHarness { - let harness = BeaconChainHarness::with_disk_store( + let harness = BeaconChainHarness::new_with_disk_store( MinimalEthSpec, store, KEYPAIRS[0..validator_count].to_vec(), diff --git a/eth2/operation_pool/src/persistence.rs b/eth2/operation_pool/src/persistence.rs index bb423891a9..230e54ae75 100644 --- a/eth2/operation_pool/src/persistence.rs +++ b/eth2/operation_pool/src/persistence.rs @@ -8,7 +8,7 @@ use types::*; /// /// Operations are stored in arbitrary order, so it's not a good idea to compare instances /// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first. -#[derive(Encode, Decode)] +#[derive(Clone, Encode, Decode)] pub struct PersistedOperationPool { /// Mapping from attestation ID to attestation mappings. // We could save space by not storing the attestation ID, but it might