mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-20 05:14:35 +00:00
Race condition fix + Reliability improvements around forks pruning (#1132)
* Improve error handling in block iteration * Introduce atomic DB operations * Fix race condition An invariant was violated: For every block hash in head_tracker, that block is accessible from the store.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use crate::chunked_vector::ChunkError;
|
||||
use crate::hot_cold_store::HotColdDBError;
|
||||
use ssz::DecodeError;
|
||||
use types::BeaconStateError;
|
||||
use types::{BeaconStateError, Hash256};
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum Error {
|
||||
@@ -12,6 +12,7 @@ pub enum Error {
|
||||
HotColdDBError(HotColdDBError),
|
||||
DBError { message: String },
|
||||
RlpError(String),
|
||||
BlockNotFound(Hash256),
|
||||
}
|
||||
|
||||
impl From<DecodeError> for Error {
|
||||
|
||||
@@ -8,6 +8,7 @@ use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||
use crate::metrics;
|
||||
use crate::{
|
||||
leveldb_store::LevelDB, DBColumn, Error, PartialBeaconState, SimpleStoreItem, Store, StoreItem,
|
||||
StoreOp,
|
||||
};
|
||||
use lru::LruCache;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
@@ -203,6 +204,21 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
let mut guard = self.block_cache.lock();
|
||||
self.hot_db.do_atomically(batch)?;
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
guard.pop(&untyped_hash);
|
||||
}
|
||||
StoreOp::DeleteState(_, _) => (),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
fn process_finalization(
|
||||
store: Arc<Self>,
|
||||
@@ -562,15 +578,24 @@ impl<E: EthSpec> HotColdDB<E> {
|
||||
end_slot: Slot,
|
||||
end_block_hash: Hash256,
|
||||
) -> Result<Vec<SignedBeaconBlock<E>>, Error> {
|
||||
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
|
||||
.map(|(_, block)| block)
|
||||
// Include the block at the end slot (if any), it needs to be
|
||||
// replayed in order to construct the canonical state at `end_slot`.
|
||||
.filter(|block| block.message.slot <= end_slot)
|
||||
// Include the block at the start slot (if any). Whilst it doesn't need to be applied
|
||||
// to the state, it contains a potentially useful state root.
|
||||
.take_while(|block| block.message.slot >= start_slot)
|
||||
.collect::<Vec<_>>();
|
||||
let mut blocks: Vec<SignedBeaconBlock<E>> =
|
||||
ParentRootBlockIterator::new(self, end_block_hash)
|
||||
.map(|result| result.map(|(_, block)| block))
|
||||
// Include the block at the end slot (if any), it needs to be
|
||||
// replayed in order to construct the canonical state at `end_slot`.
|
||||
.filter(|result| {
|
||||
result
|
||||
.as_ref()
|
||||
.map_or(true, |block| block.message.slot <= end_slot)
|
||||
})
|
||||
// Include the block at the start slot (if any). Whilst it doesn't need to be applied
|
||||
// to the state, it contains a potentially useful state root.
|
||||
.take_while(|result| {
|
||||
result
|
||||
.as_ref()
|
||||
.map_or(true, |block| block.message.slot >= start_slot)
|
||||
})
|
||||
.collect::<Result<_, _>>()?;
|
||||
blocks.reverse();
|
||||
Ok(blocks)
|
||||
}
|
||||
|
||||
@@ -217,25 +217,32 @@ impl<'a, E: EthSpec, S: Store<E>> ParentRootBlockIterator<'a, E, S> {
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, S: Store<E>> Iterator for ParentRootBlockIterator<'a, E, S> {
|
||||
type Item = (Hash256, SignedBeaconBlock<E>);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
fn do_next(&mut self) -> Result<Option<(Hash256, SignedBeaconBlock<E>)>, Error> {
|
||||
// Stop once we reach the zero parent, otherwise we'll keep returning the genesis
|
||||
// block forever.
|
||||
if self.next_block_root.is_zero() {
|
||||
None
|
||||
Ok(None)
|
||||
} else {
|
||||
let block_root = self.next_block_root;
|
||||
let block = self.store.get_block(&block_root).ok()??;
|
||||
let block = self
|
||||
.store
|
||||
.get_block(&block_root)?
|
||||
.ok_or(Error::BlockNotFound(block_root))?;
|
||||
self.next_block_root = block.message.parent_root;
|
||||
Some((block_root, block))
|
||||
Ok(Some((block_root, block)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: EthSpec, S: Store<E>> Iterator for ParentRootBlockIterator<'a, E, S> {
|
||||
type Item = Result<(Hash256, SignedBeaconBlock<E>), Error>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.do_next().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Extends `BlockRootsIterator`, returning `SignedBeaconBlock` instances, instead of their roots.
|
||||
pub struct BlockIterator<'a, T: EthSpec, U> {
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use crate::metrics;
|
||||
use db_key::Key;
|
||||
use leveldb::database::batch::{Batch, Writebatch};
|
||||
use leveldb::database::kv::KV;
|
||||
use leveldb::database::Database;
|
||||
use leveldb::error::Error as LevelDBError;
|
||||
@@ -145,6 +146,41 @@ impl<E: EthSpec> Store<E> for LevelDB<E> {
|
||||
) -> Self::ForwardsBlockRootsIterator {
|
||||
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
|
||||
}
|
||||
|
||||
fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> {
|
||||
let mut leveldb_batch = Writebatch::new();
|
||||
for op in ops_batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
let key = Self::get_key_for_col(
|
||||
DBColumn::BeaconBlock.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
);
|
||||
leveldb_batch.delete(key);
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_hash, slot) => {
|
||||
let untyped_hash: Hash256 = (*state_hash).into();
|
||||
let state_summary_key = Self::get_key_for_col(
|
||||
DBColumn::BeaconStateSummary.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
);
|
||||
leveldb_batch.delete(state_summary_key);
|
||||
|
||||
if *slot % E::slots_per_epoch() == 0 {
|
||||
let state_key = Self::get_key_for_col(
|
||||
DBColumn::BeaconState.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
);
|
||||
leveldb_batch.delete(state_key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.db.write(self.write_options(), &leveldb_batch)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<LevelDBError> for Error {
|
||||
|
||||
@@ -90,12 +90,15 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
|
||||
/// Delete a block from the store.
|
||||
fn delete_block(&self, block_root: &Hash256) -> Result<(), Error> {
|
||||
self.delete::<SignedBeaconBlock<E>>(block_root)
|
||||
self.key_delete(DBColumn::BeaconBlock.into(), block_root.as_bytes())
|
||||
}
|
||||
|
||||
/// Store a state in the store.
|
||||
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;
|
||||
|
||||
/// Execute either all of the operations in `batch` or none at all, returning an error.
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>;
|
||||
|
||||
/// Store a state summary in the store.
|
||||
// NOTE: this is a hack for the HotColdDb, we could consider splitting this
|
||||
// trait and removing the generic `S: Store` types everywhere?
|
||||
@@ -180,6 +183,13 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reified key-value storage operation. Helps in modifying the storage atomically.
|
||||
/// See also https://github.com/sigp/lighthouse/issues/692
|
||||
pub enum StoreOp {
|
||||
DeleteBlock(SignedBeaconBlockHash),
|
||||
DeleteState(BeaconStateHash, Slot),
|
||||
}
|
||||
|
||||
/// A unique column identifier.
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum DBColumn {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::{Error, Store};
|
||||
use super::{DBColumn, Error, Store, StoreOp};
|
||||
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
|
||||
use crate::impls::beacon_state::{get_full_state, store_full_state};
|
||||
use parking_lot::RwLock;
|
||||
@@ -89,6 +89,30 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
|
||||
get_full_state(self, state_root)
|
||||
}
|
||||
|
||||
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
|
||||
for op in batch {
|
||||
match op {
|
||||
StoreOp::DeleteBlock(block_hash) => {
|
||||
let untyped_hash: Hash256 = (*block_hash).into();
|
||||
self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?;
|
||||
}
|
||||
|
||||
StoreOp::DeleteState(state_hash, slot) => {
|
||||
let untyped_hash: Hash256 = (*state_hash).into();
|
||||
if *slot % E::slots_per_epoch() == 0 {
|
||||
self.key_delete(DBColumn::BeaconState.into(), untyped_hash.as_bytes())?;
|
||||
} else {
|
||||
self.key_delete(
|
||||
DBColumn::BeaconStateSummary.into(),
|
||||
untyped_hash.as_bytes(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn forwards_block_roots_iterator(
|
||||
store: Arc<Self>,
|
||||
start_slot: Slot,
|
||||
|
||||
Reference in New Issue
Block a user