mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-22 14:24:44 +00:00
Merge remote-tracking branch 'origin/master' into spec-v0.12
This commit is contained in:
@@ -10,7 +10,6 @@ harness = false
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.1.0"
|
||||
sloggers = "1.0.0"
|
||||
criterion = "0.3.2"
|
||||
rayon = "1.3.0"
|
||||
|
||||
@@ -31,3 +30,4 @@ lazy_static = "1.4.0"
|
||||
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
|
||||
lru = "0.5.1"
|
||||
fork_choice = { path = "../../consensus/fork_choice" }
|
||||
sloggers = "1.0.0"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::chunked_vector::{chunk_key, Chunk, Field};
|
||||
use crate::HotColdDB;
|
||||
use crate::{HotColdDB, ItemStore};
|
||||
use slog::error;
|
||||
use std::sync::Arc;
|
||||
use types::{ChainSpec, EthSpec, Slot};
|
||||
@@ -7,22 +7,26 @@ use types::{ChainSpec, EthSpec, Slot};
|
||||
/// Iterator over the values of a `BeaconState` vector field (like `block_roots`).
|
||||
///
|
||||
/// Uses the freezer DB's separate table to load the values.
|
||||
pub struct ChunkedVectorIter<F, E>
|
||||
pub struct ChunkedVectorIter<F, E, Hot, Cold>
|
||||
where
|
||||
F: Field<E>,
|
||||
E: EthSpec,
|
||||
Hot: ItemStore<E>,
|
||||
Cold: ItemStore<E>,
|
||||
{
|
||||
pub(crate) store: Arc<HotColdDB<E>>,
|
||||
pub(crate) store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
current_vindex: usize,
|
||||
pub(crate) end_vindex: usize,
|
||||
next_cindex: usize,
|
||||
current_chunk: Chunk<F::Value>,
|
||||
}
|
||||
|
||||
impl<F, E> ChunkedVectorIter<F, E>
|
||||
impl<F, E, Hot, Cold> ChunkedVectorIter<F, E, Hot, Cold>
|
||||
where
|
||||
F: Field<E>,
|
||||
E: EthSpec,
|
||||
Hot: ItemStore<E>,
|
||||
Cold: ItemStore<E>,
|
||||
{
|
||||
/// Create a new iterator which can yield elements from `start_vindex` up to the last
|
||||
/// index stored by the restore point at `last_restore_point_slot`.
|
||||
@@ -31,7 +35,7 @@ where
|
||||
/// `HotColdDB::get_latest_restore_point_slot`. We pass it as a parameter so that the caller can
|
||||
/// maintain a stable view of the database (see `HybridForwardsBlockRootsIterator`).
|
||||
pub fn new(
|
||||
store: Arc<HotColdDB<E>>,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
start_vindex: usize,
|
||||
last_restore_point_slot: Slot,
|
||||
spec: &ChainSpec,
|
||||
@@ -53,10 +57,12 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, E> Iterator for ChunkedVectorIter<F, E>
|
||||
impl<F, E, Hot, Cold> Iterator for ChunkedVectorIter<F, E, Hot, Cold>
|
||||
where
|
||||
F: Field<E>,
|
||||
E: EthSpec,
|
||||
Hot: ItemStore<E>,
|
||||
Cold: ItemStore<E>,
|
||||
{
|
||||
type Item = (usize, F::Value);
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ use crate::hot_cold_store::HotColdDBError;
|
||||
use ssz::DecodeError;
|
||||
use types::{BeaconStateError, Hash256};
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
SszDecodeError(DecodeError),
|
||||
@@ -13,6 +15,7 @@ pub enum Error {
|
||||
DBError { message: String },
|
||||
RlpError(String),
|
||||
BlockNotFound(Hash256),
|
||||
NoContinuationData,
|
||||
}
|
||||
|
||||
impl From<DecodeError> for Error {
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
use crate::chunked_iter::ChunkedVectorIter;
|
||||
use crate::chunked_vector::BlockRoots;
|
||||
use crate::errors::{Error, Result};
|
||||
use crate::iter::BlockRootsIterator;
|
||||
use crate::{HotColdDB, Store};
|
||||
use slog::error;
|
||||
use crate::{HotColdDB, ItemStore};
|
||||
use itertools::process_results;
|
||||
use std::sync::Arc;
|
||||
use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot};
|
||||
|
||||
/// Forwards block roots iterator that makes use of the `block_roots` table in the freezer DB.
|
||||
pub struct FrozenForwardsBlockRootsIterator<E: EthSpec> {
|
||||
inner: ChunkedVectorIter<BlockRoots, E>,
|
||||
pub struct FrozenForwardsBlockRootsIterator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
inner: ChunkedVectorIter<BlockRoots, E, Hot, Cold>,
|
||||
}
|
||||
|
||||
/// Forwards block roots iterator that reverses a backwards iterator (only good for short ranges).
|
||||
@@ -18,9 +19,9 @@ pub struct SimpleForwardsBlockRootsIterator {
|
||||
}
|
||||
|
||||
/// Fusion of the above two approaches to forwards iteration. Fast and efficient.
|
||||
pub enum HybridForwardsBlockRootsIterator<E: EthSpec> {
|
||||
pub enum HybridForwardsBlockRootsIterator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
PreFinalization {
|
||||
iter: Box<FrozenForwardsBlockRootsIterator<E>>,
|
||||
iter: Box<FrozenForwardsBlockRootsIterator<E, Hot, Cold>>,
|
||||
/// Data required by the `PostFinalization` iterator when we get to it.
|
||||
continuation_data: Box<Option<(BeaconState<E>, Hash256)>>,
|
||||
},
|
||||
@@ -29,9 +30,11 @@ pub enum HybridForwardsBlockRootsIterator<E: EthSpec> {
|
||||
},
|
||||
}
|
||||
|
||||
impl<E: EthSpec> FrozenForwardsBlockRootsIterator<E> {
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
FrozenForwardsBlockRootsIterator<E, Hot, Cold>
|
||||
{
|
||||
pub fn new(
|
||||
store: Arc<HotColdDB<E>>,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
start_slot: Slot,
|
||||
last_restore_point_slot: Slot,
|
||||
spec: &ChainSpec,
|
||||
@@ -47,7 +50,9 @@ impl<E: EthSpec> FrozenForwardsBlockRootsIterator<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Iterator for FrozenForwardsBlockRootsIterator<E> {
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for FrozenForwardsBlockRootsIterator<E, Hot, Cold>
|
||||
{
|
||||
type Item = (Hash256, Slot);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@@ -58,43 +63,49 @@ impl<E: EthSpec> Iterator for FrozenForwardsBlockRootsIterator<E> {
|
||||
}
|
||||
|
||||
impl SimpleForwardsBlockRootsIterator {
|
||||
pub fn new<S: Store<E>, E: EthSpec>(
|
||||
store: Arc<S>,
|
||||
pub fn new<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
) -> Self {
|
||||
) -> Result<Self> {
|
||||
// Iterate backwards from the end state, stopping at the start slot.
|
||||
let iter = std::iter::once((end_block_root, end_state.slot))
|
||||
.chain(BlockRootsIterator::owned(store, end_state));
|
||||
Self {
|
||||
values: iter.take_while(|(_, slot)| *slot >= start_slot).collect(),
|
||||
}
|
||||
let values = process_results(
|
||||
std::iter::once(Ok((end_block_root, end_state.slot)))
|
||||
.chain(BlockRootsIterator::owned(store, end_state)),
|
||||
|iter| {
|
||||
iter.take_while(|(_, slot)| *slot >= start_slot)
|
||||
.collect::<Vec<_>>()
|
||||
},
|
||||
)?;
|
||||
Ok(Self { values: values })
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for SimpleForwardsBlockRootsIterator {
|
||||
type Item = (Hash256, Slot);
|
||||
type Item = Result<(Hash256, Slot)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
// Pop from the end of the vector to get the block roots in slot-ascending order.
|
||||
self.values.pop()
|
||||
Ok(self.values.pop()).transpose()
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HybridForwardsBlockRootsIterator<E> {
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
HybridForwardsBlockRootsIterator<E, Hot, Cold>
|
||||
{
|
||||
pub fn new(
|
||||
store: Arc<HotColdDB<E>>,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Self {
|
||||
) -> Result<Self> {
|
||||
use HybridForwardsBlockRootsIterator::*;
|
||||
|
||||
let latest_restore_point_slot = store.get_latest_restore_point_slot();
|
||||
|
||||
if start_slot < latest_restore_point_slot {
|
||||
let result = if start_slot < latest_restore_point_slot {
|
||||
PreFinalization {
|
||||
iter: Box::new(FrozenForwardsBlockRootsIterator::new(
|
||||
store,
|
||||
@@ -111,16 +122,14 @@ impl<E: EthSpec> HybridForwardsBlockRootsIterator<E> {
|
||||
start_slot,
|
||||
end_state,
|
||||
end_block_root,
|
||||
),
|
||||
)?,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
|
||||
type Item = (Hash256, Slot);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
fn do_next(&mut self) -> Result<Option<(Hash256, Slot)>> {
|
||||
use HybridForwardsBlockRootsIterator::*;
|
||||
|
||||
match self {
|
||||
@@ -129,19 +138,13 @@ impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
|
||||
continuation_data,
|
||||
} => {
|
||||
match iter.next() {
|
||||
Some(x) => Some(x),
|
||||
Some(x) => Ok(Some(x)),
|
||||
// Once the pre-finalization iterator is consumed, transition
|
||||
// to a post-finalization iterator beginning from the last slot
|
||||
// of the pre iterator.
|
||||
None => {
|
||||
let (end_state, end_block_root) =
|
||||
continuation_data.take().or_else(|| {
|
||||
error!(
|
||||
iter.inner.store.log,
|
||||
"HybridForwardsBlockRootsIterator: logic error"
|
||||
);
|
||||
None
|
||||
})?;
|
||||
continuation_data.take().ok_or(Error::NoContinuationData)?;
|
||||
|
||||
*self = PostFinalization {
|
||||
iter: SimpleForwardsBlockRootsIterator::new(
|
||||
@@ -149,13 +152,23 @@ impl<E: EthSpec> Iterator for HybridForwardsBlockRootsIterator<E> {
|
||||
Slot::from(iter.inner.end_vindex),
|
||||
end_state,
|
||||
end_block_root,
|
||||
),
|
||||
)?,
|
||||
};
|
||||
self.next()
|
||||
self.do_next()
|
||||
}
|
||||
}
|
||||
}
|
||||
PostFinalization { iter } => iter.next(),
|
||||
PostFinalization { iter } => iter.next().transpose(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for HybridForwardsBlockRootsIterator<E, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, Slot)>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.do_next().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,10 +5,12 @@ use crate::config::StoreConfig;
|
||||
use crate::forwards_iter::HybridForwardsBlockRootsIterator;
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||
use crate::leveldb_store::LevelDB;
|
||||
use crate::memory_store::MemoryStore;
|
||||
use crate::metrics;
|
||||
use crate::{
|
||||
leveldb_store::LevelDB, DBColumn, Error, ItemStore, KeyValueStore, PartialBeaconState, Store,
|
||||
StoreItem, StoreOp,
|
||||
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
|
||||
StoreOp,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
@@ -32,7 +34,7 @@ pub const SPLIT_DB_KEY: &str = "FREEZERDBSPLITFREEZERDBSPLITFREE";
|
||||
///
|
||||
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
|
||||
/// intermittent "restore point" states pre-finalization.
|
||||
pub struct HotColdDB<E: EthSpec> {
|
||||
pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
/// The slot and state root at the point where the database is split between hot and cold.
|
||||
///
|
||||
/// States with slots less than `split.slot` are in the cold DB, while states with slots
|
||||
@@ -40,11 +42,11 @@ pub struct HotColdDB<E: EthSpec> {
|
||||
split: RwLock<Split>,
|
||||
config: StoreConfig,
|
||||
/// Cold database containing compact historical data.
|
||||
pub(crate) cold_db: LevelDB<E>,
|
||||
pub(crate) cold_db: Cold,
|
||||
/// Hot database containing duplicated but quick-to-access recent data.
|
||||
///
|
||||
/// The hot database also contains all blocks.
|
||||
pub(crate) hot_db: LevelDB<E>,
|
||||
pub(crate) hot_db: Hot,
|
||||
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
|
||||
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
|
||||
/// Chain spec.
|
||||
@@ -84,11 +86,13 @@ pub enum HotColdDBError {
|
||||
RestorePointBlockHashError(BeaconStateError),
|
||||
}
|
||||
|
||||
impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
type ForwardsBlockRootsIterator = HybridForwardsBlockRootsIterator<E>;
|
||||
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
||||
/// Store a block and update the LRU cache.
|
||||
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error> {
|
||||
pub fn put_block(
|
||||
&self,
|
||||
block_root: &Hash256,
|
||||
block: SignedBeaconBlock<E>,
|
||||
) -> Result<(), Error> {
|
||||
// Store on disk.
|
||||
self.hot_db.put(block_root, &block)?;
|
||||
|
||||
@@ -99,7 +103,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
|
||||
/// Fetch a block from the store.
|
||||
fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
pub fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
metrics::inc_counter(&metrics::BEACON_BLOCK_GET_COUNT);
|
||||
|
||||
// Check the cache.
|
||||
@@ -120,12 +124,12 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
|
||||
/// Delete a block from the store and the block cache.
|
||||
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
pub fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.block_cache.lock().pop(block_root);
|
||||
self.hot_db.delete::<SignedBeaconBlock<E>>(block_root)
|
||||
}
|
||||
|
||||
fn put_state_summary(
|
||||
pub fn put_state_summary(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
summary: HotStateSummary,
|
||||
@@ -134,7 +138,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
|
||||
/// Store a state in the store.
|
||||
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
|
||||
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
|
||||
if state.slot < self.get_split_slot() {
|
||||
self.store_cold_state(state_root, &state)
|
||||
} else {
|
||||
@@ -143,7 +147,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
|
||||
/// Fetch a state from the store.
|
||||
fn get_state(
|
||||
pub fn get_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
slot: Option<Slot>,
|
||||
@@ -170,7 +174,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
/// than the split point. You shouldn't delete states from the finalized portion of the chain
|
||||
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
|
||||
/// (which will be deleted by this function but shouldn't be).
|
||||
fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
|
||||
pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
|
||||
// Delete the state summary.
|
||||
self.hot_db
|
||||
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
|
||||
@@ -184,20 +188,20 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn forwards_block_roots_iterator(
|
||||
pub fn forwards_block_roots_iterator(
|
||||
store: Arc<Self>,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Self::ForwardsBlockRootsIterator {
|
||||
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
|
||||
HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec)
|
||||
}
|
||||
|
||||
/// Load an epoch boundary state by using the hot state summary look-up.
|
||||
///
|
||||
/// Will fall back to the cold DB if a hot state summary is not found.
|
||||
fn load_epoch_boundary_state(
|
||||
pub fn load_epoch_boundary_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
@@ -226,21 +230,49 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
}
|
||||
|
||||
fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
|
||||
pub fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
|
||||
self.hot_db.put(key, item)
|
||||
}
|
||||
|
||||
fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
|
||||
pub fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
|
||||
self.hot_db.get(key)
|
||||
}
|
||||
|
||||
fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
|
||||
pub fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
|
||||
self.hot_db.exists::<I>(key)
|
||||
}
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
let mut guard = self.block_cache.lock();
|
||||
self.hot_db.do_atomically(batch)?;
|
||||
|
||||
let mut key_value_batch: Vec<KeyValueStoreOp> = Vec::with_capacity(batch.len());
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
let key =
|
||||
get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes());
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(key));
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_hash, slot) => {
|
||||
let untyped_hash: Hash256 = (*state_hash).into();
|
||||
let state_summary_key = get_key_for_col(
|
||||
DBColumn::BeaconStateSummary.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
);
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));
|
||||
|
||||
if *slot % E::slots_per_epoch() == 0 {
|
||||
let state_key =
|
||||
get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes());
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.hot_db.do_atomically(&key_value_batch)?;
|
||||
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
@@ -254,7 +286,30 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HotColdDB<E> {
|
||||
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
||||
pub fn open_ephemeral(
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
log: Logger,
|
||||
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
|
||||
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
||||
|
||||
let db = HotColdDB {
|
||||
split: RwLock::new(Split::default()),
|
||||
cold_db: MemoryStore::open(),
|
||||
hot_db: MemoryStore::open(),
|
||||
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
|
||||
config,
|
||||
spec,
|
||||
log,
|
||||
_phantom: PhantomData,
|
||||
};
|
||||
|
||||
Ok(db)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
||||
/// Open a new or existing database, with the given paths to the hot and cold DBs.
|
||||
///
|
||||
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
|
||||
@@ -264,7 +319,7 @@ impl<E: EthSpec> HotColdDB<E> {
|
||||
config: StoreConfig,
|
||||
spec: ChainSpec,
|
||||
log: Logger,
|
||||
) -> Result<Self, Error> {
|
||||
) -> Result<HotColdDB<E, LevelDB<E>, LevelDB<E>>, Error> {
|
||||
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
||||
|
||||
let db = HotColdDB {
|
||||
@@ -285,7 +340,9 @@ impl<E: EthSpec> HotColdDB<E> {
|
||||
}
|
||||
Ok(db)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
||||
/// Store a post-finalization state efficiently in the hot database.
|
||||
///
|
||||
/// On an epoch boundary, store a full state. On an intermediate slot, store
|
||||
@@ -675,8 +732,8 @@ impl<E: EthSpec> HotColdDB<E> {
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
pub fn process_finalization<E: EthSpec>(
|
||||
store: Arc<HotColdDB<E>>,
|
||||
pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
frozen_head_root: Hash256,
|
||||
frozen_head: &BeaconState<E>,
|
||||
) -> Result<(), Error> {
|
||||
@@ -708,7 +765,11 @@ pub fn process_finalization<E: EthSpec>(
|
||||
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);
|
||||
|
||||
let mut to_delete = vec![];
|
||||
for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) {
|
||||
for maybe_pair in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, slot)) => slot >= ¤t_split_slot,
|
||||
Err(_) => true,
|
||||
}) {
|
||||
let (state_root, slot) = maybe_pair?;
|
||||
if slot % store.config.slots_per_restore_point == 0 {
|
||||
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
|
||||
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Error, Store};
|
||||
use crate::{Error, HotColdDB, ItemStore};
|
||||
use std::borrow::Cow;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
@@ -12,17 +12,20 @@ use types::{
|
||||
///
|
||||
/// It is assumed that all ancestors for this object are stored in the database. If this is not the
|
||||
/// case, the iterator will start returning `None` prior to genesis.
|
||||
pub trait AncestorIter<U: Store<E>, E: EthSpec, I: Iterator> {
|
||||
pub trait AncestorIter<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>, I: Iterator> {
|
||||
/// Returns an iterator over the roots of the ancestors of `self`.
|
||||
fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<I>;
|
||||
fn try_iter_ancestor_roots(&self, store: Arc<HotColdDB<E, Hot, Cold>>) -> Option<I>;
|
||||
}
|
||||
|
||||
impl<'a, U: Store<E>, E: EthSpec> AncestorIter<U, E, BlockRootsIterator<'a, E, U>>
|
||||
for SignedBeaconBlock<E>
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
AncestorIter<E, Hot, Cold, BlockRootsIterator<'a, E, Hot, Cold>> for SignedBeaconBlock<E>
|
||||
{
|
||||
/// Iterates across all available prior block roots of `self`, starting at the most recent and ending
|
||||
/// at genesis.
|
||||
fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<BlockRootsIterator<'a, E, U>> {
|
||||
fn try_iter_ancestor_roots(
|
||||
&self,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
) -> Option<BlockRootsIterator<'a, E, Hot, Cold>> {
|
||||
let state = store
|
||||
.get_state(&self.message.state_root, Some(self.message.slot))
|
||||
.ok()??;
|
||||
@@ -31,22 +34,27 @@ impl<'a, U: Store<E>, E: EthSpec> AncestorIter<U, E, BlockRootsIterator<'a, E, U
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, U: Store<E>, E: EthSpec> AncestorIter<U, E, StateRootsIterator<'a, E, U>>
|
||||
for BeaconState<E>
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
AncestorIter<E, Hot, Cold, StateRootsIterator<'a, E, Hot, Cold>> for BeaconState<E>
|
||||
{
|
||||
/// Iterates across all available prior state roots of `self`, starting at the most recent and ending
|
||||
/// at genesis.
|
||||
fn try_iter_ancestor_roots(&self, store: Arc<U>) -> Option<StateRootsIterator<'a, E, U>> {
|
||||
fn try_iter_ancestor_roots(
|
||||
&self,
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
) -> Option<StateRootsIterator<'a, E, Hot, Cold>> {
|
||||
// The `self.clone()` here is wasteful.
|
||||
Some(StateRootsIterator::owned(store, self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StateRootsIterator<'a, T: EthSpec, U: Store<T>> {
|
||||
inner: RootsIterator<'a, T, U>,
|
||||
pub struct StateRootsIterator<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> {
|
||||
inner: RootsIterator<'a, T, Hot, Cold>,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Clone for StateRootsIterator<'a, T, U> {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Clone
|
||||
for StateRootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
@@ -54,27 +62,29 @@ impl<'a, T: EthSpec, U: Store<T>> Clone for StateRootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> StateRootsIterator<'a, T, U> {
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> StateRootsIterator<'a, T, Hot, Cold> {
|
||||
pub fn new(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
Self {
|
||||
inner: RootsIterator::new(store, beacon_state),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
|
||||
pub fn owned(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: BeaconState<T>) -> Self {
|
||||
Self {
|
||||
inner: RootsIterator::owned(store, beacon_state),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
|
||||
type Item = (Hash256, Slot);
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
|
||||
for StateRootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, Slot), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner
|
||||
.next()
|
||||
.map(|(_, state_root, slot)| (state_root, slot))
|
||||
.map(|result| result.map(|(_, state_root, slot)| (state_root, slot)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,11 +96,13 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
|
||||
/// exhausted.
|
||||
///
|
||||
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
|
||||
pub struct BlockRootsIterator<'a, T: EthSpec, U: Store<T>> {
|
||||
inner: RootsIterator<'a, T, U>,
|
||||
pub struct BlockRootsIterator<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> {
|
||||
inner: RootsIterator<'a, T, Hot, Cold>,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Clone for BlockRootsIterator<'a, T, U> {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Clone
|
||||
for BlockRootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
inner: self.inner.clone(),
|
||||
@@ -98,40 +110,44 @@ impl<'a, T: EthSpec, U: Store<T>> Clone for BlockRootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> BlockRootsIterator<'a, T, U> {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockRootsIterator<'a, T, Hot, Cold> {
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
pub fn new(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
Self {
|
||||
inner: RootsIterator::new(store, beacon_state),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
|
||||
pub fn owned(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: BeaconState<T>) -> Self {
|
||||
Self {
|
||||
inner: RootsIterator::owned(store, beacon_state),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockRootsIterator<'a, T, U> {
|
||||
type Item = (Hash256, Slot);
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
|
||||
for BlockRootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, Slot), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner
|
||||
.next()
|
||||
.map(|(block_root, _, slot)| (block_root, slot))
|
||||
.map(|result| result.map(|(block_root, _, slot)| (block_root, slot)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`.
|
||||
pub struct RootsIterator<'a, T: EthSpec, U: Store<T>> {
|
||||
store: Arc<U>,
|
||||
pub struct RootsIterator<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> {
|
||||
store: Arc<HotColdDB<T, Hot, Cold>>,
|
||||
beacon_state: Cow<'a, BeaconState<T>>,
|
||||
slot: Slot,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Clone for RootsIterator<'a, T, U> {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Clone
|
||||
for RootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
store: self.store.clone(),
|
||||
@@ -141,8 +157,8 @@ impl<'a, T: EthSpec, U: Store<T>> Clone for RootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> RootsIterator<'a, T, Hot, Cold> {
|
||||
pub fn new(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
Self {
|
||||
store,
|
||||
slot: beacon_state.slot,
|
||||
@@ -150,7 +166,7 @@ impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
|
||||
pub fn owned(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: BeaconState<T>) -> Self {
|
||||
Self {
|
||||
store,
|
||||
slot: beacon_state.slot,
|
||||
@@ -158,7 +174,10 @@ impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_block(store: Arc<U>, block_hash: Hash256) -> Result<Self, Error> {
|
||||
pub fn from_block(
|
||||
store: Arc<HotColdDB<T, Hot, Cold>>,
|
||||
block_hash: Hash256,
|
||||
) -> Result<Self, Error> {
|
||||
let block = store
|
||||
.get_block(&block_hash)?
|
||||
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
|
||||
@@ -167,15 +186,10 @@ impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
|
||||
.ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?;
|
||||
Ok(Self::owned(store, state))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
|
||||
/// (block_root, state_root, slot)
|
||||
type Item = (Hash256, Hash256, Slot);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
fn do_next(&mut self) -> Result<Option<(Hash256, Hash256, Slot)>, Error> {
|
||||
if self.slot == 0 || self.slot > self.beacon_state.slot {
|
||||
return None;
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
self.slot -= 1;
|
||||
@@ -184,7 +198,7 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
|
||||
self.beacon_state.get_block_root(self.slot),
|
||||
self.beacon_state.get_state_root(self.slot),
|
||||
) {
|
||||
(Ok(block_root), Ok(state_root)) => Some((*block_root, *state_root, self.slot)),
|
||||
(Ok(block_root), Ok(state_root)) => Ok(Some((*block_root, *state_root, self.slot))),
|
||||
(Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => {
|
||||
// Read a `BeaconState` from the store that has access to prior historical roots.
|
||||
let beacon_state =
|
||||
@@ -192,25 +206,39 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
|
||||
|
||||
self.beacon_state = Cow::Owned(beacon_state);
|
||||
|
||||
let block_root = *self.beacon_state.get_block_root(self.slot).ok()?;
|
||||
let state_root = *self.beacon_state.get_state_root(self.slot).ok()?;
|
||||
let block_root = *self.beacon_state.get_block_root(self.slot)?;
|
||||
let state_root = *self.beacon_state.get_state_root(self.slot)?;
|
||||
|
||||
Some((block_root, state_root, self.slot))
|
||||
Ok(Some((block_root, state_root, self.slot)))
|
||||
}
|
||||
_ => None,
|
||||
(Err(e), _) => Err(e.into()),
|
||||
(Ok(_), Err(e)) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
|
||||
for RootsIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
/// (block_root, state_root, slot)
|
||||
type Item = Result<(Hash256, Hash256, Slot), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.do_next().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
/// Block iterator that uses the `parent_root` of each block to backtrack.
|
||||
pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store<E>> {
|
||||
store: &'a S,
|
||||
pub struct ParentRootBlockIterator<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
store: &'a HotColdDB<E, Hot, Cold>,
|
||||
next_block_root: Hash256,
|
||||
_phantom: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, S: Store<E>> ParentRootBlockIterator<'a, E, S> {
|
||||
pub fn new(store: &'a S, start_block_root: Hash256) -> Self {
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
|
||||
ParentRootBlockIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
pub fn new(store: &'a HotColdDB<E, Hot, Cold>, start_block_root: Hash256) -> Self {
|
||||
Self {
|
||||
store,
|
||||
next_block_root: start_block_root,
|
||||
@@ -235,7 +263,9 @@ impl<'a, E: EthSpec, S: Store<E>> ParentRootBlockIterator<'a, E, S> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, S: Store<E>> Iterator for ParentRootBlockIterator<'a, E, S> {
|
||||
impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Iterator
|
||||
for ParentRootBlockIterator<'a, E, Hot, Cold>
|
||||
{
|
||||
type Item = Result<(Hash256, SignedBeaconBlock<E>), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
@@ -245,47 +275,59 @@ impl<'a, E: EthSpec, S: Store<E>> Iterator for ParentRootBlockIterator<'a, E, S>
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots.
|
||||
pub struct BlockIterator<'a, T: EthSpec, U: Store<T>> {
|
||||
roots: BlockRootsIterator<'a, T, U>,
|
||||
pub struct BlockIterator<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> {
|
||||
roots: BlockRootsIterator<'a, T, Hot, Cold>,
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> BlockIterator<'a, T, U> {
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> BlockIterator<'a, T, Hot, Cold> {
|
||||
/// Create a new iterator over all blocks in the given `beacon_state` and prior states.
|
||||
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
pub fn new(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: &'a BeaconState<T>) -> Self {
|
||||
Self {
|
||||
roots: BlockRootsIterator::new(store, beacon_state),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new iterator over all blocks in the given `beacon_state` and prior states.
|
||||
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
|
||||
pub fn owned(store: Arc<HotColdDB<T, Hot, Cold>>, beacon_state: BeaconState<T>) -> Self {
|
||||
Self {
|
||||
roots: BlockRootsIterator::owned(store, beacon_state),
|
||||
}
|
||||
}
|
||||
|
||||
fn do_next(&mut self) -> Result<Option<SignedBeaconBlock<T>>, Error> {
|
||||
if let Some(result) = self.roots.next() {
|
||||
let (root, _slot) = result?;
|
||||
self.roots.inner.store.get_block(&root)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockIterator<'a, T, U> {
|
||||
type Item = SignedBeaconBlock<T>;
|
||||
impl<'a, T: EthSpec, Hot: ItemStore<T>, Cold: ItemStore<T>> Iterator
|
||||
for BlockIterator<'a, T, Hot, Cold>
|
||||
{
|
||||
type Item = Result<SignedBeaconBlock<T>, Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let (root, _slot) = self.roots.next()?;
|
||||
self.roots.inner.store.get_block(&root).ok()?
|
||||
self.do_next().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the next state to use whilst backtracking in `*RootsIterator`.
|
||||
fn next_historical_root_backtrack_state<E: EthSpec, S: Store<E>>(
|
||||
store: &S,
|
||||
fn next_historical_root_backtrack_state<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
current_state: &BeaconState<E>,
|
||||
) -> Option<BeaconState<E>> {
|
||||
) -> Result<BeaconState<E>, Error> {
|
||||
// For compatibility with the freezer database's restore points, we load a state at
|
||||
// a restore point slot (thus avoiding replaying blocks). In the case where we're
|
||||
// not frozen, this just means we might not jump back by the maximum amount on
|
||||
// our first jump (i.e. at most 1 extra state load).
|
||||
let new_state_slot = slot_of_prev_restore_point::<E>(current_state.slot);
|
||||
let new_state_root = current_state.get_state_root(new_state_slot).ok()?;
|
||||
store.get_state(new_state_root, Some(new_state_slot)).ok()?
|
||||
let new_state_root = current_state.get_state_root(new_state_slot)?;
|
||||
Ok(store
|
||||
.get_state(new_state_root, Some(new_state_slot))?
|
||||
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
|
||||
}
|
||||
|
||||
/// Compute the slot of the last guaranteed restore point in the freezer database.
|
||||
@@ -297,8 +339,10 @@ fn slot_of_prev_restore_point<E: EthSpec>(current_slot: Slot) -> Slot {
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::MemoryStore;
|
||||
use types::{test_utils::TestingBeaconStateBuilder, Keypair, MainnetEthSpec};
|
||||
use crate::config::StoreConfig;
|
||||
use crate::HotColdDB;
|
||||
use sloggers::{null::NullLoggerBuilder, Build};
|
||||
use types::{test_utils::TestingBeaconStateBuilder, ChainSpec, Keypair, MainnetEthSpec};
|
||||
|
||||
fn get_state<T: EthSpec>() -> BeaconState<T> {
|
||||
let builder = TestingBeaconStateBuilder::from_single_keypair(
|
||||
@@ -312,7 +356,10 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn block_root_iter() {
|
||||
let store = Arc::new(MemoryStore::open());
|
||||
let log = NullLoggerBuilder.build().unwrap();
|
||||
let store = Arc::new(
|
||||
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(),
|
||||
);
|
||||
let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root();
|
||||
|
||||
let mut state_a: BeaconState<MainnetEthSpec> = get_state();
|
||||
@@ -337,11 +384,12 @@ mod test {
|
||||
let iter = BlockRootsIterator::new(store, &state_b);
|
||||
|
||||
assert!(
|
||||
iter.clone().any(|(_root, slot)| slot == 0),
|
||||
iter.clone()
|
||||
.any(|result| result.map(|(_root, slot)| slot == 0).unwrap()),
|
||||
"iter should contain zero slot"
|
||||
);
|
||||
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect::<Result<Vec<_>, _>>().unwrap();
|
||||
collected.reverse();
|
||||
|
||||
let expected_len = 2 * MainnetEthSpec::slots_per_historical_root();
|
||||
@@ -355,7 +403,10 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn state_root_iter() {
|
||||
let store = Arc::new(MemoryStore::open());
|
||||
let log = NullLoggerBuilder.build().unwrap();
|
||||
let store = Arc::new(
|
||||
HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log).unwrap(),
|
||||
);
|
||||
let slots_per_historical_root = MainnetEthSpec::slots_per_historical_root();
|
||||
|
||||
let mut state_a: BeaconState<MainnetEthSpec> = get_state();
|
||||
@@ -386,11 +437,12 @@ mod test {
|
||||
let iter = StateRootsIterator::new(store, &state_b);
|
||||
|
||||
assert!(
|
||||
iter.clone().any(|(_root, slot)| slot == 0),
|
||||
iter.clone()
|
||||
.any(|result| result.map(|(_root, slot)| slot == 0).unwrap()),
|
||||
"iter should contain zero slot"
|
||||
);
|
||||
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect();
|
||||
let mut collected: Vec<(Hash256, Slot)> = iter.collect::<Result<Vec<_>, _>>().unwrap();
|
||||
collected.reverse();
|
||||
|
||||
let expected_len = MainnetEthSpec::slots_per_historical_root() * 2;
|
||||
|
||||
@@ -48,7 +48,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES);
|
||||
|
||||
self.db
|
||||
.get(self.read_options(), column_key)
|
||||
.get(self.read_options(), BytesKey::from_vec(column_key))
|
||||
.map_err(Into::into)
|
||||
.map(|opt| {
|
||||
opt.map(|bytes| {
|
||||
@@ -68,7 +68,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);
|
||||
|
||||
self.db
|
||||
.put(self.write_options(), column_key, val)
|
||||
.put(self.write_options(), BytesKey::from_vec(column_key), val)
|
||||
.map_err(Into::into)
|
||||
.map(|()| {
|
||||
metrics::stop_timer(timer);
|
||||
@@ -82,7 +82,7 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
metrics::inc_counter(&metrics::DISK_DB_EXISTS_COUNT);
|
||||
|
||||
self.db
|
||||
.get(self.read_options(), column_key)
|
||||
.get(self.read_options(), BytesKey::from_vec(column_key))
|
||||
.map_err(Into::into)
|
||||
.and_then(|val| Ok(val.is_some()))
|
||||
}
|
||||
@@ -94,34 +94,16 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
|
||||
metrics::inc_counter(&metrics::DISK_DB_DELETE_COUNT);
|
||||
|
||||
self.db
|
||||
.delete(self.write_options(), column_key)
|
||||
.delete(self.write_options(), BytesKey::from_vec(column_key))
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> {
|
||||
fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> {
|
||||
let mut leveldb_batch = Writebatch::new();
|
||||
for op in ops_batch {
|
||||
for op in ops_batch.into_iter() {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
let key =
|
||||
get_key_for_col(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes());
|
||||
leveldb_batch.delete(key);
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_hash, slot) => {
|
||||
let untyped_hash: Hash256 = (*state_hash).into();
|
||||
let state_summary_key = get_key_for_col(
|
||||
DBColumn::BeaconStateSummary.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
);
|
||||
leveldb_batch.delete(state_summary_key);
|
||||
|
||||
if *slot % E::slots_per_epoch() == 0 {
|
||||
let state_key =
|
||||
get_key_for_col(DBColumn::BeaconState.into(), untyped_hash.as_bytes());
|
||||
leveldb_batch.delete(state_key);
|
||||
}
|
||||
KeyValueStoreOp::DeleteKey(key) => {
|
||||
leveldb_batch.delete(BytesKey::from_vec(key.to_vec()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,10 +129,10 @@ impl Key for BytesKey {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey {
|
||||
let mut col = col.as_bytes().to_vec();
|
||||
col.append(&mut key.to_vec());
|
||||
BytesKey { key: col }
|
||||
impl BytesKey {
|
||||
fn from_vec(key: Vec<u8>) -> Self {
|
||||
Self { key }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LevelDBError> for Error {
|
||||
|
||||
@@ -13,7 +13,7 @@ extern crate lazy_static;
|
||||
pub mod chunked_iter;
|
||||
pub mod chunked_vector;
|
||||
pub mod config;
|
||||
mod errors;
|
||||
pub mod errors;
|
||||
mod forwards_iter;
|
||||
pub mod hot_cold_store;
|
||||
mod impls;
|
||||
@@ -25,8 +25,6 @@ mod state_batch;
|
||||
|
||||
pub mod iter;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use self::config::StoreConfig;
|
||||
pub use self::hot_cold_store::{HotColdDB, HotStateSummary};
|
||||
pub use self::leveldb_store::LevelDB;
|
||||
@@ -52,7 +50,17 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>;
|
||||
|
||||
/// Execute either all of the operations in `batch` or none at all, returning an error.
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>;
|
||||
fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
|
||||
let mut result = column.as_bytes().to_vec();
|
||||
result.extend_from_slice(key);
|
||||
result
|
||||
}
|
||||
|
||||
pub enum KeyValueStoreOp {
|
||||
DeleteKey(Vec<u8>),
|
||||
}
|
||||
|
||||
pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'static {
|
||||
@@ -93,75 +101,6 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
|
||||
}
|
||||
}
|
||||
|
||||
/// An object capable of storing and retrieving objects implementing `StoreItem`.
|
||||
///
|
||||
/// A `Store` is fundamentally backed by a key-value database, however it provides support for
|
||||
/// columns. A simple column implementation might involve prefixing a key with some bytes unique to
|
||||
/// each column.
|
||||
pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
type ForwardsBlockRootsIterator: Iterator<Item = (Hash256, Slot)>;
|
||||
|
||||
/// Store a block in the store.
|
||||
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error>;
|
||||
|
||||
/// Fetch a block from the store.
|
||||
fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error>;
|
||||
|
||||
/// Delete a block from the store.
|
||||
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error>;
|
||||
|
||||
/// Store a state in the store.
|
||||
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;
|
||||
|
||||
/// Store a state summary in the store.
|
||||
fn put_state_summary(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
summary: HotStateSummary,
|
||||
) -> Result<(), Error>;
|
||||
|
||||
/// Fetch a state from the store.
|
||||
fn get_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
slot: Option<Slot>,
|
||||
) -> Result<Option<BeaconState<E>>, Error>;
|
||||
|
||||
/// Delete a state from the store.
|
||||
fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error>;
|
||||
|
||||
/// Get a forwards (slot-ascending) iterator over the beacon block roots since `start_slot`.
|
||||
///
|
||||
/// Will be efficient for frozen portions of the database if using `HotColdDB`.
|
||||
///
|
||||
/// The `end_state` and `end_block_root` are required for backtracking in the post-finalization
|
||||
/// part of the chain, and should be usually be set to the current head. Importantly, the
|
||||
/// `end_state` must be a state that has had a block applied to it, and the hash of that
|
||||
/// block must be `end_block_root`.
|
||||
// NOTE: could maybe optimise by getting the `BeaconState` and end block root from a closure, as
|
||||
// it's not always required.
|
||||
fn forwards_block_roots_iterator(
|
||||
store: Arc<Self>,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
spec: &ChainSpec,
|
||||
) -> Self::ForwardsBlockRootsIterator;
|
||||
|
||||
fn load_epoch_boundary_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
) -> Result<Option<BeaconState<E>>, Error>;
|
||||
|
||||
fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error>;
|
||||
|
||||
fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error>;
|
||||
|
||||
fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error>;
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>;
|
||||
}
|
||||
|
||||
/// Reified key-value storage operation. Helps in modifying the storage atomically.
|
||||
/// See also https://github.com/sigp/lighthouse/issues/692
|
||||
pub enum StoreOp {
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
use super::{DBColumn, Error, ItemStore, KeyValueStore, Store, StoreOp};
|
||||
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
|
||||
use crate::hot_cold_store::HotStateSummary;
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use crate::StoreItem;
|
||||
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
|
||||
use parking_lot::RwLock;
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
use types::*;
|
||||
|
||||
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
|
||||
@@ -46,53 +41,34 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
||||
/// Get the value of some key from the database. Returns `None` if the key does not exist.
|
||||
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
|
||||
let column_key = Self::get_key_for_col(col, key);
|
||||
|
||||
Ok(self.db.read().get(&column_key).cloned())
|
||||
}
|
||||
|
||||
/// Puts a key in the database.
|
||||
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
|
||||
let column_key = Self::get_key_for_col(col, key);
|
||||
|
||||
self.db.write().insert(column_key, val.to_vec());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return true if some key exists in some column.
|
||||
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
|
||||
let column_key = Self::get_key_for_col(col, key);
|
||||
|
||||
Ok(self.db.read().contains_key(&column_key))
|
||||
}
|
||||
|
||||
/// Delete some key from the database.
|
||||
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
|
||||
let column_key = Self::get_key_for_col(col, key);
|
||||
|
||||
self.db.write().remove(&column_key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> {
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?;
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_hash, slot) => {
|
||||
let untyped_hash: Hash256 = (*state_hash).into();
|
||||
if *slot % E::slots_per_epoch() == 0 {
|
||||
self.key_delete(DBColumn::BeaconState.into(), untyped_hash.as_bytes())?;
|
||||
} else {
|
||||
self.key_delete(
|
||||
DBColumn::BeaconStateSummary.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
)?;
|
||||
}
|
||||
KeyValueStoreOp::DeleteKey(hash) => {
|
||||
self.db.write().remove(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -101,94 +77,3 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
|
||||
}
|
||||
|
||||
impl<E: EthSpec> ItemStore<E> for MemoryStore<E> {}
|
||||
|
||||
impl<E: EthSpec> Store<E> for MemoryStore<E> {
|
||||
type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator;
|
||||
|
||||
fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock<E>) -> Result<(), Error> {
|
||||
self.put(block_root, &block)
|
||||
}
|
||||
|
||||
fn get_block(&self, block_root: &Hash256) -> Result<Option<SignedBeaconBlock<E>>, Error> {
|
||||
self.get(block_root)
|
||||
}
|
||||
|
||||
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
fn put_state_summary(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
summary: HotStateSummary,
|
||||
) -> Result<(), Error> {
|
||||
self.put(state_root, &summary).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Store a state in the store.
|
||||
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
|
||||
store_full_state(self, state_root, &state)
|
||||
}
|
||||
|
||||
/// Fetch a state from the store.
|
||||
fn get_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
_: Option<Slot>,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
get_full_state(self, state_root)
|
||||
}
|
||||
|
||||
fn delete_state(&self, state_root: &Hash256, _slot: Slot) -> Result<(), Error> {
|
||||
self.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())
|
||||
}
|
||||
|
||||
fn forwards_block_roots_iterator(
|
||||
store: Arc<Self>,
|
||||
start_slot: Slot,
|
||||
end_state: BeaconState<E>,
|
||||
end_block_root: Hash256,
|
||||
_: &ChainSpec,
|
||||
) -> Self::ForwardsBlockRootsIterator {
|
||||
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
|
||||
}
|
||||
|
||||
/// Load the most recent ancestor state of `state_root` which lies on an epoch boundary.
|
||||
///
|
||||
/// If `state_root` corresponds to an epoch boundary state, then that state itself should be
|
||||
/// returned.
|
||||
fn load_epoch_boundary_state(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
) -> Result<Option<BeaconState<E>>, Error> {
|
||||
// The default implementation is not very efficient, but isn't used in prod.
|
||||
// See `HotColdDB` for the optimized implementation.
|
||||
if let Some(state) = self.get_state(state_root, None)? {
|
||||
let epoch_boundary_slot = state.slot / E::slots_per_epoch() * E::slots_per_epoch();
|
||||
if state.slot == epoch_boundary_slot {
|
||||
Ok(Some(state))
|
||||
} else {
|
||||
let epoch_boundary_state_root = state.get_state_root(epoch_boundary_slot)?;
|
||||
self.get_state(epoch_boundary_state_root, Some(epoch_boundary_slot))
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
|
||||
self.put(key, item)
|
||||
}
|
||||
|
||||
fn get_item<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
|
||||
self.get(key)
|
||||
}
|
||||
|
||||
fn item_exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
|
||||
self.exists::<I>(key)
|
||||
}
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
KeyValueStore::do_atomically(self, batch)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{Error, HotStateSummary, Store};
|
||||
use crate::{Error, HotColdDB, HotStateSummary, ItemStore};
|
||||
use types::{BeaconState, EthSpec, Hash256};
|
||||
|
||||
/// A collection of states to be stored in the database.
|
||||
@@ -36,7 +36,10 @@ impl<E: EthSpec> StateBatch<E> {
|
||||
/// Write the batch to the database.
|
||||
///
|
||||
/// May fail to write the full batch if any of the items error (i.e. not atomic!)
|
||||
pub fn commit<S: Store<E>>(self, store: &S) -> Result<(), Error> {
|
||||
pub fn commit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
self,
|
||||
store: &HotColdDB<E, Hot, Cold>,
|
||||
) -> Result<(), Error> {
|
||||
self.items.into_iter().try_for_each(|item| match item {
|
||||
BatchItem::Full(state_root, state) => store.put_state(&state_root, &state),
|
||||
BatchItem::Summary(state_root, summary) => {
|
||||
|
||||
Reference in New Issue
Block a user