Beacon state diffs!

This commit is contained in:
Michael Sproul
2022-02-24 08:46:03 +11:00
parent 0a4dcdd4e3
commit 143cf59504
38 changed files with 860 additions and 277 deletions

View File

@@ -73,8 +73,8 @@ impl StoreItem for OnDiskStoreConfig {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -47,6 +47,8 @@ pub enum Error {
BlockReplayError(BlockReplayError),
#[cfg(feature = "milhouse")]
MilhouseError(milhouse::Error),
Bincode(Box<bincode::ErrorKind>),
FlateCompression(std::io::Error),
}
pub trait HandleUnavailable<T> {
@@ -112,6 +114,12 @@ impl From<BlockReplayError> for Error {
}
}
impl From<Box<bincode::ErrorKind>> for Error {
fn from(e: Box<bincode::ErrorKind>) -> Self {
Self::Bincode(e)
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,

View File

@@ -21,6 +21,7 @@ use crate::{
};
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use milhouse::Diff;
use parking_lot::{Mutex, RwLock};
use safe_arith::SafeArith;
use serde_derive::{Deserialize, Serialize};
@@ -89,6 +90,7 @@ pub enum HotColdDBError {
MissingEpochBoundaryState(Hash256),
MissingPrevState(Hash256),
MissingSplitState(Hash256, Slot),
MissingStateDiff(Hash256),
MissingAnchorInfo,
HotStateSummaryError(BeaconStateError),
RestorePointDecodeError(ssz::DecodeError),
@@ -454,17 +456,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint
/// (which will be deleted by this function but shouldn't be).
pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
// Delete the state summary.
self.hot_db
.key_delete(DBColumn::BeaconStateSummary.into(), state_root.as_bytes())?;
// Delete the full state if it lies on an epoch boundary.
if slot % E::slots_per_epoch() == 0 {
self.hot_db
.key_delete(DBColumn::BeaconState.into(), state_root.as_bytes())?;
}
Ok(())
self.do_atomically(vec![StoreOp::DeleteState(*state_root, Some(slot))])
}
pub fn forwards_block_roots_iterator(
@@ -519,36 +511,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
HybridForwardsStateRootsIterator::new(self, start_slot, Some(end_slot), get_state, spec)
}
/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.
pub fn load_epoch_boundary_state(
&self,
state_root: &Hash256,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(HotStateSummary {
epoch_boundary_state_root,
..
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let state = self.get_hot_state(&epoch_boundary_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
Ok(Some(state))
} else {
// Try the cold DB
match self.load_cold_state_slot(state_root)? {
Some(state_slot) => {
let epoch_boundary_slot =
state_slot / E::slots_per_epoch() * E::slots_per_epoch();
self.load_cold_state_by_slot(epoch_boundary_slot)
}
None => Ok(None),
}
}
}
pub fn put_item<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
self.hot_db.put(key, item)
}
@@ -575,7 +537,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
StoreOp::PutStateTemporaryFlag(state_root) => {
key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root));
key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)?);
}
StoreOp::DeleteStateTemporaryFlag(state_root) => {
@@ -595,9 +557,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));
if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) {
// Delete full state if any.
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
// Delete diff too.
let diff_key = get_key_for_col(
DBColumn::BeaconStateDiff.into(),
state_root.as_bytes(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key));
}
}
}
@@ -606,7 +576,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
let mut block_cache = self.block_cache.lock();
self.hot_db
.do_atomically(self.convert_to_kv_batch(&batch)?)?;
@@ -614,7 +584,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in &batch {
match op {
StoreOp::PutBlock(block_root, block) => {
guard.put(*block_root, (**block).clone());
block_cache.put(*block_root, (**block).clone());
}
StoreOp::PutState(_, _) => (),
@@ -624,10 +594,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteStateTemporaryFlag(_) => (),
StoreOp::DeleteBlock(block_root) => {
guard.pop(block_root);
block_cache.pop(block_root);
}
StoreOp::DeleteState(_, _) => (),
StoreOp::DeleteState(state_root, _) => {
// FIXME(sproul): atomics are a bit sketchy here
self.state_cache.lock().delete(state_root)
}
}
}
Ok(())
@@ -656,27 +629,67 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(());
}
// On the epoch boundary, store the full state.
if state.slot() % E::slots_per_epoch() == 0 {
trace!(
self.log,
"Storing full state on epoch boundary";
"slot" => state.slot().as_u64(),
"state_root" => format!("{:?}", state_root)
);
store_full_state(state_root, state, ops)?;
}
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
let hot_state_summary = HotStateSummary::new(state_root, state)?;
let op = hot_state_summary.as_kv_store_op(*state_root);
let op = hot_state_summary.as_kv_store_op(*state_root)?;
ops.push(op);
// On the epoch boundary, store a diff from the previous epoch boundary state -- unless
// we're at a fork boundary in which case the full state must be stored.
if state.slot() % E::slots_per_epoch() == 0 {
if let Some(fork) = self.spec.fork_activated_at_slot::<E>(state.slot()) {
info!(
self.log,
"Storing fork transition state";
"fork" => %fork,
"slot" => state.slot(),
"state_root" => ?state_root,
);
self.store_full_state_in_batch(state_root, state, ops)?;
} else {
debug!(
self.log,
"Storing state diff on epoch boundary";
"slot" => state.slot(),
"state_root" => ?state_root,
);
let prev_epoch_state_root = hot_state_summary.epoch_boundary_state_root;
let prev_boundary_state = self.get_hot_state(&prev_epoch_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root),
)?;
let compute_diff_timer =
metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME);
let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?;
drop(compute_diff_timer);
ops.push(diff.as_kv_store_op(*state_root)?);
}
}
Ok(())
}
pub fn store_full_state(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
let mut ops = Vec::with_capacity(4);
self.store_full_state_in_batch(state_root, state, &mut ops)?;
self.hot_db.do_atomically(ops)
}
pub fn store_full_state_in_batch(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
store_full_state(state_root, state, ops)
}
/// Get a post-finalization state from the database or store.
pub fn get_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
@@ -715,18 +728,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// If the state is the finalized state, load it from disk. This should only be necessary
// once during start-up, after which point the finalized state will be cached.
if *state_root == self.get_split_info().state_root {
let mut state = get_full_state(&self.hot_db, state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
// Do a tree hash here so that the cache is fully built.
state.update_tree_hash_cache()?;
let latest_block_root = state.get_latest_block_root(*state_root);
return Ok(Some((state, latest_block_root)));
return self.load_hot_state_full(state_root).map(Some);
}
// If the state is marked as temporary, do not return it. It will become visible
// only once its transaction commits and deletes its temporary flag.
// FIXME(sproul): reconsider
if self.load_state_temporary_flag(state_root)?.is_some() {
return Ok(None);
}
@@ -734,15 +741,24 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if let Some(HotStateSummary {
slot,
latest_block_root,
epoch_boundary_state_root,
prev_state_root,
..
}) = self.load_hot_state_summary(state_root)?
{
// Load prior state, potentially from the cache.
//
// This can backtrack as far as the finalized state in extreme cases, but will prime
// the cache with every intermediate state while doing so, meaning that this work should
// be repeated infrequently.
// On a fork boundary slot load a full state from disk.
if self.spec.fork_activated_at_slot::<E>(slot).is_some() {
return self.load_hot_state_full(state_root).map(Some);
}
// On any other epoch boundary load and apply a diff.
if slot % E::slots_per_epoch() == 0 {
return self
.load_state_from_diff(*state_root, epoch_boundary_state_root)
.map(Some);
}
// Otherwise load the prior state, potentially from the cache, and replay a single block
// on top of it.
let prev_state = self
.get_hot_state(&prev_state_root)?
.ok_or(HotColdDBError::MissingPrevState(prev_state_root))?;
@@ -761,6 +777,43 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
pub fn load_hot_state_full(
&self,
state_root: &Hash256,
) -> Result<(BeaconState<E>, Hash256), Error> {
let mut state = get_full_state(&self.hot_db, state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
// Do a tree hash here so that the cache is fully built.
state.update_tree_hash_cache()?;
let latest_block_root = state.get_latest_block_root(*state_root);
Ok((state, latest_block_root))
}
pub fn load_state_from_diff(
&self,
state_root: Hash256,
prev_epoch_state_root: Hash256,
) -> Result<(BeaconState<E>, Hash256), Error> {
let diff = self.load_state_diff(state_root)?;
let mut state = self.get_hot_state(&prev_epoch_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(prev_epoch_state_root),
)?;
diff.apply_diff(&mut state)?;
// Do a tree hash here so that the cache is fully built.
state.update_tree_hash_cache()?;
let latest_block_root = state.get_latest_block_root(state_root);
Ok((state, latest_block_root))
}
pub fn load_state_diff(&self, state_root: Hash256) -> Result<BeaconStateDiff<E>, Error> {
self.get_item(&state_root)?
.ok_or(HotColdDBError::MissingStateDiff(state_root).into())
}
/// Store a pre-finalization state in the freezer database.
///
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
@@ -770,7 +823,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
ops.push(ColdStateSummary { slot: state.slot() }.as_kv_store_op(*state_root));
ops.push(ColdStateSummary { slot: state.slot() }.as_kv_store_op(*state_root)?);
if state.slot() % self.config.slots_per_restore_point != 0 {
return Ok(());
@@ -797,7 +850,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// 3. Store restore point.
let restore_point_index = state.slot().as_u64() / self.config.slots_per_restore_point;
self.store_restore_point_hash(restore_point_index, *state_root, ops);
self.store_restore_point_hash(restore_point_index, *state_root, ops)?;
Ok(())
}
@@ -1031,7 +1084,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let column = SchemaVersion::db_column().into();
let key = SCHEMA_VERSION_KEY.as_bytes();
let db_key = get_key_for_col(column, key);
let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes());
let op = KeyValueStoreOp::PutKeyValue(db_key, schema_version.as_store_bytes()?);
ops.push(op);
self.hot_db.do_atomically(ops)
@@ -1080,7 +1133,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
) -> Result<KeyValueStoreOp, Error> {
let mut anchor_info = self.anchor_info.write();
if *anchor_info == prev_value {
let kv_op = self.store_anchor_info_in_batch(&new_value);
let kv_op = self.store_anchor_info_in_batch(&new_value)?;
*anchor_info = new_value;
Ok(kv_op)
} else {
@@ -1107,14 +1160,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
///
/// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues
/// with recursive locking.
fn store_anchor_info_in_batch(&self, anchor_info: &Option<AnchorInfo>) -> KeyValueStoreOp {
fn store_anchor_info_in_batch(
&self,
anchor_info: &Option<AnchorInfo>,
) -> Result<KeyValueStoreOp, Error> {
if let Some(ref anchor_info) = anchor_info {
anchor_info.as_kv_store_op(ANCHOR_INFO_KEY)
} else {
KeyValueStoreOp::DeleteKey(get_key_for_col(
Ok(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconMeta.into(),
ANCHOR_INFO_KEY.as_bytes(),
))
)))
}
}
@@ -1184,7 +1240,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Stage the split for storage to disk.
pub fn store_split_in_batch(&self) -> KeyValueStoreOp {
pub fn store_split_in_batch(&self) -> Result<KeyValueStoreOp, Error> {
self.split.read_recursive().as_kv_store_op(SPLIT_KEY)
}
@@ -1203,10 +1259,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
restore_point_index: u64,
state_root: Hash256,
ops: &mut Vec<KeyValueStoreOp>,
) {
) -> Result<(), Error> {
let value = &RestorePointHash { state_root };
let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index));
let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index))?;
ops.push(op);
Ok(())
}
/// Convert a `restore_point_index` into a database key.
@@ -1293,11 +1350,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Store the checkpoint to begin pruning from (the "old finalized checkpoint").
pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> {
self.hot_db
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)])
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)?])
}
/// Create a staged store for the pruning checkpoint.
pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp {
pub fn pruning_checkpoint_store_op(
&self,
checkpoint: Checkpoint,
) -> Result<KeyValueStoreOp, Error> {
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
}
@@ -1353,10 +1413,14 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
}
// Store the new finalized state as a full state in the database. It would likely previously
// have been stored as a diff.
store.store_full_state(&finalized_state_root, finalized_state)?;
// Copy all of the states between the new finalized state and the split slot, from the hot DB to
// the cold DB.
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(&store, finalized_state);
for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => {
@@ -1380,7 +1444,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
let cold_state_summary = ColdStateSummary { slot };
let op = cold_state_summary.as_kv_store_op(state_root);
let op = cold_state_summary.as_kv_store_op(state_root)?;
cold_db_ops.push(op);
// There are data dependencies between calls to `store_cold_state()` that prevent us from
@@ -1471,8 +1535,8 @@ impl StoreItem for Split {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -1498,8 +1562,8 @@ impl StoreItem for HotStateSummary {
DBColumn::BeaconStateSummary
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -1514,7 +1578,7 @@ impl HotStateSummary {
// slots where there isn't a skip).
let slot = state.slot();
let latest_block_root = state.get_latest_block_root(*state_root);
let epoch_boundary_slot = slot / E::slots_per_epoch() * E::slots_per_epoch();
let epoch_boundary_slot = (slot - 1) / E::slots_per_epoch() * E::slots_per_epoch();
let epoch_boundary_state_root = if epoch_boundary_slot == slot {
*state_root
} else {
@@ -1550,8 +1614,8 @@ impl StoreItem for ColdStateSummary {
DBColumn::BeaconStateSummary
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -1570,8 +1634,8 @@ impl StoreItem for RestorePointHash {
DBColumn::BeaconRestorePoint
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -1587,8 +1651,8 @@ impl StoreItem for TemporaryFlag {
DBColumn::BeaconStateTemporary
}
fn as_store_bytes(&self) -> Vec<u8> {
vec![]
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(vec![])
}
fn from_store_bytes(_: &[u8]) -> Result<Self, Error> {

View File

@@ -26,6 +26,7 @@ pub mod metrics;
mod partial_beacon_state;
pub mod reconstruct;
mod state_cache;
mod state_diff;
pub mod iter;
@@ -94,7 +95,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
let column = I::db_column().into();
let key = key.as_bytes();
self.put_bytes(column, key, &item.as_store_bytes())
self.put_bytes(column, key, &item.as_store_bytes()?)
.map_err(Into::into)
}
@@ -102,7 +103,7 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
let column = I::db_column().into();
let key = key.as_bytes();
self.put_bytes_sync(column, key, &item.as_store_bytes())
self.put_bytes_sync(column, key, &item.as_store_bytes()?)
.map_err(Into::into)
}
@@ -151,7 +152,15 @@ pub enum DBColumn {
/// For data related to the database itself.
BeaconMeta,
BeaconBlock,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
BeaconState,
/// For compact `BeaconStateDiff`s.
BeaconStateDiff,
/// For the mapping from state roots to their slots or summaries.
BeaconStateSummary,
/// For the list of temporary states stored during block import,
/// and then made non-temporary by the deletion of their state root from this column.
BeaconStateTemporary,
/// For persisting in-memory state to the database.
BeaconChain,
OpPool,
@@ -160,11 +169,6 @@ pub enum DBColumn {
PubkeyCache,
/// For the table mapping restore point numbers to state roots.
BeaconRestorePoint,
/// For the mapping from state roots to their slots or summaries.
BeaconStateSummary,
/// For the list of temporary states stored during block import,
/// and then made non-temporary by the deletion of their state root from this column.
BeaconStateTemporary,
BeaconBlockRoots,
BeaconStateRoots,
BeaconHistoricalRoots,
@@ -179,14 +183,15 @@ impl Into<&'static str> for DBColumn {
DBColumn::BeaconMeta => "bma",
DBColumn::BeaconBlock => "blk",
DBColumn::BeaconState => "ste",
DBColumn::BeaconStateDiff => "bsd",
DBColumn::BeaconStateSummary => "bss",
DBColumn::BeaconStateTemporary => "bst",
DBColumn::BeaconChain => "bch",
DBColumn::OpPool => "opo",
DBColumn::Eth1Cache => "etc",
DBColumn::ForkChoice => "frk",
DBColumn::PubkeyCache => "pkc",
DBColumn::BeaconRestorePoint => "brp",
DBColumn::BeaconStateSummary => "bss",
DBColumn::BeaconStateTemporary => "bst",
DBColumn::BeaconBlockRoots => "bbr",
DBColumn::BeaconStateRoots => "bsr",
DBColumn::BeaconHistoricalRoots => "bhr",
@@ -212,16 +217,16 @@ pub trait StoreItem: Sized {
fn db_column() -> DBColumn;
/// Serialize `self` as bytes.
fn as_store_bytes(&self) -> Vec<u8>;
fn as_store_bytes(&self) -> Result<Vec<u8>, Error>;
/// De-serialize `self` from bytes.
///
/// Return an instance of the type and the number of bytes that were read.
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error>;
fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp {
fn as_kv_store_op(&self, key: Hash256) -> Result<KeyValueStoreOp, Error> {
let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes())
Ok(KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()?))
}
}
@@ -243,8 +248,8 @@ mod tests {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -30,8 +30,8 @@ impl StoreItem for SchemaVersion {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -52,8 +52,8 @@ impl StoreItem for PruningCheckpoint {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.checkpoint.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.checkpoint.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -71,8 +71,8 @@ impl StoreItem for CompactionTimestamp {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.0.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.0.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
@@ -109,8 +109,8 @@ impl StoreItem for AnchorInfo {
DBColumn::BeaconMeta
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
Ok(self.as_ssz_bytes())
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {

View File

@@ -86,6 +86,33 @@ lazy_static! {
"store_beacon_state_write_bytes_total",
"Total number of beacon state bytes written to the DB"
);
/*
* Beacon state diffs
*/
pub static ref BEACON_STATE_DIFF_WRITE_BYTES: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_diff_write_bytes_total",
"Total number of bytes written for beacon state diffs"
);
pub static ref BEACON_STATE_DIFF_WRITE_COUNT: Result<IntCounter> = try_create_int_counter(
"store_beacon_state_diff_write_count_total",
"Total number of beacon state diffs written"
);
pub static ref BEACON_STATE_DIFF_COMPRESSION_RATIO: Result<Gauge> = try_create_float_gauge(
"store_beacon_state_diff_compression_ratio",
"Compression ratio for beacon state diffs (higher is better)"
);
pub static ref BEACON_STATE_DIFF_COMPUTE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_compute_time",
"Time to calculate a beacon state diff"
);
pub static ref BEACON_STATE_DIFF_ENCODE_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_encode_time",
"Time to encode a beacon state diff as SSZ"
);
pub static ref BEACON_STATE_DIFF_COMPRESSION_TIME: Result<Histogram> = try_create_histogram(
"store_beacon_state_diff_compression_time",
"Time to compress beacon state SSZ using Flate2"
);
/*
* Beacon Block
*/

View File

@@ -88,6 +88,16 @@ impl<E: EthSpec> StateCache<E> {
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<bool, Error> {
if self
.finalized_state
.as_ref()
.map_or(false, |finalized_state| {
finalized_state.state_root == state_root
})
{
// FIXME(sproul): this should technically be true
return Ok(false);
}
if self.states.peek(&state_root).is_some() {
return Ok(true);
}
@@ -136,6 +146,11 @@ impl<E: EthSpec> StateCache<E> {
let state = self.get_by_state_root(state_root)?;
Some((state_root, state))
}
pub fn delete(&mut self, state_root: &Hash256) {
self.states.pop(state_root);
self.block_map.delete(state_root);
}
}
impl BlockMap {
@@ -164,6 +179,16 @@ impl BlockMap {
pruned_states
}
// FIXME(sproul): slow, make generic
fn delete(&mut self, state_root_to_delete: &Hash256) {
self.blocks.retain(|_, slot_map| {
slot_map
.slots
.retain(|_, state_root| state_root != state_root_to_delete);
!slot_map.slots.is_empty()
});
}
}
#[cfg(test)]

View File

@@ -0,0 +1,49 @@
use crate::{metrics, DBColumn, Error, StoreItem};
use flate2::bufread::{ZlibDecoder, ZlibEncoder};
use ssz::{Decode, Encode};
use std::io::Read;
use types::{beacon_state::BeaconStateDiff, EthSpec};
impl<E: EthSpec> StoreItem for BeaconStateDiff<E> {
fn db_column() -> DBColumn {
DBColumn::BeaconStateDiff
}
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
let encode_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_ENCODE_TIME);
let value = self.as_ssz_bytes();
drop(encode_timer);
// FIXME(sproul): try vec with capacity
let compression_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPRESSION_TIME);
let mut encoder = ZlibEncoder::new(&value[..], flate2::Compression::fast());
let mut compressed_value = vec![];
encoder
.read_to_end(&mut compressed_value)
.map_err(Error::FlateCompression)?;
drop(compression_timer);
let compression_ratio = value.len() as f64 / compressed_value.len() as f64;
metrics::set_float_gauge(
&metrics::BEACON_STATE_DIFF_COMPRESSION_RATIO,
compression_ratio,
);
metrics::inc_counter_by(
&metrics::BEACON_STATE_DIFF_WRITE_BYTES,
compressed_value.len() as u64,
);
metrics::inc_counter(&metrics::BEACON_STATE_DIFF_WRITE_COUNT);
Ok(compressed_value)
}
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
let mut ssz_bytes = vec![];
let mut decoder = ZlibDecoder::new(bytes);
decoder
.read_to_end(&mut ssz_bytes)
.map_err(Error::FlateCompression)?;
Ok(Self::from_ssz_bytes(&ssz_bytes)?)
}
}