Add tests for BeaconChain persistence + fix bugs

This commit is contained in:
Paul Hauner
2019-11-30 13:33:07 +11:00
parent 10929e8255
commit a2d071e681
9 changed files with 289 additions and 49 deletions

View File

@@ -140,12 +140,35 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn persist(&self) -> Result<(), Error> {
let timer = metrics::start_timer(&metrics::PERSIST_CHAIN);
let canonical_head = self.head();
let finalized_checkpoint = {
let beacon_block_root = canonical_head.beacon_state.finalized_checkpoint.root;
let beacon_block = self
.store
.get::<BeaconBlock<_>>(&beacon_block_root)?
.ok_or_else(|| Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root;
let beacon_state = self
.store
.get_state(&beacon_state_root, Some(beacon_block.slot))?
.ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?;
CheckPoint {
beacon_block_root,
beacon_block,
beacon_state_root,
beacon_state,
}
};
let p: PersistedBeaconChain<T> = PersistedBeaconChain {
canonical_head: self.canonical_head.read().clone(),
canonical_head,
finalized_checkpoint,
op_pool: PersistedOperationPool::from_operation_pool(&self.op_pool),
genesis_block_root: self.genesis_block_root,
ssz_head_tracker: self.head_tracker.to_ssz_container(),
fork_choice_ssz_bytes: self.fork_choice.as_bytes(),
fork_choice: self.fork_choice.as_ssz_container(),
};
let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes());
@@ -1610,6 +1633,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
fn drop(&mut self) {
if let Err(e) = self.persist() {
error!(
self.log,
"Failed to persist BeaconChain on drop";
"error" => format!("{:?}", e)
)
} else {
info!(
self.log,
"Saved beacon chain state";
)
}
}
}
fn write_state<T: EthSpec>(prefix: &str, state: &BeaconState<T>, log: &Logger) {
if WRITE_BLOCK_PROCESSING_SSZ {
let root = Hash256::from_slice(&state.tree_hash_root());

View File

@@ -213,15 +213,16 @@ where
self.op_pool = Some(
p.op_pool
.clone()
.into_operation_pool(&p.canonical_head.beacon_state, &self.spec),
);
self.finalized_checkpoint = Some(p.canonical_head);
self.finalized_checkpoint = Some(p.finalized_checkpoint.clone());
self.genesis_block_root = Some(p.genesis_block_root);
self.head_tracker = HeadTracker::from_ssz_container(&p.ssz_head_tracker)
.map_err(|e| error!(log, "Failed to decode head tracker for database: {:?}", e))
.ok();
self.persisted_beacon_chain = Some(p);
Ok(self)
}
@@ -273,28 +274,6 @@ where
Ok(self.empty_op_pool())
}
/// Sets the `BeaconChain` fork choice backend.
///
/// Requires the store and state to have been specified earlier in the build chain.
pub fn fork_choice_backend(mut self, backend: TLmdGhost) -> Result<Self, String> {
let store = self
.store
.clone()
.ok_or_else(|| "reduced_tree_fork_choice requires a store")?;
let genesis_block_root = self
.genesis_block_root
.ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?;
self.fork_choice = Some(ForkChoice::new(
store,
backend,
genesis_block_root,
self.spec.genesis_slot,
));
Ok(self)
}
/// Sets the `BeaconChain` eth1 backend.
pub fn eth1_backend(mut self, backend: Option<TEth1Backend>) -> Self {
self.eth1_chain = backend.map(Eth1Chain::new);
@@ -346,9 +325,12 @@ where
>,
String,
> {
let mut canonical_head = self
.finalized_checkpoint
.ok_or_else(|| "Cannot build without a state".to_string())?;
let mut canonical_head = if let Some(persisted_beacon_chain) = self.persisted_beacon_chain {
persisted_beacon_chain.canonical_head
} else {
self.finalized_checkpoint
.ok_or_else(|| "Cannot build without a state".to_string())?
};
canonical_head
.beacon_state
@@ -428,29 +410,39 @@ where
/// `ThreadSafeReducedTree` backend.
///
/// Requires the store and state to be initialized.
pub fn reduced_tree_fork_choice(self) -> Result<Self, String> {
pub fn reduced_tree_fork_choice(mut self) -> Result<Self, String> {
let store = self
.store
.clone()
.ok_or_else(|| "reduced_tree_fork_choice requires a store")?;
let backend = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain {
ThreadSafeReducedTree::from_bytes(&persisted_beacon_chain.fork_choice_ssz_bytes, store)
.map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))?
let fork_choice = if let Some(persisted_beacon_chain) = &self.persisted_beacon_chain {
ForkChoice::from_ssz_container(
persisted_beacon_chain.fork_choice.clone(),
store.clone(),
)
.map_err(|e| format!("Unable to decode fork choice from db: {:?}", e))?
} else {
let finalized_checkpoint = &self
.finalized_checkpoint
.as_ref()
.expect("should have finalized checkpoint");
let genesis_block_root = self
.genesis_block_root
.ok_or_else(|| "fork_choice_backend requires a genesis_block_root")?;
ThreadSafeReducedTree::new(
let backend = ThreadSafeReducedTree::new(
store.clone(),
&finalized_checkpoint.beacon_block,
finalized_checkpoint.beacon_block_root,
)
);
ForkChoice::new(store, backend, genesis_block_root, self.spec.genesis_slot)
};
self.fork_choice_backend(backend)
self.fork_choice = Some(fork_choice);
Ok(self)
}
}

View File

@@ -1,6 +1,7 @@
use crate::{errors::BeaconChainError, metrics, BeaconChain, BeaconChainTypes};
use lmd_ghost::LmdGhost;
use parking_lot::RwLock;
use ssz_derive::{Decode, Encode};
use state_processing::{common::get_attesting_indices, per_slot_processing};
use std::sync::Arc;
use store::{Error as StoreError, Store};
@@ -292,13 +293,41 @@ impl<T: BeaconChainTypes> ForkChoice<T> {
.map_err(Into::into)
}
/// Returns a byte-level representation of the present state of the fork choice cache.
///
/// This simply calls `as_bytes()`, on the backend. To decode these bytes, decode the backend
/// directly then use `Self::new(..)`.
pub fn as_bytes(&self) -> Vec<u8> {
self.backend.as_bytes()
/// Returns a `SszForkChoice` which contains the current state of `Self`.
pub fn as_ssz_container(&self) -> SszForkChoice {
SszForkChoice {
genesis_block_root: self.genesis_block_root.clone(),
justified_checkpoint: self.justified_checkpoint.read().clone(),
best_justified_checkpoint: self.best_justified_checkpoint.read().clone(),
backend_bytes: self.backend.as_bytes(),
}
}
/// Instantiates `Self` from a prior `SszForkChoice`.
///
/// The created `Self` will have the same state as the `Self` that created the `SszForkChoice`.
pub fn from_ssz_container(ssz_container: SszForkChoice, store: Arc<T::Store>) -> Result<Self> {
let backend = LmdGhost::from_bytes(&ssz_container.backend_bytes, store.clone())?;
Ok(Self {
store,
backend,
genesis_block_root: ssz_container.genesis_block_root,
justified_checkpoint: RwLock::new(ssz_container.justified_checkpoint),
best_justified_checkpoint: RwLock::new(ssz_container.best_justified_checkpoint),
})
}
}
/// Helper struct that is used to encode/decode the state of the `ForkChoice` as SSZ bytes.
///
/// This is used when persisting the state of the `BeaconChain` to disk.
#[derive(Encode, Decode, Clone)]
pub struct SszForkChoice {
genesis_block_root: Hash256,
justified_checkpoint: Checkpoint,
best_justified_checkpoint: Checkpoint,
backend_bytes: Vec<u8>,
}
impl From<BeaconStateError> for Error {

View File

@@ -9,16 +9,29 @@ pub enum Error {
MismatchingLengths { roots_len: usize, slots_len: usize },
}
#[derive(Encode, Decode)]
/// Helper struct that is used to encode/decode the state of the `HeadTracker` as SSZ bytes.
///
/// This is used when persisting the state of the `BeaconChain` to disk.
#[derive(Encode, Decode, Clone)]
pub struct SszHeadTracker {
roots: Vec<Hash256>,
slots: Vec<Slot>,
}
/// Maintains a list of `BeaconChain` head block roots and slots.
///
/// Each time a new block is imported, it should be applied to the `Self::register_block` function.
/// In order for this struct to be effective, every single block that is imported must be
/// registered here.
#[derive(Default, Debug)]
pub struct HeadTracker(RwLock<HashMap<Hash256, Slot>>);
impl HeadTracker {
/// Register a block with `Self`, so it may or may not be included in a `Self::heads` call.
///
/// This function assumes that no block is imported without its parent having already been
/// imported. It cannot detect an error if this is not the case, it is the responsibility of
/// the upstream user.
pub fn register_block<E: EthSpec>(&self, block_root: Hash256, block: &BeaconBlock<E>) {
let mut map = self.0.write();
@@ -26,6 +39,7 @@ impl HeadTracker {
map.insert(block_root, block.slot);
}
/// Returns the list of heads in the chain.
pub fn heads(&self) -> Vec<(Hash256, Slot)> {
self.0
.read()
@@ -34,6 +48,8 @@ impl HeadTracker {
.collect()
}
/// Returns a `SszHeadTracker`, which contains all necessary information to restore the state
/// of `Self` at some later point.
pub fn to_ssz_container(&self) -> SszHeadTracker {
let (roots, slots) = self
.0
@@ -45,6 +61,8 @@ impl HeadTracker {
SszHeadTracker { roots, slots }
}
/// Creates a new `Self` from the given `SszHeadTracker`, restoring `Self` to the same state of
/// the `Self` that created the `SszHeadTracker`.
pub fn from_ssz_container(ssz_container: &SszHeadTracker) -> Result<Self, Error> {
let roots_len = ssz_container.roots.len();
let slots_len = ssz_container.slots.len();

View File

@@ -1,3 +1,4 @@
use crate::fork_choice::SszForkChoice;
use crate::head_tracker::SszHeadTracker;
use crate::{BeaconChainTypes, CheckPoint};
use operation_pool::PersistedOperationPool;
@@ -9,13 +10,14 @@ use types::Hash256;
/// 32-byte key for accessing the `PersistedBeaconChain`.
pub const BEACON_CHAIN_DB_KEY: &str = "PERSISTEDBEACONCHAINPERSISTEDBEA";
#[derive(Encode, Decode)]
#[derive(Clone, Encode, Decode)]
pub struct PersistedBeaconChain<T: BeaconChainTypes> {
pub canonical_head: CheckPoint<T::EthSpec>,
pub finalized_checkpoint: CheckPoint<T::EthSpec>,
pub op_pool: PersistedOperationPool<T::EthSpec>,
pub genesis_block_root: Hash256,
pub ssz_head_tracker: SszHeadTracker,
pub fork_choice_ssz_bytes: Vec<u8>,
pub fork_choice: SszForkChoice,
}
impl<T: BeaconChainTypes> SimpleStoreItem for PersistedBeaconChain<T> {

View File

@@ -26,6 +26,8 @@ pub use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KE
pub use types::test_utils::generate_deterministic_keypairs;
pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // 4th September 2019
// This parameter is required by a builder, but not used because we use the `TestingSlotClock`.
pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1);
pub type BaseHarnessType<TStore, TStoreMigrator, TEthSpec> = Witness<
TStore,
@@ -98,7 +100,7 @@ impl<E: EthSpec> BeaconChainHarness<HarnessType<E>> {
.dummy_eth1_backend()
.expect("should build dummy backend")
.null_event_handler()
.testing_slot_clock(Duration::from_secs(1))
.testing_slot_clock(HARNESS_SLOT_TIME)
.expect("should configure testing slot clock")
.reduced_tree_fork_choice()
.expect("should add fork choice to builder")
@@ -115,7 +117,7 @@ impl<E: EthSpec> BeaconChainHarness<HarnessType<E>> {
impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
/// Instantiate a new harness with `validator_count` initial validators.
pub fn with_disk_store(
pub fn new_with_disk_store(
eth_spec_instance: E,
store: Arc<DiskStore>,
keypairs: Vec<Keypair>,
@@ -140,6 +142,43 @@ impl<E: EthSpec> BeaconChainHarness<DiskHarnessType<E>> {
.dummy_eth1_backend()
.expect("should build dummy backend")
.null_event_handler()
.testing_slot_clock(HARNESS_SLOT_TIME)
.expect("should configure testing slot clock")
.reduced_tree_fork_choice()
.expect("should add fork choice to builder")
.build()
.expect("should build");
Self {
spec: chain.spec.clone(),
chain,
keypairs,
}
}
/// Instantiate a new harness with `validator_count` initial validators.
pub fn resume_from_disk_store(
eth_spec_instance: E,
store: Arc<DiskStore>,
keypairs: Vec<Keypair>,
) -> Self {
let spec = E::default_spec();
let log = TerminalLoggerBuilder::new()
.level(Severity::Warning)
.build()
.expect("logger should build");
let chain = BeaconChainBuilder::new(eth_spec_instance)
.logger(log.clone())
.custom_spec(spec.clone())
.store(store.clone())
.store_migrator(<BlockingMigrator<_> as Migrate<_, E>>::new(store))
.resume_from_db()
.expect("should resume beacon chain from db")
.dummy_eth1_backend()
.expect("should build dummy backend")
.null_event_handler()
.testing_slot_clock(Duration::from_secs(1))
.expect("should configure testing slot clock")
.reduced_tree_fork_choice()

View File

@@ -0,0 +1,120 @@
// #![cfg(not(debug_assertions))]
#[macro_use]
extern crate lazy_static;
use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy};
use sloggers::{null::NullLoggerBuilder, Build};
use std::sync::Arc;
use store::DiskStore;
use tempfile::{tempdir, TempDir};
use types::{EthSpec, Keypair, MinimalEthSpec};
type E = MinimalEthSpec;
// Should ideally be divisible by 3.
pub const VALIDATOR_COUNT: usize = 24;
lazy_static! {
/// A cached set of keys.
static ref KEYPAIRS: Vec<Keypair> = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT);
}
fn get_store(db_path: &TempDir) -> Arc<DiskStore> {
let spec = E::default_spec();
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let log = NullLoggerBuilder.build().expect("logger should build");
Arc::new(
DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"),
)
}
#[test]
fn finalizes_after_resuming_from_db() {
let validator_count = 16;
let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5;
let first_half = num_blocks_produced / 2;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = BeaconChainHarness::new_with_disk_store(
MinimalEthSpec,
store.clone(),
KEYPAIRS[0..validator_count].to_vec(),
);
harness.advance_slot();
harness.extend_chain(
first_half as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let latest_slot = harness.chain.slot().expect("should have a slot");
let original_head = harness.chain.head();
let original_heads = harness.chain.heads();
assert_eq!(
original_head.beacon_state.slot, first_half,
"head should be half way through test"
);
drop(harness);
let resumed_harness = BeaconChainHarness::resume_from_disk_store(
MinimalEthSpec,
store,
KEYPAIRS[0..validator_count].to_vec(),
);
// Set the slot clock of the resumed harness to be in the slot following the previous harness.
//
// This allows us to produce the block at the next slot.
resumed_harness
.chain
.slot_clock
.set_slot(latest_slot.as_u64() + 1);
assert_eq!(
original_head,
resumed_harness.chain.head(),
"resumed head should be same as previous head"
);
assert_eq!(
original_heads,
resumed_harness.chain.heads(),
"resumed heads should be same as previous heads"
);
resumed_harness.extend_chain(
(num_blocks_produced - first_half) as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
);
let state = &resumed_harness.chain.head().beacon_state;
assert_eq!(
state.slot, num_blocks_produced,
"head should be at the current slot"
);
assert_eq!(
state.current_epoch(),
num_blocks_produced / MinimalEthSpec::slots_per_epoch(),
"head should be at the expected epoch"
);
assert_eq!(
state.current_justified_checkpoint.epoch,
state.current_epoch() - 1,
"the head should be justified one behind the current epoch"
);
assert_eq!(
state.finalized_checkpoint.epoch,
state.current_epoch() - 2,
"the head should be finalized two behind the current epoch"
);
}

View File

@@ -37,7 +37,7 @@ fn get_store(db_path: &TempDir) -> Arc<DiskStore> {
}
fn get_harness(store: Arc<DiskStore>, validator_count: usize) -> TestHarness {
let harness = BeaconChainHarness::with_disk_store(
let harness = BeaconChainHarness::new_with_disk_store(
MinimalEthSpec,
store,
KEYPAIRS[0..validator_count].to_vec(),

View File

@@ -8,7 +8,7 @@ use types::*;
///
/// Operations are stored in arbitrary order, so it's not a good idea to compare instances
/// of this type (or its encoded form) for equality. Convert back to an `OperationPool` first.
#[derive(Encode, Decode)]
#[derive(Clone, Encode, Decode)]
pub struct PersistedOperationPool<T: EthSpec> {
/// Mapping from attestation ID to attestation mappings.
// We could save space by not storing the attestation ID, but it might