Write new blocks and states to the database atomically (#1285)

* Mostly atomic put_state()
* Reduce number of vec allocations
* Make crucial db operations atomic
* Save restore points
* Remove StateBatch
* Merge two HotColdDB impls
* Further reduce allocations
* Review feedback
* Silence clippy warning
This commit is contained in:
Adam Szkoda
2020-07-01 04:45:57 +02:00
committed by GitHub
parent ac89bb190a
commit 536728b975
11 changed files with 189 additions and 184 deletions

View File

@@ -195,6 +195,7 @@ pub trait Field<E: EthSpec>: Copy {
fn check_and_store_genesis_value<S: KeyValueStore<E>>(
store: &S,
value: Self::Value,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let key = &genesis_value_key()[..];
@@ -217,7 +218,9 @@ pub trait Field<E: EthSpec>: Copy {
Ok(())
}
} else {
Chunk::new(vec![value]).store(store, Self::column(), &genesis_value_key()[..])
let chunk = Chunk::new(vec![value]);
chunk.store(Self::column(), &genesis_value_key()[..], ops)?;
Ok(())
}
}
@@ -332,6 +335,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let chunk_size = F::chunk_size();
let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec);
@@ -341,7 +345,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
// Store the genesis value if we have access to it, and it hasn't been stored already.
if F::slot_needs_genesis_value(state.slot, spec) {
let genesis_value = F::extract_genesis_value(state, spec)?;
F::check_and_store_genesis_value(store, genesis_value)?;
F::check_and_store_genesis_value(store, genesis_value, ops)?;
}
// Start by iterating backwards from the last chunk, storing new chunks in the database.
@@ -355,6 +359,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store,
state,
spec,
ops,
)?;
// If the previous `store_range` did not check the entire range, it may be the case that the
@@ -369,6 +374,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store,
state,
spec,
ops,
)?;
}
@@ -383,6 +389,7 @@ fn store_range<F, E, S, I>(
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<bool, Error>
where
F: Field<E>,
@@ -409,7 +416,7 @@ where
return Ok(false);
}
new_chunk.store(store, F::column(), chunk_key)?;
new_chunk.store(F::column(), chunk_key, ops)?;
}
Ok(true)
@@ -585,13 +592,14 @@ where
.transpose()
}
pub fn store<S: KeyValueStore<E>, E: EthSpec>(
pub fn store(
&self,
store: &S,
column: DBColumn,
key: &[u8],
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
store.put_bytes(column.into(), key, &self.encode()?)?;
let db_key = get_key_for_col(column.into(), key);
ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?));
Ok(())
}

View File

