diff --git a/Cargo.lock b/Cargo.lock index ca606bf5bd..b4d7a413f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -432,7 +432,6 @@ dependencies = [ "serde_json", "serde_yaml", "slog", - "slog-term", "sloggers", "slot_clock", "smallvec 1.4.2", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index b249c17282..c4252e793d 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" default = ["participation_metrics"] write_ssz_files = [] # Writes debugging .ssz files to /tmp during block processing. participation_metrics = [] # Exposes validator participation metrics to Prometheus. +test_logger = [] # Print log output to stderr when running tests instead of dropping it [dev-dependencies] int_to_bytes = { path = "../../consensus/int_to_bytes" } @@ -30,7 +31,6 @@ serde_derive = "1.0.116" serde_yaml = "0.8.13" serde_json = "1.0.58" slog = { version = "2.5.2", features = ["max_level_trace"] } -slog-term = "2.6.0" sloggers = "1.0.1" slot_clock = { path = "../../common/slot_clock" } eth2_hashing = "0.1.0" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 1af92df7fa..b1843308b4 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -12,13 +12,13 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; use crate::head_tracker::HeadTracker; -use crate::migrate::Migrate; +use crate::migrate::BackgroundMigrator; use crate::naive_aggregation_pool::{Error as NaiveAggregationError, NaiveAggregationPool}; use crate::observed_attestations::{Error as AttestationObservationError, ObservedAttestations}; use crate::observed_attesters::{ObservedAggregators, ObservedAttesters}; use crate::observed_block_producers::ObservedBlockProducers; use crate::observed_operations::{ObservationOutcome, ObservedOperations}; -use crate::persisted_beacon_chain::PersistedBeaconChain; +use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_fork_choice::PersistedForkChoice; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; use crate::snapshot_cache::SnapshotCache; @@ -47,7 +47,7 @@ use std::io::prelude::*; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, HotColdDB, StoreOp}; +use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp}; use types::*; pub type ForkChoiceError = fork_choice::Error; @@ -153,7 +153,6 @@ pub struct HeadInfo { pub trait BeaconChainTypes: Send + Sync + 'static { type HotStore: store::ItemStore; type ColdStore: store::ItemStore; - type StoreMigrator: Migrate; type SlotClock: slot_clock::SlotClock; type Eth1Chain: Eth1ChainBackend; type EthSpec: types::EthSpec; @@ -169,7 +168,7 @@ pub struct BeaconChain { /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. pub store: Arc>, /// Database migrator for running background maintenance on the store. - pub store_migrator: T::StoreMigrator, + pub store_migrator: BackgroundMigrator, /// Reports the current slot, typically based upon the system clock. pub slot_clock: T::SlotClock, /// Stores all operations (e.g., `Attestation`, `Deposit`, etc) that are candidates for @@ -237,53 +236,49 @@ pub struct BeaconChain { type BeaconBlockAndState = (BeaconBlock, BeaconState); impl BeaconChain { - /// Persists the core `BeaconChain` components (including the head block) and the fork choice. + /// Persists the head tracker and fork choice. /// - /// ## Notes: - /// - /// In this function we first obtain the head, persist fork choice, then persist the head. We - /// do it in this order to ensure that the persisted head is always from a time prior to fork - /// choice. - /// - /// We want to ensure that the head never out dates the fork choice to avoid having references - /// to blocks that do not exist in fork choice. + /// We do it atomically even though no guarantees need to be made about blocks from + /// the head tracker also being present in fork choice. pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> { - let canonical_head_block_root = self - .canonical_head - .try_read_for(HEAD_LOCK_TIMEOUT) - .ok_or_else(|| Error::CanonicalHeadLockTimeout)? - .beacon_block_root; + let mut batch = vec![]; - let persisted_head = PersistedBeaconChain { - canonical_head_block_root, - genesis_block_root: self.genesis_block_root, - ssz_head_tracker: self.head_tracker.to_ssz_container(), - }; + let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); + batch.push(self.persist_head_in_batch()); - let fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); + let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE); + batch.push(self.persist_fork_choice_in_batch()); - let fork_choice = self.fork_choice.read(); - - self.store.put_item( - &FORK_CHOICE_DB_KEY, - &PersistedForkChoice { - fork_choice: fork_choice.to_persisted(), - fork_choice_store: fork_choice.fc_store().to_persisted(), - }, - )?; - - drop(fork_choice); - - metrics::stop_timer(fork_choice_timer); - let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD); - - self.store.put_item(&BEACON_CHAIN_DB_KEY, &persisted_head)?; - - metrics::stop_timer(head_timer); + self.store.hot_db.do_atomically(batch)?; Ok(()) } + /// Return a `PersistedBeaconChain` representing the current head. + pub fn make_persisted_head(&self) -> PersistedBeaconChain { + PersistedBeaconChain { + _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, + genesis_block_root: self.genesis_block_root, + ssz_head_tracker: self.head_tracker.to_ssz_container(), + } + } + + /// Return a database operation for writing the beacon chain head to disk. + pub fn persist_head_in_batch(&self) -> KeyValueStoreOp { + self.make_persisted_head() + .as_kv_store_op(BEACON_CHAIN_DB_KEY) + } + + /// Return a database operation for writing fork choice to disk. + pub fn persist_fork_choice_in_batch(&self) -> KeyValueStoreOp { + let fork_choice = self.fork_choice.read(); + let persisted_fork_choice = PersistedForkChoice { + fork_choice: fork_choice.to_persisted(), + fork_choice_store: fork_choice.fc_store().to_persisted(), + }; + persisted_fork_choice.as_kv_store_op(FORK_CHOICE_DB_KEY) + } + /// Persists `self.op_pool` to disk. /// /// ## Notes @@ -1991,11 +1986,7 @@ impl BeaconChain { }); if new_finalized_checkpoint.epoch != old_finalized_checkpoint.epoch { - self.after_finalization( - old_finalized_checkpoint, - new_finalized_checkpoint, - new_finalized_state_root, - )?; + self.after_finalization(new_finalized_checkpoint, new_finalized_state_root)?; } let _ = self.event_handler.register(EventKind::BeaconHeadChanged { @@ -2076,7 +2067,6 @@ impl BeaconChain { /// Performs pruning and finality-based optimizations. fn after_finalization( &self, - old_finalized_checkpoint: Checkpoint, new_finalized_checkpoint: Checkpoint, new_finalized_state_root: Hash256, ) -> Result<(), Error> { @@ -2112,9 +2102,8 @@ impl BeaconChain { self.store_migrator.process_finalization( new_finalized_state_root.into(), finalized_state, - self.head_tracker.clone(), - old_finalized_checkpoint, new_finalized_checkpoint, + self.head_tracker.clone(), )?; let _ = self.event_handler.register(EventKind::BeaconFinalization { @@ -2430,11 +2419,6 @@ impl BeaconChain { let mut file = std::fs::File::create(file_name).unwrap(); self.dump_as_dot(&mut file); } - - // Should be used in tests only - pub fn set_graffiti(&mut self, graffiti: Graffiti) { - self.graffiti = graffiti; - } } impl Drop for BeaconChain { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index a251ced2d2..0c99c35f8b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -4,7 +4,7 @@ use crate::beacon_chain::{ use crate::eth1_chain::{CachingEth1Backend, SszEth1}; use crate::events::NullEventHandler; use crate::head_tracker::HeadTracker; -use crate::migrate::Migrate; +use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::persisted_fork_choice::PersistedForkChoice; use crate::shuffling_cache::ShufflingCache; @@ -21,7 +21,7 @@ use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use operation_pool::{OperationPool, PersistedOperationPool}; use parking_lot::RwLock; -use slog::{info, Logger}; +use slog::{crit, info, Logger}; use slot_clock::{SlotClock, TestingSlotClock}; use std::marker::PhantomData; use std::path::PathBuf; @@ -37,17 +37,8 @@ pub const PUBKEY_CACHE_FILENAME: &str = "pubkey_cache.ssz"; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. -pub struct Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, ->( +pub struct Witness( PhantomData<( - TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, @@ -57,21 +48,11 @@ pub struct Witness< )>, ); -impl - BeaconChainTypes - for Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - > +impl BeaconChainTypes + for Witness where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, - TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -79,7 +60,6 @@ where { type HotStore = THotStore; type ColdStore = TColdStore; - type StoreMigrator = TStoreMigrator; type SlotClock = TSlotClock; type Eth1Chain = TEth1Backend; type EthSpec = TEthSpec; @@ -97,7 +77,7 @@ where pub struct BeaconChainBuilder { #[allow(clippy::type_complexity)] store: Option>>, - store_migrator: Option, + store_migrator_config: Option, pub genesis_time: Option, genesis_block_root: Option, #[allow(clippy::type_complexity)] @@ -120,22 +100,13 @@ pub struct BeaconChainBuilder { graffiti: Graffiti, } -impl +impl BeaconChainBuilder< - Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, > where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, - TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -148,7 +119,7 @@ where pub fn new(_eth_spec_instance: TEthSpec) -> Self { Self { store: None, - store_migrator: None, + store_migrator_config: None, genesis_time: None, genesis_block_root: None, fork_choice: None, @@ -195,9 +166,9 @@ where self } - /// Sets the store migrator. - pub fn store_migrator(mut self, store_migrator: TStoreMigrator) -> Self { - self.store_migrator = Some(store_migrator); + /// Sets the store migrator config (optional). + pub fn store_migrator_config(mut self, config: MigratorConfig) -> Self { + self.store_migrator_config = Some(config); self } @@ -448,15 +419,7 @@ where self, ) -> Result< BeaconChain< - Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, >, String, > { @@ -548,13 +511,19 @@ where .map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e)) })?; + let migrator_config = self.store_migrator_config.unwrap_or_default(); + let store_migrator = BackgroundMigrator::new( + store.clone(), + migrator_config, + genesis_block_root, + log.clone(), + ); + let beacon_chain = BeaconChain { spec: self.spec, config: self.chain_config, store, - store_migrator: self - .store_migrator - .ok_or_else(|| "Cannot build without store migrator".to_string())?, + store_migrator, slot_clock, op_pool: self .op_pool @@ -634,10 +603,9 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStoreMigrator, TSlotClock, CachingEth1Backend, TEthSpec, @@ -649,7 +617,6 @@ impl where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, - TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -675,22 +642,13 @@ where } } -impl +impl BeaconChainBuilder< - Witness< - TStoreMigrator, - TestingSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, > where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, - TStoreMigrator: Migrate + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -713,10 +671,9 @@ where } } -impl +impl BeaconChainBuilder< Witness< - TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, @@ -728,7 +685,6 @@ impl where THotStore: ItemStore + 'static, TColdStore: ItemStore + 'static, - TStoreMigrator: Migrate + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -760,7 +716,6 @@ fn genesis_block( #[cfg(test)] mod test { use super::*; - use crate::migrate::NullMigrator; use eth2_hashing::hash; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; use sloggers::{null::NullLoggerBuilder, Build}; @@ -805,7 +760,6 @@ mod test { let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(Arc::new(store)) - .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state(genesis_state) .expect("should build state using recent genesis") diff --git a/beacon_node/beacon_chain/src/head_tracker.rs b/beacon_node/beacon_chain/src/head_tracker.rs index 4f338c0c22..6d6ab6e120 100644 --- a/beacon_node/beacon_chain/src/head_tracker.rs +++ b/beacon_node/beacon_chain/src/head_tracker.rs @@ -15,7 +15,7 @@ pub enum Error { /// In order for this struct to be effective, every single block that is imported must be /// registered here. #[derive(Default, Debug)] -pub struct HeadTracker(RwLock>); +pub struct HeadTracker(pub RwLock>); impl HeadTracker { /// Register a block with `Self`, so it may or may not be included in a `Self::heads` call. @@ -29,13 +29,6 @@ impl HeadTracker { map.insert(block_root, slot); } - /// Removes abandoned head. - pub fn remove_head(&self, block_root: Hash256) { - let mut map = self.0.write(); - debug_assert!(map.contains_key(&block_root)); - map.remove(&block_root); - } - /// Returns true iff `block_root` is a recognized head. pub fn contains_head(&self, block_root: Hash256) -> bool { self.0.read().contains_key(&block_root) @@ -53,14 +46,7 @@ impl HeadTracker { /// Returns a `SszHeadTracker`, which contains all necessary information to restore the state /// of `Self` at some later point. pub fn to_ssz_container(&self) -> SszHeadTracker { - let (roots, slots) = self - .0 - .read() - .iter() - .map(|(hash, slot)| (*hash, *slot)) - .unzip(); - - SszHeadTracker { roots, slots } + SszHeadTracker::from_map(&*self.0.read()) } /// Creates a new `Self` from the given `SszHeadTracker`, restoring `Self` to the same state of @@ -103,6 +89,13 @@ pub struct SszHeadTracker { slots: Vec, } +impl SszHeadTracker { + pub fn from_map(map: &HashMap) -> Self { + let (roots, slots) = map.iter().map(|(hash, slot)| (*hash, *slot)).unzip(); + SszHeadTracker { roots, slots } + } +} + #[cfg(test)] mod test { use super::*; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 793179da8d..37704a94eb 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -1,11 +1,4 @@ #![recursion_limit = "128"] // For lazy-static -#[macro_use] -extern crate lazy_static; - -#[macro_use] -extern crate slog; -extern crate slog_term; - pub mod attestation_verification; mod beacon_chain; mod beacon_fork_choice_store; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index be2a5626f5..42fac5aa37 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1,4 +1,5 @@ use crate::{BeaconChain, BeaconChainTypes}; +use lazy_static::lazy_static; pub use lighthouse_metrics::*; use slot_clock::SlotClock; use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 9730d49f00..ce80232e33 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -1,5 +1,7 @@ +use crate::beacon_chain::BEACON_CHAIN_DB_KEY; use crate::errors::BeaconChainError; -use crate::head_tracker::HeadTracker; +use crate::head_tracker::{HeadTracker, SszHeadTracker}; +use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; use slog::{debug, warn, Logger}; use std::collections::{HashMap, HashSet}; @@ -9,13 +11,49 @@ use std::sync::Arc; use std::thread; use store::hot_cold_store::{migrate_database, HotColdDBError}; use store::iter::RootsIterator; -use store::{Error, ItemStore, StoreOp}; +use store::{Error, ItemStore, StoreItem, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use types::{ - BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, EthSpec, Hash256, + BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, SignedBeaconBlockHash, Slot, }; +/// The background migrator runs a thread to perform pruning and migrate state from the hot +/// to the cold database. +pub struct BackgroundMigrator, Cold: ItemStore> { + db: Arc>, + #[allow(clippy::type_complexity)] + tx_thread: Option< + Mutex<( + mpsc::Sender>, + thread::JoinHandle<()>, + )>, + >, + latest_checkpoint: Arc>, + /// Genesis block root, for persisting the `PersistedBeaconChain`. + genesis_block_root: Hash256, + log: Logger, +} + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct MigratorConfig { + pub blocking: bool, +} + +impl MigratorConfig { + pub fn blocking(mut self) -> Self { + self.blocking = true; + self + } +} + +/// Pruning can be successful, or in rare cases deferred to a later point. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PruningOutcome { + Successful, + DeferredConcurrentMutation, +} + /// Logic errors that can occur during pruning, none of these should ever happen. #[derive(Debug)] pub enum PruningError { @@ -30,29 +68,177 @@ pub enum PruningError { UnexpectedUnequalStateRoots, } -/// Trait for migration processes that update the database upon finalization. -pub trait Migrate, Cold: ItemStore>: - Send + Sync + 'static -{ - fn new(db: Arc>, log: Logger) -> Self; +/// Message sent to the migration thread containing the information it needs to run. +pub struct MigrationNotification { + finalized_state_root: BeaconStateHash, + finalized_state: BeaconState, + finalized_checkpoint: Checkpoint, + head_tracker: Arc, + latest_checkpoint: Arc>, + genesis_block_root: Hash256, +} - fn process_finalization( +impl, Cold: ItemStore> BackgroundMigrator { + /// Create a new `BackgroundMigrator` and spawn its thread if necessary. + pub fn new( + db: Arc>, + config: MigratorConfig, + genesis_block_root: Hash256, + log: Logger, + ) -> Self { + let tx_thread = if config.blocking { + None + } else { + Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone()))) + }; + let latest_checkpoint = Arc::new(Mutex::new(Checkpoint { + root: Hash256::zero(), + epoch: Epoch::new(0), + })); + Self { + db, + tx_thread, + latest_checkpoint, + genesis_block_root, + log, + } + } + + /// Process a finalized checkpoint from the `BeaconChain`. + /// + /// If successful, all forks descending from before the `finalized_checkpoint` will be + /// pruned, and the split point of the database will be advanced to the slot of the finalized + /// checkpoint. + pub fn process_finalization( &self, - _finalized_state_root: BeaconStateHash, - _new_finalized_state: BeaconState, - _head_tracker: Arc, - _old_finalized_checkpoint: Checkpoint, - _new_finalized_checkpoint: Checkpoint, + finalized_state_root: BeaconStateHash, + finalized_state: BeaconState, + finalized_checkpoint: Checkpoint, + head_tracker: Arc, ) -> Result<(), BeaconChainError> { + let notif = MigrationNotification { + finalized_state_root, + finalized_state, + finalized_checkpoint, + head_tracker, + latest_checkpoint: self.latest_checkpoint.clone(), + genesis_block_root: self.genesis_block_root, + }; + + // Async path, on the background thread. + if let Some(tx_thread) = &self.tx_thread { + let (ref mut tx, ref mut thread) = *tx_thread.lock(); + + // Restart the background thread if it has crashed. + if let Err(tx_err) = tx.send(notif) { + let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone()); + + *tx = new_tx; + let old_thread = mem::replace(thread, new_thread); + + // Join the old thread, which will probably have panicked, or may have + // halted normally just now as a result of us dropping the old `mpsc::Sender`. + if let Err(thread_err) = old_thread.join() { + warn!( + self.log, + "Migration thread died, so it was restarted"; + "reason" => format!("{:?}", thread_err) + ); + } + + // Retry at most once, we could recurse but that would risk overflowing the stack. + let _ = tx.send(tx_err.0); + } + } + // Synchronous path, on the current thread. + else { + Self::run_migration(self.db.clone(), notif, &self.log) + } + Ok(()) } + /// Perform the actual work of `process_finalization`. + fn run_migration( + db: Arc>, + notif: MigrationNotification, + log: &Logger, + ) { + let mut latest_checkpoint = notif.latest_checkpoint.lock(); + let finalized_state_root = notif.finalized_state_root; + let finalized_state = notif.finalized_state; + + match Self::prune_abandoned_forks( + db.clone(), + notif.head_tracker, + finalized_state_root, + &finalized_state, + *latest_checkpoint, + notif.finalized_checkpoint, + notif.genesis_block_root, + log, + ) { + Ok(PruningOutcome::DeferredConcurrentMutation) => { + warn!( + log, + "Pruning deferred because of a concurrent mutation"; + "message" => "this is expected only very rarely!" + ); + return; + } + Ok(PruningOutcome::Successful) => { + // Update the migrator's idea of the latest checkpoint only if the + // pruning process was successful. + *latest_checkpoint = notif.finalized_checkpoint; + } + Err(e) => { + warn!(log, "Block pruning failed"; "error" => format!("{:?}", e)); + return; + } + }; + + match migrate_database(db, finalized_state_root.into(), &finalized_state) { + Ok(()) => {} + Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { + debug!( + log, + "Database migration postponed, unaligned finalized block"; + "slot" => slot.as_u64() + ); + } + Err(e) => { + warn!( + log, + "Database migration failed"; + "error" => format!("{:?}", e) + ); + } + }; + } + + /// Spawn a new child thread to run the migration process. + /// + /// Return a channel handle for sending new finalized states to the thread. + fn spawn_thread( + db: Arc>, + log: Logger, + ) -> ( + mpsc::Sender>, + thread::JoinHandle<()>, + ) { + let (tx, rx) = mpsc::channel(); + let thread = thread::spawn(move || { + while let Ok(notif) = rx.recv() { + Self::run_migration(db.clone(), notif, &log); + } + }); + (tx, thread) + } + /// Traverses live heads and prunes blocks and states of chains that we know can't be built /// upon because finalization would prohibit it. This is an optimisation intended to save disk /// space. - /// - /// Assumptions: - /// * It is called after every finalization. + #[allow(clippy::too_many_arguments)] fn prune_abandoned_forks( store: Arc>, head_tracker: Arc, @@ -60,13 +246,9 @@ pub trait Migrate, Cold: ItemStore>: new_finalized_state: &BeaconState, old_finalized_checkpoint: Checkpoint, new_finalized_checkpoint: Checkpoint, + genesis_block_root: Hash256, log: &Logger, - ) -> Result<(), BeaconChainError> { - // There will never be any blocks to prune if there is only a single head in the chain. - if head_tracker.heads().len() == 1 { - return Ok(()); - } - + ) -> Result { let old_finalized_slot = old_finalized_checkpoint .epoch .start_slot(E::slots_per_epoch()); @@ -120,7 +302,10 @@ pub trait Migrate, Cold: ItemStore>: let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new(); let mut abandoned_heads: HashSet = HashSet::new(); - for (head_hash, head_slot) in head_tracker.heads() { + let heads = head_tracker.heads(); + debug!(log, "Pruning {} heads", heads.len()); + + for (head_hash, head_slot) in heads { let mut potentially_abandoned_head = Some(head_hash); let mut potentially_abandoned_blocks = vec![]; @@ -161,6 +346,7 @@ pub trait Migrate, Cold: ItemStore>: "head_block_root" => format!("{:?}", head_hash), "head_slot" => head_slot, ); + potentially_abandoned_head.take(); break; } } @@ -229,6 +415,25 @@ pub trait Migrate, Cold: ItemStore>: } } + // Update the head tracker before the database, so that we maintain the invariant + // that a block present in the head tracker is present in the database. + // See https://github.com/sigp/lighthouse/issues/1557 + let mut head_tracker_lock = head_tracker.0.write(); + + // Check that all the heads to be deleted are still present. The absence of any + // head indicates a race, that will likely resolve itself, so we defer pruning until + // later. + for head_hash in &abandoned_heads { + if !head_tracker_lock.contains_key(head_hash) { + return Ok(PruningOutcome::DeferredConcurrentMutation); + } + } + + // Then remove them for real. + for head_hash in abandoned_heads { + head_tracker_lock.remove(&head_hash); + } + let batch: Vec> = abandoned_blocks .into_iter() .map(StoreOp::DeleteBlock) @@ -239,203 +444,21 @@ pub trait Migrate, Cold: ItemStore>: ) .collect(); - store.do_atomically(batch)?; - for head_hash in abandoned_heads.into_iter() { - head_tracker.remove_head(head_hash); - } + let mut kv_batch = store.convert_to_kv_batch(&batch)?; + // Persist the head in case the process is killed or crashes here. This prevents + // the head tracker reverting after our mutation above. + let persisted_head = PersistedBeaconChain { + _canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT, + genesis_block_root, + ssz_head_tracker: SszHeadTracker::from_map(&*head_tracker_lock), + }; + drop(head_tracker_lock); + kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY)); + + store.hot_db.do_atomically(kv_batch)?; debug!(log, "Database pruning complete"); - Ok(()) - } -} - -/// Migrator that does nothing, for stores that don't need migration. -pub struct NullMigrator; - -impl, Cold: ItemStore> Migrate for NullMigrator { - fn process_finalization( - &self, - _finalized_state_root: BeaconStateHash, - _new_finalized_state: BeaconState, - _head_tracker: Arc, - _old_finalized_checkpoint: Checkpoint, - _new_finalized_checkpoint: Checkpoint, - ) -> Result<(), BeaconChainError> { - Ok(()) - } - - fn new(_: Arc>, _: Logger) -> Self { - NullMigrator - } -} - -/// Migrator that immediately calls the store's migration function, blocking the current execution. -/// -/// Mostly useful for tests. -pub struct BlockingMigrator, Cold: ItemStore> { - db: Arc>, - log: Logger, -} - -impl, Cold: ItemStore> Migrate - for BlockingMigrator -{ - fn new(db: Arc>, log: Logger) -> Self { - BlockingMigrator { db, log } - } - - fn process_finalization( - &self, - finalized_state_root: BeaconStateHash, - new_finalized_state: BeaconState, - head_tracker: Arc, - old_finalized_checkpoint: Checkpoint, - new_finalized_checkpoint: Checkpoint, - ) -> Result<(), BeaconChainError> { - Self::prune_abandoned_forks( - self.db.clone(), - head_tracker, - finalized_state_root, - &new_finalized_state, - old_finalized_checkpoint, - new_finalized_checkpoint, - &self.log, - )?; - - match migrate_database( - self.db.clone(), - finalized_state_root.into(), - &new_finalized_state, - ) { - Ok(()) => Ok(()), - Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { - debug!( - self.log, - "Database migration postponed, unaligned finalized block"; - "slot" => slot.as_u64() - ); - Ok(()) - } - Err(e) => Err(e.into()), - } - } -} - -type MpscSender = mpsc::Sender<( - BeaconStateHash, - BeaconState, - Arc, - Checkpoint, - Checkpoint, -)>; - -/// Migrator that runs a background thread to migrate state from the hot to the cold database. -pub struct BackgroundMigrator, Cold: ItemStore> { - db: Arc>, - tx_thread: Mutex<(MpscSender, thread::JoinHandle<()>)>, - log: Logger, -} - -impl, Cold: ItemStore> Migrate - for BackgroundMigrator -{ - fn new(db: Arc>, log: Logger) -> Self { - let tx_thread = Mutex::new(Self::spawn_thread(db.clone(), log.clone())); - Self { db, tx_thread, log } - } - - fn process_finalization( - &self, - finalized_state_root: BeaconStateHash, - new_finalized_state: BeaconState, - head_tracker: Arc, - old_finalized_checkpoint: Checkpoint, - new_finalized_checkpoint: Checkpoint, - ) -> Result<(), BeaconChainError> { - let (ref mut tx, ref mut thread) = *self.tx_thread.lock(); - - if let Err(tx_err) = tx.send(( - finalized_state_root, - new_finalized_state, - head_tracker, - old_finalized_checkpoint, - new_finalized_checkpoint, - )) { - let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone()); - - *tx = new_tx; - let old_thread = mem::replace(thread, new_thread); - - // Join the old thread, which will probably have panicked, or may have - // halted normally just now as a result of us dropping the old `mpsc::Sender`. - if let Err(thread_err) = old_thread.join() { - warn!( - self.log, - "Migration thread died, so it was restarted"; - "reason" => format!("{:?}", thread_err) - ); - } - - // Retry at most once, we could recurse but that would risk overflowing the stack. - let _ = tx.send(tx_err.0); - } - - Ok(()) - } -} - -impl, Cold: ItemStore> BackgroundMigrator { - /// Spawn a new child thread to run the migration process. - /// - /// Return a channel handle for sending new finalized states to the thread. - fn spawn_thread( - db: Arc>, - log: Logger, - ) -> (MpscSender, thread::JoinHandle<()>) { - let (tx, rx) = mpsc::channel(); - let thread = thread::spawn(move || { - while let Ok(( - state_root, - state, - head_tracker, - old_finalized_checkpoint, - new_finalized_checkpoint, - )) = rx.recv() - { - match Self::prune_abandoned_forks( - db.clone(), - head_tracker, - state_root, - &state, - old_finalized_checkpoint, - new_finalized_checkpoint, - &log, - ) { - Ok(()) => {} - Err(e) => warn!(log, "Block pruning failed: {:?}", e), - } - - match migrate_database(db.clone(), state_root.into(), &state) { - Ok(()) => {} - Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { - debug!( - log, - "Database migration postponed, unaligned finalized block"; - "slot" => slot.as_u64() - ); - } - Err(e) => { - warn!( - log, - "Database migration failed"; - "error" => format!("{:?}", e) - ); - } - }; - } - }); - - (tx, thread) + Ok(PruningOutcome::Successful) } } diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index dcdf710d67..adb68def0d 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -4,16 +4,19 @@ use ssz_derive::{Decode, Encode}; use store::{DBColumn, Error as StoreError, StoreItem}; use types::Hash256; +/// Dummy value to use for the canonical head block root, see below. +pub const DUMMY_CANONICAL_HEAD_BLOCK_ROOT: Hash256 = Hash256::repeat_byte(0xff); + #[derive(Clone, Encode, Decode)] pub struct PersistedBeaconChain { /// This value is ignored to resolve the issue described here: /// /// https://github.com/sigp/lighthouse/pull/1639 /// - /// The following PR will clean-up and remove this field: + /// Its removal is tracked here: /// - /// https://github.com/sigp/lighthouse/pull/1638 - pub canonical_head_block_root: Hash256, + /// https://github.com/sigp/lighthouse/issues/1784 + pub _canonical_head_block_root: Hash256, pub genesis_block_root: Hash256, pub ssz_head_tracker: SszHeadTracker, } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 17dff57d1a..faa58ed07b 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,9 +1,9 @@ -pub use crate::beacon_chain::{ - BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY, -}; -use crate::migrate::{BlockingMigrator, Migrate, NullMigrator}; pub use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::slog::Drain; +pub use crate::{ + beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, + migrate::MigratorConfig, + BeaconChainError, +}; use crate::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, @@ -12,11 +12,12 @@ use crate::{ }; use futures::channel::mpsc::Receiver; use genesis::interop_genesis_state; +use parking_lot::Mutex; use rand::rngs::StdRng; use rand::Rng; use rand_core::SeedableRng; use rayon::prelude::*; -use sloggers::{null::NullLoggerBuilder, Build}; +use slog::Logger; use slot_clock::TestingSlotClock; use state_processing::per_slot_processing; use std::borrow::Cow; @@ -28,10 +29,10 @@ use tempfile::{tempdir, TempDir}; use tree_hash::TreeHash; use types::{ AggregateSignature, Attestation, AttestationData, AttesterSlashing, BeaconState, - BeaconStateHash, ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Hash256, IndexedAttestation, - Keypair, ProposerSlashing, SelectionProof, SignedAggregateAndProof, SignedBeaconBlock, - SignedBeaconBlockHash, SignedRoot, SignedVoluntaryExit, Slot, SubnetId, VariableList, - VoluntaryExit, + BeaconStateHash, ChainSpec, Checkpoint, Domain, Epoch, EthSpec, Graffiti, Hash256, + IndexedAttestation, Keypair, ProposerSlashing, SelectionProof, SignedAggregateAndProof, + SignedBeaconBlock, SignedBeaconBlockHash, SignedRoot, SignedVoluntaryExit, Slot, SubnetId, + VariableList, VoluntaryExit, }; pub use types::test_utils::generate_deterministic_keypairs; @@ -41,8 +42,7 @@ pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // This parameter is required by a builder but not used because we use the `TestingSlotClock`. pub const HARNESS_SLOT_TIME: Duration = Duration::from_secs(1); -pub type BaseHarnessType = Witness< - TStoreMigrator, +pub type BaseHarnessType = Witness< TestingSlotClock, CachingEth1Backend, TEthSpec, @@ -51,16 +51,8 @@ pub type BaseHarnessType = Witn TColdStore, >; -pub type NullMigratorEphemeralHarnessType = - BaseHarnessType, MemoryStore>; -pub type BlockingMigratorDiskHarnessType = - BaseHarnessType, LevelDB>, E, LevelDB, LevelDB>; -pub type BlockingMigratorEphemeralHarnessType = BaseHarnessType< - BlockingMigrator, MemoryStore>, - E, - MemoryStore, - MemoryStore, ->; +pub type DiskHarnessType = BaseHarnessType, LevelDB>; +pub type EphemeralHarnessType = BaseHarnessType, MemoryStore>; pub type AddBlocksResult = ( HashMap, @@ -94,10 +86,29 @@ pub enum AttestationStrategy { SomeValidators(Vec), } -fn make_rng() -> StdRng { +fn make_rng() -> Mutex { // Nondeterminism in tests is a highly undesirable thing. Seed the RNG to some arbitrary // but fixed value for reproducibility. - StdRng::seed_from_u64(0x0DDB1A5E5BAD5EEDu64) + Mutex::new(StdRng::seed_from_u64(0x0DDB1A5E5BAD5EEDu64)) +} + +/// Return a logger suitable for test usage. +/// +/// By default no logs will be printed, but they can be enabled via the `test_logger` feature. +/// +/// We've tried the `slog_term::TestStdoutWriter` in the past, but found it too buggy because +/// of the threading limitation. +pub fn test_logger() -> Logger { + use sloggers::Build; + + if cfg!(feature = "test_logger") { + sloggers::terminal::TerminalLoggerBuilder::new() + .level(sloggers::types::Severity::Debug) + .build() + .unwrap() + } else { + sloggers::null::NullLoggerBuilder.build().unwrap() + } } /// A testing harness which can instantiate a `BeaconChain` and populate it with blocks and @@ -105,14 +116,14 @@ fn make_rng() -> StdRng { /// /// Used for testing. pub struct BeaconChainHarness { - pub validators_keypairs: Vec, + pub validator_keypairs: Vec, pub chain: BeaconChain, pub spec: ChainSpec, pub data_dir: TempDir, pub shutdown_receiver: Receiver<&'static str>, - pub rng: StdRng, + pub rng: Mutex, } type HarnessAttestations = Vec<( @@ -120,82 +131,37 @@ type HarnessAttestations = Vec<( Option>, )>; -impl BeaconChainHarness> { - pub fn new(eth_spec_instance: E, validators_keypairs: Vec) -> Self { - let data_dir = tempdir().unwrap(); - let mut spec = E::default_spec(); - - // Setting the target aggregators to really high means that _all_ validators in the - // committee are required to produce an aggregate. This is overkill, however with small - // validator counts it's the only way to be certain there is _at least one_ aggregator per - // committee. - spec.target_aggregators_per_committee = 1 << 32; - - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::FullFormat::new(decorator).build(); - let debug_level = slog::LevelFilter::new(drain, slog::Level::Critical); - let log = slog::Logger::root(std::sync::Mutex::new(debug_level).fuse(), o!()); - - let config = StoreConfig::default(); - let store = Arc::new(HotColdDB::open_ephemeral(config, spec.clone(), log.clone()).unwrap()); - let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - - let chain = BeaconChainBuilder::new(eth_spec_instance) - .logger(log.clone()) - .custom_spec(spec.clone()) - .store(store.clone()) - .store_migrator(BlockingMigrator::new(store, log.clone())) - .data_dir(data_dir.path().to_path_buf()) - .genesis_state( - interop_genesis_state::(&validators_keypairs, HARNESS_GENESIS_TIME, &spec) - .unwrap(), - ) - .unwrap() - .dummy_eth1_backend() - .unwrap() - .null_event_handler() - .testing_slot_clock(HARNESS_SLOT_TIME) - .unwrap() - .shutdown_sender(shutdown_tx) - .build() - .unwrap(); - - Self { - spec: chain.spec.clone(), - chain, - validators_keypairs, - data_dir, - shutdown_receiver, - rng: make_rng(), - } +impl BeaconChainHarness> { + pub fn new(eth_spec_instance: E, validator_keypairs: Vec) -> Self { + Self::new_with_store_config( + eth_spec_instance, + validator_keypairs, + StoreConfig::default(), + ) } -} -impl BeaconChainHarness> { - /// Instantiate a new harness with `validator_count` initial validators. pub fn new_with_store_config( eth_spec_instance: E, - validators_keypairs: Vec, + validator_keypairs: Vec, config: StoreConfig, ) -> Self { // Setting the target aggregators to really high means that _all_ validators in the // committee are required to produce an aggregate. This is overkill, however with small // validator counts it's the only way to be certain there is _at least one_ aggregator per // committee. - Self::new_with_target_aggregators(eth_spec_instance, validators_keypairs, 1 << 32, config) + Self::new_with_target_aggregators(eth_spec_instance, validator_keypairs, 1 << 32, config) } - /// Instantiate a new harness with `validator_count` initial validators and a custom - /// `target_aggregators_per_committee` spec value + /// Instantiate a new harness with a custom `target_aggregators_per_committee` spec value pub fn new_with_target_aggregators( eth_spec_instance: E, - validators_keypairs: Vec, + validator_keypairs: Vec, target_aggregators_per_committee: u64, store_config: StoreConfig, ) -> Self { Self::new_with_chain_config( eth_spec_instance, - validators_keypairs, + validator_keypairs, target_aggregators_per_committee, store_config, ChainConfig::default(), @@ -206,7 +172,7 @@ impl BeaconChainHarness> { /// `target_aggregators_per_committee` spec value, and a `ChainConfig` pub fn new_with_chain_config( eth_spec_instance: E, - validators_keypairs: Vec, + validator_keypairs: Vec, target_aggregators_per_committee: u64, store_config: StoreConfig, chain_config: ChainConfig, @@ -216,21 +182,19 @@ impl BeaconChainHarness> { spec.target_aggregators_per_committee = target_aggregators_per_committee; - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::FullFormat::new(decorator).build(); - let debug_level = slog::LevelFilter::new(drain, slog::Level::Critical); - let log = slog::Logger::root(std::sync::Mutex::new(debug_level).fuse(), o!()); let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); + let log = test_logger(); + let store = HotColdDB::open_ephemeral(store_config, spec.clone(), log.clone()).unwrap(); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log) .custom_spec(spec.clone()) .store(Arc::new(store)) - .store_migrator(NullMigrator) + .store_migrator_config(MigratorConfig::default().blocking()) .data_dir(data_dir.path().to_path_buf()) .genesis_state( - interop_genesis_state::(&validators_keypairs, HARNESS_GENESIS_TIME, &spec) + interop_genesis_state::(&validator_keypairs, HARNESS_GENESIS_TIME, &spec) .expect("should generate interop state"), ) .expect("should build state using recent genesis") @@ -247,7 +211,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), chain, - validators_keypairs, + validator_keypairs, data_dir, shutdown_receiver, rng: make_rng(), @@ -255,31 +219,28 @@ impl BeaconChainHarness> { } } -impl BeaconChainHarness> { +impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn new_with_disk_store( eth_spec_instance: E, store: Arc, LevelDB>>, - validators_keypairs: Vec, + validator_keypairs: Vec, ) -> Self { let data_dir = tempdir().expect("should create temporary data_dir"); let spec = E::default_spec(); - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::FullFormat::new(decorator).build(); - let debug_level = slog::LevelFilter::new(drain, slog::Level::Critical); - let log = slog::Logger::root(std::sync::Mutex::new(debug_level).fuse(), o!()); + let log = test_logger(); let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) .custom_spec(spec.clone()) .import_max_skip_slots(None) - .store(store.clone()) - .store_migrator(BlockingMigrator::new(store, log.clone())) + .store(store) + .store_migrator_config(MigratorConfig::default().blocking()) .data_dir(data_dir.path().to_path_buf()) .genesis_state( - interop_genesis_state::(&validators_keypairs, HARNESS_GENESIS_TIME, &spec) + interop_genesis_state::(&validator_keypairs, HARNESS_GENESIS_TIME, &spec) .expect("should generate interop state"), ) .expect("should build state using recent genesis") @@ -295,7 +256,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), chain, - validators_keypairs, + validator_keypairs, data_dir, shutdown_receiver, rng: make_rng(), @@ -303,28 +264,25 @@ impl BeaconChainHarness> { } } -impl BeaconChainHarness> { +impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn resume_from_disk_store( eth_spec_instance: E, store: Arc, LevelDB>>, - validators_keypairs: Vec, + validator_keypairs: Vec, data_dir: TempDir, ) -> Self { let spec = E::default_spec(); - let log = NullLoggerBuilder.build().expect("logger should build"); + let log = test_logger(); let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); let chain = BeaconChainBuilder::new(eth_spec_instance) .logger(log.clone()) .custom_spec(spec) .import_max_skip_slots(None) - .store(store.clone()) - .store_migrator( as Migrate>::new( - store, - log.clone(), - )) + .store(store) + .store_migrator_config(MigratorConfig::default().blocking()) .data_dir(data_dir.path().to_path_buf()) .resume_from_db() .expect("should resume beacon chain from db") @@ -340,7 +298,7 @@ impl BeaconChainHarness> { Self { spec: chain.spec.clone(), chain, - validators_keypairs, + validator_keypairs, data_dir, shutdown_receiver, rng: make_rng(), @@ -348,15 +306,18 @@ impl BeaconChainHarness> { } } -impl BeaconChainHarness> +impl BeaconChainHarness> where - M: Migrate, E: EthSpec, Hot: ItemStore, Cold: ItemStore, { + pub fn logger(&self) -> &slog::Logger { + &self.chain.log + } + pub fn get_all_validators(&self) -> Vec { - (0..self.validators_keypairs.len()).collect() + (0..self.validator_keypairs.len()).collect() } pub fn slots_per_epoch(&self) -> u64 { @@ -411,7 +372,7 @@ where } pub fn make_block( - &mut self, + &self, mut state: BeaconState, slot: Slot, ) -> (SignedBeaconBlock, BeaconState) { @@ -432,7 +393,7 @@ where // If we produce two blocks for the same slot, they hash up to the same value and // BeaconChain errors out with `BlockIsAlreadyKnown`. Vary the graffiti so that we produce // different blocks each time. - self.chain.set_graffiti(self.rng.gen::<[u8; 32]>().into()); + let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); let randao_reveal = { let epoch = slot.epoch(E::slots_per_epoch()); @@ -443,17 +404,17 @@ where state.genesis_validators_root, ); let message = epoch.signing_root(domain); - let sk = &self.validators_keypairs[proposer_index].sk; + let sk = &self.validator_keypairs[proposer_index].sk; sk.sign(message) }; let (block, state) = self .chain - .produce_block_on_state(state, slot, randao_reveal, None) + .produce_block_on_state(state, slot, randao_reveal, Some(graffiti)) .unwrap(); let signed_block = block.sign( - &self.validators_keypairs[proposer_index].sk, + &self.validator_keypairs[proposer_index].sk, &state.fork, state.genesis_validators_root, &self.spec, @@ -513,7 +474,7 @@ where let mut agg_sig = AggregateSignature::infinity(); agg_sig.add_assign( - &self.validators_keypairs[*validator_index].sk.sign(message), + &self.validator_keypairs[*validator_index].sk.sign(message), ); agg_sig @@ -586,7 +547,7 @@ where let selection_proof = SelectionProof::new::( state.slot, - &self.validators_keypairs[*validator_index].sk, + &self.validator_keypairs[*validator_index].sk, &state.fork, state.genesis_validators_root, &self.spec, @@ -616,7 +577,7 @@ where aggregator_index as u64, aggregate, None, - &self.validators_keypairs[aggregator_index].sk, + &self.validator_keypairs[aggregator_index].sk, &state.fork, state.genesis_validators_root, &self.spec, @@ -659,7 +620,7 @@ where for attestation in &mut [&mut attestation_1, &mut attestation_2] { for &i in &attestation.attesting_indices { - let sk = &self.validators_keypairs[i as usize].sk; + let sk = &self.validator_keypairs[i as usize].sk; let fork = self.chain.head_info().unwrap().fork; let genesis_validators_root = self.chain.genesis_validators_root; @@ -694,7 +655,7 @@ where let mut block_header_2 = block_header_1.clone(); block_header_2.state_root = Hash256::zero(); - let sk = &self.validators_keypairs[validator_index as usize].sk; + let sk = &self.validator_keypairs[validator_index as usize].sk; let fork = self.chain.head_info().unwrap().fork; let genesis_validators_root = self.chain.genesis_validators_root; @@ -712,7 +673,7 @@ where } pub fn make_voluntary_exit(&self, validator_index: u64, epoch: Epoch) -> SignedVoluntaryExit { - let sk = &self.validators_keypairs[validator_index as usize].sk; + let sk = &self.validator_keypairs[validator_index as usize].sk; let fork = self.chain.head_info().unwrap().fork; let genesis_validators_root = self.chain.genesis_validators_root; @@ -723,19 +684,21 @@ where .sign(sk, &fork, genesis_validators_root, &self.chain.spec) } - pub fn process_block(&self, slot: Slot, block: SignedBeaconBlock) -> SignedBeaconBlockHash { - assert_eq!(self.chain.slot().unwrap(), slot); - let block_hash: SignedBeaconBlockHash = self.chain.process_block(block).unwrap().into(); - self.chain.fork_choice().unwrap(); - block_hash - } - - pub fn process_block_result( + pub fn process_block( &self, slot: Slot, block: SignedBeaconBlock, ) -> Result> { - assert_eq!(self.chain.slot().unwrap(), slot); + self.set_current_slot(slot); + let block_hash: SignedBeaconBlockHash = self.chain.process_block(block)?.into(); + self.chain.fork_choice()?; + Ok(block_hash) + } + + pub fn process_block_result( + &self, + block: SignedBeaconBlock, + ) -> Result> { let block_hash: SignedBeaconBlockHash = self.chain.process_block(block)?.into(); self.chain.fork_choice().unwrap(); Ok(block_hash) @@ -780,14 +743,14 @@ where } pub fn add_block_at_slot( - &mut self, + &self, slot: Slot, state: BeaconState, - ) -> (SignedBeaconBlockHash, SignedBeaconBlock, BeaconState) { + ) -> Result<(SignedBeaconBlockHash, SignedBeaconBlock, BeaconState), BlockError> { self.set_current_slot(slot); let (block, new_state) = self.make_block(state, slot); - let block_hash = self.process_block(slot, block.clone()); - (block_hash, block, new_state) + let block_hash = self.process_block(slot, block.clone())?; + Ok((block_hash, block, new_state)) } pub fn attest_block( @@ -803,18 +766,18 @@ where } pub fn add_attested_block_at_slot( - &mut self, + &self, slot: Slot, state: BeaconState, validators: &[usize], - ) -> (SignedBeaconBlockHash, BeaconState) { - let (block_hash, block, state) = self.add_block_at_slot(slot, state); + ) -> Result<(SignedBeaconBlockHash, BeaconState), BlockError> { + let (block_hash, block, state) = self.add_block_at_slot(slot, state)?; self.attest_block(&state, block_hash, &block, validators); - (block_hash, state) + Ok((block_hash, state)) } pub fn add_attested_blocks_at_slots( - &mut self, + &self, state: BeaconState, slots: &[Slot], validators: &[usize], @@ -824,7 +787,7 @@ where } fn add_attested_blocks_at_slots_given_lbh( - &mut self, + &self, mut state: BeaconState, slots: &[Slot], validators: &[usize], @@ -837,7 +800,9 @@ where let mut block_hash_from_slot: HashMap = HashMap::new(); let mut state_hash_from_slot: HashMap = HashMap::new(); for slot in slots { - let (block_hash, new_state) = self.add_attested_block_at_slot(*slot, state, validators); + let (block_hash, new_state) = self + .add_attested_block_at_slot(*slot, state, validators) + .unwrap(); state = new_state; block_hash_from_slot.insert(*slot, block_hash); state_hash_from_slot.insert(*slot, state.tree_hash_root().into()); @@ -859,7 +824,7 @@ where /// /// Chains is a vec of `(state, slots, validators)` tuples. pub fn add_blocks_on_multiple_chains( - &mut self, + &self, chains: Vec<(BeaconState, Vec, Vec)>, ) -> Vec> { let slots_per_epoch = E::slots_per_epoch(); @@ -959,7 +924,7 @@ where /// /// Returns a newly created block, signed by the proposer for the given slot. pub fn build_block( - &mut self, + &self, state: BeaconState, slot: Slot, _block_strategy: BlockStrategy, @@ -979,7 +944,7 @@ where /// The `attestation_strategy` dictates which validators will attest to the newly created /// blocks. pub fn extend_chain( - &mut self, + &self, num_blocks: usize, block_strategy: BlockStrategy, attestation_strategy: AttestationStrategy, @@ -1028,7 +993,7 @@ where /// /// Returns `(honest_head, faulty_head)`, the roots of the blocks at the top of each chain. pub fn generate_two_forks_by_skipping_a_block( - &mut self, + &self, honest_validators: &[usize], faulty_validators: &[usize], honest_fork_blocks: usize, diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index a01286d64a..cb7893180e 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -26,7 +26,7 @@ lazy_static! { fn produces_attestations() { let num_blocks_produced = MainnetEthSpec::slots_per_epoch() * 4; - let mut harness = BeaconChainHarness::new_with_store_config( + let harness = BeaconChainHarness::new_with_store_config( MainnetEthSpec, KEYPAIRS[..].to_vec(), StoreConfig::default(), @@ -55,7 +55,7 @@ fn produces_attestations() { // Test all valid committee indices for all slots in the chain. for slot in 0..=current_slot.as_u64() + MainnetEthSpec::slots_per_epoch() * 3 { let slot = Slot::from(slot); - let state = chain + let mut state = chain .state_at_slot(slot, StateSkipConfig::WithStateRoots) .expect("should get state"); @@ -81,6 +81,9 @@ fn produces_attestations() { .expect("should get target block root") }; + state + .build_committee_cache(RelativeEpoch::Current, &harness.chain.spec) + .unwrap(); let committee_cache = state .committee_cache(RelativeEpoch::Current) .expect("should get committee_cache"); diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 4a8a071ccb..4d512d8415 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -5,9 +5,7 @@ extern crate lazy_static; use beacon_chain::{ attestation_verification::Error as AttnError, - test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, NullMigratorEphemeralHarnessType, - }, + test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, BeaconChainTypes, }; use int_to_bytes::int_to_bytes32; @@ -34,7 +32,7 @@ lazy_static! { } /// Returns a beacon chain harness. -fn get_harness(validator_count: usize) -> BeaconChainHarness> { +fn get_harness(validator_count: usize) -> BeaconChainHarness> { let harness = BeaconChainHarness::new_with_target_aggregators( MainnetEthSpec, KEYPAIRS[0..validator_count].to_vec(), @@ -188,7 +186,7 @@ fn get_non_aggregator( /// Tests verification of `SignedAggregateAndProof` from the gossip network. #[test] fn aggregated_gossip_verification() { - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); // Extend the chain out a few epochs so we have some chain depth to play with. harness.extend_chain( @@ -550,7 +548,7 @@ fn aggregated_gossip_verification() { /// Tests the verification conditions for an unaggregated attestation on the gossip network. #[test] fn unaggregated_gossip_verification() { - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); // Extend the chain out a few epochs so we have some chain depth to play with. harness.extend_chain( @@ -882,7 +880,7 @@ fn unaggregated_gossip_verification() { /// This also checks that we can do a state lookup if we don't get a hit from the shuffling cache. #[test] fn attestation_that_skips_epochs() { - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); // Extend the chain out a few epochs so we have some chain depth to play with. harness.extend_chain( diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 409511761a..47d7f53cf9 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -4,9 +4,7 @@ extern crate lazy_static; use beacon_chain::{ - test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, NullMigratorEphemeralHarnessType, - }, + test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconSnapshot, BlockError, }; use store::config::StoreConfig; @@ -33,7 +31,7 @@ lazy_static! { } fn get_chain_segment() -> Vec> { - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( CHAIN_SEGMENT_LENGTH, @@ -50,7 +48,7 @@ fn get_chain_segment() -> Vec> { .collect() } -fn get_harness(validator_count: usize) -> BeaconChainHarness> { +fn get_harness(validator_count: usize) -> BeaconChainHarness> { let harness = BeaconChainHarness::new_with_store_config( MainnetEthSpec, KEYPAIRS[0..validator_count].to_vec(), @@ -83,7 +81,7 @@ fn junk_aggregate_signature() -> AggregateSignature { fn update_proposal_signatures( snapshots: &mut [BeaconSnapshot], - harness: &BeaconChainHarness>, + harness: &BeaconChainHarness>, ) { for snapshot in snapshots { let spec = &harness.chain.spec; @@ -93,7 +91,7 @@ fn update_proposal_signatures( .get_beacon_proposer_index(slot, spec) .expect("should find proposer index"); let keypair = harness - .validators_keypairs + .validator_keypairs .get(proposer_index) .expect("proposer keypair should be available"); @@ -276,7 +274,7 @@ fn chain_segment_non_linear_slots() { } fn assert_invalid_signature( - harness: &BeaconChainHarness>, + harness: &BeaconChainHarness>, block_index: usize, snapshots: &[BeaconSnapshot], item: &str, @@ -327,7 +325,7 @@ fn assert_invalid_signature( // slot) tuple. } -fn get_invalid_sigs_harness() -> BeaconChainHarness> { +fn get_invalid_sigs_harness() -> BeaconChainHarness> { let harness = get_harness(VALIDATOR_COUNT); harness .chain diff --git a/beacon_node/beacon_chain/tests/op_verification.rs b/beacon_node/beacon_chain/tests/op_verification.rs index 6ca90565b2..0f4d5b1a90 100644 --- a/beacon_node/beacon_chain/tests/op_verification.rs +++ b/beacon_node/beacon_chain/tests/op_verification.rs @@ -7,7 +7,7 @@ extern crate lazy_static; use beacon_chain::observed_operations::ObservationOutcome; use beacon_chain::test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, BlockingMigratorDiskHarnessType, + AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; @@ -28,7 +28,7 @@ lazy_static! { } type E = MinimalEthSpec; -type TestHarness = BeaconChainHarness>; +type TestHarness = BeaconChainHarness>; type HotColdDB = store::HotColdDB, LevelDB>; fn get_store(db_path: &TempDir) -> Arc { @@ -57,7 +57,7 @@ fn get_harness(store: Arc, validator_count: usize) -> TestHarness { fn voluntary_exit() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); let spec = &harness.chain.spec.clone(); harness.extend_chain( diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index 1666f34bba..18ecaaf0b9 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -44,7 +44,7 @@ fn finalizes_after_resuming_from_db() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = BeaconChainHarness::new_with_disk_store( + let harness = BeaconChainHarness::new_with_disk_store( MinimalEthSpec, store.clone(), KEYPAIRS[0..validator_count].to_vec(), @@ -88,7 +88,7 @@ fn finalizes_after_resuming_from_db() { let data_dir = harness.data_dir; let original_chain = harness.chain; - let mut resumed_harness = BeaconChainHarness::resume_from_disk_store( + let resumed_harness = BeaconChainHarness::resume_from_disk_store( MinimalEthSpec, store, KEYPAIRS[0..validator_count].to_vec(), diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index e9006a6268..50eccc48fb 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,18 +1,11 @@ #![cfg(not(debug_assertions))] -#[macro_use] -extern crate lazy_static; - -#[macro_use] -extern crate slog; -extern crate slog_term; - -use crate::slog::Drain; use beacon_chain::attestation_verification::Error as AttnError; use beacon_chain::test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, BlockingMigratorDiskHarnessType, + test_logger, AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, }; use beacon_chain::BeaconSnapshot; +use lazy_static::lazy_static; use maplit::hashset; use rand::Rng; use std::collections::HashMap; @@ -38,17 +31,14 @@ lazy_static! { } type E = MinimalEthSpec; -type TestHarness = BeaconChainHarness>; +type TestHarness = BeaconChainHarness>; fn get_store(db_path: &TempDir) -> Arc, LevelDB>> { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let config = StoreConfig::default(); - - let decorator = slog_term::PlainDecorator::new(slog_term::TestStdoutWriter); - let drain = slog_term::FullFormat::new(decorator).build(); - let log = slog::Logger::root(std::sync::Mutex::new(drain).fuse(), o!()); + let log = test_logger(); Arc::new( HotColdDB::open(&hot_path, &cold_path, config, spec, log) @@ -74,7 +64,7 @@ fn full_participation_no_skips() { let num_blocks_produced = E::slots_per_epoch() * 5; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -94,7 +84,7 @@ fn randomised_skips() { let mut num_blocks_produced = 0; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let rng = &mut XorShiftRng::from_seed([42; 16]); let mut head_slot = 0; @@ -130,7 +120,7 @@ fn randomised_skips() { fn long_skip() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); // Number of blocks to create in the first run, intentionally not falling on an epoch // boundary in order to check that the DB hot -> cold migration is capable of reaching @@ -181,7 +171,7 @@ fn randao_genesis_storage() { let validator_count = 8; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), validator_count); + let harness = get_harness(store.clone(), validator_count); let num_slots = E::slots_per_epoch() * (E::epochs_per_historical_vector() - 1) as u64; @@ -242,7 +232,7 @@ fn split_slot_restore() { let split_slot = { let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let num_blocks = 4 * E::slots_per_epoch(); @@ -270,7 +260,7 @@ fn epoch_boundary_state_attestation_processing() { let num_blocks_produced = E::slots_per_epoch() * 5; let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let late_validators = vec![0, 1]; let timely_validators = (2..LOW_VALIDATOR_COUNT).collect::>(); @@ -358,7 +348,7 @@ fn delete_blocks_and_states() { let store = get_store(&db_path); let validators_keypairs = types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); - let mut harness = + let harness = BeaconChainHarness::new_with_disk_store(MinimalEthSpec, store.clone(), validators_keypairs); let unforked_blocks: u64 = 4 * E::slots_per_epoch(); @@ -481,7 +471,7 @@ fn multi_epoch_fork_valid_blocks_test( let store = get_store(&db_path); let validators_keypairs = types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT); - let mut harness = + let harness = BeaconChainHarness::new_with_disk_store(MinimalEthSpec, store, validators_keypairs); let num_fork1_blocks: u64 = num_fork1_blocks_.try_into().unwrap(); @@ -550,7 +540,7 @@ fn block_production_different_shuffling_long() { fn multiple_attestations_per_block() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store, HIGH_VALIDATOR_COUNT); + let harness = get_harness(store, HIGH_VALIDATOR_COUNT); harness.extend_chain( MainnetEthSpec::slots_per_epoch() as usize * 3, @@ -581,7 +571,7 @@ fn multiple_attestations_per_block() { fn shuffling_compatible_linear_chain() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); // Skip the block at the end of the first epoch. let head_block_root = harness.extend_chain( @@ -605,7 +595,7 @@ fn shuffling_compatible_linear_chain() { fn shuffling_compatible_missing_pivot_block() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); // Skip the block at the end of the first epoch. harness.extend_chain( @@ -769,7 +759,7 @@ fn prunes_abandoned_fork_between_two_finalized_checkpoints() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let slots_per_epoch = rig.slots_per_epoch(); let mut state = rig.get_current_state(); @@ -862,7 +852,7 @@ fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let slots_per_epoch = rig.slots_per_epoch(); let state = rig.get_current_state(); @@ -977,7 +967,7 @@ fn pruning_does_not_touch_blocks_prior_to_finalization() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let slots_per_epoch = rig.slots_per_epoch(); let mut state = rig.get_current_state(); @@ -1059,7 +1049,7 @@ fn prunes_fork_growing_past_youngest_finalized_checkpoint() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let state = rig.get_current_state(); // Fill up 0th epoch with canonical chain blocks @@ -1180,7 +1170,7 @@ fn prunes_skipped_slots_states() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let state = rig.get_current_state(); let canonical_slots_zeroth_epoch: Vec = @@ -1290,7 +1280,7 @@ fn finalizes_non_epoch_start_slot() { let validators_keypairs = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let adversarial_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); - let mut rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); + let rig = BeaconChainHarness::new(MinimalEthSpec, validators_keypairs); let state = rig.get_current_state(); let canonical_slots_zeroth_epoch: Vec = @@ -1530,7 +1520,7 @@ fn pruning_test( let db_path = tempdir().unwrap(); let store = get_store(&db_path); - let mut harness = get_harness(store.clone(), VALIDATOR_COUNT); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); let honest_validators: Vec = (0..HONEST_VALIDATOR_COUNT).collect(); let faulty_validators: Vec = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect(); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index cd8b564787..d74f953a0f 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -6,7 +6,7 @@ extern crate lazy_static; use beacon_chain::{ attestation_verification::Error as AttnError, test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, NullMigratorEphemeralHarnessType, + AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, OP_POOL_DB_KEY, }, }; @@ -25,9 +25,7 @@ lazy_static! { static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); } -fn get_harness( - validator_count: usize, -) -> BeaconChainHarness> { +fn get_harness(validator_count: usize) -> BeaconChainHarness> { let harness = BeaconChainHarness::new_with_store_config( MinimalEthSpec, KEYPAIRS[0..validator_count].to_vec(), @@ -67,7 +65,7 @@ fn massive_skips() { fn iterators() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 2 - 1; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -142,7 +140,7 @@ fn iterators() { #[test] fn chooses_fork() { - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); let two_thirds = (VALIDATOR_COUNT / 3) * 2; let delay = MinimalEthSpec::default_spec().min_attestation_inclusion_delay as usize; @@ -193,7 +191,7 @@ fn chooses_fork() { fn finalizes_with_full_participation() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -228,7 +226,7 @@ fn finalizes_with_full_participation() { fn finalizes_with_two_thirds_participation() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); let two_thirds = (VALIDATOR_COUNT / 3) * 2; let attesters = (0..two_thirds).collect(); @@ -271,7 +269,7 @@ fn finalizes_with_two_thirds_participation() { fn does_not_finalize_with_less_than_two_thirds_participation() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); let two_thirds = (VALIDATOR_COUNT / 3) * 2; let less_than_two_thirds = two_thirds - 1; @@ -308,7 +306,7 @@ fn does_not_finalize_with_less_than_two_thirds_participation() { fn does_not_finalize_without_attestation() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -341,7 +339,7 @@ fn does_not_finalize_without_attestation() { fn roundtrip_operation_pool() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); // Add some attestations harness.extend_chain( @@ -372,7 +370,7 @@ fn roundtrip_operation_pool() { fn unaggregated_attestations_added_to_fork_choice_some_none() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() / 2; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -426,7 +424,7 @@ fn unaggregated_attestations_added_to_fork_choice_some_none() { fn attestations_with_increasing_slots() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 5; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); let mut attestations = vec![]; @@ -438,8 +436,8 @@ fn attestations_with_increasing_slots() { AttestationStrategy::SomeValidators(vec![]), ); - attestations.append( - &mut harness.get_unaggregated_attestations( + attestations.extend( + harness.get_unaggregated_attestations( &AttestationStrategy::AllValidators, &harness.chain.head().expect("should get head").beacon_state, harness @@ -488,7 +486,7 @@ fn attestations_with_increasing_slots() { fn unaggregated_attestations_added_to_fork_choice_all_updated() { let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 2 - 1; - let mut harness = get_harness(VALIDATOR_COUNT); + let harness = get_harness(VALIDATOR_COUNT); harness.extend_chain( num_blocks_produced as usize, @@ -543,7 +541,7 @@ fn unaggregated_attestations_added_to_fork_choice_all_updated() { fn run_skip_slot_test(skip_slots: u64) { let num_validators = 8; - let mut harness_a = get_harness(num_validators); + let harness_a = get_harness(num_validators); let harness_b = get_harness(num_validators); for _ in 0..skip_slots { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 97d68f407b..ec039aeaed 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,7 +5,6 @@ use beacon_chain::events::TeeEventHandler; use beacon_chain::{ builder::{BeaconChainBuilder, Witness}, eth1_chain::{CachingEth1Backend, Eth1Chain}, - migrate::{BackgroundMigrator, Migrate}, slot_clock::{SlotClock, SystemTimeSlotClock}, store::{HotColdDB, ItemStore, LevelDB, StoreConfig}, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, @@ -51,7 +50,6 @@ pub struct ClientBuilder { slot_clock: Option, #[allow(clippy::type_complexity)] store: Option>>, - store_migrator: Option, runtime_context: Option>, chain_spec: Option, beacon_chain_builder: Option>, @@ -68,20 +66,9 @@ pub struct ClientBuilder { eth_spec_instance: T::EthSpec, } -impl - ClientBuilder< - Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, - > +impl + ClientBuilder> where - TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -96,7 +83,6 @@ where Self { slot_clock: None, store: None, - store_migrator: None, runtime_context: None, chain_spec: None, beacon_chain_builder: None, @@ -134,7 +120,6 @@ where config: ClientConfig, ) -> Result { let store = self.store.clone(); - let store_migrator = self.store_migrator.take(); let chain_spec = self.chain_spec.clone(); let runtime_context = self.runtime_context.clone(); let eth_spec_instance = self.eth_spec_instance.clone(); @@ -145,8 +130,6 @@ where let store = store.ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; - let store_migrator = store_migrator - .ok_or_else(|| "beacon_chain_start_method requires a store migrator".to_string())?; let context = runtime_context .ok_or_else(|| "beacon_chain_start_method requires a runtime context".to_string())? .service_context("beacon".into()); @@ -156,7 +139,6 @@ where let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) .store(store) - .store_migrator(store_migrator) .data_dir(data_dir) .custom_spec(spec.clone()) .chain_config(chain_config) @@ -337,17 +319,7 @@ where pub fn build( self, ) -> Result< - Client< - Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, - >, + Client>, String, > { let runtime_context = self @@ -415,20 +387,9 @@ where } } -impl - ClientBuilder< - Witness< - TStoreMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, - > +impl + ClientBuilder> where - TStoreMigrator: Migrate, TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -469,10 +430,9 @@ where } } -impl +impl ClientBuilder< Witness< - TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, @@ -482,7 +442,6 @@ impl >, > where - TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -517,10 +476,9 @@ where } } -impl +impl ClientBuilder< Witness< - TStoreMigrator, TSlotClock, TEth1Backend, TEthSpec, @@ -531,7 +489,6 @@ impl > where TSlotClock: SlotClock + 'static, - TStoreMigrator: Migrate, LevelDB> + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -563,44 +520,9 @@ where } } -impl +impl ClientBuilder< Witness< - BackgroundMigrator, - TSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, - > -where - TSlotClock: SlotClock + 'static, - TEth1Backend: Eth1ChainBackend + 'static, - TEthSpec: EthSpec + 'static, - TEventHandler: EventHandler + 'static, - THotStore: ItemStore + 'static, - TColdStore: ItemStore + 'static, -{ - pub fn background_migrator(mut self) -> Result { - let context = self - .runtime_context - .as_ref() - .ok_or_else(|| "disk_store requires a log".to_string())? - .service_context("freezer_db".into()); - let store = self.store.clone().ok_or_else(|| { - "background_migrator requires the store to be initialized".to_string() - })?; - self.store_migrator = Some(BackgroundMigrator::new(store, context.log().clone())); - Ok(self) - } -} - -impl - ClientBuilder< - Witness< - TStoreMigrator, TSlotClock, CachingEth1Backend, TEthSpec, @@ -610,7 +532,6 @@ impl >, > where - TStoreMigrator: Migrate, TSlotClock: SlotClock + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -710,20 +631,11 @@ where } } -impl +impl ClientBuilder< - Witness< - TStoreMigrator, - SystemTimeSlotClock, - TEth1Backend, - TEthSpec, - TEventHandler, - THotStore, - TColdStore, - >, + Witness, > where - TStoreMigrator: Migrate, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 2a7e8f6d40..a161ce5267 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1,8 +1,5 @@ use beacon_chain::{ - test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, - BlockingMigratorEphemeralHarnessType, - }, + test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, StateSkipConfig, }; use discv5::enr::{CombinedKey, EnrBuilder}; @@ -46,7 +43,7 @@ const SKIPPED_SLOTS: &[u64] = &[ ]; struct ApiTester { - chain: Arc>>, + chain: Arc>>, client: BeaconNodeHttpClient, next_block: SignedBeaconBlock, attestations: Vec>, @@ -191,7 +188,7 @@ impl ApiTester { proposer_slashing, voluntary_exit, _server_shutdown: shutdown_tx, - validator_keypairs: harness.validators_keypairs, + validator_keypairs: harness.validator_keypairs, network_rx, } } diff --git a/beacon_node/http_metrics/tests/tests.rs b/beacon_node/http_metrics/tests/tests.rs index 18a40d4f84..c537e7e4f4 100644 --- a/beacon_node/http_metrics/tests/tests.rs +++ b/beacon_node/http_metrics/tests/tests.rs @@ -1,4 +1,4 @@ -use beacon_chain::test_utils::BlockingMigratorEphemeralHarnessType; +use beacon_chain::test_utils::EphemeralHarnessType; use environment::null_logger; use http_metrics::Config; use reqwest::StatusCode; @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::sync::oneshot; use types::MainnetEthSpec; -type Context = http_metrics::Context>; +type Context = http_metrics::Context>; #[tokio::test(core_threads = 2)] async fn returns_200_ok() { diff --git a/beacon_node/network/src/attestation_service/tests/mod.rs b/beacon_node/network/src/attestation_service/tests/mod.rs index 46a4ed9eaa..98eb6dc8d9 100644 --- a/beacon_node/network/src/attestation_service/tests/mod.rs +++ b/beacon_node/network/src/attestation_service/tests/mod.rs @@ -5,7 +5,6 @@ mod tests { builder::{BeaconChainBuilder, Witness}, eth1_chain::CachingEth1Backend, events::NullEventHandler, - migrate::NullMigrator, }; use futures::Stream; use genesis::{generate_deterministic_keypairs, interop_genesis_state}; @@ -23,7 +22,6 @@ mod tests { const SLOT_DURATION_MILLIS: u64 = 400; type TestBeaconChainType = Witness< - NullMigrator, SystemTimeSlotClock, CachingEth1Backend, MinimalEthSpec, @@ -55,7 +53,6 @@ mod tests { .logger(log.clone()) .custom_spec(spec.clone()) .store(Arc::new(store)) - .store_migrator(NullMigrator) .data_dir(data_dir.path().to_path_buf()) .genesis_state( interop_genesis_state::(&keypairs, 0, &spec) diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 86592cfc7a..9a3504a3d7 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -11,7 +11,6 @@ pub use config::{get_config, get_data_dir, get_eth2_testnet_config, set_network_ pub use eth2_config::Eth2Config; use beacon_chain::events::TeeEventHandler; -use beacon_chain::migrate::BackgroundMigrator; use beacon_chain::store::LevelDB; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, slot_clock::SystemTimeSlotClock, @@ -25,7 +24,6 @@ use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = Client< Witness< - BackgroundMigrator, LevelDB>, SystemTimeSlotClock, CachingEth1Backend, E, @@ -85,8 +83,7 @@ impl ProductionBeaconNode { let builder = ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) .chain_spec(spec) - .disk_store(&db_path, &freezer_db_path_res?, store_config)? - .background_migrator()?; + .disk_store(&db_path, &freezer_db_path_res?, store_config)?; let builder = builder .beacon_chain_builder(client_genesis, client_config_1) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 55c403aa8a..57fb713b64 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -55,11 +55,11 @@ pub struct HotColdDB, Cold: ItemStore> { split: RwLock, config: StoreConfig, /// Cold database containing compact historical data. - pub(crate) cold_db: Cold, + pub cold_db: Cold, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. - pub(crate) hot_db: Hot, + pub hot_db: Hot, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, /// Chain spec. @@ -386,11 +386,10 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.exists::(key) } - pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { - let mut guard = self.block_cache.lock(); - - let mut key_value_batch: Vec = Vec::with_capacity(batch.len()); - for op in &batch { + /// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`. + pub fn convert_to_kv_batch(&self, batch: &[StoreOp]) -> Result, Error> { + let mut key_value_batch = Vec::with_capacity(batch.len()); + for op in batch { match op { StoreOp::PutBlock(block_hash, block) => { let untyped_hash: Hash256 = (*block_hash).into(); @@ -430,7 +429,14 @@ impl, Cold: ItemStore> HotColdDB } } } - self.hot_db.do_atomically(key_value_batch)?; + Ok(key_value_batch) + } + + pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { + let mut guard = self.block_cache.lock(); + + self.hot_db + .do_atomically(self.convert_to_kv_batch(&batch)?)?; for op in &batch { match op { diff --git a/common/eth2_testnet_config/Cargo.toml b/common/eth2_testnet_config/Cargo.toml index 7ee8eb801c..03db320d7c 100644 --- a/common/eth2_testnet_config/Cargo.toml +++ b/common/eth2_testnet_config/Cargo.toml @@ -19,4 +19,4 @@ serde_yaml = "0.8.13" types = { path = "../../consensus/types"} eth2_ssz = "0.1.2" eth2_config = { path = "../eth2_config"} -enr = "0.3.0" +enr = { version = "0.3.0", features = ["ed25519"] } diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 21cdfbc4a1..70513177ba 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -1,9 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::{ - test_utils::{ - AttestationStrategy, BeaconChainHarness, BlockStrategy, NullMigratorEphemeralHarnessType, - }, + test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, BeaconChainError, BeaconForkChoiceStore, ChainConfig, ForkChoiceError, StateSkipConfig, }; @@ -34,7 +32,7 @@ pub enum MutationDelay { /// A helper struct to make testing fork choice more ergonomic and less repetitive. struct ForkChoiceTest { - harness: BeaconChainHarness>, + harness: BeaconChainHarness>, } /// Allows us to use `unwrap` in some cases. @@ -170,7 +168,7 @@ impl ForkChoiceTest { } /// Build the chain whilst `predicate` returns `true` and `process_block_result` does not error. - pub fn apply_blocks_while(mut self, mut predicate: F) -> Result + pub fn apply_blocks_while(self, mut predicate: F) -> Result where F: FnMut(&BeaconBlock, &BeaconState) -> bool, { @@ -184,7 +182,7 @@ impl ForkChoiceTest { if !predicate(&block.message, &state) { break; } - if let Ok(block_hash) = self.harness.process_block_result(slot, block.clone()) { + if let Ok(block_hash) = self.harness.process_block_result(block.clone()) { self.harness .attest_block(&state, block_hash, &block, &validators); self.harness.advance_slot(); @@ -197,7 +195,7 @@ impl ForkChoiceTest { } /// Apply `count` blocks to the chain (with attestations). - pub fn apply_blocks(mut self, count: usize) -> Self { + pub fn apply_blocks(self, count: usize) -> Self { self.harness.advance_slot(); self.harness.extend_chain( count, @@ -209,7 +207,7 @@ impl ForkChoiceTest { } /// Apply `count` blocks to the chain (without attestations). - pub fn apply_blocks_without_new_attestations(mut self, count: usize) -> Self { + pub fn apply_blocks_without_new_attestations(self, count: usize) -> Self { self.harness.advance_slot(); self.harness.extend_chain( count, @@ -248,7 +246,7 @@ impl ForkChoiceTest { /// Applies a block directly to fork choice, bypassing the beacon chain. /// /// Asserts the block was applied successfully. - pub fn apply_block_directly_to_fork_choice(mut self, mut func: F) -> Self + pub fn apply_block_directly_to_fork_choice(self, mut func: F) -> Self where F: FnMut(&mut BeaconBlock, &mut BeaconState), { @@ -277,7 +275,7 @@ impl ForkChoiceTest { /// /// Asserts that an error occurred and allows inspecting it via `comparison_func`. pub fn apply_invalid_block_directly_to_fork_choice( - mut self, + self, mut mutation_func: F, mut comparison_func: G, ) -> Self @@ -352,13 +350,13 @@ impl ForkChoiceTest { /// /// Also returns some info about who created it. fn apply_attestation_to_chain( - mut self, + self, delay: MutationDelay, mut mutation_func: F, mut comparison_func: G, ) -> Self where - F: FnMut(&mut IndexedAttestation, &BeaconChain>), + F: FnMut(&mut IndexedAttestation, &BeaconChain>), G: FnMut(Result<(), BeaconChainError>), { let head = self.harness.chain.head().expect("should get head");