New state pruning algorithm

This commit is contained in:
Michael Sproul
2022-03-02 15:40:56 +11:00
parent ebe8e30171
commit 64f0e3e13d
6 changed files with 165 additions and 75 deletions

View File

@@ -524,20 +524,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Convert a batch of `StoreOp` to a batch of `KeyValueStoreOp`.
pub fn convert_to_kv_batch(&self, batch: &[StoreOp<E>]) -> Result<Vec<KeyValueStoreOp>, Error> {
pub fn convert_to_kv_batch(
&self,
batch: Vec<StoreOp<E>>,
) -> Result<Vec<KeyValueStoreOp>, Error> {
let mut key_value_batch = Vec::with_capacity(batch.len());
for op in batch {
match op {
StoreOp::PutBlock(block_root, block) => {
key_value_batch.push(self.block_as_kv_store_op(block_root, block));
key_value_batch.push(self.block_as_kv_store_op(&block_root, &block));
}
StoreOp::PutState(state_root, state) => {
self.store_hot_state(state_root, state, &mut key_value_batch)?;
self.store_hot_state(&state_root, state, &mut key_value_batch)?;
}
StoreOp::PutStateTemporaryFlag(state_root) => {
key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)?);
key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root)?);
}
StoreOp::DeleteStateTemporaryFlag(state_root) => {
@@ -570,6 +573,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
key_value_batch.push(KeyValueStoreOp::DeleteKey(diff_key));
}
}
StoreOp::KeyValueOp(kv_op) => key_value_batch.push(kv_op),
}
}
Ok(key_value_batch)
@@ -578,9 +582,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
let mut block_cache = self.block_cache.lock();
self.hot_db
.do_atomically(self.convert_to_kv_batch(&batch)?)?;
for op in &batch {
match op {
StoreOp::PutBlock(block_root, block) => {
@@ -595,14 +596,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::DeleteBlock(block_root) => {
block_cache.pop(block_root);
self.state_cache.lock().delete_block_states(block_root);
}
StoreOp::DeleteState(state_root, _) => {
// FIXME(sproul): atomics are a bit sketchy here
self.state_cache.lock().delete(state_root)
self.state_cache.lock().delete_state(state_root)
}
StoreOp::KeyValueOp(_) => (),
}
}
self.hot_db
.do_atomically(self.convert_to_kv_batch(batch)?)?;
drop(block_cache);
Ok(())
}
@@ -731,13 +738,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return self.load_hot_state_full(state_root).map(Some);
}
// If the state is marked as temporary, do not return it. It will become visible
// only once its transaction commits and deletes its temporary flag.
// FIXME(sproul): reconsider
if self.load_state_temporary_flag(state_root)?.is_some() {
return Ok(None);
}
if let Some(HotStateSummary {
slot,
latest_block_root,
@@ -745,6 +745,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
prev_state_root,
}) = self.load_hot_state_summary(state_root)?
{
// Load the latest block, and use it to confirm the validity of this state.
let latest_block = if let Some(block) = self.get_block(&latest_block_root)? {
block
} else {
// Dangling state, will be deleted fully once finalization advances past it.
debug!(
self.log,
"Ignoring state load for dangling state";
"state_root" => ?state_root,
"slot" => slot,
"latest_block_root" => ?latest_block_root,
);
return Ok(None);
};
// On a fork boundary slot load a full state from disk.
if self.spec.fork_activated_at_slot::<E>(slot).is_some() {
return self.load_hot_state_full(state_root).map(Some);
@@ -757,13 +772,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(Some);
}
// Otherwise load the prior state, potentially from the cache, and replay a single block
// on top of it.
// Otherwise try to load the prior state and replay the `latest_block` on top of it as
// necessary (if it's not a skip slot).
let prev_state = self
.get_hot_state(&prev_state_root)?
.ok_or(HotColdDBError::MissingPrevState(prev_state_root))?;
let blocks = self.load_blocks_to_replay(slot, slot, latest_block_root)?;
let blocks = if latest_block.slot() == slot {
vec![latest_block]
} else {
vec![]
};
let state_roots = [(prev_state_root, slot - 1), (*state_root, slot)];
let state_root_iter = state_roots.into_iter().map(Ok);
@@ -1287,6 +1305,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.get(state_root)
}
/// Iterate all hot state summaries in the database.
pub fn iter_hot_state_summaries(
&self,
) -> impl Iterator<Item = Result<(Hash256, HotStateSummary), Error>> + '_ {
self.hot_db
.iter_column(DBColumn::BeaconStateSummary)
.map(|res| {
let (key, value_bytes) = res?;
Ok((key, HotStateSummary::from_store_bytes(&value_bytes)?))
})
}
/// Load the temporary flag for a state root, if one exists.
///
/// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not
@@ -1549,12 +1579,12 @@ impl StoreItem for Split {
/// Allows full reconstruction by replaying blocks.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
pub struct HotStateSummary {
pub(crate) slot: Slot,
pub(crate) latest_block_root: Hash256,
pub(crate) epoch_boundary_state_root: Hash256,
pub slot: Slot,
pub latest_block_root: Hash256,
pub epoch_boundary_state_root: Hash256,
/// The state root of the state at the prior slot.
// FIXME(sproul): migrate
pub(crate) prev_state_root: Hash256,
pub prev_state_root: Hash256,
}
impl StoreItem for HotStateSummary {

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::hot_cold_store::HotColdDBError;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
@@ -6,7 +7,7 @@ use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::iterator::{Iterable, KeyIterator};
use leveldb::iterator::{Iterable, KeyIterator, LevelDBIterator};
use leveldb::options::{Options, ReadOptions, WriteOptions};
use parking_lot::{Mutex, MutexGuard};
use std::marker::PhantomData;
@@ -167,13 +168,38 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
};
for (start_key, end_key) in vec![
endpoints(DBColumn::BeaconStateTemporary),
endpoints(DBColumn::BeaconState),
endpoints(DBColumn::BeaconStateDiff),
endpoints(DBColumn::BeaconStateSummary),
] {
self.db.compact(&start_key, &end_key);
}
Ok(())
}
/// Iterate through all keys and values in a particular column.
fn iter_column<'a>(
&'a self,
column: DBColumn,
) -> Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a> {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
let iter = self.db.iter(self.read_options());
iter.seek(&start_key);
Box::new(
iter.take_while(move |(key, _)| key.matches_column(column))
.map(move |(bytes_key, value)| {
let key = bytes_key.remove_column(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key,
}
})?;
Ok((key, value))
}),
)
}
}
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}