@@ -87,6 +87,62 @@ pub enum HotColdDBError {
RestorePointBlockHashError(BeaconStateError),
}
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
pub fn open_ephemeral(
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
};
Ok(db)
}
}
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
/// Open a new or existing database, with the given paths to the hot and cold DBs.
///
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
pub fn open(
hot_path: &Path,
cold_path: &Path,
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, LevelDB<E>, LevelDB<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
};
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
*db.split.write() = split;
}
Ok(db)
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
/// Store a block and update the LRU cache.
pub fn put_block(
@@ -141,9 +197,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Store a state in the store.
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
if state.slot < self.get_split_slot() {
self.store_cold_state(state_root, &state)
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
self.store_cold_state(state_root, &state, &mut ops)?;
self.cold_db.do_atomically(ops)
} else {
self.store_hot_state(state_root, state)
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
self.store_hot_state(state_root, state, &mut ops)?;
self.hot_db.do_atomically(ops)
}
}
@@ -243,12 +303,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.hot_db.exists::<I>(key)
}
pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
pub fn do_atomically(&self, batch: Vec<StoreOp<E>>) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
let mut key_value_batch: Vec<KeyValueStoreOp> = Vec::with_capacity(batch.len());
for op in batch {
for op in &batch {
match op {
StoreOp::PutBlock(block_hash, block) => {
let untyped_hash: Hash256 = (*block_hash).into();
key_value_batch.push(block.as_kv_store_op(untyped_hash));
}
StoreOp::PutState(state_hash, state) => {
let untyped_hash: Hash256 = (*state_hash).into();
self.store_hot_state(&untyped_hash, state, &mut key_value_batch)?;
}
StoreOp::PutStateSummary(state_hash, summary) => {
let untyped_hash: Hash256 = (*state_hash).into();
key_value_batch.push(summary.as_kv_store_op(untyped_hash));
}
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
let key =
@@ -272,78 +347,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}
}
self.hot_db.do_atomically(&key_value_batch)?;
self.hot_db.do_atomically(key_value_batch)?;
for op in batch {
for op in &batch {
match op {
StoreOp::PutBlock(block_hash, block) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.put(untyped_hash, block.clone());
}
StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (),
StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash);
}
StoreOp::DeleteState(_, _) => (),
}
}
Ok(())
}
}
impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
pub fn open_ephemeral(
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
};
Ok(db)
}
}
impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
/// Open a new or existing database, with the given paths to the hot and cold DBs.
///
/// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`.
pub fn open(
hot_path: &Path,
cold_path: &Path,
config: StoreConfig,
spec: ChainSpec,
log: Logger,
) -> Result<HotColdDB<E, LevelDB<E>, LevelDB<E>>, Error> {
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
let db = HotColdDB {
split: RwLock::new(Split::default()),
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
config,
spec,
log,
_phantom: PhantomData,
};
// Load the previous split slot from the database (if any). This ensures we can
// stop and restart correctly.
if let Some(split) = db.load_split()? {
*db.split.write() = split;
}
Ok(db)
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
/// Store a post-finalization state efficiently in the hot database.
///
/// On an epoch boundary, store a full state. On an intermediate slot, store
@@ -352,6 +378,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// On the epoch boundary, store the full state.
if state.slot % E::slots_per_epoch() == 0 {
@@ -361,13 +388,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
"slot" => state.slot.as_u64(),
"state_root" => format!("{:?}", state_root)
);
store_full_state(&self.hot_db, state_root, &state)?;
store_full_state(state_root, &state, ops)?;
}
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?;
let hot_state_summary = HotStateSummary::new(state_root, state)?;
let op = hot_state_summary.as_kv_store_op(*state_root);
ops.push(op);
Ok(())
}
@@ -413,6 +442,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
if state.slot % self.config.slots_per_restore_point != 0 {
warn!(
@@ -433,18 +463,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// 1. Convert to PartialBeaconState and store that in the DB.
let partial_state = PartialBeaconState::from_state_forgetful(state);
self.cold_db.put(state_root, &partial_state)?;
let op = partial_state.as_kv_store_op(*state_root);
ops.push(op);
// 2. Store updated vector entries.
let db = &self.cold_db;
store_updated_vector(BlockRoots, db, state, &self.spec)?;
store_updated_vector(StateRoots, db, state, &self.spec)?;
store_updated_vector(HistoricalRoots, db, state, &self.spec)?;
store_updated_vector(RandaoMixes, db, state, &self.spec)?;
store_updated_vector(BlockRoots, db, state, &self.spec, ops)?;
store_updated_vector(StateRoots, db, state, &self.spec, ops)?;
store_updated_vector(HistoricalRoots, db, state, &self.spec, ops)?;
store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?;
// 3. Store restore point.
let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point;
self.store_restore_point_hash(restore_point_index, *state_root)?;
self.store_restore_point_hash(restore_point_index, *state_root, ops);
Ok(())
}
@@ -666,11 +697,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
restore_point_index: u64,
state_root: Hash256,
) -> Result<(), Error> {
let key = Self::restore_point_key(restore_point_index);
self.cold_db
.put(&key, &RestorePointHash { state_root })
.map_err(Into::into)
ops: &mut Vec<KeyValueStoreOp>,
) {
let value = &RestorePointHash { state_root };
let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index));
ops.push(op);
}
/// Convert a `restore_point_index` into a database key.
@@ -775,7 +806,9 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state)?;
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
store.store_cold_state(&state_root, &state, &mut ops)?;
store.cold_db.do_atomically(ops)?;
}
// Store a pointer from this state root to its slot, so we can later reconstruct states

View File

