Make key value storage abstractions more accurate (#1267)

* Layer do_atomically() abstractions properly

* Reduce allocs and DRY get_key_for_col()

* Parameterize HotColdDB with hot and cold item stores

* -impl Store for MemoryStore

* Replace Store uses with HotColdDB

* Ditch Store trait

* cargo fmt

* Style fix

* Readd missing dep that broke the build
This commit is contained in:
Adam Szkoda
2020-06-16 03:34:04 +02:00
committed by GitHub
parent 6b8c96662f
commit 9db0c28051
30 changed files with 589 additions and 575 deletions

View File

@@ -5,10 +5,12 @@ use crate::config::StoreConfig;
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
use crate::metrics;
use crate::{
leveldb_store::LevelDB, DBColumn, Error, ItemStore, KeyValueStore, PartialBeaconState, Store,
StoreItem, StoreOp,
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
StoreOp,
};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
@@ -32,7 +34,7 @@ pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
///
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
/// intermittent "restore point" states pre-finalization.
pub struct HotColdDB<E: EthSpec> {
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// The slot and state root at the point where the database is split between hot and cold.
///
/// States with slots less than `split.slot` are in the cold DB, while states with slots
@@ -40,11 +42,11 @@ pub struct HotColdDB<E: EthSpec> {
split: RwLock<Split>,
config: StoreConfig,
/// Cold database containing compact historical data.
pub(crate) cold_db: LevelDB<E>,
pub(crate) cold_db: Cold,
/// Hot database containing duplicated but quick-to-access recent data.
///
/// The hot database also contains all blocks.
pub(crate) hot_db: LevelDB<E>,
pub(crate) hot_db: Hot,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Chain spec.
@@ -84,11 +86,13 @@ pub enum HotColdDBError {
RestorePointBlockHashError(BeaconStateError),
}
impl<E: EthSpec> Store<E> for HotColdDB<E> {
type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator<E>;
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
/// Store a block and update the LRU cache.
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error> {
pub fn put_block(
&self,
block_root: &Hash256,
block: SignedBeaconBlock<E>,
) -> Result<(), Error> {
// Store on disk.
self.hot_db.put(block_root, &block)?;
@@ -99,7 +103,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Fetch a block from the store.
fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
// Check the cache.
@@ -120,12 +124,12 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Delete a block from the store and the block cache.
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().pop(block_root);
self.hot_db.delete::<SignedBeaconBlock<E>>(block_root)
}
fn put_state_summary(
pub fn put_state_summary(
&self,
state_root: &Hash256,
summary: HotStateSummary,
@@ -134,7 +138,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
if state.slot < self.get_split_slot() {
self.store_cold_state(state_root, &state)
} else {
@@ -143,7 +147,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Fetch a state from the store.
fn get_state(
pub fn get_state(
&self,
state_root: &Hash256,
slot: Option<Slot>,
@@ -170,7 +174,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
/// than the split point. You shouldn't delete states from the finalized portion of the chain
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
/// (which will be deleted by this function but shouldn't be).
fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
// Delete the state summary.
self.hot_db
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
@@ -184,20 +188,20 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
Ok(())
}
fn forwards_block_roots_iterator(
pub fn forwards_block_roots_iterator(
store: Arc<Self>,
start_slot: Slot,
end_state: BeaconState<E>,
end_block_root: Hash256,
spec: &ChainSpec,
) -> Result<Self::ForwardsBlockRootsIterator, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec)
}
/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.
fn load_epoch_boundary_state(
pub fn load_epoch_boundary_state(
&self,
state_root: &Hash256,
) -> Result<Option<BeaconState<E>>, Error> {
@@ -226,21 +230,49 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
}
fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
pub fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
self.hot_db.put(key, item)
}
fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
pub fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
self.hot_db.get(key)
}
fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
pub fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
self.hot_db.exists::<I>(key)
}
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
self.hot_db.do_atomically(batch)?;
let mut key_value_batch: Vec<KeyValueStoreOp> = Vec::with_capacity(batch.len());
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
let key =
get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
}
StoreOp::DeleteState(state_hash, slot) => {
let untyped_hash: Hash256 = (*state_hash).into();
let state_summary_key = get_key_for_col(
DBColumn::BeaconStateSummary.into(),
untyped_hash.as_bytes(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));
if *slot % E::slots_per_epoch() == 0 {
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
}
}
}
}
self.hot_db.do_atomically(&key_value_batch)?;
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
@@ -254,7 +286,30 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
}
impl<E: EthSpec> HotColdDB<E> {
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
pub fn open_ephemeral(
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
};
Ok(db)
}
}
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
/// Open a new or existing database, with the given paths to the hot and cold DBs.
///
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
@@ -264,7 +319,7 @@ impl<E: EthSpec> HotColdDB<E> {
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<Self, Error> {
) -> Result<HotColdDB<E, LevelDB<E>, LevelDB<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
@@ -285,7 +340,9 @@ impl<E: EthSpec> HotColdDB<E> {
}
Ok(db)
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
/// Store a post-finalization state efficiently in the hot database.
///
/// On an epoch boundary, store a full state. On an intermediate slot, store
@@ -675,8 +732,8 @@ impl<E: EthSpec> HotColdDB<E> {
}
/// Advance the split point of the store, moving new finalized states to the freezer.
pub fn process_finalization<E: EthSpec>(
store: Arc<HotColdDB<E>>,
pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,
) -> Result<(), Error> {