diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 0abe4596be..fcf9049c12 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -43,5 +43,6 @@ integer-sqrt = "0.1" rand = "0.7.2" [dev-dependencies] +tempfile = "3.1.0" lazy_static = "1.4.0" environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a35aa85dec..66abfcee3d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3,7 +3,6 @@ use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; use crate::events::{EventHandler, EventKind}; use crate::fork_choice::{Error as ForkChoiceError, ForkChoice}; -use crate::iter::{ReverseBlockRootIterator, ReverseStateRootIterator}; use crate::metrics; use crate::persisted_beacon_chain::{PersistedBeaconChain, BEACON_CHAIN_DB_KEY}; use lmd_ghost::LmdGhost; @@ -26,8 +25,10 @@ use state_processing::{ use std::fs; use std::io::prelude::*; use std::sync::Arc; -use store::iter::{BlockRootsIterator, StateRootsIterator}; -use store::{Error as DBError, Store}; +use store::iter::{ + BlockRootsIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator, +}; +use store::{Error as DBError, Migrate, Store}; use tree_hash::TreeHash; use types::*; @@ -95,6 +96,7 @@ pub enum AttestationProcessingOutcome { pub trait BeaconChainTypes: Send + Sync + 'static { type Store: store::Store; + type StoreMigrator: store::Migrate; type SlotClock: slot_clock::SlotClock; type LmdGhost: LmdGhost; type Eth1Chain: Eth1ChainBackend; @@ -108,6 +110,8 @@ pub struct BeaconChain { pub spec: ChainSpec, /// Persistent storage for blocks, states, etc. Typically an on-disk store, such as LevelDB. pub store: Arc, + /// Database migrator for running background maintenance on the store. + pub store_migrator: T::StoreMigrator, /// Reports the current slot, typically based upon the system clock. pub slot_clock: T::SlotClock, /// Stores all operations (e.g., `Attestation`, `Deposit`, etc) that are candidates for @@ -237,7 +241,8 @@ impl BeaconChain { .get_block(&block_root)? .ok_or_else(|| Error::MissingBeaconBlock(block_root))?; let state = self - .get_state(&block.state_root)? + .store + .get_state(&block.state_root, Some(block.slot))? .ok_or_else(|| Error::MissingBeaconState(block.state_root))?; let iter = BlockRootsIterator::owned(self.store.clone(), state); Ok(ReverseBlockRootIterator::new( @@ -318,18 +323,6 @@ impl BeaconChain { Ok(self.store.get(block_root)?) } - /// Returns the state at the given root, if any. - /// - /// ## Errors - /// - /// May return a database error. - pub fn get_state( - &self, - state_root: &Hash256, - ) -> Result>, Error> { - Ok(self.store.get(state_root)?) - } - /// Returns a `Checkpoint` representing the head block and state. Contains the "best block"; /// the head of the canonical `BeaconChain`. /// @@ -403,7 +396,7 @@ impl BeaconChain { Ok(self .store - .get(&state_root)? + .get_state(&state_root, Some(slot))? .ok_or_else(|| Error::NoStateForSlot(slot))?) } } @@ -762,7 +755,10 @@ impl BeaconChain { // not guaranteed to be from the same slot or epoch as the attestation. let mut state: BeaconState = self .store - .get(&attestation_head_block.state_root)? + .get_state( + &attestation_head_block.state_root, + Some(attestation_head_block.slot), + )? .ok_or_else(|| Error::MissingBeaconState(attestation_head_block.state_root))?; // Ensure the state loaded from the database matches the state of the attestation @@ -1123,8 +1119,10 @@ impl BeaconChain { let parent_state_root = parent_block.state_root; let parent_state = self .store - .get(&parent_state_root)? - .ok_or_else(|| Error::DBInconsistent(format!("Missing state {}", parent_state_root)))?; + .get_state(&parent_state_root, Some(parent_block.slot))? + .ok_or_else(|| { + Error::DBInconsistent(format!("Missing state {:?}", parent_state_root)) + })?; metrics::stop_timer(db_read_timer); @@ -1214,12 +1212,12 @@ impl BeaconChain { following_state.get_state_root(intermediate_state.slot)?; self.store - .put(&intermediate_state_root, intermediate_state)?; + .put_state(&intermediate_state_root, intermediate_state)?; } // Store the block and state. self.store.put(&block_root, &block)?; - self.store.put(&state_root, &state)?; + self.store.put_state(&state_root, &state)?; metrics::stop_timer(db_write_timer); @@ -1282,7 +1280,7 @@ impl BeaconChain { .state_at_slot(slot - 1) .map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?; - self.produce_block_on_state(state.clone(), slot, randao_reveal) + self.produce_block_on_state(state, slot, randao_reveal) } /// Produce a block for some `slot` upon the given `state`. @@ -1396,7 +1394,7 @@ impl BeaconChain { let beacon_state_root = beacon_block.state_root; let beacon_state: BeaconState = self .store - .get(&beacon_state_root)? + .get_state(&beacon_state_root, Some(beacon_block.slot))? .ok_or_else(|| Error::MissingBeaconState(beacon_state_root))?; let previous_slot = self.head().beacon_block.slot; @@ -1519,11 +1517,19 @@ impl BeaconChain { let finalized_state = self .store - .get::>(&finalized_block.state_root)? + .get_state(&finalized_block.state_root, Some(finalized_block.slot))? .ok_or_else(|| Error::MissingBeaconState(finalized_block.state_root))?; self.op_pool.prune_all(&finalized_state, &self.spec); + // TODO: configurable max finality distance + let max_finality_distance = 0; + self.store_migrator.freeze_to_state( + finalized_block.state_root, + finalized_state, + max_finality_distance, + ); + let _ = self.event_handler.register(EventKind::BeaconFinalization { epoch: new_finalized_epoch, root: finalized_block_root, @@ -1568,9 +1574,12 @@ impl BeaconChain { Error::DBInconsistent(format!("Missing block {}", beacon_block_root)) })?; let beacon_state_root = beacon_block.state_root; - let beacon_state = self.store.get(&beacon_state_root)?.ok_or_else(|| { - Error::DBInconsistent(format!("Missing state {}", beacon_state_root)) - })?; + let beacon_state = self + .store + .get_state(&beacon_state_root, Some(beacon_block.slot))? + .ok_or_else(|| { + Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root)) + })?; let slot = CheckPoint { beacon_block, diff --git a/beacon_node/beacon_chain/src/beacon_chain_builder.rs b/beacon_node/beacon_chain/src/beacon_chain_builder.rs deleted file mode 100644 index 2ee6950612..0000000000 --- a/beacon_node/beacon_chain/src/beacon_chain_builder.rs +++ /dev/null @@ -1,331 +0,0 @@ -use crate::{BeaconChain, BeaconChainTypes}; -use eth2_hashing::hash; -use lighthouse_bootstrap::Bootstrapper; -use merkle_proof::MerkleTree; -use rayon::prelude::*; -use slog::Logger; -use ssz::{Decode, Encode}; -use state_processing::initialize_beacon_state_from_eth1; -use std::fs::File; -use std::io::prelude::*; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::SystemTime; -use tree_hash::{SignedRoot, TreeHash}; -use types::{ - BeaconBlock, BeaconState, ChainSpec, Deposit, DepositData, Domain, EthSpec, Fork, Hash256, - Keypair, PublicKey, Signature, -}; - -enum BuildStrategy { - FromGenesis { - genesis_state: Box>, - genesis_block: Box>, - }, - LoadFromStore, -} - -pub struct BeaconChainBuilder { - build_strategy: BuildStrategy, - spec: ChainSpec, - log: Logger, -} - -impl BeaconChainBuilder { - pub fn recent_genesis( - keypairs: &[Keypair], - minutes: u64, - spec: ChainSpec, - log: Logger, - ) -> Result { - Self::quick_start(recent_genesis_time(minutes), keypairs, spec, log) - } - - pub fn quick_start( - genesis_time: u64, - keypairs: &[Keypair], - spec: ChainSpec, - log: Logger, - ) -> Result { - let genesis_state = interop_genesis_state(keypairs, genesis_time, &spec)?; - - Ok(Self::from_genesis_state(genesis_state, spec, log)) - } - - pub fn yaml_state(file: &PathBuf, spec: ChainSpec, log: Logger) -> Result { - let file = File::open(file.clone()) - .map_err(|e| format!("Unable to open YAML genesis state file {:?}: {:?}", file, e))?; - - let genesis_state = serde_yaml::from_reader(file) - .map_err(|e| format!("Unable to parse YAML genesis state file: {:?}", e))?; - - Ok(Self::from_genesis_state(genesis_state, spec, log)) - } - - pub fn ssz_state(file: &PathBuf, spec: ChainSpec, log: Logger) -> Result { - let mut file = File::open(file.clone()) - .map_err(|e| format!("Unable to open SSZ genesis state file {:?}: {:?}", file, e))?; - - let mut bytes = vec![]; - file.read_to_end(&mut bytes) - .map_err(|e| format!("Failed to read SSZ file: {:?}", e))?; - - let genesis_state = BeaconState::from_ssz_bytes(&bytes) - .map_err(|e| format!("Unable to parse SSZ genesis state file: {:?}", e))?; - - Ok(Self::from_genesis_state(genesis_state, spec, log)) - } - - pub fn json_state(file: &PathBuf, spec: ChainSpec, log: Logger) -> Result { - let file = File::open(file.clone()) - .map_err(|e| format!("Unable to open JSON genesis state file {:?}: {:?}", file, e))?; - - let genesis_state = serde_json::from_reader(file) - .map_err(|e| format!("Unable to parse JSON genesis state file: {:?}", e))?; - - Ok(Self::from_genesis_state(genesis_state, spec, log)) - } - - pub fn http_bootstrap(server: &str, spec: ChainSpec, log: Logger) -> Result { - let bootstrapper = Bootstrapper::connect(server.to_string(), &log) - .map_err(|e| format!("Failed to initialize bootstrap client: {}", e))?; - - let (genesis_state, genesis_block) = bootstrapper - .genesis() - .map_err(|e| format!("Failed to bootstrap genesis state: {}", e))?; - - Ok(Self { - build_strategy: BuildStrategy::FromGenesis { - genesis_block: Box::new(genesis_block), - genesis_state: Box::new(genesis_state), - }, - spec, - log, - }) - } - - fn from_genesis_state( - genesis_state: BeaconState, - spec: ChainSpec, - log: Logger, - ) -> Self { - Self { - build_strategy: BuildStrategy::FromGenesis { - genesis_block: Box::new(genesis_block(&genesis_state, &spec)), - genesis_state: Box::new(genesis_state), - }, - spec, - log, - } - } - - pub fn from_store(spec: ChainSpec, log: Logger) -> Self { - Self { - build_strategy: BuildStrategy::LoadFromStore, - spec, - log, - } - } - - pub fn build( - self, - store: Arc, - eth1_backend: T::Eth1Chain, - event_handler: T::EventHandler, - ) -> Result, String> { - Ok(match self.build_strategy { - BuildStrategy::LoadFromStore => { - BeaconChain::from_store(store, eth1_backend, event_handler, self.spec, self.log) - .map_err(|e| format!("Error loading BeaconChain from database: {:?}", e))? - .ok_or_else(|| "Unable to find exising BeaconChain in database.".to_string())? - } - BuildStrategy::FromGenesis { - genesis_block, - genesis_state, - } => BeaconChain::from_genesis( - store, - eth1_backend, - event_handler, - genesis_state.as_ref().clone(), - genesis_block.as_ref().clone(), - self.spec, - self.log, - ) - .map_err(|e| format!("Failed to initialize new beacon chain: {:?}", e))?, - }) - } -} - -fn genesis_block(genesis_state: &BeaconState, spec: &ChainSpec) -> BeaconBlock { - let mut genesis_block = BeaconBlock::empty(&spec); - - genesis_block.state_root = genesis_state.canonical_root(); - - genesis_block -} - -/// Builds a genesis state as defined by the Eth2 interop procedure (see below). -/// -/// Reference: -/// https://github.com/ethereum/eth2.0-pm/tree/6e41fcf383ebeb5125938850d8e9b4e9888389b4/interop/mocked_start -fn interop_genesis_state( - keypairs: &[Keypair], - genesis_time: u64, - spec: &ChainSpec, -) -> Result, String> { - let eth1_block_hash = Hash256::from_slice(&[0x42; 32]); - let eth1_timestamp = 2_u64.pow(40); - let amount = spec.max_effective_balance; - - let withdrawal_credentials = |pubkey: &PublicKey| { - let mut credentials = hash(&pubkey.as_ssz_bytes()); - credentials[0] = spec.bls_withdrawal_prefix_byte; - Hash256::from_slice(&credentials) - }; - - let datas = keypairs - .into_par_iter() - .map(|keypair| { - let mut data = DepositData { - withdrawal_credentials: withdrawal_credentials(&keypair.pk), - pubkey: keypair.pk.clone().into(), - amount, - signature: Signature::empty_signature().into(), - }; - - let domain = spec.get_domain( - spec.genesis_slot.epoch(T::slots_per_epoch()), - Domain::Deposit, - &Fork::default(), - ); - data.signature = Signature::new(&data.signed_root()[..], domain, &keypair.sk).into(); - - data - }) - .collect::>(); - - let deposit_root_leaves = datas - .par_iter() - .map(|data| Hash256::from_slice(&data.tree_hash_root())) - .collect::>(); - - let mut proofs = vec![]; - let depth = spec.deposit_contract_tree_depth as usize; - let mut tree = MerkleTree::create(&[], depth); - for (i, deposit_leaf) in deposit_root_leaves.iter().enumerate() { - if let Err(_) = tree.push_leaf(*deposit_leaf, depth) { - return Err(String::from("Failed to push leaf")); - } - - let (_, mut proof) = tree.generate_proof(i, depth); - proof.push(Hash256::from_slice(&int_to_bytes32(i + 1))); - - assert_eq!( - proof.len(), - depth + 1, - "Deposit proof should be correct len" - ); - - proofs.push(proof); - } - - let deposits = datas - .into_par_iter() - .zip(proofs.into_par_iter()) - .map(|(data, proof)| (data, proof.into())) - .map(|(data, proof)| Deposit { proof, data }) - .collect::>(); - - let mut state = - initialize_beacon_state_from_eth1(eth1_block_hash, eth1_timestamp, deposits, spec) - .map_err(|e| format!("Unable to initialize genesis state: {:?}", e))?; - - state.genesis_time = genesis_time; - - // Invalid all the caches after all the manual state surgery. - state.drop_all_caches(); - - Ok(state) -} - -/// Returns `int` as little-endian bytes with a length of 32. -fn int_to_bytes32(int: usize) -> Vec { - let mut vec = int.to_le_bytes().to_vec(); - vec.resize(32, 0); - vec -} - -/// Returns the system time, mod 30 minutes. -/// -/// Used for easily creating testnets. -fn recent_genesis_time(minutes: u64) -> u64 { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let secs_after_last_period = now.checked_rem(minutes * 60).unwrap_or(0); - now - secs_after_last_period -} - -#[cfg(test)] -mod test { - use super::*; - use types::{test_utils::generate_deterministic_keypairs, EthSpec, MinimalEthSpec}; - - type TestEthSpec = MinimalEthSpec; - - #[test] - fn interop_state() { - let validator_count = 16; - let genesis_time = 42; - let spec = &TestEthSpec::default_spec(); - - let keypairs = generate_deterministic_keypairs(validator_count); - - let state = interop_genesis_state::(&keypairs, genesis_time, spec) - .expect("should build state"); - - assert_eq!( - state.eth1_data.block_hash, - Hash256::from_slice(&[0x42; 32]), - "eth1 block hash should be co-ordinated junk" - ); - - assert_eq!( - state.genesis_time, genesis_time, - "genesis time should be as specified" - ); - - for b in &state.balances { - assert_eq!( - *b, spec.max_effective_balance, - "validator balances should be max effective balance" - ); - } - - for v in &state.validators { - let creds = v.withdrawal_credentials.as_bytes(); - assert_eq!( - creds[0], spec.bls_withdrawal_prefix_byte, - "first byte of withdrawal creds should be bls prefix" - ); - assert_eq!( - &creds[1..], - &hash(&v.pubkey.as_ssz_bytes())[1..], - "rest of withdrawal creds should be pubkey hash" - ) - } - - assert_eq!( - state.balances.len(), - validator_count, - "validator balances len should be correct" - ); - - assert_eq!( - state.validators.len(), - validator_count, - "validator count should be correct" - ); - } -} diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 3032669915..b8e3b868ff 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -19,9 +19,18 @@ use types::{BeaconBlock, BeaconState, ChainSpec, EthSpec, Hash256, Slot}; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing /// functionality and only exists to satisfy the type system. -pub struct Witness( +pub struct Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, +>( PhantomData<( TStore, + TStoreMigrator, TSlotClock, TLmdGhost, TEth1Backend, @@ -30,10 +39,20 @@ pub struct Witness, ); -impl BeaconChainTypes - for Witness +impl + BeaconChainTypes + for Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -41,6 +60,7 @@ where TEventHandler: EventHandler + 'static, { type Store = TStore; + type StoreMigrator = TStoreMigrator; type SlotClock = TSlotClock; type LmdGhost = TLmdGhost; type Eth1Chain = TEth1Backend; @@ -58,6 +78,7 @@ where /// See the tests for an example of a complete working example. pub struct BeaconChainBuilder { store: Option>, + store_migrator: Option, /// The finalized checkpoint to anchor the chain. May be genesis or a higher /// checkpoint. pub finalized_checkpoint: Option>, @@ -71,12 +92,21 @@ pub struct BeaconChainBuilder { log: Option, } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -90,6 +120,7 @@ where pub fn new(_eth_spec_instance: TEthSpec) -> Self { Self { store: None, + store_migrator: None, finalized_checkpoint: None, genesis_block_root: None, op_pool: None, @@ -119,6 +150,12 @@ where self } + /// Sets the store migrator. + pub fn store_migrator(mut self, store_migrator: TStoreMigrator) -> Self { + self.store_migrator = Some(store_migrator); + self + } + /// Sets the logger. /// /// Should generally be called early in the build chain. @@ -149,7 +186,15 @@ where let key = Hash256::from_slice(&BEACON_CHAIN_DB_KEY.as_bytes()); let p: PersistedBeaconChain< - Witness, + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, > = match store.get(&key) { Err(e) => { return Err(format!( @@ -195,7 +240,7 @@ where self.genesis_block_root = Some(beacon_block_root); store - .put(&beacon_state_root, &beacon_state) + .put_state(&beacon_state_root, &beacon_state) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; store .put(&beacon_block_root, &beacon_block) @@ -279,7 +324,17 @@ where pub fn build( self, ) -> Result< - BeaconChain>, + BeaconChain< + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + >, String, > { let mut canonical_head = self @@ -304,6 +359,9 @@ where store: self .store .ok_or_else(|| "Cannot build without store".to_string())?, + store_migrator: self + .store_migrator + .ok_or_else(|| "Cannot build without store migrator".to_string())?, slot_clock: self .slot_clock .ok_or_else(|| "Cannot build without slot clock".to_string())?, @@ -336,10 +394,11 @@ where } } -impl +impl BeaconChainBuilder< Witness< TStore, + TStoreMigrator, TSlotClock, ThreadSafeReducedTree, TEth1Backend, @@ -349,6 +408,7 @@ impl > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -378,10 +438,11 @@ where } } -impl +impl BeaconChainBuilder< Witness< TStore, + TStoreMigrator, TSlotClock, TLmdGhost, CachingEth1Backend, @@ -391,6 +452,7 @@ impl > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEthSpec: EthSpec + 'static, @@ -428,12 +490,21 @@ where } } -impl +impl BeaconChainBuilder< - Witness, + Witness< + TStore, + TStoreMigrator, + TestingSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -460,12 +531,21 @@ where } } -impl +impl BeaconChainBuilder< - Witness>, + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + NullEventHandler, + >, > where TStore: Store + 'static, + TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -494,7 +574,7 @@ mod test { use sloggers::{null::NullLoggerBuilder, Build}; use ssz::Encode; use std::time::Duration; - use store::MemoryStore; + use store::{migrate::NullMigrator, MemoryStore}; use types::{EthSpec, MinimalEthSpec, Slot}; type TestEthSpec = MinimalEthSpec; @@ -523,6 +603,7 @@ mod test { let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(store.clone()) + .store_migrator(NullMigrator) .genesis_state(genesis_state) .expect("should build state using recent genesis") .dummy_eth1_backend() diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index a9819eab1f..9df12f6f08 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -293,7 +293,7 @@ fn eth1_block_hash_at_start_of_voting_period( .map_err(|e| Error::UnableToGetPreviousStateRoot(e))?; store - .get::>(&prev_state_root) + .get_state::(&prev_state_root, Some(slot)) .map_err(|e| Error::StoreError(e))? .map(|state| state.eth1_data.block_hash) .ok_or_else(|| Error::PreviousStateNotInDB) @@ -676,7 +676,7 @@ mod test { ); store - .put( + .put_state( &state .get_state_root(prev_state.slot) .expect("should find state root"), @@ -738,7 +738,7 @@ mod test { ); store - .put( + .put_state( &state .get_state_root(Slot::new(0)) .expect("should find state root"), diff --git a/beacon_node/beacon_chain/src/fork_choice.rs b/beacon_node/beacon_chain/src/fork_choice.rs index 04c521c332..9f9277693f 100644 --- a/beacon_node/beacon_chain/src/fork_choice.rs +++ b/beacon_node/beacon_chain/src/fork_choice.rs @@ -137,8 +137,9 @@ impl ForkChoice { block_root }; - let mut state = chain - .get_state(&block.state_root)? + let mut state: BeaconState = chain + .store + .get_state(&block.state_root, Some(block.slot))? .ok_or_else(|| Error::MissingState(block.state_root))?; // Fast-forward the state to the start slot of the epoch where it was justified. diff --git a/beacon_node/beacon_chain/src/iter.rs b/beacon_node/beacon_chain/src/iter.rs deleted file mode 100644 index f73e88afa8..0000000000 --- a/beacon_node/beacon_chain/src/iter.rs +++ /dev/null @@ -1,48 +0,0 @@ -use store::iter::{BlockRootsIterator, StateRootsIterator}; -use types::{Hash256, Slot}; - -pub type ReverseBlockRootIterator<'a, E, S> = - ReverseHashAndSlotIterator>; -pub type ReverseStateRootIterator<'a, E, S> = - ReverseHashAndSlotIterator>; - -pub type ReverseHashAndSlotIterator = ReverseChainIterator<(Hash256, Slot), I>; - -/// Provides a wrapper for an iterator that returns a given `T` before it starts returning results of -/// the `Iterator`. -pub struct ReverseChainIterator { - first_value_used: bool, - first_value: T, - iter: I, -} - -impl ReverseChainIterator -where - T: Sized, - I: Iterator + Sized, -{ - pub fn new(first_value: T, iter: I) -> Self { - Self { - first_value_used: false, - first_value, - iter, - } - } -} - -impl Iterator for ReverseChainIterator -where - T: Clone, - I: Iterator, -{ - type Item = T; - - fn next(&mut self) -> Option { - if self.first_value_used { - self.iter.next() - } else { - self.first_value_used = true; - Some(self.first_value.clone()) - } - } -} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 141f768980..3e412fe67a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -9,7 +9,6 @@ mod errors; pub mod eth1_chain; pub mod events; mod fork_choice; -mod iter; mod metrics; mod persisted_beacon_chain; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs index a85f78ac82..52a2028fca 100644 --- a/beacon_node/beacon_chain/src/persisted_beacon_chain.rs +++ b/beacon_node/beacon_chain/src/persisted_beacon_chain.rs @@ -2,7 +2,7 @@ use crate::{BeaconChainTypes, CheckPoint}; use operation_pool::PersistedOperationPool; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; -use store::{DBColumn, Error as StoreError, StoreItem}; +use store::{DBColumn, Error as StoreError, SimpleStoreItem}; use types::Hash256; /// 32-byte key for accessing the `PersistedBeaconChain`. @@ -15,7 +15,7 @@ pub struct PersistedBeaconChain { pub genesis_block_root: Hash256, } -impl StoreItem for PersistedBeaconChain { +impl SimpleStoreItem for PersistedBeaconChain { fn db_column() -> DBColumn { DBColumn::BeaconChain } @@ -24,7 +24,7 @@ impl StoreItem for PersistedBeaconChain { self.as_ssz_bytes() } - fn from_store_bytes(bytes: &mut [u8]) -> Result { + fn from_store_bytes(bytes: &[u8]) -> Result { Self::from_ssz_bytes(bytes).map_err(Into::into) } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index bf41018d26..6916982e0a 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -12,7 +12,10 @@ use slot_clock::TestingSlotClock; use state_processing::per_slot_processing; use std::sync::Arc; use std::time::Duration; -use store::MemoryStore; +use store::{ + migrate::{BlockingMigrator, NullMigrator}, + DiskStore, MemoryStore, Migrate, Store, +}; use tree_hash::{SignedRoot, TreeHash}; use types::{ AggregateSignature, Attestation, BeaconBlock, BeaconState, BitList, ChainSpec, Domain, EthSpec, @@ -24,15 +27,19 @@ pub use types::test_utils::generate_deterministic_keypairs; pub const HARNESS_GENESIS_TIME: u64 = 1_567_552_690; // 4th September 2019 -pub type HarnessType = Witness< - MemoryStore, +pub type BaseHarnessType = Witness< + TStore, + TStoreMigrator, TestingSlotClock, - ThreadSafeReducedTree, - CachingEth1Backend, - E, - NullEventHandler, + ThreadSafeReducedTree, + CachingEth1Backend, + TEthSpec, + NullEventHandler, >; +pub type HarnessType = BaseHarnessType; +pub type DiskHarnessType = BaseHarnessType, E>; + /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] pub enum BlockStrategy { @@ -82,6 +89,7 @@ impl BeaconChainHarness> { .logger(log.clone()) .custom_spec(spec.clone()) .store(Arc::new(MemoryStore::open())) + .store_migrator(NullMigrator) .genesis_state( interop_genesis_state::(&keypairs, HARNESS_GENESIS_TIME, &spec) .expect("should generate interop state"), @@ -103,7 +111,56 @@ impl BeaconChainHarness> { keypairs, } } +} +impl BeaconChainHarness> { + /// Instantiate a new harness with `validator_count` initial validators. + pub fn with_disk_store( + eth_spec_instance: E, + store: Arc, + keypairs: Vec, + ) -> Self { + let spec = E::default_spec(); + + let log = TerminalLoggerBuilder::new() + .level(Severity::Warning) + .build() + .expect("logger should build"); + + let chain = BeaconChainBuilder::new(eth_spec_instance) + .logger(log.clone()) + .custom_spec(spec.clone()) + .store(store.clone()) + .store_migrator( as Migrate<_, E>>::new(store)) + .genesis_state( + interop_genesis_state::(&keypairs, HARNESS_GENESIS_TIME, &spec) + .expect("should generate interop state"), + ) + .expect("should build state using recent genesis") + .dummy_eth1_backend() + .expect("should build dummy backend") + .null_event_handler() + .testing_slot_clock(Duration::from_secs(1)) + .expect("should configure testing slot clock") + .empty_reduced_tree_fork_choice() + .expect("should add fork choice to builder") + .build() + .expect("should build"); + + Self { + spec: chain.spec.clone(), + chain, + keypairs, + } + } +} + +impl BeaconChainHarness> +where + S: Store, + M: Migrate, + E: EthSpec, +{ /// Advance the slot of the `BeaconChain`. /// /// Does not produce blocks or attestations. diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs new file mode 100644 index 0000000000..3b548fce46 --- /dev/null +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -0,0 +1,290 @@ +#![cfg(not(debug_assertions))] + +#[macro_use] +extern crate lazy_static; + +use beacon_chain::test_utils::{ + AttestationStrategy, BeaconChainHarness, BlockStrategy, DiskHarnessType, +}; +use rand::Rng; +use sloggers::{null::NullLoggerBuilder, Build}; +use std::sync::Arc; +use store::DiskStore; +use tempfile::{tempdir, TempDir}; +use tree_hash::TreeHash; +use types::test_utils::{SeedableRng, XorShiftRng}; +use types::*; + +// Should ideally be divisible by 3. +pub const VALIDATOR_COUNT: usize = 24; + +lazy_static! { + /// A cached set of keys. + static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); +} + +type E = MinimalEthSpec; +type TestHarness = BeaconChainHarness>; + +fn get_store(db_path: &TempDir) -> Arc { + let spec = MinimalEthSpec::default_spec(); + let hot_path = db_path.path().join("hot_db"); + let cold_path = db_path.path().join("cold_db"); + let log = NullLoggerBuilder.build().expect("logger should build"); + Arc::new( + DiskStore::open(&hot_path, &cold_path, spec, log).expect("disk store should initialize"), + ) +} + +fn get_harness(store: Arc, validator_count: usize) -> TestHarness { + let harness = BeaconChainHarness::with_disk_store( + MinimalEthSpec, + store, + KEYPAIRS[0..validator_count].to_vec(), + ); + harness.advance_slot(); + harness +} + +#[test] +fn full_participation_no_skips() { + let num_blocks_produced = E::slots_per_epoch() * 5; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + + harness.extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + check_finalization(&harness, num_blocks_produced); + check_split_slot(&harness, store); + check_chain_dump(&harness, num_blocks_produced + 1); +} + +#[test] +fn randomised_skips() { + let num_slots = E::slots_per_epoch() * 5; + let mut num_blocks_produced = 0; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + let rng = &mut XorShiftRng::from_seed([42; 16]); + + let mut head_slot = 0; + + for slot in 1..=num_slots { + if rng.gen_bool(0.8) { + harness.extend_chain( + 1, + BlockStrategy::ForkCanonicalChainAt { + previous_slot: Slot::new(head_slot), + first_slot: Slot::new(slot), + }, + AttestationStrategy::AllValidators, + ); + harness.advance_slot(); + num_blocks_produced += 1; + head_slot = slot; + } else { + harness.advance_slot(); + } + } + + let state = &harness.chain.head().beacon_state; + + assert_eq!(state.slot, num_slots, "head should be at the current slot"); + + check_split_slot(&harness, store); + check_chain_dump(&harness, num_blocks_produced + 1); +} + +#[test] +fn long_skip() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + + // Number of blocks to create in the first run, intentionally not falling on an epoch + // boundary in order to check that the DB hot -> cold migration is capable of reaching + // back across the skip distance, and correctly migrating those extra non-finalized states. + let initial_blocks = E::slots_per_epoch() * 5 + E::slots_per_epoch() / 2; + let skip_slots = E::slots_per_historical_root() as u64 * 8; + let final_blocks = E::slots_per_epoch() * 4; + + harness.extend_chain( + initial_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + check_finalization(&harness, initial_blocks); + + // 2. Skip slots + for _ in 0..skip_slots { + harness.advance_slot(); + } + + // 3. Produce more blocks, establish a new finalized epoch + harness.extend_chain( + final_blocks as usize, + BlockStrategy::ForkCanonicalChainAt { + previous_slot: Slot::new(initial_blocks), + first_slot: Slot::new(initial_blocks + skip_slots as u64 + 1), + }, + AttestationStrategy::AllValidators, + ); + + check_finalization(&harness, initial_blocks + skip_slots + final_blocks); + check_split_slot(&harness, store); + check_chain_dump(&harness, initial_blocks + final_blocks + 1); +} + +/// Go forward to the point where the genesis randao value is no longer part of the vector. +/// +/// This implicitly checks that: +/// 1. The chunked vector scheme doesn't attempt to store an incorrect genesis value +/// 2. We correctly load the genesis value for all required slots +/// NOTE: this test takes about a minute to run +#[test] +fn randao_genesis_storage() { + let validator_count = 8; + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + let harness = get_harness(store.clone(), validator_count); + + let num_slots = E::slots_per_epoch() * (E::epochs_per_historical_vector() - 1) as u64; + + // Check we have a non-trivial genesis value + let genesis_value = *harness + .chain + .head() + .beacon_state + .get_randao_mix(Epoch::new(0)) + .expect("randao mix ok"); + assert!(!genesis_value.is_zero()); + + harness.extend_chain( + num_slots as usize - 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + // Check that genesis value is still present + assert!(harness + .chain + .head() + .beacon_state + .randao_mixes + .iter() + .find(|x| **x == genesis_value) + .is_some()); + + // Then upon adding one more block, it isn't + harness.advance_slot(); + harness.extend_chain( + 1, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + assert!(harness + .chain + .head() + .beacon_state + .randao_mixes + .iter() + .find(|x| **x == genesis_value) + .is_none()); + + check_finalization(&harness, num_slots); + check_split_slot(&harness, store); + check_chain_dump(&harness, num_slots + 1); +} + +// Check that closing and reopening a freezer DB restores the split slot to its correct value. +#[test] +fn split_slot_restore() { + let db_path = tempdir().unwrap(); + + let split_slot = { + let store = get_store(&db_path); + let harness = get_harness(store.clone(), VALIDATOR_COUNT); + + let num_blocks = 4 * E::slots_per_epoch(); + + harness.extend_chain( + num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ); + + store.get_split_slot() + }; + assert_ne!(split_slot, Slot::new(0)); + + // Re-open the store + let store = get_store(&db_path); + + assert_eq!(store.get_split_slot(), split_slot); +} + +/// Check that the head state's slot matches `expected_slot`. +fn check_slot(harness: &TestHarness, expected_slot: u64) { + let state = &harness.chain.head().beacon_state; + + assert_eq!( + state.slot, expected_slot, + "head should be at the current slot" + ); +} + +/// Check that the chain has finalized under best-case assumptions, and check the head slot. +fn check_finalization(harness: &TestHarness, expected_slot: u64) { + let state = &harness.chain.head().beacon_state; + + check_slot(harness, expected_slot); + + assert_eq!( + state.current_justified_checkpoint.epoch, + state.current_epoch() - 1, + "the head should be justified one behind the current epoch" + ); + assert_eq!( + state.finalized_checkpoint.epoch, + state.current_epoch() - 2, + "the head should be finalized two behind the current epoch" + ); +} + +/// Check that the DiskStore's split_slot is equal to the start slot of the last finalized epoch. +fn check_split_slot(harness: &TestHarness, store: Arc) { + let split_slot = store.get_split_slot(); + assert_eq!( + harness + .chain + .head() + .beacon_state + .finalized_checkpoint + .epoch + .start_slot(E::slots_per_epoch()), + split_slot + ); + assert_ne!(split_slot, 0); +} + +/// Check that all the states in a chain dump have the correct tree hash. +fn check_chain_dump(harness: &TestHarness, expected_len: u64) { + let chain_dump = harness.chain.chain_dump().unwrap(); + + assert_eq!(chain_dump.len() as u64, expected_len); + + for checkpoint in chain_dump { + assert_eq!( + checkpoint.beacon_state_root, + Hash256::from_slice(&checkpoint.beacon_state.tree_hash_root()), + "tree hash of stored state is incorrect" + ); + } +} diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index fa8b10a5b4..92d22529e5 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -14,6 +14,7 @@ store = { path = "../store" } network = { path = "../network" } eth2-libp2p = { path = "../eth2-libp2p" } rest_api = { path = "../rest_api" } +parking_lot = "0.9.0" websocket_server = { path = "../websocket_server" } prometheus = "0.7.0" types = { path = "../../eth2/types" } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 90cbc033cf..84e9feff00 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -5,7 +5,10 @@ use beacon_chain::{ eth1_chain::CachingEth1Backend, lmd_ghost::ThreadSafeReducedTree, slot_clock::{SlotClock, SystemTimeSlotClock}, - store::{DiskStore, MemoryStore, Store}, + store::{ + migrate::{BackgroundMigrator, Migrate, NullMigrator}, + DiskStore, MemoryStore, SimpleDiskStore, Store, + }, BeaconChain, BeaconChainTypes, Eth1ChainBackend, EventHandler, }; use environment::RuntimeContext; @@ -52,6 +55,7 @@ pub const ETH1_GENESIS_UPDATE_INTERVAL_MILLIS: u64 = 500; pub struct ClientBuilder { slot_clock: Option, store: Option>, + store_migrator: Option, runtime_context: Option>, chain_spec: Option, beacon_chain_builder: Option>, @@ -66,10 +70,21 @@ pub struct ClientBuilder { eth_spec_instance: T::EthSpec, } -impl - ClientBuilder> +impl + ClientBuilder< + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + > where TStore: Store + 'static, + TStoreMigrator: store::Migrate, TSlotClock: SlotClock + Clone + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -83,6 +98,7 @@ where Self { slot_clock: None, store: None, + store_migrator: None, runtime_context: None, chain_spec: None, beacon_chain_builder: None, @@ -118,6 +134,7 @@ where config: Eth1Config, ) -> impl Future { let store = self.store.clone(); + let store_migrator = self.store_migrator.take(); let chain_spec = self.chain_spec.clone(); let runtime_context = self.runtime_context.clone(); let eth_spec_instance = self.eth_spec_instance.clone(); @@ -126,6 +143,9 @@ where .and_then(move |()| { let store = store .ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; + let store_migrator = store_migrator.ok_or_else(|| { + "beacon_chain_start_method requires a store migrator".to_string() + })?; let context = runtime_context .ok_or_else(|| "beacon_chain_start_method requires a log".to_string())? .service_context("beacon"); @@ -135,6 +155,7 @@ where let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log.clone()) .store(store.clone()) + .store_migrator(store_migrator) .custom_spec(spec.clone()); Ok((builder, spec, context)) @@ -300,7 +321,9 @@ where &context.executor, beacon_chain.clone(), network_info, - client_config.db_path().expect("unable to read datadir"), + client_config + .create_db_path() + .expect("unable to read datadir"), eth2_config.clone(), context.log, ) @@ -420,7 +443,17 @@ where /// If type inference errors are being raised, see the comment on the definition of `Self`. pub fn build( self, - ) -> Client> { + ) -> Client< + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + > { Client { beacon_chain: self.beacon_chain, libp2p_network: self.libp2p_network, @@ -431,10 +464,11 @@ where } } -impl +impl ClientBuilder< Witness< TStore, + TStoreMigrator, TSlotClock, ThreadSafeReducedTree, TEth1Backend, @@ -444,6 +478,7 @@ impl > where TStore: Store + 'static, + TStoreMigrator: store::Migrate, TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, @@ -476,12 +511,21 @@ where } } -impl +impl ClientBuilder< - Witness>, + Witness< + TStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + WebSocketSender, + >, > where TStore: Store + 'static, + TStoreMigrator: store::Migrate, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -517,18 +561,68 @@ where } } -impl - ClientBuilder> +impl + ClientBuilder< + Witness< + DiskStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + > where TSlotClock: SlotClock + 'static, + TStoreMigrator: store::Migrate + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, { /// Specifies that the `Client` should use a `DiskStore` database. - pub fn disk_store(mut self, path: &Path) -> Result { - let store = DiskStore::open(path) + pub fn disk_store(mut self, hot_path: &Path, cold_path: &Path) -> Result { + let context = self + .runtime_context + .as_ref() + .ok_or_else(|| "disk_store requires a log".to_string())? + .service_context("freezer_db"); + let spec = self + .chain_spec + .clone() + .ok_or_else(|| "disk_store requires a chain spec".to_string())?; + + let store = DiskStore::open(hot_path, cold_path, spec, context.log) + .map_err(|e| format!("Unable to open database: {:?}", e).to_string())?; + self.store = Some(Arc::new(store)); + Ok(self) + } +} + +impl + ClientBuilder< + Witness< + SimpleDiskStore, + TStoreMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + > +where + TSlotClock: SlotClock + 'static, + TStoreMigrator: store::Migrate + 'static, + TLmdGhost: LmdGhost + 'static, + TEth1Backend: Eth1ChainBackend + 'static, + TEthSpec: EthSpec + 'static, + TEventHandler: EventHandler + 'static, +{ + /// Specifies that the `Client` should use a `DiskStore` database. + pub fn simple_disk_store(mut self, path: &Path) -> Result { + let store = SimpleDiskStore::open(path) .map_err(|e| format!("Unable to open database: {:?}", e).to_string())?; self.store = Some(Arc::new(store)); Ok(self) @@ -537,7 +631,15 @@ where impl ClientBuilder< - Witness, + Witness< + MemoryStore, + NullMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, > where TSlotClock: SlotClock + 'static, @@ -547,17 +649,49 @@ where TEventHandler: EventHandler + 'static, { /// Specifies that the `Client` should use a `MemoryStore` database. + /// + /// Also sets the `store_migrator` to the `NullMigrator`, as that's the only viable choice. pub fn memory_store(mut self) -> Self { let store = MemoryStore::open(); self.store = Some(Arc::new(store)); + self.store_migrator = Some(NullMigrator); self } } -impl +impl + ClientBuilder< + Witness< + DiskStore, + BackgroundMigrator, + TSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, + > +where + TSlotClock: SlotClock + 'static, + TLmdGhost: LmdGhost + 'static, + TEth1Backend: Eth1ChainBackend + 'static, + TEthSpec: EthSpec + 'static, + TEventHandler: EventHandler + 'static, +{ + pub fn background_migrator(mut self) -> Result { + let store = self.store.clone().ok_or_else(|| { + "background_migrator requires the store to be initialized".to_string() + })?; + self.store_migrator = Some(BackgroundMigrator::new(store)); + Ok(self) + } +} + +impl ClientBuilder< Witness< TStore, + TStoreMigrator, TSlotClock, TLmdGhost, CachingEth1Backend, @@ -567,6 +701,7 @@ impl > where TStore: Store + 'static, + TStoreMigrator: store::Migrate, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, TEthSpec: EthSpec + 'static, @@ -643,12 +778,21 @@ where } } -impl +impl ClientBuilder< - Witness, + Witness< + TStore, + TStoreMigrator, + SystemTimeSlotClock, + TLmdGhost, + TEth1Backend, + TEthSpec, + TEventHandler, + >, > where TStore: Store + 'static, + TStoreMigrator: store::Migrate, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 9081175f13..f94ea5ed55 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -7,6 +7,9 @@ use std::path::PathBuf; /// The number initial validators when starting the `Minimal`. const TESTNET_SPEC_CONSTANTS: &str = "minimal"; +/// Default directory name for the freezer database under the top-level data dir. +const DEFAULT_FREEZER_DB_DIR: &str = "freezer_db"; + /// Defines how the client should initialize the `BeaconChain` and other components. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ClientGenesis { @@ -39,6 +42,7 @@ pub struct Config { pub data_dir: PathBuf, pub db_type: String, db_name: String, + freezer_db_path: Option, pub log_file: PathBuf, pub spec_constants: String, /// If true, the node will use co-ordinated junk for eth1 values. @@ -63,6 +67,7 @@ impl Default for Config { log_file: PathBuf::from(""), db_type: "disk".to_string(), db_name: "chain_db".to_string(), + freezer_db_path: None, genesis: <_>::default(), network: NetworkConfig::new(), rest_api: <_>::default(), @@ -76,19 +81,59 @@ impl Default for Config { } impl Config { - /// Returns the path to which the client may initialize an on-disk database. - pub fn db_path(&self) -> Option { - self.data_dir() - .and_then(|path| Some(path.join(&self.db_name))) + /// Get the database path without initialising it. + pub fn get_db_path(&self) -> Option { + self.get_data_dir() + .map(|data_dir| data_dir.join(&self.db_name)) + } + + /// Get the database path, creating it if necessary. + pub fn create_db_path(&self) -> Result { + let db_path = self + .get_db_path() + .ok_or_else(|| "Unable to locate user home directory")?; + ensure_dir_exists(db_path) + } + + /// Fetch default path to use for the freezer database. + fn default_freezer_db_path(&self) -> Option { + self.get_data_dir() + .map(|data_dir| data_dir.join(DEFAULT_FREEZER_DB_DIR)) + } + + /// Returns the path to which the client may initialize the on-disk freezer database. + /// + /// Will attempt to use the user-supplied path from e.g. the CLI, or will default + /// to a directory in the data_dir if no path is provided. + pub fn get_freezer_db_path(&self) -> Option { + self.freezer_db_path + .clone() + .or_else(|| self.default_freezer_db_path()) + } + + /// Get the freezer DB path, creating it if necessary. + pub fn create_freezer_db_path(&self) -> Result { + let freezer_db_path = self + .get_freezer_db_path() + .ok_or_else(|| "Unable to locate user home directory")?; + ensure_dir_exists(freezer_db_path) + } + + /// Returns the core path for the client. + /// + /// Will not create any directories. + pub fn get_data_dir(&self) -> Option { + dirs::home_dir().map(|home_dir| home_dir.join(&self.data_dir)) } /// Returns the core path for the client. /// /// Creates the directory if it does not exist. - pub fn data_dir(&self) -> Option { - let path = dirs::home_dir()?.join(&self.data_dir); - fs::create_dir_all(&path).ok()?; - Some(path) + pub fn create_data_dir(&self) -> Result { + let path = self + .get_data_dir() + .ok_or_else(|| "Unable to locate user home directory".to_string())?; + ensure_dir_exists(path) } /// Apply the following arguments to `self`, replacing values if they are specified in `args`. @@ -100,9 +145,9 @@ impl Config { self.data_dir = PathBuf::from(dir); }; - if let Some(dir) = args.value_of("db") { - self.db_type = dir.to_string(); - }; + if let Some(freezer_dir) = args.value_of("freezer-dir") { + self.freezer_db_path = Some(PathBuf::from(freezer_dir)); + } self.network.apply_cli_args(args)?; self.rest_api.apply_cli_args(args)?; @@ -112,6 +157,12 @@ impl Config { } } +/// Ensure that the directory at `path` exists, by creating it and all parents if necessary. +fn ensure_dir_exists(path: PathBuf) -> Result { + fs::create_dir_all(&path).map_err(|e| format!("Unable to create {}: {}", path.display(), e))?; + Ok(path) +} + #[cfg(test)] mod tests { use super::*; diff --git a/beacon_node/network/src/message_handler.rs b/beacon_node/network/src/message_handler.rs index 35bdbb7d08..54f7f4b9b3 100644 --- a/beacon_node/network/src/message_handler.rs +++ b/beacon_node/network/src/message_handler.rs @@ -43,7 +43,7 @@ pub enum HandlerMessage { PubsubMessage(String, PeerId, PubsubMessage), } -impl MessageHandler { +impl MessageHandler { /// Initializes and runs the MessageHandler. pub fn spawn( beacon_chain: Arc>, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 0e86a61cc7..3743a67ca9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -24,7 +24,7 @@ pub struct Service { _phantom: PhantomData, } -impl Service { +impl Service { pub fn new( beacon_chain: Arc>, config: &NetworkConfig, diff --git a/beacon_node/rest_api/src/beacon.rs b/beacon_node/rest_api/src/beacon.rs index 9aa19cd457..9621dce459 100644 --- a/beacon_node/rest_api/src/beacon.rs +++ b/beacon_node/rest_api/src/beacon.rs @@ -196,7 +196,7 @@ pub fn get_state( let state = beacon_chain .store - .get(root)? + .get_state(root, None)? .ok_or_else(|| ApiError::NotFound(format!("No state for root: {:?}", root)))?; (*root, state) diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 1431d9c89b..55d9025a3c 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -154,7 +154,7 @@ pub fn state_at_slot( let state: BeaconState = beacon_chain .store - .get(&root)? + .get_state(&root, Some(slot))? .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at root {}", root)))?; Ok((root, state)) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2384092f3c..bb07ea1ff0 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -17,6 +17,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .global(true) ) + .arg( + Arg::with_name("freezer-dir") + .long("freezer-dir") + .value_name("DIR") + .help("Data directory for the freezer database.") + .takes_value(true) + .global(true) + ) /* * Network parameters. */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index ed3fcf97a0..05193cf16c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -306,7 +306,7 @@ impl ConfigBuilder { /// /// - Client config /// - Eth2 config - /// - The entire database directory + /// - All database directories pub fn clean_datadir(&mut self) -> Result<()> { let backup_dir = { let mut s = String::from("backup_"); @@ -334,10 +334,8 @@ impl ConfigBuilder { move_to_backup_dir(&self.client_config.data_dir.join(CLIENT_CONFIG_FILENAME))?; move_to_backup_dir(&self.client_config.data_dir.join(ETH2_CONFIG_FILENAME))?; - - if let Some(db_path) = self.client_config.db_path() { - move_to_backup_dir(&db_path)?; - } + move_to_backup_dir(&self.client_config.create_db_path()?)?; + move_to_backup_dir(&self.client_config.create_freezer_db_path()?)?; Ok(()) } @@ -475,7 +473,7 @@ impl ConfigBuilder { pub fn write_configs_to_new_datadir(&mut self) -> Result<()> { let db_exists = self .client_config - .db_path() + .get_db_path() .map(|d| d.exists()) .unwrap_or_else(|| false); @@ -531,19 +529,22 @@ impl ConfigBuilder { // // For now we return an error. In the future we may decide to boot a default (e.g., // public testnet or mainnet). - if !self.client_config.data_dir.exists() { + if !self + .client_config + .get_data_dir() + .map_or(false, |d| d.exists()) + { return Err( "No datadir found. Either create a new testnet or specify a different `--datadir`." .into(), ); } - // If there is a path to a databse in the config, ensure it exists. + // If there is a path to a database in the config, ensure it exists. if !self .client_config - .db_path() - .map(|path| path.exists()) - .unwrap_or_else(|| true) + .get_db_path() + .map_or(false, |path| path.exists()) { return Err( "No database found in datadir. Use 'testnet -f' to overwrite the existing \ diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index cc9ced9c0c..6797095b62 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -19,13 +19,14 @@ use environment::RuntimeContext; use futures::{Future, IntoFuture}; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; -use store::DiskStore; +use store::{migrate::BackgroundMigrator, DiskStore}; use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = Client< Witness< DiskStore, + BackgroundMigrator, SystemTimeSlotClock, ThreadSafeReducedTree, CachingEth1Backend, @@ -80,15 +81,17 @@ impl ProductionBeaconNode { let client_genesis = client_config.genesis.clone(); let log = context.log.clone(); - client_config - .db_path() - .ok_or_else(|| "Unable to access database path".to_string()) + let db_path_res = client_config.create_db_path(); + let freezer_db_path_res = client_config.create_freezer_db_path(); + + db_path_res .into_future() .and_then(move |db_path| { Ok(ClientBuilder::new(context.eth_spec_instance.clone()) .runtime_context(context) - .disk_store(&db_path)? - .chain_spec(spec)) + .chain_spec(spec) + .disk_store(&db_path, &freezer_db_path_res?)? + .background_migrator()?) }) .and_then(move |builder| { builder.beacon_chain_builder(client_genesis, genesis_eth1_config) diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index d613c12000..5187dba0d4 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dev-dependencies] tempfile = "3.1.0" +sloggers = "0.3.2" [dependencies] db-key = "0.0.5" @@ -15,5 +16,6 @@ eth2_ssz = "0.1.2" eth2_ssz_derive = "0.1.0" tree_hash = "0.1.0" types = { path = "../../eth2/types" } +slog = "2.2.3" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../eth2/utils/lighthouse_metrics" } diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs new file mode 100644 index 0000000000..47b4053be4 --- /dev/null +++ b/beacon_node/store/src/chunked_vector.rs @@ -0,0 +1,791 @@ +//! Space-efficient storage for `BeaconState` vector fields. +//! +//! This module provides logic for splitting the `FixedVector` fields of a `BeaconState` into +//! chunks, and storing those chunks in contiguous ranges in the on-disk database. The motiviation +//! for doing this is avoiding massive duplication in every on-disk state. For example, rather than +//! storing the whole `historical_roots` vector, which is updated once every couple of thousand +//! slots, at every slot, we instead store all the historical values as a chunked vector on-disk, +//! and fetch only the slice we need when reconstructing the `historical_roots` of a state. +//! +//! ## Terminology +//! +//! * **Chunk size**: the number of vector values stored per on-disk chunk. +//! * **Vector index** (vindex): index into all the historical values, identifying a single element +//! of the vector being stored. +//! * **Chunk index** (cindex): index into the keyspace of the on-disk database, identifying a chunk +//! of elements. To find the chunk index of a vector index: `cindex = vindex / chunk_size`. +use self::UpdatePattern::*; +use crate::*; +use ssz::{Decode, Encode}; +use typenum::Unsigned; + +/// Description of how a `BeaconState` field is updated during state processing. +/// +/// When storing a state, this allows us to efficiently store only those entries +/// which are not present in the DB already. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UpdatePattern { + /// The value is updated once per `n` slots. + OncePerNSlots { n: u64 }, + /// The value is updated once per epoch, for the epoch `current_epoch - lag`. + OncePerEpoch { lag: u64 }, +} + +/// Map a chunk index to bytes that can be used to key the NoSQL database. +/// +/// We shift chunks up by 1 to make room for a genesis chunk that is handled separately. +fn chunk_key(cindex: u64) -> [u8; 8] { + (cindex + 1).to_be_bytes() +} + +/// Return the database key for the genesis value. +fn genesis_value_key() -> [u8; 8] { + 0u64.to_be_bytes() +} + +/// Trait for types representing fields of the `BeaconState`. +/// +/// All of the required methods are type-level, because we do most things with fields at the +/// type-level. We require their value-level witnesses to be `Copy` so that we can avoid the +/// turbofish when calling functions like `store_updated_vector`. +pub trait Field: Copy { + /// The type of value stored in this field: the `T` from `FixedVector`. + /// + /// The `Default` impl will be used to fill extra vector entries. + type Value: Decode + Encode + Default + Clone + PartialEq + std::fmt::Debug; + + /// The length of this field: the `N` from `FixedVector`. + type Length: Unsigned; + + /// The database column where the integer-indexed chunks for this field should be stored. + /// + /// Each field's column **must** be unique. + fn column() -> DBColumn; + + /// Update pattern for this field, so that we can do differential updates. + fn update_pattern(spec: &ChainSpec) -> UpdatePattern; + + /// The number of values to store per chunk on disk. + /// + /// Default is 128 so that we read/write 4K pages when the values are 32 bytes. + // TODO: benchmark and optimise this parameter + fn chunk_size() -> usize { + 128 + } + + /// Get the value of this field at the given vector index, from the state. + fn get_value( + state: &BeaconState, + vindex: u64, + spec: &ChainSpec, + ) -> Result; + + /// True if this is a `FixedLengthField`, false otherwise. + fn is_fixed_length() -> bool; + + /// Compute the start and end vector indices of the slice of history required at `current_slot`. + /// + /// ## Example + /// + /// If we have a field that is updated once per epoch, then the end vindex will be + /// `current_epoch + 1`, because we want to include the value for the current epoch, and the + /// start vindex will be `end_vindex - Self::Length`, because that's how far back we can look. + fn start_and_end_vindex(current_slot: Slot, spec: &ChainSpec) -> (usize, usize) { + // We take advantage of saturating subtraction on slots and epochs + match Self::update_pattern(spec) { + OncePerNSlots { n } => { + // Per-slot changes exclude the index for the current slot, because + // it won't be set until the slot completes (think of `state_roots`, `block_roots`). + // This also works for the `historical_roots` because at the `n`th slot, the 0th + // entry of the list is created, and before that the list is empty. + let end_vindex = current_slot / n; + let start_vindex = end_vindex - Self::Length::to_u64(); + (start_vindex.as_usize(), end_vindex.as_usize()) + } + OncePerEpoch { lag } => { + // Per-epoch changes include the index for the current epoch, because it + // will have been set at the most recent epoch boundary. + let current_epoch = current_slot.epoch(E::slots_per_epoch()); + let end_epoch = current_epoch + 1 - lag; + let start_epoch = end_epoch + lag - Self::Length::to_u64(); + (start_epoch.as_usize(), end_epoch.as_usize()) + } + } + } + + /// Given an `existing_chunk` stored in the DB, construct an updated chunk to replace it. + fn get_updated_chunk( + existing_chunk: &Chunk, + chunk_index: usize, + start_vindex: usize, + end_vindex: usize, + state: &BeaconState, + spec: &ChainSpec, + ) -> Result, Error> { + let chunk_size = Self::chunk_size(); + let mut new_chunk = Chunk::new(vec![Self::Value::default(); chunk_size]); + + for i in 0..chunk_size { + let vindex = chunk_index * chunk_size + i; + if vindex >= start_vindex && vindex < end_vindex { + let vector_value = Self::get_value(state, vindex as u64, spec)?; + + if let Some(existing_value) = existing_chunk.values.get(i) { + if *existing_value != vector_value && *existing_value != Self::Value::default() + { + return Err(ChunkError::Inconsistent { + field: Self::column(), + chunk_index, + existing_value: format!("{:?}", existing_value), + new_value: format!("{:?}", vector_value), + } + .into()); + } + } + + new_chunk.values[i] = vector_value; + } else { + new_chunk.values[i] = existing_chunk + .values + .get(i) + .cloned() + .unwrap_or_else(Self::Value::default); + } + } + + Ok(new_chunk) + } + + /// Determine whether a state at `slot` possesses (or requires) the genesis value. + fn slot_needs_genesis_value(slot: Slot, spec: &ChainSpec) -> bool { + let (_, end_vindex) = Self::start_and_end_vindex(slot, spec); + match Self::update_pattern(spec) { + // If the end_vindex is less than the length of the vector, then the vector + // has not yet been completely filled with non-genesis values, and so the genesis + // value is still required. + OncePerNSlots { .. } => { + Self::is_fixed_length() && end_vindex < Self::Length::to_usize() + } + // If the field has lag, then it takes an extra `lag` vindices beyond the + // `end_vindex` before the vector has been filled with non-genesis values. + OncePerEpoch { lag } => { + Self::is_fixed_length() && end_vindex + (lag as usize) < Self::Length::to_usize() + } + } + } + + /// Load the genesis value for a fixed length field from the store. + /// + /// This genesis value should be used to fill the initial state of the vector. + fn load_genesis_value(store: &S) -> Result { + let key = &genesis_value_key()[..]; + let chunk = + Chunk::load(store, Self::column(), key)?.ok_or(ChunkError::MissingGenesisValue)?; + chunk + .values + .first() + .cloned() + .ok_or(ChunkError::MissingGenesisValue.into()) + } + + /// Store the given `value` as the genesis value for this field, unless stored already. + /// + /// Check the existing value (if any) for consistency with the value we intend to store, and + /// return an error if they are inconsistent. + fn check_and_store_genesis_value(store: &S, value: Self::Value) -> Result<(), Error> { + let key = &genesis_value_key()[..]; + + if let Some(existing_chunk) = Chunk::::load(store, Self::column(), key)? { + if existing_chunk.values.len() != 1 { + Err(ChunkError::InvalidGenesisChunk { + field: Self::column(), + expected_len: 1, + observed_len: existing_chunk.values.len(), + } + .into()) + } else if existing_chunk.values[0] != value { + Err(ChunkError::InconsistentGenesisValue { + field: Self::column(), + existing_value: format!("{:?}", existing_chunk.values[0]), + new_value: format!("{:?}", value), + } + .into()) + } else { + Ok(()) + } + } else { + Chunk::new(vec![value]).store(store, Self::column(), &genesis_value_key()[..]) + } + } + + /// Extract the genesis value for a fixed length field from an + /// + /// Will only return a correct value if `slot_needs_genesis_value(state.slot, spec) == true`. + fn extract_genesis_value( + state: &BeaconState, + spec: &ChainSpec, + ) -> Result { + let (_, end_vindex) = Self::start_and_end_vindex(state.slot, spec); + match Self::update_pattern(spec) { + // Genesis value is guaranteed to exist at `end_vindex`, as it won't yet have been + // updated + OncePerNSlots { .. } => Ok(Self::get_value(state, end_vindex as u64, spec)?), + // If there's lag, the value of the field at the vindex *without the lag* + // should still be set to the genesis value. + OncePerEpoch { lag } => Ok(Self::get_value(state, end_vindex as u64 + lag, spec)?), + } + } +} + +/// Marker trait for fixed-length fields (`FixedVector`). +pub trait FixedLengthField: Field {} + +/// Marker trait for variable-length fields (`VariableList`). +pub trait VariableLengthField: Field {} + +/// Macro to implement the `Field` trait on a new unit struct type. +macro_rules! field { + ($struct_name:ident, $marker_trait:ident, $value_ty:ty, $length_ty:ty, $column:expr, + $update_pattern:expr, $get_value:expr) => { + #[derive(Clone, Copy)] + pub struct $struct_name; + + impl Field for $struct_name + where + T: EthSpec, + { + type Value = $value_ty; + type Length = $length_ty; + + fn column() -> DBColumn { + $column + } + + fn update_pattern(spec: &ChainSpec) -> UpdatePattern { + $update_pattern(spec) + } + + fn get_value( + state: &BeaconState, + vindex: u64, + spec: &ChainSpec, + ) -> Result { + $get_value(state, vindex, spec) + } + + fn is_fixed_length() -> bool { + stringify!($marker_trait) == "FixedLengthField" + } + } + + impl $marker_trait for $struct_name {} + }; +} + +field!( + BlockRoots, + FixedLengthField, + Hash256, + T::SlotsPerHistoricalRoot, + DBColumn::BeaconBlockRoots, + |_| OncePerNSlots { n: 1 }, + |state: &BeaconState<_>, index, _| safe_modulo_index(&state.block_roots, index) +); + +field!( + StateRoots, + FixedLengthField, + Hash256, + T::SlotsPerHistoricalRoot, + DBColumn::BeaconStateRoots, + |_| OncePerNSlots { n: 1 }, + |state: &BeaconState<_>, index, _| safe_modulo_index(&state.state_roots, index) +); + +field!( + HistoricalRoots, + VariableLengthField, + Hash256, + T::HistoricalRootsLimit, + DBColumn::BeaconHistoricalRoots, + |_| OncePerNSlots { + n: T::SlotsPerHistoricalRoot::to_u64() + }, + |state: &BeaconState<_>, index, _| safe_modulo_index(&state.historical_roots, index) +); + +field!( + RandaoMixes, + FixedLengthField, + Hash256, + T::EpochsPerHistoricalVector, + DBColumn::BeaconRandaoMixes, + |_| OncePerEpoch { lag: 1 }, + |state: &BeaconState<_>, index, _| safe_modulo_index(&state.randao_mixes, index) +); + +pub fn store_updated_vector, E: EthSpec, S: Store>( + field: F, + store: &S, + state: &BeaconState, + spec: &ChainSpec, +) -> Result<(), Error> { + let chunk_size = F::chunk_size(); + let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec); + let start_cindex = start_vindex / chunk_size; + let end_cindex = end_vindex / chunk_size; + + // Store the genesis value if we have access to it, and it hasn't been stored already. + if F::slot_needs_genesis_value(state.slot, spec) { + let genesis_value = F::extract_genesis_value(state, spec)?; + F::check_and_store_genesis_value(store, genesis_value)?; + } + + // Start by iterating backwards from the last chunk, storing new chunks in the database. + // Stop once a chunk in the database matches what we were about to store, this indicates + // that a previously stored state has already filled-in a portion of the indices covered. + let full_range_checked = store_range( + field, + (start_cindex..=end_cindex).rev(), + start_vindex, + end_vindex, + store, + state, + spec, + )?; + + // If the previous `store_range` did not check the entire range, it may be the case that the + // state's vector includes elements at low vector indices that are not yet stored in the + // database, so run another `store_range` to ensure these values are also stored. + if !full_range_checked { + store_range( + field, + start_cindex..end_cindex, + start_vindex, + end_vindex, + store, + state, + spec, + )?; + } + + Ok(()) +} + +fn store_range( + _: F, + range: I, + start_vindex: usize, + end_vindex: usize, + store: &S, + state: &BeaconState, + spec: &ChainSpec, +) -> Result +where + F: Field, + E: EthSpec, + S: Store, + I: Iterator, +{ + for chunk_index in range { + let chunk_key = &chunk_key(chunk_index as u64)[..]; + + let existing_chunk = + Chunk::::load(store, F::column(), chunk_key)?.unwrap_or_else(Chunk::default); + + let new_chunk = F::get_updated_chunk( + &existing_chunk, + chunk_index, + start_vindex, + end_vindex, + state, + spec, + )?; + + if new_chunk == existing_chunk { + return Ok(false); + } + + new_chunk.store(store, F::column(), chunk_key)?; + } + + Ok(true) +} + +// Chunks at the end index are included. +// TODO: could be more efficient with a real range query (perhaps RocksDB) +fn range_query( + store: &S, + column: DBColumn, + start_index: usize, + end_index: usize, +) -> Result>, Error> { + let mut result = vec![]; + + for chunk_index in start_index..=end_index { + let key = &chunk_key(chunk_index as u64)[..]; + let chunk = Chunk::load(store, column, key)?.ok_or(ChunkError::Missing { chunk_index })?; + result.push(chunk); + } + + Ok(result) +} + +/// Combine chunks to form a list or vector of all values with vindex in `start_vindex..end_vindex`. +/// +/// The `length` parameter is the length of the vec to construct, with entries set to `default` if +/// they lie outside the vindex range. +fn stitch( + chunks: Vec>, + start_vindex: usize, + end_vindex: usize, + chunk_size: usize, + length: usize, + default: T, +) -> Result, ChunkError> { + if start_vindex + length < end_vindex { + return Err(ChunkError::OversizedRange { + start_vindex, + end_vindex, + length, + }); + } + + let start_cindex = start_vindex / chunk_size; + let end_cindex = end_vindex / chunk_size; + + let mut result = vec![default; length]; + + for (chunk_index, chunk) in (start_cindex..=end_cindex).zip(chunks.into_iter()) { + // All chunks but the last chunk must be full-sized + if chunk_index != end_cindex && chunk.values.len() != chunk_size { + return Err(ChunkError::InvalidSize { + chunk_index, + expected: chunk_size, + actual: chunk.values.len(), + }); + } + + // Copy the chunk entries into the result vector + for (i, value) in chunk.values.into_iter().enumerate() { + let vindex = chunk_index * chunk_size + i; + + if vindex >= start_vindex && vindex < end_vindex { + result[vindex % length] = value; + } + } + } + + Ok(result) +} + +pub fn load_vector_from_db, E: EthSpec, S: Store>( + store: &S, + slot: Slot, + spec: &ChainSpec, +) -> Result, Error> { + // Do a range query + let chunk_size = F::chunk_size(); + let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec); + let start_cindex = start_vindex / chunk_size; + let end_cindex = end_vindex / chunk_size; + + let chunks = range_query(store, F::column(), start_cindex, end_cindex)?; + + let default = if F::slot_needs_genesis_value(slot, spec) { + F::load_genesis_value(store)? + } else { + F::Value::default() + }; + + let result = stitch( + chunks, + start_vindex, + end_vindex, + chunk_size, + F::Length::to_usize(), + default, + )?; + + Ok(result.into()) +} + +/// The historical roots are stored in vector chunks, despite not actually being a vector. +pub fn load_variable_list_from_db, E: EthSpec, S: Store>( + store: &S, + slot: Slot, + spec: &ChainSpec, +) -> Result, Error> { + let chunk_size = F::chunk_size(); + let (start_vindex, end_vindex) = F::start_and_end_vindex(slot, spec); + let start_cindex = start_vindex / chunk_size; + let end_cindex = end_vindex / chunk_size; + + let chunks: Vec> = range_query(store, F::column(), start_cindex, end_cindex)?; + + let mut result = Vec::with_capacity(chunk_size * chunks.len()); + + for (chunk_index, chunk) in chunks.into_iter().enumerate() { + for (i, value) in chunk.values.into_iter().enumerate() { + let vindex = chunk_index * chunk_size + i; + + if vindex >= start_vindex && vindex < end_vindex { + result.push(value); + } + } + } + + Ok(result.into()) +} + +/// Index into a field of the state, avoiding out of bounds and division by 0. +fn safe_modulo_index(values: &[T], index: u64) -> Result { + if values.is_empty() { + Err(ChunkError::ZeroLengthVector) + } else { + Ok(values[index as usize % values.len()]) + } +} + +/// A chunk of a fixed-size vector from the `BeaconState`, stored in the database. +#[derive(Debug, Clone, PartialEq)] +pub struct Chunk { + /// A vector of up-to `chunk_size` values. + pub values: Vec, +} + +impl Default for Chunk +where + T: Decode + Encode, +{ + fn default() -> Self { + Chunk { values: vec![] } + } +} + +impl Chunk +where + T: Decode + Encode, +{ + pub fn new(values: Vec) -> Self { + Chunk { values } + } + + pub fn load(store: &S, column: DBColumn, key: &[u8]) -> Result, Error> { + store + .get_bytes(column.into(), key)? + .map(|bytes| Self::decode(&bytes)) + .transpose() + } + + pub fn store(&self, store: &S, column: DBColumn, key: &[u8]) -> Result<(), Error> { + store.put_bytes(column.into(), key, &self.encode()?)?; + Ok(()) + } + + /// Attempt to decode a single chunk. + pub fn decode(bytes: &[u8]) -> Result { + if !::is_ssz_fixed_len() { + return Err(Error::from(ChunkError::InvalidType)); + } + + let value_size = ::ssz_fixed_len(); + + if value_size == 0 { + return Err(Error::from(ChunkError::InvalidType)); + } + + let values = bytes + .chunks(value_size) + .map(T::from_ssz_bytes) + .collect::>()?; + + Ok(Chunk { values }) + } + + pub fn encoded_size(&self) -> usize { + self.values.len() * ::ssz_fixed_len() + } + + /// Encode a single chunk as bytes. + pub fn encode(&self) -> Result, Error> { + if !::is_ssz_fixed_len() { + return Err(Error::from(ChunkError::InvalidType)); + } + + Ok(self.values.iter().flat_map(T::as_ssz_bytes).collect()) + } +} + +#[derive(Debug, PartialEq)] +pub enum ChunkError { + ZeroLengthVector, + InvalidSize { + chunk_index: usize, + expected: usize, + actual: usize, + }, + Missing { + chunk_index: usize, + }, + MissingGenesisValue, + Inconsistent { + field: DBColumn, + chunk_index: usize, + existing_value: String, + new_value: String, + }, + InconsistentGenesisValue { + field: DBColumn, + existing_value: String, + new_value: String, + }, + InvalidGenesisChunk { + field: DBColumn, + expected_len: usize, + observed_len: usize, + }, + InvalidType, + OversizedRange { + start_vindex: usize, + end_vindex: usize, + length: usize, + }, +} + +#[cfg(test)] +mod test { + use super::*; + use types::MainnetEthSpec as TestSpec; + use types::*; + + fn v(i: u64) -> Hash256 { + Hash256::from_low_u64_be(i) + } + + #[test] + fn stitch_default() { + let chunk_size = 4; + + let chunks = vec![ + Chunk::new(vec![0u64, 1, 2, 3]), + Chunk::new(vec![4, 5, 0, 0]), + ]; + + assert_eq!( + stitch(chunks.clone(), 2, 6, chunk_size, 12, 99).unwrap(), + vec![99, 99, 2, 3, 4, 5, 99, 99, 99, 99, 99, 99] + ); + } + + #[test] + fn stitch_basic() { + let chunk_size = 4; + let default = v(0); + + let chunks = vec![ + Chunk::new(vec![v(0), v(1), v(2), v(3)]), + Chunk::new(vec![v(4), v(5), v(6), v(7)]), + Chunk::new(vec![v(8), v(9), v(10), v(11)]), + ]; + + assert_eq!( + stitch(chunks.clone(), 0, 12, chunk_size, 12, default).unwrap(), + (0..12).map(v).collect::>() + ); + + assert_eq!( + stitch(chunks.clone(), 2, 10, chunk_size, 8, default).unwrap(), + vec![v(8), v(9), v(2), v(3), v(4), v(5), v(6), v(7)] + ); + } + + #[test] + fn stitch_oversized_range() { + let chunk_size = 4; + let default = 0; + + let chunks = vec![Chunk::new(vec![20u64, 21, 22, 23])]; + + // Args (start_vindex, end_vindex, length) + let args = vec![(0, 21, 20), (0, 2048, 1024), (0, 2, 1)]; + + for (start_vindex, end_vindex, length) in args { + assert_eq!( + stitch( + chunks.clone(), + start_vindex, + end_vindex, + chunk_size, + length, + default + ), + Err(ChunkError::OversizedRange { + start_vindex, + end_vindex, + length, + }) + ); + } + } + + #[test] + fn fixed_length_fields() { + fn test_fixed_length>(_: F, expected: bool) { + assert_eq!(F::is_fixed_length(), expected); + } + test_fixed_length(BlockRoots, true); + test_fixed_length(StateRoots, true); + test_fixed_length(HistoricalRoots, false); + test_fixed_length(RandaoMixes, true); + } + + fn needs_genesis_value_once_per_slot>(_: F) { + let spec = &TestSpec::default_spec(); + let max = F::Length::to_u64(); + for i in 0..max { + assert!( + F::slot_needs_genesis_value(Slot::new(i), spec), + "slot {}", + i + ); + } + assert!(!F::slot_needs_genesis_value(Slot::new(max), spec)); + } + + #[test] + fn needs_genesis_value_block_roots() { + needs_genesis_value_once_per_slot(BlockRoots); + } + + #[test] + fn needs_genesis_value_state_roots() { + needs_genesis_value_once_per_slot(StateRoots); + } + + #[test] + fn needs_genesis_value_historical_roots() { + let spec = &TestSpec::default_spec(); + assert!( + !>::slot_needs_genesis_value(Slot::new(0), spec) + ); + } + + fn needs_genesis_value_test_randao>(_: F) { + let spec = &TestSpec::default_spec(); + let max = TestSpec::slots_per_epoch() as u64 * (F::Length::to_u64() - 1); + for i in 0..max { + assert!( + F::slot_needs_genesis_value(Slot::new(i), spec), + "slot {}", + i + ); + } + assert!(!F::slot_needs_genesis_value(Slot::new(max), spec)); + } + + #[test] + fn needs_genesis_value_randao() { + needs_genesis_value_test_randao(RandaoMixes); + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 815b35a8ed..70cc327a78 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -1,8 +1,15 @@ +use crate::chunked_vector::ChunkError; +use crate::hot_cold_store::HotColdDbError; use ssz::DecodeError; +use types::BeaconStateError; #[derive(Debug, PartialEq)] pub enum Error { SszDecodeError(DecodeError), + VectorChunkError(ChunkError), + BeaconStateError(BeaconStateError), + PartialBeaconStateError, + HotColdDbError(HotColdDbError), DBError { message: String }, } @@ -12,6 +19,24 @@ impl From for Error { } } +impl From for Error { + fn from(e: ChunkError) -> Error { + Error::VectorChunkError(e) + } +} + +impl From for Error { + fn from(e: HotColdDbError) -> Error { + Error::HotColdDbError(e) + } +} + +impl From for Error { + fn from(e: BeaconStateError) -> Error { + Error::BeaconStateError(e) + } +} + impl From for Error { fn from(e: DBError) -> Error { Error::DBError { message: e.message } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs new file mode 100644 index 0000000000..490f6c2d78 --- /dev/null +++ b/beacon_node/store/src/hot_cold_store.rs @@ -0,0 +1,260 @@ +use crate::chunked_vector::{ + store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, +}; +use crate::iter::StateRootsIterator; +use crate::{ + leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem, +}; +use parking_lot::RwLock; +use slog::{info, trace, Logger}; +use ssz::{Decode, Encode}; +use std::convert::TryInto; +use std::path::Path; +use std::sync::Arc; +use types::*; + +/// 32-byte key for accessing the `split_slot` of the freezer DB. +pub const SPLIT_SLOT_DB_KEY: &str = "FREEZERDBSPLITSLOTFREEZERDBSPLIT"; + +pub struct HotColdDB { + /// The slot before which all data is stored in the cold database. + /// + /// Data for slots less than `split_slot` is in the cold DB, while data for slots + /// greater than or equal is in the hot DB. + split_slot: RwLock, + /// Cold database containing compact historical data. + cold_db: LevelDB, + /// Hot database containing duplicated but quick-to-access recent data. + hot_db: LevelDB, + /// Chain spec. + spec: ChainSpec, + /// Logger. + pub(crate) log: Logger, +} + +#[derive(Debug, PartialEq)] +pub enum HotColdDbError { + FreezeSlotError { + current_split_slot: Slot, + proposed_split_slot: Slot, + }, +} + +impl Store for HotColdDB { + // Defer to the hot database for basic operations (including blocks for now) + fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error> { + self.hot_db.get_bytes(column, key) + } + + fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error> { + self.hot_db.put_bytes(column, key, value) + } + + fn key_exists(&self, column: &str, key: &[u8]) -> Result { + self.hot_db.key_exists(column, key) + } + + fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error> { + self.hot_db.key_delete(column, key) + } + + /// Store a state in the store. + fn put_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + if state.slot < self.get_split_slot() { + self.store_archive_state(state_root, state) + } else { + self.hot_db.put_state(state_root, state) + } + } + + /// Fetch a state from the store. + fn get_state( + &self, + state_root: &Hash256, + slot: Option, + ) -> Result>, Error> { + if let Some(slot) = slot { + if slot < self.get_split_slot() { + self.load_archive_state(state_root) + } else { + self.hot_db.get_state(state_root, None) + } + } else { + match self.hot_db.get_state(state_root, None)? { + Some(state) => Ok(Some(state)), + None => self.load_archive_state(state_root), + } + } + } + + fn freeze_to_state( + store: Arc, + _frozen_head_root: Hash256, + frozen_head: &BeaconState, + ) -> Result<(), Error> { + info!( + store.log, + "Freezer migration started"; + "slot" => frozen_head.slot + ); + + // 1. Copy all of the states between the head and the split slot, from the hot DB + // to the cold DB. + let current_split_slot = store.get_split_slot(); + + if frozen_head.slot < current_split_slot { + Err(HotColdDbError::FreezeSlotError { + current_split_slot, + proposed_split_slot: frozen_head.slot, + })?; + } + + let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); + + let mut to_delete = vec![]; + for (state_root, slot) in + state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) + { + trace!(store.log, "Freezing"; + "slot" => slot, + "state_root" => format!("{}", state_root)); + + let state: BeaconState = match store.hot_db.get_state(&state_root, None)? { + Some(s) => s, + // If there's no state it could be a skip slot, which is fine, our job is just + // to move everything that was in the hot DB to the cold. + None => continue, + }; + + to_delete.push(state_root); + + store.store_archive_state(&state_root, &state)?; + } + + // 2. Update the split slot + *store.split_slot.write() = frozen_head.slot; + store.store_split_slot()?; + + // 3. Delete from the hot DB + for state_root in to_delete { + store + .hot_db + .key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?; + } + + info!( + store.log, + "Freezer migration complete"; + "slot" => frozen_head.slot + ); + + Ok(()) + } +} + +impl HotColdDB { + pub fn open( + hot_path: &Path, + cold_path: &Path, + spec: ChainSpec, + log: Logger, + ) -> Result { + let db = HotColdDB { + split_slot: RwLock::new(Slot::new(0)), + cold_db: LevelDB::open(cold_path)?, + hot_db: LevelDB::open(hot_path)?, + spec, + log, + }; + // Load the previous split slot from the database (if any). This ensures we can + // stop and restart correctly. + if let Some(split_slot) = db.load_split_slot()? { + *db.split_slot.write() = split_slot; + } + Ok(db) + } + + pub fn store_archive_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + trace!( + self.log, + "Freezing state"; + "slot" => state.slot.as_u64(), + "state_root" => format!("{:?}", state_root) + ); + // 1. Convert to PartialBeaconState and store that in the DB. + let partial_state = PartialBeaconState::from_state_forgetful(state); + partial_state.db_put(&self.cold_db, state_root)?; + + // 2. Store updated vector entries. + let db = &self.cold_db; + store_updated_vector(BlockRoots, db, state, &self.spec)?; + store_updated_vector(StateRoots, db, state, &self.spec)?; + store_updated_vector(HistoricalRoots, db, state, &self.spec)?; + store_updated_vector(RandaoMixes, db, state, &self.spec)?; + + Ok(()) + } + + pub fn load_archive_state( + &self, + state_root: &Hash256, + ) -> Result>, Error> { + let mut partial_state = match PartialBeaconState::db_get(&self.cold_db, state_root)? { + Some(s) => s, + None => return Ok(None), + }; + + // Fill in the fields of the partial state. + partial_state.load_block_roots(&self.cold_db, &self.spec)?; + partial_state.load_state_roots(&self.cold_db, &self.spec)?; + partial_state.load_historical_roots(&self.cold_db, &self.spec)?; + partial_state.load_randao_mixes(&self.cold_db, &self.spec)?; + + let state: BeaconState = partial_state.try_into()?; + + Ok(Some(state)) + } + + pub fn get_split_slot(&self) -> Slot { + *self.split_slot.read() + } + + fn load_split_slot(&self) -> Result, Error> { + let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes()); + let split_slot: Option = self.hot_db.get(&key)?; + Ok(split_slot.map(|s| Slot::new(s.0))) + } + + fn store_split_slot(&self) -> Result<(), Error> { + let key = Hash256::from_slice(SPLIT_SLOT_DB_KEY.as_bytes()); + self.hot_db + .put(&key, &SplitSlot(self.get_split_slot().as_u64()))?; + Ok(()) + } +} + +/// Struct for storing the split slot in the database. +#[derive(Clone, Copy)] +struct SplitSlot(u64); + +impl SimpleStoreItem for SplitSlot { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(SplitSlot(u64::from_ssz_bytes(bytes)?)) + } +} diff --git a/beacon_node/store/src/impls.rs b/beacon_node/store/src/impls.rs index ed724480c3..b78a7f2ee5 100644 --- a/beacon_node/store/src/impls.rs +++ b/beacon_node/store/src/impls.rs @@ -1,9 +1,10 @@ use crate::*; use ssz::{Decode, Encode}; -mod beacon_state; +pub mod beacon_state; +pub mod partial_beacon_state; -impl StoreItem for BeaconBlock { +impl SimpleStoreItem for BeaconBlock { fn db_column() -> DBColumn { DBColumn::BeaconBlock } @@ -19,7 +20,7 @@ impl StoreItem for BeaconBlock { bytes } - fn from_store_bytes(bytes: &mut [u8]) -> Result { + fn from_store_bytes(bytes: &[u8]) -> Result { let timer = metrics::start_timer(&metrics::BEACON_BLOCK_READ_TIMES); let len = bytes.len(); diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 2113d35bd1..08ccec91cb 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -4,6 +4,43 @@ use ssz_derive::{Decode, Encode}; use std::convert::TryInto; use types::beacon_state::{BeaconTreeHashCache, CommitteeCache, CACHED_EPOCHS}; +pub fn store_full_state( + store: &S, + state_root: &Hash256, + state: &BeaconState, +) -> Result<(), Error> { + let timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES); + + let bytes = StorageContainer::new(state).as_ssz_bytes(); + let result = store.put_bytes(DBColumn::BeaconState.into(), state_root.as_bytes(), &bytes); + + metrics::stop_timer(timer); + metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); + metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64); + + result +} + +pub fn get_full_state( + store: &S, + state_root: &Hash256, +) -> Result>, Error> { + let timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES); + + match store.get_bytes(DBColumn::BeaconState.into(), state_root.as_bytes())? { + Some(bytes) => { + let container = StorageContainer::from_ssz_bytes(&bytes)?; + + metrics::stop_timer(timer); + metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT); + metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, bytes.len() as i64); + + Ok(Some(container.try_into()?)) + } + None => Ok(None), + } +} + /// A container for storing `BeaconState` components. // TODO: would be more space efficient with the caches stored separately and referenced by hash #[derive(Encode, Decode)] @@ -53,36 +90,3 @@ impl TryInto> for StorageContainer { Ok(state) } } - -impl StoreItem for BeaconState { - fn db_column() -> DBColumn { - DBColumn::BeaconState - } - - fn as_store_bytes(&self) -> Vec { - let timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES); - - let container = StorageContainer::new(self); - let bytes = container.as_ssz_bytes(); - - metrics::stop_timer(timer); - metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); - metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64); - - bytes - } - - fn from_store_bytes(bytes: &mut [u8]) -> Result { - let timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES); - - let len = bytes.len(); - let container = StorageContainer::from_ssz_bytes(bytes)?; - let result = container.try_into(); - - metrics::stop_timer(timer); - metrics::inc_counter(&metrics::BEACON_STATE_READ_COUNT); - metrics::inc_counter_by(&metrics::BEACON_STATE_READ_BYTES, len as i64); - - result - } -} diff --git a/beacon_node/store/src/impls/partial_beacon_state.rs b/beacon_node/store/src/impls/partial_beacon_state.rs new file mode 100644 index 0000000000..b7fcc8bdb2 --- /dev/null +++ b/beacon_node/store/src/impls/partial_beacon_state.rs @@ -0,0 +1,16 @@ +use crate::*; +use ssz::{Decode, Encode}; + +impl SimpleStoreItem for PartialBeaconState { + fn db_column() -> DBColumn { + DBColumn::BeaconState + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 678cd26f11..6b2dc2d540 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -18,7 +18,7 @@ impl<'a, U: Store, E: EthSpec> AncestorIter> for /// Iterates across all available prior block roots of `self`, starting at the most recent and ending /// at genesis. fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { - let state = store.get::>(&self.state_root).ok()??; + let state = store.get_state(&self.state_root, Some(self.slot)).ok()??; Some(BlockRootsIterator::owned(store, state)) } @@ -33,13 +33,22 @@ impl<'a, U: Store, E: EthSpec> AncestorIter> for } } -#[derive(Clone)] pub struct StateRootsIterator<'a, T: EthSpec, U> { store: Arc, beacon_state: Cow<'a, BeaconState>, slot: Slot, } +impl<'a, T: EthSpec, U> Clone for StateRootsIterator<'a, T, U> { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + beacon_state: self.beacon_state.clone(), + slot: self.slot, + } + } +} + impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { Self { @@ -62,7 +71,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { type Item = (Hash256, Slot); fn next(&mut self) -> Option { - if (self.slot == 0) || (self.slot > self.beacon_state.slot) { + if self.slot == 0 || self.slot > self.beacon_state.slot { return None; } @@ -75,7 +84,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { let beacon_state: BeaconState = { let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; - self.store.get(&new_state_root).ok()? + self.store.get_state(&new_state_root, None).ok()? }?; self.beacon_state = Cow::Owned(beacon_state); @@ -128,13 +137,22 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { /// exhausted. /// /// Returns `None` for roots prior to genesis or when there is an error reading from `Store`. -#[derive(Clone)] pub struct BlockRootsIterator<'a, T: EthSpec, U> { store: Arc, beacon_state: Cow<'a, BeaconState>, slot: Slot, } +impl<'a, T: EthSpec, U> Clone for BlockRootsIterator<'a, T, U> { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + beacon_state: self.beacon_state.clone(), + slot: self.slot, + } + } +} + impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { @@ -173,7 +191,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { // Load the earliest state from disk. let new_state_root = self.beacon_state.get_oldest_state_root().ok()?; - self.store.get(&new_state_root).ok()? + self.store.get_state(&new_state_root, None).ok()? }?; self.beacon_state = Cow::Owned(beacon_state); @@ -187,6 +205,52 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } } +pub type ReverseBlockRootIterator<'a, E, S> = + ReverseHashAndSlotIterator>; +pub type ReverseStateRootIterator<'a, E, S> = + ReverseHashAndSlotIterator>; + +pub type ReverseHashAndSlotIterator = ReverseChainIterator<(Hash256, Slot), I>; + +/// Provides a wrapper for an iterator that returns a given `T` before it starts returning results of +/// the `Iterator`. +pub struct ReverseChainIterator { + first_value_used: bool, + first_value: T, + iter: I, +} + +impl ReverseChainIterator +where + T: Sized, + I: Iterator + Sized, +{ + pub fn new(first_value: T, iter: I) -> Self { + Self { + first_value_used: false, + first_value, + iter, + } + } +} + +impl Iterator for ReverseChainIterator +where + T: Clone, + I: Iterator, +{ + type Item = T; + + fn next(&mut self) -> Option { + if self.first_value_used { + self.iter.next() + } else { + self.first_value_used = true; + Some(self.first_value.clone()) + } + } +} + #[cfg(test)] mod test { use super::*; @@ -225,7 +289,7 @@ mod test { let state_a_root = hashes.next().unwrap(); state_b.state_roots[0] = state_a_root; - store.put(&state_a_root, &state_a).unwrap(); + store.put_state(&state_a_root, &state_a).unwrap(); let iter = BlockRootsIterator::new(store.clone(), &state_b); @@ -273,8 +337,8 @@ mod test { let state_a_root = Hash256::from_low_u64_be(slots_per_historical_root as u64); let state_b_root = Hash256::from_low_u64_be(slots_per_historical_root as u64 * 2); - store.put(&state_a_root, &state_a).unwrap(); - store.put(&state_b_root, &state_b).unwrap(); + store.put_state(&state_a_root, &state_a).unwrap(); + store.put_state(&state_b_root, &state_b).unwrap(); let iter = StateRootsIterator::new(store.clone(), &state_b); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index a085d845a8..5c53b77572 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,4 +1,5 @@ use super::*; +use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::metrics; use db_key::Key; use leveldb::database::kv::KV; @@ -6,14 +7,10 @@ use leveldb::database::Database; use leveldb::error::Error as LevelDBError; use leveldb::options::{Options, ReadOptions, WriteOptions}; use std::path::Path; -use std::sync::Arc; /// A wrapped leveldb database. -#[derive(Clone)] pub struct LevelDB { - // Note: this `Arc` is only included because of an artificial constraint by gRPC. Hopefully we - // can remove this one day. - db: Arc>, + db: Database, } impl LevelDB { @@ -23,7 +20,7 @@ impl LevelDB { options.create_if_missing = true; - let db = Arc::new(Database::open(path, options)?); + let db = Database::open(path, options)?; Ok(Self { db }) } @@ -111,6 +108,24 @@ impl Store for LevelDB { .delete(self.write_options(), column_key) .map_err(Into::into) } + + /// Store a state in the store. + fn put_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + store_full_state(self, state_root, state) + } + + /// Fetch a state from the store. + fn get_state( + &self, + state_root: &Hash256, + _: Option, + ) -> Result>, Error> { + get_full_state(self, state_root) + } } impl From for Error { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 9c0e3cbaec..d33a2ab3d4 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -11,16 +11,25 @@ extern crate lazy_static; mod block_at_slot; +pub mod chunked_vector; mod errors; +mod hot_cold_store; mod impls; mod leveldb_store; mod memory_store; mod metrics; +mod partial_beacon_state; pub mod iter; +pub mod migrate; -pub use self::leveldb_store::LevelDB as DiskStore; +use std::sync::Arc; + +pub use self::hot_cold_store::HotColdDB as DiskStore; +pub use self::leveldb_store::LevelDB as SimpleDiskStore; pub use self::memory_store::MemoryStore; +pub use self::migrate::Migrate; +pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use metrics::scrape_for_metrics; pub use types::*; @@ -30,9 +39,21 @@ pub use types::*; /// A `Store` is fundamentally backed by a key-value database, however it provides support for /// columns. A simple column implementation might involve prefixing a key with some bytes unique to /// each column. -pub trait Store: Sync + Send + Sized { +pub trait Store: Sync + Send + Sized + 'static { + /// Retrieve some bytes in `column` with `key`. + fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; + + /// Store some `value` in `column`, indexed with `key`. + fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>; + + /// Return `true` if `key` exists in `column`. + fn key_exists(&self, column: &str, key: &[u8]) -> Result; + + /// Removes `key` from `column`. + fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; + /// Store an item in `Self`. - fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> { + fn put(&self, key: &Hash256, item: &I) -> Result<(), Error> { item.db_put(self, key) } @@ -51,6 +72,20 @@ pub trait Store: Sync + Send + Sized { I::db_delete(self, key) } + /// Store a state in the store. + fn put_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error>; + + /// Fetch a state from the store. + fn get_state( + &self, + state_root: &Hash256, + slot: Option, + ) -> Result>, Error>; + /// Given the root of an existing block in the store (`start_block_root`), return a parent /// block with the specified `slot`. /// @@ -64,42 +99,48 @@ pub trait Store: Sync + Send + Sized { block_at_slot::get_block_at_preceeding_slot::<_, E>(self, slot, start_block_root) } - /// Retrieve some bytes in `column` with `key`. - fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; - - /// Store some `value` in `column`, indexed with `key`. - fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>; - - /// Return `true` if `key` exists in `column`. - fn key_exists(&self, column: &str, key: &[u8]) -> Result; - - /// Removes `key` from `column`. - fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; + /// (Optionally) Move all data before the frozen slot to the freezer database. + fn freeze_to_state( + _store: Arc, + _frozen_head_root: Hash256, + _frozen_head: &BeaconState, + ) -> Result<(), Error> { + Ok(()) + } } /// A unique column identifier. +#[derive(Debug, Clone, Copy, PartialEq)] pub enum DBColumn { + /// For data related to the database itself. + BeaconMeta, BeaconBlock, BeaconState, BeaconChain, + BeaconBlockRoots, + BeaconStateRoots, + BeaconHistoricalRoots, + BeaconRandaoMixes, } -impl<'a> Into<&'a str> for DBColumn { +impl Into<&'static str> for DBColumn { /// Returns a `&str` that can be used for keying a key-value data base. - fn into(self) -> &'a str { + fn into(self) -> &'static str { match self { - DBColumn::BeaconBlock => &"blk", - DBColumn::BeaconState => &"ste", - DBColumn::BeaconChain => &"bch", + DBColumn::BeaconMeta => "bma", + DBColumn::BeaconBlock => "blk", + DBColumn::BeaconState => "ste", + DBColumn::BeaconChain => "bch", + DBColumn::BeaconBlockRoots => "bbr", + DBColumn::BeaconStateRoots => "bsr", + DBColumn::BeaconHistoricalRoots => "bhr", + DBColumn::BeaconRandaoMixes => "brm", } } } -/// An item that may be stored in a `Store`. -/// -/// Provides default methods that are suitable for most applications, however when overridden they -/// provide full customizability of `Store` operations. -pub trait StoreItem: Sized { +/// An item that may stored in a `Store` by serializing and deserializing from bytes. +pub trait SimpleStoreItem: Sized { /// Identifies which column this item should be placed in. fn db_column() -> DBColumn; @@ -107,10 +148,32 @@ pub trait StoreItem: Sized { fn as_store_bytes(&self) -> Vec; /// De-serialize `self` from bytes. - fn from_store_bytes(bytes: &mut [u8]) -> Result; + /// + /// Return an instance of the type and the number of bytes that were read. + fn from_store_bytes(bytes: &[u8]) -> Result; +} +/// An item that may be stored in a `Store`. +pub trait StoreItem: Sized { /// Store `self`. - fn db_put(&self, store: &impl Store, key: &Hash256) -> Result<(), Error> { + fn db_put(&self, store: &S, key: &Hash256) -> Result<(), Error>; + + /// Retrieve an instance of `Self` from `store`. + fn db_get(store: &S, key: &Hash256) -> Result, Error>; + + /// Return `true` if an instance of `Self` exists in `store`. + fn db_exists(store: &S, key: &Hash256) -> Result; + + /// Delete an instance of `Self` from `store`. + fn db_delete(store: &S, key: &Hash256) -> Result<(), Error>; +} + +impl StoreItem for T +where + T: SimpleStoreItem, +{ + /// Store `self`. + fn db_put(&self, store: &S, key: &Hash256) -> Result<(), Error> { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -120,18 +183,18 @@ pub trait StoreItem: Sized { } /// Retrieve an instance of `Self`. - fn db_get(store: &impl Store, key: &Hash256) -> Result, Error> { + fn db_get(store: &S, key: &Hash256) -> Result, Error> { let column = Self::db_column().into(); let key = key.as_bytes(); match store.get_bytes(column, key)? { - Some(mut bytes) => Ok(Some(Self::from_store_bytes(&mut bytes[..])?)), + Some(bytes) => Ok(Some(Self::from_store_bytes(&bytes[..])?)), None => Ok(None), } } /// Return `true` if an instance of `Self` exists in `Store`. - fn db_exists(store: &impl Store, key: &Hash256) -> Result { + fn db_exists(store: &S, key: &Hash256) -> Result { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -139,7 +202,7 @@ pub trait StoreItem: Sized { } /// Delete `self` from the `Store`. - fn db_delete(store: &impl Store, key: &Hash256) -> Result<(), Error> { + fn db_delete(store: &S, key: &Hash256) -> Result<(), Error> { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -160,7 +223,7 @@ mod tests { b: u64, } - impl StoreItem for StorableThing { + impl SimpleStoreItem for StorableThing { fn db_column() -> DBColumn { DBColumn::BeaconBlock } @@ -169,7 +232,7 @@ mod tests { self.as_ssz_bytes() } - fn from_store_bytes(bytes: &mut [u8]) -> Result { + fn from_store_bytes(bytes: &[u8]) -> Result { Self::from_ssz_bytes(bytes).map_err(Into::into) } } @@ -196,9 +259,22 @@ mod tests { #[test] fn diskdb() { + use sloggers::{null::NullLoggerBuilder, Build}; + + let hot_dir = tempdir().unwrap(); + let cold_dir = tempdir().unwrap(); + let spec = MinimalEthSpec::default_spec(); + let log = NullLoggerBuilder.build().unwrap(); + let store = DiskStore::open(&hot_dir.path(), &cold_dir.path(), spec, log).unwrap(); + + test_impl(store); + } + + #[test] + fn simplediskdb() { let dir = tempdir().unwrap(); let path = dir.path(); - let store = DiskStore::open(&path).unwrap(); + let store = SimpleDiskStore::open(&path).unwrap(); test_impl(store); } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 048c054f52..4da9d076ca 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,23 +1,29 @@ use super::{Error, Store}; +use crate::impls::beacon_state::{get_full_state, store_full_state}; use parking_lot::RwLock; use std::collections::HashMap; -use std::sync::Arc; +use types::*; type DBHashMap = HashMap, Vec>; /// A thread-safe `HashMap` wrapper. -#[derive(Clone)] pub struct MemoryStore { - // Note: this `Arc` is only included because of an artificial constraint by gRPC. Hopefully we - // can remove this one day. - db: Arc>, + db: RwLock, +} + +impl Clone for MemoryStore { + fn clone(&self) -> Self { + Self { + db: RwLock::new(self.db.read().clone()), + } + } } impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { - db: Arc::new(RwLock::new(HashMap::new())), + db: RwLock::new(HashMap::new()), } } @@ -64,4 +70,22 @@ impl Store for MemoryStore { Ok(()) } + + /// Store a state in the store. + fn put_state( + &self, + state_root: &Hash256, + state: &BeaconState, + ) -> Result<(), Error> { + store_full_state(self, state_root, state) + } + + /// Fetch a state from the store. + fn get_state( + &self, + state_root: &Hash256, + _: Option, + ) -> Result>, Error> { + get_full_state(self, state_root) + } } diff --git a/beacon_node/store/src/migrate.rs b/beacon_node/store/src/migrate.rs new file mode 100644 index 0000000000..b32ffed14b --- /dev/null +++ b/beacon_node/store/src/migrate.rs @@ -0,0 +1,142 @@ +use crate::{DiskStore, MemoryStore, SimpleDiskStore, Store}; +use parking_lot::Mutex; +use slog::warn; +use std::mem; +use std::sync::mpsc; +use std::sync::Arc; +use std::thread; +use types::{BeaconState, EthSpec, Hash256, Slot}; + +/// Trait for migration processes that update the database upon finalization. +pub trait Migrate: Send + Sync + 'static { + fn new(db: Arc) -> Self; + + fn freeze_to_state( + &self, + _state_root: Hash256, + _state: BeaconState, + _max_finality_distance: u64, + ) { + } +} + +/// Migrator that does nothing, for stores that don't need migration. +pub struct NullMigrator; + +impl Migrate for NullMigrator { + fn new(_: Arc) -> Self { + NullMigrator + } +} + +impl Migrate for NullMigrator { + fn new(_: Arc) -> Self { + NullMigrator + } +} + +/// Migrator that immediately calls the store's migration function, blocking the current execution. +/// +/// Mostly useful for tests. +pub struct BlockingMigrator(Arc); + +impl Migrate for BlockingMigrator { + fn new(db: Arc) -> Self { + BlockingMigrator(db) + } + + fn freeze_to_state( + &self, + state_root: Hash256, + state: BeaconState, + _max_finality_distance: u64, + ) { + if let Err(e) = S::freeze_to_state(self.0.clone(), state_root, &state) { + // This migrator is only used for testing, so we just log to stderr without a logger. + eprintln!("Migration error: {:?}", e); + } + } +} + +/// Migrator that runs a background thread to migrate state from the hot to the cold database. +pub struct BackgroundMigrator { + db: Arc, + tx_thread: Mutex<( + mpsc::Sender<(Hash256, BeaconState)>, + thread::JoinHandle<()>, + )>, +} + +impl Migrate for BackgroundMigrator { + fn new(db: Arc) -> Self { + let tx_thread = Mutex::new(Self::spawn_thread(db.clone())); + Self { db, tx_thread } + } + + /// Perform the freezing operation on the database, + fn freeze_to_state( + &self, + finalized_state_root: Hash256, + finalized_state: BeaconState, + max_finality_distance: u64, + ) { + if !self.needs_migration(finalized_state.slot, max_finality_distance) { + return; + } + + let (ref mut tx, ref mut thread) = *self.tx_thread.lock(); + + if let Err(tx_err) = tx.send((finalized_state_root, finalized_state)) { + let (new_tx, new_thread) = Self::spawn_thread(self.db.clone()); + + drop(mem::replace(tx, new_tx)); + let old_thread = mem::replace(thread, new_thread); + + // Join the old thread, which will probably have panicked, or may have + // halted normally just now as a result of us dropping the old `mpsc::Sender`. + if let Err(thread_err) = old_thread.join() { + warn!( + self.db.log, + "Migration thread died, so it was restarted"; + "reason" => format!("{:?}", thread_err) + ); + } + + // Retry at most once, we could recurse but that would risk overflowing the stack. + let _ = tx.send(tx_err.0); + } + } +} + +impl BackgroundMigrator { + /// Return true if a migration needs to be performed, given a new `finalized_slot`. + fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool { + let finality_distance = finalized_slot - self.db.get_split_slot(); + finality_distance > max_finality_distance + } + + /// Spawn a new child thread to run the migration process. + /// + /// Return a channel handle for sending new finalized states to the thread. + fn spawn_thread( + db: Arc, + ) -> ( + mpsc::Sender<(Hash256, BeaconState)>, + thread::JoinHandle<()>, + ) { + let (tx, rx) = mpsc::channel(); + let thread = thread::spawn(move || { + while let Ok((state_root, state)) = rx.recv() { + if let Err(e) = DiskStore::freeze_to_state(db.clone(), state_root, &state) { + warn!( + db.log, + "Database migration failed"; + "error" => format!("{:?}", e) + ); + } + } + }); + + (tx, thread) + } +} diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs new file mode 100644 index 0000000000..cea8cf310d --- /dev/null +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -0,0 +1,217 @@ +use crate::chunked_vector::{ + load_variable_list_from_db, load_vector_from_db, BlockRoots, HistoricalRoots, RandaoMixes, + StateRoots, +}; +use crate::{Error, Store}; +use ssz_derive::{Decode, Encode}; +use std::convert::TryInto; +use types::*; + +/// Lightweight variant of the `BeaconState` that is stored in the database. +/// +/// Utilises lazy-loading from separate storage for its vector fields. +/// +/// Spec v0.9.1 +#[derive(Debug, PartialEq, Clone, Encode, Decode)] +pub struct PartialBeaconState +where + T: EthSpec, +{ + // Versioning + pub genesis_time: u64, + pub slot: Slot, + pub fork: Fork, + + // History + pub latest_block_header: BeaconBlockHeader, + + #[ssz(skip_serializing)] + #[ssz(skip_deserializing)] + pub block_roots: Option>, + #[ssz(skip_serializing)] + #[ssz(skip_deserializing)] + pub state_roots: Option>, + + #[ssz(skip_serializing)] + #[ssz(skip_deserializing)] + pub historical_roots: Option>, + + // Ethereum 1.0 chain data + pub eth1_data: Eth1Data, + pub eth1_data_votes: VariableList, + pub eth1_deposit_index: u64, + + // Registry + pub validators: VariableList, + pub balances: VariableList, + + // Shuffling + /// Randao value from the current slot, for patching into the per-epoch randao vector. + pub latest_randao_value: Hash256, + #[ssz(skip_serializing)] + #[ssz(skip_deserializing)] + pub randao_mixes: Option>, + + // Slashings + slashings: FixedVector, + + // Attestations + pub previous_epoch_attestations: VariableList, T::MaxPendingAttestations>, + pub current_epoch_attestations: VariableList, T::MaxPendingAttestations>, + + // Finality + pub justification_bits: BitVector, + pub previous_justified_checkpoint: Checkpoint, + pub current_justified_checkpoint: Checkpoint, + pub finalized_checkpoint: Checkpoint, +} + +impl PartialBeaconState { + /// Convert a `BeaconState` to a `PartialBeaconState`, while dropping the optional fields. + pub fn from_state_forgetful(s: &BeaconState) -> Self { + // TODO: could use references/Cow for fields to avoid cloning + PartialBeaconState { + genesis_time: s.genesis_time, + slot: s.slot, + fork: s.fork.clone(), + + // History + latest_block_header: s.latest_block_header.clone(), + block_roots: None, + state_roots: None, + historical_roots: None, + + // Eth1 + eth1_data: s.eth1_data.clone(), + eth1_data_votes: s.eth1_data_votes.clone(), + eth1_deposit_index: s.eth1_deposit_index, + + // Validator registry + validators: s.validators.clone(), + balances: s.balances.clone(), + + // Shuffling + latest_randao_value: *s + .get_randao_mix(s.current_epoch()) + .expect("randao at current epoch is OK"), + randao_mixes: None, + + // Slashings + slashings: s.get_all_slashings().to_vec().into(), + + // Attestations + previous_epoch_attestations: s.previous_epoch_attestations.clone(), + current_epoch_attestations: s.current_epoch_attestations.clone(), + + // Finality + justification_bits: s.justification_bits.clone(), + previous_justified_checkpoint: s.previous_justified_checkpoint.clone(), + current_justified_checkpoint: s.current_justified_checkpoint.clone(), + finalized_checkpoint: s.finalized_checkpoint.clone(), + } + } + + pub fn load_block_roots(&mut self, store: &S, spec: &ChainSpec) -> Result<(), Error> { + if self.block_roots.is_none() { + self.block_roots = Some(load_vector_from_db::( + store, self.slot, spec, + )?); + } + Ok(()) + } + + pub fn load_state_roots(&mut self, store: &S, spec: &ChainSpec) -> Result<(), Error> { + if self.state_roots.is_none() { + self.state_roots = Some(load_vector_from_db::( + store, self.slot, spec, + )?); + } + Ok(()) + } + + pub fn load_historical_roots( + &mut self, + store: &S, + spec: &ChainSpec, + ) -> Result<(), Error> { + if self.historical_roots.is_none() { + self.historical_roots = Some(load_variable_list_from_db::( + store, self.slot, spec, + )?); + } + Ok(()) + } + + pub fn load_randao_mixes( + &mut self, + store: &S, + spec: &ChainSpec, + ) -> Result<(), Error> { + if self.randao_mixes.is_none() { + // Load the per-epoch values from the database + let mut randao_mixes = + load_vector_from_db::(store, self.slot, spec)?; + + // Patch the value for the current slot into the index for the current epoch + let current_epoch = self.slot.epoch(T::slots_per_epoch()); + let len = randao_mixes.len(); + randao_mixes[current_epoch.as_usize() % len] = self.latest_randao_value; + + self.randao_mixes = Some(randao_mixes) + } + Ok(()) + } +} + +impl TryInto> for PartialBeaconState { + type Error = Error; + + fn try_into(self) -> Result, Error> { + fn unpack(x: Option) -> Result { + x.ok_or(Error::PartialBeaconStateError) + } + + Ok(BeaconState { + genesis_time: self.genesis_time, + slot: self.slot, + fork: self.fork, + + // History + latest_block_header: self.latest_block_header, + block_roots: unpack(self.block_roots)?, + state_roots: unpack(self.state_roots)?, + historical_roots: unpack(self.historical_roots)?, + + // Eth1 + eth1_data: self.eth1_data, + eth1_data_votes: self.eth1_data_votes, + eth1_deposit_index: self.eth1_deposit_index, + + // Validator registry + validators: self.validators, + balances: self.balances, + + // Shuffling + randao_mixes: unpack(self.randao_mixes)?, + + // Slashings + slashings: self.slashings, + + // Attestations + previous_epoch_attestations: self.previous_epoch_attestations, + current_epoch_attestations: self.current_epoch_attestations, + + // Finality + justification_bits: self.justification_bits, + previous_justified_checkpoint: self.previous_justified_checkpoint, + current_justified_checkpoint: self.current_justified_checkpoint, + finalized_checkpoint: self.finalized_checkpoint, + + // Caching + committee_caches: <_>::default(), + pubkey_cache: <_>::default(), + exit_cache: <_>::default(), + tree_hash_cache: <_>::default(), + }) + } +} diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index 85540785db..e26d472a1b 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -667,7 +667,7 @@ where fn iter_ancestors(&self, child: Hash256) -> Result> { let block = self.get_block(child)?; - let state = self.get_state(block.state_root)?; + let state = self.get_state(block.state_root, block.slot)?; Ok(BlockRootsIterator::owned(self.store.clone(), state)) } @@ -754,9 +754,9 @@ where .ok_or_else(|| Error::MissingBlock(block_root)) } - fn get_state(&self, state_root: Hash256) -> Result> { + fn get_state(&self, state_root: Hash256, slot: Slot) -> Result> { self.store - .get::>(&state_root)? + .get_state(&state_root, Some(slot))? .ok_or_else(|| Error::MissingState(state_root)) } diff --git a/eth2/types/src/chain_spec.rs b/eth2/types/src/chain_spec.rs index 3889a392ad..c6152269a5 100644 --- a/eth2/types/src/chain_spec.rs +++ b/eth2/types/src/chain_spec.rs @@ -241,7 +241,7 @@ impl ChainSpec { /// Ethereum Foundation minimal spec, as defined here: /// - /// https://github.com/ethereum/eth2.0-specs/blob/v0.8.1/configs/constant_presets/minimal.yaml + /// https://github.com/ethereum/eth2.0-specs/blob/v0.9.1/configs/minimal.yaml /// /// Spec v0.9.1 pub fn minimal() -> Self { @@ -249,9 +249,11 @@ impl ChainSpec { let boot_nodes = vec![]; Self { + max_committees_per_slot: 4, target_committee_size: 4, shuffle_round_count: 10, min_genesis_active_validator_count: 64, + milliseconds_per_slot: 6_000, network_id: 2, // lighthouse testnet network id boot_nodes, eth1_follow_distance: 16,