@@ -4,24 +4,20 @@ use ssz_derive::{Decode, Encode};
use std::convert::TryInto;
use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS};
pub fn store_full_state<KV: KeyValueStore<E>, E: EthSpec>(
store: &KV,
pub fn store_full_state<E: EthSpec>(
state_root: &Hash256,
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let total_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES);
let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES);
let bytes = StorageContainer::new(state).as_ssz_bytes();
metrics::stop_timer(overhead_timer);
let result = store.put_bytes(DBColumn::BeaconState.into(), state_root.as_bytes(), &bytes);
metrics::stop_timer(total_timer);
metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT);
let bytes = {
let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES);
StorageContainer::new(state).as_ssz_bytes()
};
metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64);
result
metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT);
let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes());
ops.push(KeyValueStoreOp::PutKeyValue(key, bytes));
Ok(())
}
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(

View File

@@ -98,12 +98,16 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
.map_err(Into::into)
}
fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> {
fn do_atomically(&self, ops_batch: Vec<KeyValueStoreOp>) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops_batch.into_iter() {
for op in ops_batch {
match op {
KeyValueStoreOp::PutKeyValue(key, value) => {
leveldb_batch.put(BytesKey::from_vec(key), &value);
}
KeyValueStoreOp::DeleteKey(key) => {
leveldb_batch.delete(BytesKey::from_vec(key.to_vec()));
leveldb_batch.delete(BytesKey::from_vec(key));
}
}
}

View File

@@ -21,10 +21,11 @@ mod leveldb_store;
mod memory_store;
mod metrics;
mod partial_beacon_state;
mod state_batch;
pub mod iter;
use std::borrow::Cow;
pub use self::config::StoreConfig;
pub use self::hot_cold_store::{HotColdDB, HotStateSummary};
pub use self::leveldb_store::LevelDB;
@@ -33,7 +34,6 @@ pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
pub use metrics::scrape_for_metrics;
pub use state_batch::StateBatch;
pub use types::*;
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
@@ -50,7 +50,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>;
/// Execute either all of the operations in `batch` or none at all, returning an error.
fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>;
fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error>;
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
@@ -60,6 +60,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
}
pub enum KeyValueStoreOp {
PutKeyValue(Vec<u8>, Vec<u8>),
DeleteKey(Vec<u8>),
}
@@ -103,7 +104,11 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
/// Reified key-value storage operation. Helps in modifying the storage atomically.
/// See also https://github.com/sigp/lighthouse/issues/692
pub enum StoreOp {
#[allow(clippy::large_enum_variant)]
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(SignedBeaconBlockHash, SignedBeaconBlock<E>),
PutState(BeaconStateHash, Cow<'a, BeaconState<E>>),
PutStateSummary(BeaconStateHash, HotStateSummary),
DeleteBlock(SignedBeaconBlockHash),
DeleteState(BeaconStateHash, Slot),
}
@@ -165,6 +170,11 @@ pub trait StoreItem: Sized {
///
/// Return an instance of the type and the number of bytes that were read.
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error>;
fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp {
let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes());
KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes())
}
}
#[cfg(test)]

View File

@@ -64,11 +64,15 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
Ok(())
}
fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> {
fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error> {
for op in batch {
match op {
KeyValueStoreOp::PutKeyValue(key, value) => {
self.db.write().insert(key, value);
}
KeyValueStoreOp::DeleteKey(hash) => {
self.db.write().remove(hash);
self.db.write().remove(&hash);
}
}
}

View File

@@ -78,10 +78,6 @@ lazy_static! {
"store_beacon_state_read_bytes_total",
"Total number of beacon state bytes read from the DB"
);
pub static ref BEACON_STATE_WRITE_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_write_seconds",
"Total time required to write a BeaconState to the database"
);
pub static ref BEACON_STATE_WRITE_OVERHEAD_TIMES: Result<Histogram> = try_create_histogram(
"store_beacon_state_write_overhead_seconds",
"Overhead on writing a beacon state to the DB (e.g., encoding)"

View File

@@ -1,50 +0,0 @@
use crate::{Error, HotColdDB, HotStateSummary, ItemStore};
use types::{BeaconState, EthSpec, Hash256};
/// A collection of states to be stored in the database.
///
/// Consumes minimal space in memory by not storing states between epoch boundaries.
#[derive(Debug, Clone, Default)]
pub struct StateBatch<E: EthSpec> {
items: Vec<BatchItem<E>>,
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
enum BatchItem<E: EthSpec> {
Full(Hash256, BeaconState<E>),
Summary(Hash256, HotStateSummary),
}
impl<E: EthSpec> StateBatch<E> {
/// Create a new empty batch.
pub fn new() -> Self {
Self::default()
}
/// Stage a `BeaconState` to be stored.
pub fn add_state(&mut self, state_root: Hash256, state: &BeaconState<E>) -> Result<(), Error> {
let item = if state.slot % E::slots_per_epoch() == 0 {
BatchItem::Full(state_root, state.clone())
} else {
BatchItem::Summary(state_root, HotStateSummary::new(&state_root, state)?)
};
self.items.push(item);
Ok(())
}
/// Write the batch to the database.
///
/// May fail to write the full batch if any of the items error (i.e. not atomic!)
pub fn commit<Hot: ItemStore<E>, Cold: ItemStore<E>>(
self,
store: &HotColdDB<E, Hot, Cold>,
) -> Result<(), Error> {
self.items.into_iter().try_for_each(|item| match item {
BatchItem::Full(state_root, state) => store.put_state(&state_root, &state),
BatchItem::Summary(state_root, summary) => {
store.put_state_summary(&state_root, summary)
}
})
}
}