Clean up database abstractions (#1200)

* Remove redundant method

* Pull out a method out of a struct

* More precise db access abstractions

* Move fake trait method out of it

* cargo fmt

* Fix compilation error after refactoring

* Move another fake method out the Store trait

* Get rid of superfluous method

* Fix refactoring bug

* Rename: SimpleStoreItem -> StoreItem

* Get rid of the confusing DiskStore type alias

* Get rid of SimpleDiskStore type alias

* Correction: A method took both self and a ref to Self
This commit is contained in:
Adam Szkoda
2020-06-01 00:13:49 +02:00
committed by GitHub
parent 08e6b4961d
commit 91cb14ac41
31 changed files with 413 additions and 485 deletions

View File

@@ -3,11 +3,12 @@ use crate::chunked_vector::{
};
use crate::config::StoreConfig;
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
use crate::impls::beacon_state::store_full_state;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::metrics;
use crate::{
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreOp,
leveldb_store::LevelDB, DBColumn, Error, ItemStore, KeyValueStore, PartialBeaconState, Store,
StoreItem, StoreOp,
};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
@@ -86,27 +87,10 @@ pub enum HotColdDBError {
impl<E: EthSpec> Store<E> for HotColdDB<E> {
type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator<E>;
// Defer to the hot database for basic operations (including blocks for now)
fn get_bytes(&self, column: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
self.hot_db.get_bytes(column, key)
}
fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.hot_db.put_bytes(column, key, value)
}
fn key_exists(&self, column: &str, key: &[u8]) -> Result<bool, Error> {
self.hot_db.key_exists(column, key)
}
fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error> {
self.hot_db.key_delete(column, key)
}
/// Store a block and update the LRU cache.
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error> {
// Store on disk.
self.put(block_root, &block)?;
self.hot_db.put(block_root, &block)?;
// Update cache.
self.block_cache.lock().put(*block_root, block);
@@ -125,7 +109,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
// Fetch from database.
match self.get::<SignedBeaconBlock<E>>(block_root)? {
match self.hot_db.get::<SignedBeaconBlock<E>>(block_root)? {
Some(block) => {
// Add to cache.
self.block_cache.lock().put(*block_root, block.clone());
@@ -138,7 +122,15 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
/// Delete a block from the store and the block cache.
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
self.block_cache.lock().pop(block_root);
self.delete::<SignedBeaconBlock<E>>(block_root)
self.hot_db.delete::<SignedBeaconBlock<E>>(block_root)
}
fn put_state_summary(
&self,
state_root: &Hash256,
summary: HotStateSummary,
) -> Result<(), Error> {
self.hot_db.put(state_root, &summary).map_err(Into::into)
}
/// Store a state in the store.
@@ -155,17 +147,6 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
&self,
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
self.get_state_with(state_root, slot)
}
/// Get a state from the store.
///
/// Fetch a state from the store, controlling which cache fields are cloned.
fn get_state_with(
&self,
state_root: &Hash256,
slot: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
@@ -203,96 +184,6 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
Ok(())
}
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
self.hot_db.do_atomically(batch)?;
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash);
}
StoreOp::DeleteState(_, _) => (),
}
}
Ok(())
}
/// Advance the split point of the store, moving new finalized states to the freezer.
fn process_finalization(
store: Arc<Self>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,
) -> Result<(), Error> {
debug!(
store.log,
"Freezer migration started";
"slot" => frozen_head.slot
);
// 0. Check that the migration is sensible.
// The new frozen head must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.get_split_slot();
if frozen_head.slot < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
current_split_slot,
proposed_split_slot: frozen_head.slot,
}
.into());
}
if frozen_head.slot % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into());
}
// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
let mut to_delete = vec![];
for (state_root, slot) in
state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot)
{
if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = store
.hot_db
.get_state(&state_root, None)?
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state)?;
}
// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
store.store_cold_state_slot(&state_root, slot)?;
// Delete the old summary, and the full state if we lie on an epoch boundary.
to_delete.push((state_root, slot));
}
// 2. Update the split slot
*store.split.write() = Split {
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store.store_split()?;
// 3. Delete from the hot DB
for (state_root, slot) in to_delete {
store.delete_state(&state_root, slot)?;
}
debug!(
store.log,
"Freezer migration complete";
"slot" => frozen_head.slot
);
Ok(())
}
fn forwards_block_roots_iterator(
store: Arc<Self>,
start_slot: Slot,
@@ -334,6 +225,33 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
}
}
fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
self.hot_db.put(key, item)
}
fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
self.hot_db.get(key)
}
fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
self.hot_db.exists::<I>(key)
}
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
self.hot_db.do_atomically(batch)?;
for op in batch {
match op {
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash);
}
StoreOp::DeleteState(_, _) => (),
}
}
Ok(())
}
}
impl<E: EthSpec> HotColdDB<E> {
@@ -408,9 +326,7 @@ impl<E: EthSpec> HotColdDB<E> {
epoch_boundary_state_root,
}) = self.load_hot_state_summary(state_root)?
{
let boundary_state = self
.hot_db
.get_state(&epoch_boundary_state_root, None)?
let boundary_state = get_full_state(&self.hot_db, &epoch_boundary_state_root)?
.ok_or_else(|| {
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root)
})?;
@@ -758,6 +674,77 @@ impl<E: EthSpec> HotColdDB<E> {
}
}
/// Advance the split point of the store, moving new finalized states to the freezer.
pub fn process_finalization<E: EthSpec>(
store: Arc<HotColdDB<E>>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,
) -> Result<(), Error> {
debug!(
store.log,
"Freezer migration started";
"slot" => frozen_head.slot
);
// 0. Check that the migration is sensible.
// The new frozen head must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.get_split_slot();
if frozen_head.slot < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
current_split_slot,
proposed_split_slot: frozen_head.slot,
}
.into());
}
if frozen_head.slot % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into());
}
// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
let mut to_delete = vec![];
for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) {
if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state)?;
}
// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
store.store_cold_state_slot(&state_root, slot)?;
// Delete the old summary, and the full state if we lie on an epoch boundary.
to_delete.push((state_root, slot));
}
// 2. Update the split slot
*store.split.write() = Split {
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store.store_split()?;
// 3. Delete from the hot DB
for (state_root, slot) in to_delete {
store.delete_state(&state_root, slot)?;
}
debug!(
store.log,
"Freezer migration complete";
"slot" => frozen_head.slot
);
Ok(())
}
/// Struct for storing the split slot and state root in the database.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
struct Split {
@@ -765,7 +752,7 @@ struct Split {
state_root: Hash256,
}
impl SimpleStoreItem for Split {
impl StoreItem for Split {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}
@@ -789,7 +776,7 @@ pub struct HotStateSummary {
epoch_boundary_state_root: Hash256,
}
impl SimpleStoreItem for HotStateSummary {
impl StoreItem for HotStateSummary {
fn db_column() -> DBColumn {
DBColumn::BeaconStateSummary
}
@@ -832,7 +819,7 @@ struct ColdStateSummary {
slot: Slot,
}
impl SimpleStoreItem for ColdStateSummary {
impl StoreItem for ColdStateSummary {
fn db_column() -> DBColumn {
DBColumn::BeaconStateSummary
}
@@ -852,7 +839,7 @@ struct RestorePointHash {
state_root: Hash256,
}
impl SimpleStoreItem for RestorePointHash {
impl StoreItem for RestorePointHash {
fn db_column() -> DBColumn {
DBColumn::BeaconRestorePoint
}