From bd1b61a5b1623ac99d31f9ca512cd336c1ca0eeb Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 6 Dec 2019 18:52:11 +1100 Subject: [PATCH] Forwards block root iterators (#672) * Implement forwards block root iterators * Clean up errors and docs --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/builder.rs | 12 +- beacon_node/beacon_chain/src/eth1_chain.rs | 10 +- beacon_node/beacon_chain/src/test_utils.rs | 10 +- .../beacon_chain/tests/persistence_tests.rs | 4 +- beacon_node/beacon_chain/tests/store_tests.rs | 40 ++++- beacon_node/client/src/builder.rs | 32 ++-- beacon_node/src/lib.rs | 6 +- beacon_node/store/src/block_at_slot.rs | 14 +- beacon_node/store/src/chunked_iter.rs | 118 +++++++++++++ beacon_node/store/src/chunked_vector.rs | 32 ++-- beacon_node/store/src/forwards_iter.rs | 164 ++++++++++++++++++ beacon_node/store/src/hot_cold_store.rs | 74 ++++---- beacon_node/store/src/impls/beacon_state.rs | 4 +- beacon_node/store/src/iter.rs | 30 ++-- beacon_node/store/src/leveldb_store.rs | 34 ++-- beacon_node/store/src/lib.rs | 58 ++++--- beacon_node/store/src/memory_store.rs | 42 +++-- beacon_node/store/src/migrate.rs | 18 +- beacon_node/store/src/partial_beacon_state.rs | 16 +- eth2/lmd_ghost/src/lib.rs | 2 +- eth2/lmd_ghost/src/reduced_tree.rs | 11 +- eth2/lmd_ghost/tests/test.rs | 27 ++- 23 files changed, 573 insertions(+), 187 deletions(-) create mode 100644 beacon_node/store/src/chunked_iter.rs create mode 100644 beacon_node/store/src/forwards_iter.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 3635b44540..4f7d180ea2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -94,7 +94,7 @@ pub enum AttestationProcessingOutcome { } pub trait BeaconChainTypes: Send + Sync + 'static { - type Store: store::Store; + type Store: store::Store; type StoreMigrator: store::Migrate; type SlotClock: slot_clock::SlotClock; type LmdGhost: LmdGhost; diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 98a4aa3c2e..ee04b43f4e 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -52,7 +52,7 @@ impl where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, @@ -108,7 +108,7 @@ impl, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, @@ -402,7 +402,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -462,7 +462,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, @@ -514,7 +514,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -555,7 +555,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate + 'static, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 16278cf54c..96b2ea6aa3 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -184,7 +184,7 @@ pub struct CachingEth1Backend { _phantom: PhantomData, } -impl CachingEth1Backend { +impl> CachingEth1Backend { /// Instantiates `self` with empty caches. /// /// Does not connect to the eth1 node or start any tasks to keep the cache updated. @@ -213,7 +213,7 @@ impl CachingEth1Backend { } } -impl Eth1ChainBackend for CachingEth1Backend { +impl> Eth1ChainBackend for CachingEth1Backend { fn eth1_data(&self, state: &BeaconState, spec: &ChainSpec) -> Result { // Note: we do not return random junk if this function call fails as it would be caused by // an internal error. @@ -372,7 +372,7 @@ fn random_eth1_data() -> Eth1Data { /// Returns `state.eth1_data.block_hash` at the start of eth1 voting period defined by /// `state.slot`. -fn eth1_block_hash_at_start_of_voting_period( +fn eth1_block_hash_at_start_of_voting_period>( store: Arc, state: &BeaconState, ) -> Result { @@ -392,7 +392,7 @@ fn eth1_block_hash_at_start_of_voting_period( .map_err(Error::UnableToGetPreviousStateRoot)?; store - .get_state::(&prev_state_root, Some(slot)) + .get_state(&prev_state_root, Some(slot)) .map_err(Error::StoreError)? .map(|state| state.eth1_data.block_hash) .ok_or_else(|| Error::PreviousStateNotInDB(*prev_state_root)) @@ -581,7 +581,7 @@ mod test { use store::MemoryStore; use types::test_utils::{generate_deterministic_keypair, TestingDepositBuilder}; - fn get_eth1_chain() -> Eth1Chain, E> { + fn get_eth1_chain() -> Eth1Chain>, E> { let eth1_config = Eth1Config { ..Eth1Config::default() }; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index a43cc1d0fd..0ec92aa280 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -40,8 +40,8 @@ pub type BaseHarnessType = Witness< NullEventHandler, >; -pub type HarnessType = BaseHarnessType; -pub type DiskHarnessType = BaseHarnessType, E>; +pub type HarnessType = BaseHarnessType, NullMigrator, E>; +pub type DiskHarnessType = BaseHarnessType, BlockingMigrator>, E>; /// Indicates how the `BeaconChainHarness` should produce blocks. #[derive(Clone, Copy, Debug)] @@ -120,7 +120,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn new_with_disk_store( eth_spec_instance: E, - store: Arc, + store: Arc>, keypairs: Vec, ) -> Self { let spec = E::default_spec(); @@ -160,7 +160,7 @@ impl BeaconChainHarness> { /// Instantiate a new harness with `validator_count` initial validators. pub fn resume_from_disk_store( eth_spec_instance: E, - store: Arc, + store: Arc>, keypairs: Vec, ) -> Self { let spec = E::default_spec(); @@ -197,7 +197,7 @@ impl BeaconChainHarness> { impl BeaconChainHarness> where - S: Store, + S: Store, M: Migrate, E: EthSpec, { diff --git a/beacon_node/beacon_chain/tests/persistence_tests.rs b/beacon_node/beacon_chain/tests/persistence_tests.rs index c3caa50b9b..c88bbcb951 100644 --- a/beacon_node/beacon_chain/tests/persistence_tests.rs +++ b/beacon_node/beacon_chain/tests/persistence_tests.rs @@ -23,14 +23,14 @@ lazy_static! { static ref KEYPAIRS: Vec = types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT); } -fn get_store(db_path: &TempDir) -> Arc { +fn get_store(db_path: &TempDir) -> Arc> { let spec = E::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open::(&hot_path, &cold_path, slots_per_restore_point, spec, log) + DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log) .expect("disk store should initialize"), ) } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 0d6deb4a88..2484062dd9 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -26,19 +26,19 @@ lazy_static! { type E = MinimalEthSpec; type TestHarness = BeaconChainHarness>; -fn get_store(db_path: &TempDir) -> Arc { +fn get_store(db_path: &TempDir) -> Arc> { let spec = MinimalEthSpec::default_spec(); let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let log = NullLoggerBuilder.build().expect("logger should build"); Arc::new( - DiskStore::open::(&hot_path, &cold_path, slots_per_restore_point, spec, log) + DiskStore::open(&hot_path, &cold_path, slots_per_restore_point, spec, log) .expect("disk store should initialize"), ) } -fn get_harness(store: Arc, validator_count: usize) -> TestHarness { +fn get_harness(store: Arc>, validator_count: usize) -> TestHarness { let harness = BeaconChainHarness::new_with_disk_store( MinimalEthSpec, store, @@ -265,7 +265,7 @@ fn check_finalization(harness: &TestHarness, expected_slot: u64) { } /// Check that the DiskStore's split_slot is equal to the start slot of the last finalized epoch. -fn check_split_slot(harness: &TestHarness, store: Arc) { +fn check_split_slot(harness: &TestHarness, store: Arc>) { let split_slot = store.get_split_slot(); assert_eq!( harness @@ -286,7 +286,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { assert_eq!(chain_dump.len() as u64, expected_len); - for checkpoint in chain_dump { + for checkpoint in &chain_dump { // Check that the tree hash of the stored state is as expected assert_eq!( checkpoint.beacon_state_root, @@ -300,13 +300,41 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { harness .chain .store - .get_state::(&checkpoint.beacon_state_root, None) + .get_state(&checkpoint.beacon_state_root, None) .expect("no error") .expect("state exists") .slot, checkpoint.beacon_state.slot ); } + + // Check the forwards block roots iterator against the chain dump + let chain_dump_block_roots = chain_dump + .iter() + .map(|checkpoint| (checkpoint.beacon_block_root, checkpoint.beacon_block.slot)) + .collect::>(); + + let head = harness.chain.head(); + let mut forward_block_roots = Store::forwards_block_roots_iterator( + harness.chain.store.clone(), + Slot::new(0), + head.beacon_state, + head.beacon_block_root, + &harness.spec, + ) + .collect::>(); + + // Drop the block roots for skipped slots. + forward_block_roots.dedup_by_key(|(block_root, _)| *block_root); + + for i in 0..std::cmp::max(chain_dump_block_roots.len(), forward_block_roots.len()) { + assert_eq!( + chain_dump_block_roots[i], + forward_block_roots[i], + "split slot is {}", + harness.chain.store.get_split_slot() + ); + } } /// Check that state and block root iterators can reach genesis diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 5167f7ed7d..9f93b7bd57 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -80,7 +80,7 @@ impl, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate, TSlotClock: SlotClock + Clone + 'static, TLmdGhost: LmdGhost + 'static, @@ -395,7 +395,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate, TSlotClock: SlotClock + Clone + 'static, TEth1Backend: Eth1ChainBackend + 'static, @@ -442,7 +442,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, @@ -482,7 +482,7 @@ where impl ClientBuilder< Witness< - DiskStore, + DiskStore, TStoreMigrator, TSlotClock, TLmdGhost, @@ -493,8 +493,8 @@ impl where TSlotClock: SlotClock + 'static, - TStoreMigrator: store::Migrate + 'static, - TLmdGhost: LmdGhost + 'static, + TStoreMigrator: store::Migrate, TEthSpec> + 'static, + TLmdGhost: LmdGhost, TEthSpec> + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -516,7 +516,7 @@ where .clone() .ok_or_else(|| "disk_store requires a chain spec".to_string())?; - let store = DiskStore::open::( + let store = DiskStore::open( hot_path, cold_path, slots_per_restore_point, @@ -532,7 +532,7 @@ where impl ClientBuilder< Witness< - SimpleDiskStore, + SimpleDiskStore, TStoreMigrator, TSlotClock, TLmdGhost, @@ -543,8 +543,8 @@ impl where TSlotClock: SlotClock + 'static, - TStoreMigrator: store::Migrate + 'static, - TLmdGhost: LmdGhost + 'static, + TStoreMigrator: store::Migrate, TEthSpec> + 'static, + TLmdGhost: LmdGhost, TEthSpec> + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -561,7 +561,7 @@ where impl ClientBuilder< Witness< - MemoryStore, + MemoryStore, NullMigrator, TSlotClock, TLmdGhost, @@ -572,7 +572,7 @@ impl > where TSlotClock: SlotClock + 'static, - TLmdGhost: LmdGhost + 'static, + TLmdGhost: LmdGhost, TEthSpec> + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -591,7 +591,7 @@ where impl ClientBuilder< Witness< - DiskStore, + DiskStore, BackgroundMigrator, TSlotClock, TLmdGhost, @@ -602,7 +602,7 @@ impl > where TSlotClock: SlotClock + 'static, - TLmdGhost: LmdGhost + 'static, + TLmdGhost: LmdGhost, TEthSpec> + 'static, TEth1Backend: Eth1ChainBackend + 'static, TEthSpec: EthSpec + 'static, TEventHandler: EventHandler + 'static, @@ -629,7 +629,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate, TSlotClock: SlotClock + 'static, TLmdGhost: LmdGhost + 'static, @@ -731,7 +731,7 @@ impl >, > where - TStore: Store + 'static, + TStore: Store + 'static, TStoreMigrator: store::Migrate, TLmdGhost: LmdGhost + 'static, TEth1Backend: Eth1ChainBackend + 'static, diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 71b7eb0461..eda78383c2 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -25,11 +25,11 @@ use types::EthSpec; /// A type-alias to the tighten the definition of a production-intended `Client`. pub type ProductionClient = Client< Witness< - DiskStore, + DiskStore, BackgroundMigrator, SystemTimeSlotClock, - ThreadSafeReducedTree, - CachingEth1Backend, + ThreadSafeReducedTree, E>, + CachingEth1Backend>, E, WebSocketSender, >, diff --git a/beacon_node/store/src/block_at_slot.rs b/beacon_node/store/src/block_at_slot.rs index 84c5e48303..292d7b3838 100644 --- a/beacon_node/store/src/block_at_slot.rs +++ b/beacon_node/store/src/block_at_slot.rs @@ -1,7 +1,7 @@ use super::*; use ssz::{Decode, DecodeError}; -fn get_block_bytes( +fn get_block_bytes, E: EthSpec>( store: &T, root: Hash256, ) -> Result>, Error> { @@ -23,7 +23,7 @@ fn read_parent_root_from_block_bytes(bytes: &[u8]) -> Result( +pub fn get_block_at_preceeding_slot, E: EthSpec>( store: &T, slot: Slot, start_root: Hash256, @@ -36,7 +36,7 @@ pub fn get_block_at_preceeding_slot( ) } -fn get_at_preceeding_slot( +fn get_at_preceeding_slot, E: EthSpec>( store: &T, slot: Slot, mut root: Hash256, @@ -107,7 +107,7 @@ mod tests { } fn build_chain( - store: &impl Store, + store: &impl Store, slots: &[usize], spec: &ChainSpec, ) -> Vec<(Hash256, BeaconBlock)> { @@ -157,7 +157,7 @@ mod tests { #[test] fn chain_with_skips() { - let store = MemoryStore::open(); + let store = MemoryStore::::open(); let spec = MinimalEthSpec::default_spec(); let slots = vec![0, 1, 2, 5]; @@ -181,14 +181,14 @@ mod tests { // Slot that doesn't exist let (source_root, _source_block) = &blocks_and_roots[3]; assert!(store - .get_block_at_preceeding_slot::(*source_root, Slot::new(3)) + .get_block_at_preceeding_slot(*source_root, Slot::new(3)) .unwrap() .is_none()); // Slot too high let (source_root, _source_block) = &blocks_and_roots[3]; assert!(store - .get_block_at_preceeding_slot::(*source_root, Slot::new(3)) + .get_block_at_preceeding_slot(*source_root, Slot::new(3)) .unwrap() .is_none()); } diff --git a/beacon_node/store/src/chunked_iter.rs b/beacon_node/store/src/chunked_iter.rs new file mode 100644 index 0000000000..794bbbb893 --- /dev/null +++ b/beacon_node/store/src/chunked_iter.rs @@ -0,0 +1,118 @@ +use crate::chunked_vector::{chunk_key, Chunk, Field}; +use crate::DiskStore; +use slog::error; +use std::sync::Arc; +use types::{ChainSpec, EthSpec, Slot}; + +/// Iterator over the values of a `BeaconState` vector field (like `block_roots`). +/// +/// Uses the freezer DB's separate table to load the values. +pub struct ChunkedVectorIter +where + F: Field, + E: EthSpec, +{ + pub(crate) store: Arc>, + current_vindex: usize, + pub(crate) end_vindex: usize, + next_cindex: usize, + current_chunk: Chunk, +} + +impl ChunkedVectorIter +where + F: Field, + E: EthSpec, +{ + /// Create a new iterator which can yield elements from `start_vindex` up to the last + /// index stored by the restore point at `last_restore_point_slot`. + /// + /// The `last_restore_point` slot should be the slot of a recent restore point as obtained from + /// `DiskStore::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can + /// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`). + pub fn new( + store: Arc>, + start_vindex: usize, + last_restore_point_slot: Slot, + spec: &ChainSpec, + ) -> Self { + let (_, end_vindex) = F::start_and_end_vindex(last_restore_point_slot, spec); + + // Set the next chunk to the one containing `start_vindex`. + let next_cindex = start_vindex / F::chunk_size(); + // Set the current chunk to the empty chunk, it will never be read. + let current_chunk = Chunk::default(); + + Self { + store, + current_vindex: start_vindex, + end_vindex, + next_cindex, + current_chunk, + } + } +} + +impl Iterator for ChunkedVectorIter +where + F: Field, + E: EthSpec, +{ + type Item = (usize, F::Value); + + fn next(&mut self) -> Option { + let chunk_size = F::chunk_size(); + + // Range exhausted, return `None` forever. + if self.current_vindex >= self.end_vindex { + None + } + // Value lies in the current chunk, return it. + else if self.current_vindex < self.next_cindex * chunk_size { + let vindex = self.current_vindex; + let val = self + .current_chunk + .values + .get(vindex % chunk_size) + .cloned() + .or_else(|| { + error!( + self.store.log, + "Missing chunk value in forwards iterator"; + "vector index" => vindex + ); + None + })?; + self.current_vindex += 1; + Some((vindex, val)) + } + // Need to load the next chunk, load it and recurse back into the in-range case. + else { + self.current_chunk = Chunk::load( + &self.store.cold_db, + F::column(), + &chunk_key(self.next_cindex as u64), + ) + .map_err(|e| { + error!( + self.store.log, + "Database error in forwards iterator"; + "chunk index" => self.next_cindex, + "error" => format!("{:?}", e) + ); + e + }) + .ok()? + .or_else(|| { + error!( + self.store.log, + "Missing chunk in forwards iterator"; + "chunk index" => self.next_cindex + ); + None + })?; + self.next_cindex += 1; + self.next() + } + } +} diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index 47b4053be4..4dd05d7168 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -34,7 +34,7 @@ pub enum UpdatePattern { /// Map a chunk index to bytes that can be used to key the NoSQL database. /// /// We shift chunks up by 1 to make room for a genesis chunk that is handled separately. -fn chunk_key(cindex: u64) -> [u8; 8] { +pub fn chunk_key(cindex: u64) -> [u8; 8] { (cindex + 1).to_be_bytes() } @@ -177,7 +177,7 @@ pub trait Field: Copy { /// Load the genesis value for a fixed length field from the store. /// /// This genesis value should be used to fill the initial state of the vector. - fn load_genesis_value(store: &S) -> Result { + fn load_genesis_value>(store: &S) -> Result { let key = &genesis_value_key()[..]; let chunk = Chunk::load(store, Self::column(), key)?.ok_or(ChunkError::MissingGenesisValue)?; @@ -192,7 +192,10 @@ pub trait Field: Copy { /// /// Check the existing value (if any) for consistency with the value we intend to store, and /// return an error if they are inconsistent. - fn check_and_store_genesis_value(store: &S, value: Self::Value) -> Result<(), Error> { + fn check_and_store_genesis_value>( + store: &S, + value: Self::Value, + ) -> Result<(), Error> { let key = &genesis_value_key()[..]; if let Some(existing_chunk) = Chunk::::load(store, Self::column(), key)? { @@ -324,7 +327,7 @@ field!( |state: &BeaconState<_>, index, _| safe_modulo_index(&state.randao_mixes, index) ); -pub fn store_updated_vector, E: EthSpec, S: Store>( +pub fn store_updated_vector, E: EthSpec, S: Store>( field: F, store: &S, state: &BeaconState, @@ -384,7 +387,7 @@ fn store_range( where F: Field, E: EthSpec, - S: Store, + S: Store, I: Iterator, { for chunk_index in range { @@ -414,7 +417,7 @@ where // Chunks at the end index are included. // TODO: could be more efficient with a real range query (perhaps RocksDB) -fn range_query( +fn range_query, E: EthSpec, T: Decode + Encode>( store: &S, column: DBColumn, start_index: usize, @@ -479,7 +482,7 @@ fn stitch( Ok(result) } -pub fn load_vector_from_db, E: EthSpec, S: Store>( +pub fn load_vector_from_db, E: EthSpec, S: Store>( store: &S, slot: Slot, spec: &ChainSpec, @@ -511,7 +514,7 @@ pub fn load_vector_from_db, E: EthSpec, S: Store>( } /// The historical roots are stored in vector chunks, despite not actually being a vector. -pub fn load_variable_list_from_db, E: EthSpec, S: Store>( +pub fn load_variable_list_from_db, E: EthSpec, S: Store>( store: &S, slot: Slot, spec: &ChainSpec, @@ -571,14 +574,23 @@ where Chunk { values } } - pub fn load(store: &S, column: DBColumn, key: &[u8]) -> Result, Error> { + pub fn load, E: EthSpec>( + store: &S, + column: DBColumn, + key: &[u8], + ) -> Result, Error> { store .get_bytes(column.into(), key)? .map(|bytes| Self::decode(&bytes)) .transpose() } - pub fn store(&self, store: &S, column: DBColumn, key: &[u8]) -> Result<(), Error> { + pub fn store, E: EthSpec>( + &self, + store: &S, + column: DBColumn, + key: &[u8], + ) -> Result<(), Error> { store.put_bytes(column.into(), key, &self.encode()?)?; Ok(()) } diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs new file mode 100644 index 0000000000..bcec40935b --- /dev/null +++ b/beacon_node/store/src/forwards_iter.rs @@ -0,0 +1,164 @@ +use crate::chunked_iter::ChunkedVectorIter; +use crate::chunked_vector::BlockRoots; +use crate::iter::{BlockRootsIterator, ReverseBlockRootIterator}; +use crate::{DiskStore, Store}; +use slog::error; +use std::sync::Arc; +use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; + +/// Forwards block roots iterator that makes use of the `block_roots` table in the freezer DB. +pub struct FrozenForwardsBlockRootsIterator { + inner: ChunkedVectorIter, +} + +/// Forwards block roots iterator that reverses a backwards iterator (only good for short ranges). +pub struct SimpleForwardsBlockRootsIterator { + // Values from the backwards iterator (in slot descending order) + values: Vec<(Hash256, Slot)>, +} + +/// Fusion of the above two approaches to forwards iteration. Fast and efficient. +pub enum HybridForwardsBlockRootsIterator { + PreFinalization { + iter: FrozenForwardsBlockRootsIterator, + /// Data required by the `PostFinalization` iterator when we get to it. + continuation_data: Option<(BeaconState, Hash256)>, + }, + PostFinalization { + iter: SimpleForwardsBlockRootsIterator, + }, +} + +impl FrozenForwardsBlockRootsIterator { + pub fn new( + store: Arc>, + start_slot: Slot, + last_restore_point_slot: Slot, + spec: &ChainSpec, + ) -> Self { + Self { + inner: ChunkedVectorIter::new( + store, + start_slot.as_usize(), + last_restore_point_slot, + spec, + ), + } + } +} + +impl Iterator for FrozenForwardsBlockRootsIterator { + type Item = (Hash256, Slot); + + fn next(&mut self) -> Option { + self.inner + .next() + .map(|(slot, block_hash)| (block_hash, Slot::from(slot))) + } +} + +impl SimpleForwardsBlockRootsIterator { + pub fn new, E: EthSpec>( + store: Arc, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + ) -> Self { + // Iterate backwards from the end state, stopping at the start slot. + Self { + values: ReverseBlockRootIterator::new( + (end_block_root, end_state.slot), + BlockRootsIterator::owned(store, end_state), + ) + .take_while(|(_, slot)| *slot >= start_slot) + .collect(), + } + } +} + +impl Iterator for SimpleForwardsBlockRootsIterator { + type Item = (Hash256, Slot); + + fn next(&mut self) -> Option { + // Pop from the end of the vector to get the block roots in slot-ascending order. + self.values.pop() + } +} + +impl HybridForwardsBlockRootsIterator { + pub fn new( + store: Arc>, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + spec: &ChainSpec, + ) -> Self { + use HybridForwardsBlockRootsIterator::*; + + let latest_restore_point_slot = store.get_latest_restore_point_slot(); + + if start_slot < latest_restore_point_slot { + PreFinalization { + iter: FrozenForwardsBlockRootsIterator::new( + store, + start_slot, + latest_restore_point_slot, + spec, + ), + continuation_data: Some((end_state, end_block_root)), + } + } else { + PostFinalization { + iter: SimpleForwardsBlockRootsIterator::new( + store, + start_slot, + end_state, + end_block_root, + ), + } + } + } +} + +impl Iterator for HybridForwardsBlockRootsIterator { + type Item = (Hash256, Slot); + + fn next(&mut self) -> Option { + use HybridForwardsBlockRootsIterator::*; + + match self { + PreFinalization { + iter, + continuation_data, + } => { + match iter.next() { + Some(x) => Some(x), + // Once the pre-finalization iterator is consumed, transition + // to a post-finalization iterator beginning from the last slot + // of the pre iterator. + None => { + let (end_state, end_block_root) = + continuation_data.take().or_else(|| { + error!( + iter.inner.store.log, + "HybridForwardsBlockRootsIterator: logic error" + ); + None + })?; + + *self = PostFinalization { + iter: SimpleForwardsBlockRootsIterator::new( + iter.inner.store.clone(), + Slot::from(iter.inner.end_vindex), + end_state, + end_block_root, + ), + }; + self.next() + } + } + } + PostFinalization { iter } => iter.next(), + } + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 3ddf2c365a..5e72ea8799 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,6 +1,7 @@ use crate::chunked_vector::{ store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; +use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; use crate::{ leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem, @@ -14,6 +15,7 @@ use state_processing::{ SlotProcessingError, }; use std::convert::TryInto; +use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use types::*; @@ -25,7 +27,7 @@ pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE"; /// /// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores /// intermittent "restore point" states pre-finalization. -pub struct HotColdDB { +pub struct HotColdDB { /// The slot and state root at the point where the database is split between hot and cold. /// /// States with slots less than `split.slot` are in the cold DB, while states with slots @@ -34,15 +36,17 @@ pub struct HotColdDB { /// Number of slots per restore point state in the freezer database. slots_per_restore_point: u64, /// Cold database containing compact historical data. - cold_db: LevelDB, + pub(crate) cold_db: LevelDB, /// Hot database containing duplicated but quick-to-access recent data. /// /// The hot database also contains all blocks. - hot_db: LevelDB, + pub(crate) hot_db: LevelDB, /// Chain spec. spec: ChainSpec, /// Logger. pub(crate) log: Logger, + /// Mere vessel for E. + _phantom: PhantomData, } #[derive(Debug, PartialEq)] @@ -71,7 +75,9 @@ pub enum HotColdDbError { RestorePointBlockHashError(BeaconStateError), } -impl Store for HotColdDB { +impl Store for HotColdDB { + type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator; + // Defer to the hot database for basic operations (including blocks for now) fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error> { self.hot_db.get_bytes(column, key) @@ -90,11 +96,7 @@ impl Store for HotColdDB { } /// Store a state in the store. - fn put_state( - &self, - state_root: &Hash256, - state: &BeaconState, - ) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { self.store_archive_state(state_root, state) } else { @@ -103,7 +105,7 @@ impl Store for HotColdDB { } /// Fetch a state from the store. - fn get_state( + fn get_state( &self, state_root: &Hash256, slot: Option, @@ -129,7 +131,7 @@ impl Store for HotColdDB { } /// Advance the split point of the store, moving new finalized states to the freezer. - fn freeze_to_state( + fn freeze_to_state( store: Arc, frozen_head_root: Hash256, frozen_head: &BeaconState, @@ -157,7 +159,7 @@ impl Store for HotColdDB { for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { - if slot % store.slots_per_restore_point == 0 { + if slot % dbg!(store.slots_per_restore_point) == 0 { let state: BeaconState = store .hot_db .get_state(&state_root, None)? @@ -195,20 +197,30 @@ impl Store for HotColdDB { Ok(()) } + + fn forwards_block_roots_iterator( + store: Arc, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + spec: &ChainSpec, + ) -> Self::ForwardsBlockRootsIterator { + HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) + } } -impl HotColdDB { +impl HotColdDB { /// Open a new or existing database, with the given paths to the hot and cold DBs. /// /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. - pub fn open( + pub fn open( hot_path: &Path, cold_path: &Path, slots_per_restore_point: u64, spec: ChainSpec, log: Logger, ) -> Result { - Self::verify_slots_per_restore_point::(slots_per_restore_point)?; + Self::verify_slots_per_restore_point(slots_per_restore_point)?; let db = HotColdDB { split: RwLock::new(Split::default()), @@ -217,6 +229,7 @@ impl HotColdDB { hot_db: LevelDB::open(hot_path)?, spec, log, + _phantom: PhantomData, }; // Load the previous split slot from the database (if any). This ensures we can @@ -230,7 +243,7 @@ impl HotColdDB { /// Store a pre-finalization state in the freezer database. /// /// Will return an error if the state does not lie on a restore point boundary. - pub fn store_archive_state( + pub fn store_archive_state( &self, state_root: &Hash256, state: &BeaconState, @@ -251,6 +264,7 @@ impl HotColdDB { "slot" => state.slot, "state_root" => format!("{:?}", state_root) ); + println!("Creating restore point {}", state.slot); // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); @@ -273,7 +287,7 @@ impl HotColdDB { /// Load a pre-finalization state from the freezer database. /// /// Will reconstruct the state if it lies between restore points. - pub fn load_archive_state( + pub fn load_archive_state( &self, state_root: &Hash256, slot: Slot, @@ -286,10 +300,7 @@ impl HotColdDB { } /// Load a restore point state by its `state_root`. - fn load_restore_point( - &self, - state_root: &Hash256, - ) -> Result, Error> { + fn load_restore_point(&self, state_root: &Hash256) -> Result, Error> { let mut partial_state = PartialBeaconState::db_get(&self.cold_db, state_root)? .ok_or_else(|| HotColdDbError::MissingRestorePoint(*state_root))?; @@ -303,7 +314,7 @@ impl HotColdDB { } /// Load a restore point state by its `restore_point_index`. - fn load_restore_point_by_index( + fn load_restore_point_by_index( &self, restore_point_index: u64, ) -> Result, Error> { @@ -312,7 +323,7 @@ impl HotColdDB { } /// Load a state that lies between restore points. - fn load_intermediate_state( + fn load_intermediate_state( &self, state_root: &Hash256, slot: Slot, @@ -330,7 +341,7 @@ impl HotColdDB { let high_restore_point = if high_restore_point_idx * self.slots_per_restore_point >= split.slot.as_u64() { - self.get_state::(&split.state_root, Some(split.slot))? + self.get_state(&split.state_root, Some(split.slot))? .ok_or_else(|| HotColdDbError::MissingSplitState(split.state_root, split.slot))? } else { self.load_restore_point_by_index(high_restore_point_idx)? @@ -365,7 +376,7 @@ impl HotColdDB { /// Get a suitable block root for backtracking from `high_restore_point` to the state at `slot`. /// /// Defaults to the block root for `slot`, which *should* be in range. - fn get_high_restore_point_block_root( + fn get_high_restore_point_block_root( &self, high_restore_point: &BeaconState, slot: Slot, @@ -381,7 +392,7 @@ impl HotColdDB { /// /// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot /// equal to `start_slot`, to reach a state with slot equal to `end_slot`. - fn load_blocks_to_replay( + fn load_blocks_to_replay( &self, start_slot: Slot, end_slot: Slot, @@ -402,7 +413,7 @@ impl HotColdDB { /// Replay `blocks` on top of `state` until `target_slot` is reached. /// /// Will skip slots as necessary. - fn replay_blocks( + fn replay_blocks( &self, mut state: BeaconState, blocks: Vec>, @@ -440,6 +451,11 @@ impl HotColdDB { self.split.read().slot } + /// Fetch the slot of the most recently stored restore point. + pub fn get_latest_restore_point_slot(&self) -> Slot { + self.get_split_slot() / self.slots_per_restore_point * self.slots_per_restore_point + } + /// Load the split point from disk. fn load_split(&self) -> Result, Error> { let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes()); @@ -498,9 +514,7 @@ impl HotColdDB { /// This ensures that we have at least one restore point within range of our state /// root history when iterating backwards (and allows for more frequent restore points if /// desired). - fn verify_slots_per_restore_point( - slots_per_restore_point: u64, - ) -> Result<(), HotColdDbError> { + fn verify_slots_per_restore_point(slots_per_restore_point: u64) -> Result<(), HotColdDbError> { let slots_per_historical_root = E::SlotsPerHistoricalRoot::to_u64(); if slots_per_restore_point > 0 && slots_per_historical_root % slots_per_restore_point == 0 { Ok(()) diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 885abb593f..a2f163f5b9 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -4,7 +4,7 @@ use ssz_derive::{Decode, Encode}; use std::convert::TryInto; use types::beacon_state::{BeaconTreeHashCache, CommitteeCache, CACHED_EPOCHS}; -pub fn store_full_state( +pub fn store_full_state, E: EthSpec>( store: &S, state_root: &Hash256, state: &BeaconState, @@ -21,7 +21,7 @@ pub fn store_full_state( result } -pub fn get_full_state( +pub fn get_full_state, E: EthSpec>( store: &S, state_root: &Hash256, ) -> Result>, Error> { diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index b4dfee774a..e1e6386570 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -12,12 +12,14 @@ use types::{ /// /// It is assumed that all ancestors for this object are stored in the database. If this is not the /// case, the iterator will start returning `None` prior to genesis. -pub trait AncestorIter { +pub trait AncestorIter, E: EthSpec, I: Iterator> { /// Returns an iterator over the roots of the ancestors of `self`. fn try_iter_ancestor_roots(&self, store: Arc) -> Option; } -impl<'a, U: Store, E: EthSpec> AncestorIter> for BeaconBlock { +impl<'a, U: Store, E: EthSpec> AncestorIter> + for BeaconBlock +{ /// Iterates across all available prior block roots of `self`, starting at the most recent and ending /// at genesis. fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { @@ -27,7 +29,9 @@ impl<'a, U: Store, E: EthSpec> AncestorIter> for } } -impl<'a, U: Store, E: EthSpec> AncestorIter> for BeaconState { +impl<'a, U: Store, E: EthSpec> AncestorIter> + for BeaconState +{ /// Iterates across all available prior state roots of `self`, starting at the most recent and ending /// at genesis. fn try_iter_ancestor_roots(&self, store: Arc) -> Option> { @@ -52,7 +56,7 @@ impl<'a, T: EthSpec, U> Clone for StateRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { Self { store, @@ -70,7 +74,7 @@ impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { type Item = (Hash256, Slot); fn next(&mut self) -> Option { @@ -99,13 +103,13 @@ impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { } /// Block iterator that uses the `parent_root` of each block to backtrack. -pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { +pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { store: &'a S, next_block_root: Hash256, _phantom: PhantomData, } -impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { +impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { pub fn new(store: &'a S, start_block_root: Hash256) -> Self { Self { store, @@ -115,7 +119,7 @@ impl<'a, E: EthSpec, S: Store> ParentRootBlockIterator<'a, E, S> { } } -impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { +impl<'a, E: EthSpec, S: Store> Iterator for ParentRootBlockIterator<'a, E, S> { type Item = BeaconBlock; fn next(&mut self) -> Option { @@ -137,7 +141,7 @@ pub struct BlockIterator<'a, T: EthSpec, U> { roots: BlockRootsIterator<'a, T, U>, } -impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { /// Create a new iterator over all blocks in the given `beacon_state` and prior states. pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { Self { @@ -153,7 +157,7 @@ impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { type Item = BeaconBlock; fn next(&mut self) -> Option { @@ -186,7 +190,7 @@ impl<'a, T: EthSpec, U> Clone for BlockRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { /// Create a new iterator over all block roots in the given `beacon_state` and prior states. pub fn new(store: Arc, beacon_state: &'a BeaconState) -> Self { Self { @@ -206,7 +210,7 @@ impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { } } -impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { +impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { type Item = (Hash256, Slot); fn next(&mut self) -> Option { @@ -235,7 +239,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { } /// Fetch the next state to use whilst backtracking in `*RootsIterator`. -fn next_historical_root_backtrack_state( +fn next_historical_root_backtrack_state>( store: &S, current_state: &BeaconState, ) -> Option> { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 5c53b77572..b9ebd25d10 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,4 +1,5 @@ use super::*; +use crate::forwards_iter::SimpleForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::metrics; use db_key::Key; @@ -6,14 +7,16 @@ use leveldb::database::kv::KV; use leveldb::database::Database; use leveldb::error::Error as LevelDBError; use leveldb::options::{Options, ReadOptions, WriteOptions}; +use std::marker::PhantomData; use std::path::Path; /// A wrapped leveldb database. -pub struct LevelDB { +pub struct LevelDB { db: Database, + _phantom: PhantomData, } -impl LevelDB { +impl LevelDB { /// Open a database at `path`, creating a new database if one does not already exist. pub fn open(path: &Path) -> Result { let mut options = Options::new(); @@ -22,7 +25,10 @@ impl LevelDB { let db = Database::open(path, options)?; - Ok(Self { db }) + Ok(Self { + db, + _phantom: PhantomData, + }) } fn read_options(&self) -> ReadOptions { @@ -55,7 +61,9 @@ impl Key for BytesKey { } } -impl Store for LevelDB { +impl Store for LevelDB { + type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator; + /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { let column_key = Self::get_key_for_col(col, key); @@ -110,22 +118,28 @@ impl Store for LevelDB { } /// Store a state in the store. - fn put_state( - &self, - state_root: &Hash256, - state: &BeaconState, - ) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { store_full_state(self, state_root, state) } /// Fetch a state from the store. - fn get_state( + fn get_state( &self, state_root: &Hash256, _: Option, ) -> Result>, Error> { get_full_state(self, state_root) } + + fn forwards_block_roots_iterator( + store: Arc, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + _: &ChainSpec, + ) -> Self::ForwardsBlockRootsIterator { + SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) + } } impl From for Error { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 786d2cac42..29a38fe8dd 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -11,9 +11,11 @@ extern crate lazy_static; mod block_at_slot; +pub mod chunked_iter; pub mod chunked_vector; pub mod config; mod errors; +mod forwards_iter; mod hot_cold_store; mod impls; mod leveldb_store; @@ -42,7 +44,9 @@ pub use types::*; /// A `Store` is fundamentally backed by a key-value database, however it provides support for /// columns. A simple column implementation might involve prefixing a key with some bytes unique to /// each column. -pub trait Store: Sync + Send + Sized + 'static { +pub trait Store: Sync + Send + Sized + 'static { + type ForwardsBlockRootsIterator: Iterator; + /// Retrieve some bytes in `column` with `key`. fn get_bytes(&self, column: &str, key: &[u8]) -> Result>, Error>; @@ -76,14 +80,10 @@ pub trait Store: Sync + Send + Sized + 'static { } /// Store a state in the store. - fn put_state( - &self, - state_root: &Hash256, - state: &BeaconState, - ) -> Result<(), Error>; + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; /// Fetch a state from the store. - fn get_state( + fn get_state( &self, state_root: &Hash256, slot: Option, @@ -94,7 +94,7 @@ pub trait Store: Sync + Send + Sized + 'static { /// /// Returns `None` if no parent block exists at that slot, or if `slot` is greater than the /// slot of `start_block_root`. - fn get_block_at_preceeding_slot( + fn get_block_at_preceeding_slot( &self, start_block_root: Hash256, slot: Slot, @@ -103,13 +103,31 @@ pub trait Store: Sync + Send + Sized + 'static { } /// (Optionally) Move all data before the frozen slot to the freezer database. - fn freeze_to_state( + fn freeze_to_state( _store: Arc, _frozen_head_root: Hash256, _frozen_head: &BeaconState, ) -> Result<(), Error> { Ok(()) } + + /// Get a forwards (slot-ascending) iterator over the beacon block roots since `start_slot`. + /// + /// Will be efficient for frozen portions of the database if using `DiskStore`. + /// + /// The `end_state` and `end_block_root` are required for backtracking in the post-finalization + /// part of the chain, and should be usually be set to the current head. Importantly, the + /// `end_state` must be a state that has had a block applied to it, and the hash of that + /// block must be `end_block_root`. + // NOTE: could maybe optimise by getting the `BeaconState` and end block root from a closure, as + // it's not always required. + fn forwards_block_roots_iterator( + store: Arc, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + spec: &ChainSpec, + ) -> Self::ForwardsBlockRootsIterator; } /// A unique column identifier. @@ -165,16 +183,16 @@ pub trait SimpleStoreItem: Sized { /// An item that may be stored in a `Store`. pub trait StoreItem: Sized { /// Store `self`. - fn db_put(&self, store: &S, key: &Hash256) -> Result<(), Error>; + fn db_put, E: EthSpec>(&self, store: &S, key: &Hash256) -> Result<(), Error>; /// Retrieve an instance of `Self` from `store`. - fn db_get(store: &S, key: &Hash256) -> Result, Error>; + fn db_get, E: EthSpec>(store: &S, key: &Hash256) -> Result, Error>; /// Return `true` if an instance of `Self` exists in `store`. - fn db_exists(store: &S, key: &Hash256) -> Result; + fn db_exists, E: EthSpec>(store: &S, key: &Hash256) -> Result; /// Delete an instance of `Self` from `store`. - fn db_delete(store: &S, key: &Hash256) -> Result<(), Error>; + fn db_delete, E: EthSpec>(store: &S, key: &Hash256) -> Result<(), Error>; } impl StoreItem for T @@ -182,7 +200,7 @@ where T: SimpleStoreItem, { /// Store `self`. - fn db_put(&self, store: &S, key: &Hash256) -> Result<(), Error> { + fn db_put, E: EthSpec>(&self, store: &S, key: &Hash256) -> Result<(), Error> { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -192,7 +210,7 @@ where } /// Retrieve an instance of `Self`. - fn db_get(store: &S, key: &Hash256) -> Result, Error> { + fn db_get, E: EthSpec>(store: &S, key: &Hash256) -> Result, Error> { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -203,7 +221,7 @@ where } /// Return `true` if an instance of `Self` exists in `Store`. - fn db_exists(store: &S, key: &Hash256) -> Result { + fn db_exists, E: EthSpec>(store: &S, key: &Hash256) -> Result { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -211,7 +229,7 @@ where } /// Delete `self` from the `Store`. - fn db_delete(store: &S, key: &Hash256) -> Result<(), Error> { + fn db_delete, E: EthSpec>(store: &S, key: &Hash256) -> Result<(), Error> { let column = Self::db_column().into(); let key = key.as_bytes(); @@ -246,7 +264,7 @@ mod tests { } } - fn test_impl(store: impl Store) { + fn test_impl(store: impl Store) { let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; @@ -275,7 +293,7 @@ mod tests { let slots_per_restore_point = MinimalEthSpec::slots_per_historical_root() as u64; let spec = MinimalEthSpec::default_spec(); let log = NullLoggerBuilder.build().unwrap(); - let store = DiskStore::open::( + let store = DiskStore::open( &hot_dir.path(), &cold_dir.path(), slots_per_restore_point, @@ -305,7 +323,7 @@ mod tests { #[test] fn exists() { - let store = MemoryStore::open(); + let store = MemoryStore::::open(); let key = Hash256::random(); let item = StorableThing { a: 1, b: 42 }; diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 4da9d076ca..1695756512 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,29 +1,35 @@ use super::{Error, Store}; +use crate::forwards_iter::SimpleForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use parking_lot::RwLock; use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::Arc; use types::*; type DBHashMap = HashMap, Vec>; /// A thread-safe `HashMap` wrapper. -pub struct MemoryStore { +pub struct MemoryStore { db: RwLock, + _phantom: PhantomData, } -impl Clone for MemoryStore { +impl Clone for MemoryStore { fn clone(&self) -> Self { Self { db: RwLock::new(self.db.read().clone()), + _phantom: PhantomData, } } } -impl MemoryStore { +impl MemoryStore { /// Create a new, empty database. pub fn open() -> Self { Self { db: RwLock::new(HashMap::new()), + _phantom: PhantomData, } } @@ -34,10 +40,12 @@ impl MemoryStore { } } -impl Store for MemoryStore { +impl Store for MemoryStore { + type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator; + /// Get the value of some key from the database. Returns `None` if the key does not exist. fn get_bytes(&self, col: &str, key: &[u8]) -> Result>, Error> { - let column_key = MemoryStore::get_key_for_col(col, key); + let column_key = Self::get_key_for_col(col, key); Ok(self .db @@ -48,7 +56,7 @@ impl Store for MemoryStore { /// Puts a key in the database. fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> { - let column_key = MemoryStore::get_key_for_col(col, key); + let column_key = Self::get_key_for_col(col, key); self.db.write().insert(column_key, val.to_vec()); @@ -57,14 +65,14 @@ impl Store for MemoryStore { /// Return true if some key exists in some column. fn key_exists(&self, col: &str, key: &[u8]) -> Result { - let column_key = MemoryStore::get_key_for_col(col, key); + let column_key = Self::get_key_for_col(col, key); Ok(self.db.read().contains_key(&column_key)) } /// Delete some key from the database. fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> { - let column_key = MemoryStore::get_key_for_col(col, key); + let column_key = Self::get_key_for_col(col, key); self.db.write().remove(&column_key); @@ -72,20 +80,26 @@ impl Store for MemoryStore { } /// Store a state in the store. - fn put_state( - &self, - state_root: &Hash256, - state: &BeaconState, - ) -> Result<(), Error> { + fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { store_full_state(self, state_root, state) } /// Fetch a state from the store. - fn get_state( + fn get_state( &self, state_root: &Hash256, _: Option, ) -> Result>, Error> { get_full_state(self, state_root) } + + fn forwards_block_roots_iterator( + store: Arc, + start_slot: Slot, + end_state: BeaconState, + end_block_root: Hash256, + _: &ChainSpec, + ) -> Self::ForwardsBlockRootsIterator { + SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) + } } diff --git a/beacon_node/store/src/migrate.rs b/beacon_node/store/src/migrate.rs index b32ffed14b..7aa54f6205 100644 --- a/beacon_node/store/src/migrate.rs +++ b/beacon_node/store/src/migrate.rs @@ -23,14 +23,14 @@ pub trait Migrate: Send + Sync + 'static { /// Migrator that does nothing, for stores that don't need migration. pub struct NullMigrator; -impl Migrate for NullMigrator { - fn new(_: Arc) -> Self { +impl Migrate, E> for NullMigrator { + fn new(_: Arc>) -> Self { NullMigrator } } -impl Migrate for NullMigrator { - fn new(_: Arc) -> Self { +impl Migrate, E> for NullMigrator { + fn new(_: Arc>) -> Self { NullMigrator } } @@ -40,7 +40,7 @@ impl Migrate for NullMigrator { /// Mostly useful for tests. pub struct BlockingMigrator(Arc); -impl Migrate for BlockingMigrator { +impl> Migrate for BlockingMigrator { fn new(db: Arc) -> Self { BlockingMigrator(db) } @@ -60,15 +60,15 @@ impl Migrate for BlockingMigrator { /// Migrator that runs a background thread to migrate state from the hot to the cold database. pub struct BackgroundMigrator { - db: Arc, + db: Arc>, tx_thread: Mutex<( mpsc::Sender<(Hash256, BeaconState)>, thread::JoinHandle<()>, )>, } -impl Migrate for BackgroundMigrator { - fn new(db: Arc) -> Self { +impl Migrate, E> for BackgroundMigrator { + fn new(db: Arc>) -> Self { let tx_thread = Mutex::new(Self::spawn_thread(db.clone())); Self { db, tx_thread } } @@ -119,7 +119,7 @@ impl BackgroundMigrator { /// /// Return a channel handle for sending new finalized states to the thread. fn spawn_thread( - db: Arc, + db: Arc>, ) -> ( mpsc::Sender<(Hash256, BeaconState)>, thread::JoinHandle<()>, diff --git a/beacon_node/store/src/partial_beacon_state.rs b/beacon_node/store/src/partial_beacon_state.rs index cea8cf310d..096aea8fe3 100644 --- a/beacon_node/store/src/partial_beacon_state.rs +++ b/beacon_node/store/src/partial_beacon_state.rs @@ -111,7 +111,11 @@ impl PartialBeaconState { } } - pub fn load_block_roots(&mut self, store: &S, spec: &ChainSpec) -> Result<(), Error> { + pub fn load_block_roots>( + &mut self, + store: &S, + spec: &ChainSpec, + ) -> Result<(), Error> { if self.block_roots.is_none() { self.block_roots = Some(load_vector_from_db::( store, self.slot, spec, @@ -120,7 +124,11 @@ impl PartialBeaconState { Ok(()) } - pub fn load_state_roots(&mut self, store: &S, spec: &ChainSpec) -> Result<(), Error> { + pub fn load_state_roots>( + &mut self, + store: &S, + spec: &ChainSpec, + ) -> Result<(), Error> { if self.state_roots.is_none() { self.state_roots = Some(load_vector_from_db::( store, self.slot, spec, @@ -129,7 +137,7 @@ impl PartialBeaconState { Ok(()) } - pub fn load_historical_roots( + pub fn load_historical_roots>( &mut self, store: &S, spec: &ChainSpec, @@ -142,7 +150,7 @@ impl PartialBeaconState { Ok(()) } - pub fn load_randao_mixes( + pub fn load_randao_mixes>( &mut self, store: &S, spec: &ChainSpec, diff --git a/eth2/lmd_ghost/src/lib.rs b/eth2/lmd_ghost/src/lib.rs index ac42120882..6cb96b787a 100644 --- a/eth2/lmd_ghost/src/lib.rs +++ b/eth2/lmd_ghost/src/lib.rs @@ -10,7 +10,7 @@ pub type Result = std::result::Result; // Note: the `PartialEq` bound is only required for testing. If it becomes a serious annoyance we // can remove it. -pub trait LmdGhost: PartialEq + Send + Sync + Sized { +pub trait LmdGhost, E: EthSpec>: PartialEq + Send + Sync + Sized { /// Create a new instance, with the given `store` and `finalized_root`. fn new(store: Arc, finalized_block: &BeaconBlock, finalized_root: Hash256) -> Self; diff --git a/eth2/lmd_ghost/src/reduced_tree.rs b/eth2/lmd_ghost/src/reduced_tree.rs index 65aa22e584..1d369fcce6 100644 --- a/eth2/lmd_ghost/src/reduced_tree.rs +++ b/eth2/lmd_ghost/src/reduced_tree.rs @@ -64,7 +64,7 @@ impl PartialEq for ThreadSafeReducedTree { impl LmdGhost for ThreadSafeReducedTree where - T: Store, + T: Store, E: EthSpec, { fn new(store: Arc, genesis_block: &BeaconBlock, genesis_root: Hash256) -> Self { @@ -218,7 +218,7 @@ impl PartialEq for ReducedTree { impl ReducedTree where - T: Store, + T: Store, E: EthSpec, { pub fn new(store: Arc, genesis_block: &BeaconBlock, genesis_root: Hash256) -> Self { @@ -976,16 +976,15 @@ mod tests { #[test] fn test_reduced_tree_ssz() { - let store = Arc::new(MemoryStore::open()); - let tree = ReducedTree::::new( + let store = Arc::new(MemoryStore::::open()); + let tree = ReducedTree::new( store.clone(), &BeaconBlock::empty(&MinimalEthSpec::default_spec()), Hash256::zero(), ); let ssz_tree = ReducedTreeSsz::from_reduced_tree(&tree); let bytes = tree.as_bytes(); - let recovered_tree = - ReducedTree::::from_bytes(&bytes, store.clone()).unwrap(); + let recovered_tree = ReducedTree::from_bytes(&bytes, store.clone()).unwrap(); let recovered_ssz = ReducedTreeSsz::from_reduced_tree(&recovered_tree); assert_eq!(ssz_tree, recovered_ssz); diff --git a/eth2/lmd_ghost/tests/test.rs b/eth2/lmd_ghost/tests/test.rs index 631f3e406e..8a1734b22f 100644 --- a/eth2/lmd_ghost/tests/test.rs +++ b/eth2/lmd_ghost/tests/test.rs @@ -10,17 +10,14 @@ use beacon_chain::test_utils::{ use lmd_ghost::{LmdGhost, ThreadSafeReducedTree as BaseThreadSafeReducedTree}; use rand::{prelude::*, rngs::StdRng}; use std::sync::Arc; -use store::{ - iter::{AncestorIter, BlockRootsIterator}, - MemoryStore, Store, -}; +use store::{iter::AncestorIter, MemoryStore, Store}; use types::{BeaconBlock, EthSpec, Hash256, MinimalEthSpec, Slot}; // Should ideally be divisible by 3. pub const VALIDATOR_COUNT: usize = 3 * 8; type TestEthSpec = MinimalEthSpec; -type ThreadSafeReducedTree = BaseThreadSafeReducedTree; +type ThreadSafeReducedTree = BaseThreadSafeReducedTree, TestEthSpec>; type BeaconChainHarness = BaseBeaconChainHarness>; type RootAndSlot = (Hash256, Slot); @@ -86,16 +83,14 @@ impl ForkedHarness { faulty_fork_blocks, ); - let mut honest_roots = - get_ancestor_roots::(harness.chain.store.clone(), honest_head); + let mut honest_roots = get_ancestor_roots(harness.chain.store.clone(), honest_head); honest_roots.insert( 0, (honest_head, get_slot_for_block_root(&harness, honest_head)), ); - let mut faulty_roots = - get_ancestor_roots::(harness.chain.store.clone(), faulty_head); + let mut faulty_roots = get_ancestor_roots(harness.chain.store.clone(), faulty_head); faulty_roots.insert( 0, @@ -121,7 +116,7 @@ impl ForkedHarness { } } - pub fn store_clone(&self) -> MemoryStore { + pub fn store_clone(&self) -> MemoryStore { (*self.harness.chain.store).clone() } @@ -131,7 +126,7 @@ impl ForkedHarness { // // Taking a clone here ensures that each fork choice gets it's own store so there is no // cross-contamination between tests. - let store: MemoryStore = self.store_clone(); + let store: MemoryStore = self.store_clone(); ThreadSafeReducedTree::new( Arc::new(store), @@ -155,7 +150,7 @@ impl ForkedHarness { } /// Helper: returns all the ancestor roots and slots for a given block_root. -fn get_ancestor_roots( +fn get_ancestor_roots>( store: Arc, block_root: Hash256, ) -> Vec<(Hash256, Slot)> { @@ -164,11 +159,9 @@ fn get_ancestor_roots( .expect("block should exist") .expect("store should not error"); - as AncestorIter<_, BlockRootsIterator>>::try_iter_ancestor_roots( - &block, store, - ) - .expect("should be able to create ancestor iter") - .collect() + as AncestorIter<_, _, _>>::try_iter_ancestor_roots(&block, store) + .expect("should be able to create ancestor iter") + .collect() } /// Helper: returns the slot for some block_root.