View File

@@ -75,6 +75,15 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Compact the database, freeing space used by deleted items.
fn compact(&self) -> Result<(), Error>;
/// Iterate through all values in a particular column.
fn iter_column<'a>(
&'a self,
_column: DBColumn,
) -> Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a> {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
}
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
@@ -144,6 +153,7 @@ pub enum StoreOp<'a, E: EthSpec> {
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteState(Hash256, Option<Slot>),
KeyValueOp(KeyValueStoreOp),
}
/// A unique column identifier.

View File

@@ -146,10 +146,18 @@ impl<E: EthSpec> StateCache<E> {
Some((state_root, state))
}
pub fn delete(&mut self, state_root: &Hash256) {
pub fn delete_state(&mut self, state_root: &Hash256) {
self.states.pop(state_root);
self.block_map.delete(state_root);
}
pub fn delete_block_states(&mut self, block_root: &Hash256) {
if let Some(slot_map) = self.block_map.delete_block_states(block_root) {
for state_root in slot_map.slots.values() {
self.states.pop(state_root);
}
}
}
}
impl BlockMap {
@@ -188,6 +196,10 @@ impl BlockMap {
!slot_map.slots.is_empty()
});
}
fn delete_block_states(&mut self, block_root: &Hash256) -> Option<SlotMap> {
self.blocks.remove(block_root)
}
}
#[cfg(test)]