Prune abandoned forks (#916)

* Address compiler warning

* Prune abandoned fork choice forks

* New approach to pruning

* Wrap some block hashes in a newtype pattern

For increased type safety.

* Add Graphviz chain dump emitter for debugging

* Fix broken test case

* Make prunes_abandoned_forks use real DiskStore

* Mark finalized blocks in the GraphViz output

* Refine debug stringification of Slot and Epoch

Before this commit: print!("{:?}", Slot(123)) == "Slot(\n123\n)".
After this commit: print!("{:?", Slot(123)) == "Slot(123)".

* Simplify build_block()

* Rewrite test case using more composable test primitives

* Working rewritten test case

* Tighten fork prunning test checks

* Add another pruning test case

* Bugfix: Finalized blocks weren't always properly detected

* Pruning: Add pruning_does_not_touch_blocks_prior_to_finalization test case

* Tighten pruning tests: check if heads are tracked properly

* Add a failing test case for a buggy scenario

* Change name of function to a more accurate one

* Fix failing test case

* Test case: Were skipped slots' states pruned?

* Style fix: Simplify dereferencing

* Tighten pruning tests: check if abandoned states are deleted

* Towards atomicity of db ops

* Correct typo

* Prune also skipped slots' states

* New logic for handling skipped states

* Make skipped slots test pass

* Post conflict resolution fixes

* Formatting fixes

* Tests passing

* Block hashes in Graphviz node labels

* Removed unused changes

* Fix bug with states having < SlotsPerHistoricalRoot roots

* Consolidate State/BlockRootsIterator for pruning

* Address review feedback

* Fix a bug in pruning tests

* Detach prune_abandoned_forks() from its object

* Move migrate.rs from store to beacon_chain

* Move forks pruning onto a background thread

* Bugfix: Heads weren't pruned when prune set contained only the head

* Rename: freeze_to_state() -> process_finalization()

* Eliminate redundant function parameter

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Adam Szkoda
2020-04-20 11:59:56 +02:00
committed by GitHub
parent b374ead24b
commit 9c3f76a33b
19 changed files with 1398 additions and 282 deletions

View File

@@ -204,7 +204,7 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
}
/// Advance the split point of the store, moving new finalized states to the freezer.
fn freeze_to_state(
fn process_finalization(
store: Arc<Self>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,

View File

@@ -1,4 +1,4 @@
use crate::Store;
use crate::{Error, Store};
use std::borrow::Cow;
use std::marker::PhantomData;
use std::sync::Arc;
@@ -43,12 +43,95 @@ impl<'a, U: Store<E>, E: EthSpec> AncestorIter<U, E, StateRootsIterator<'a, E, U
}
pub struct StateRootsIterator<'a, T: EthSpec, U> {
inner: RootsIterator<'a, T, U>,
}
impl<'a, T: EthSpec, U> Clone for StateRootsIterator<'a, T, U> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<'a, T: EthSpec, U: Store<T>> StateRootsIterator<'a, T, U> {
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
Self {
inner: RootsIterator::new(store, beacon_state),
}
}
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
Self {
inner: RootsIterator::owned(store, beacon_state),
}
}
}
impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|(_, state_root, slot)| (state_root, slot))
}
}
/// Iterates backwards through block roots. If any specified slot is unable to be retrieved, the
/// iterator returns `None` indefinitely.
///
/// Uses the `block_roots` field of `BeaconState` as the source of block roots and will
/// perform a lookup on the `Store` for a prior `BeaconState` if `block_roots` has been
/// exhausted.
///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
pub struct BlockRootsIterator<'a, T: EthSpec, U> {
inner: RootsIterator<'a, T, U>,
}
impl<'a, T: EthSpec, U> Clone for BlockRootsIterator<'a, T, U> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<'a, T: EthSpec, U: Store<T>> BlockRootsIterator<'a, T, U> {
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
Self {
inner: RootsIterator::new(store, beacon_state),
}
}
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
Self {
inner: RootsIterator::owned(store, beacon_state),
}
}
}
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockRootsIterator<'a, T, U> {
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|(block_root, _, slot)| (block_root, slot))
}
}
/// Iterator over state and block roots that backtracks using the vectors from a `BeaconState`.
pub struct RootsIterator<'a, T: EthSpec, U> {
store: Arc<U>,
beacon_state: Cow<'a, BeaconState<T>>,
slot: Slot,
}
impl<'a, T: EthSpec, U> Clone for StateRootsIterator<'a, T, U> {
impl<'a, T: EthSpec, U> Clone for RootsIterator<'a, T, U> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
@@ -58,7 +141,7 @@ impl<'a, T: EthSpec, U> Clone for StateRootsIterator<'a, T, U> {
}
}
impl<'a, T: EthSpec, U: Store<T>> StateRootsIterator<'a, T, U> {
impl<'a, T: EthSpec, U: Store<T>> RootsIterator<'a, T, U> {
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
Self {
store,
@@ -74,10 +157,21 @@ impl<'a, T: EthSpec, U: Store<T>> StateRootsIterator<'a, T, U> {
beacon_state: Cow::Owned(beacon_state),
}
}
pub fn from_block(store: Arc<U>, block_hash: Hash256) -> Result<Self, Error> {
let block = store
.get_block(&block_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
.ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?;
Ok(Self::owned(store, state))
}
}
impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
type Item = (Hash256, Slot);
impl<'a, T: EthSpec, U: Store<T>> Iterator for RootsIterator<'a, T, U> {
/// (block_root, state_root, slot)
type Item = (Hash256, Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
if self.slot == 0 || self.slot > self.beacon_state.slot {
@@ -86,18 +180,22 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for StateRootsIterator<'a, T, U> {
self.slot -= 1;
match self.beacon_state.get_state_root(self.slot) {
Ok(root) => Some((*root, self.slot)),
Err(BeaconStateError::SlotOutOfBounds) => {
match (
self.beacon_state.get_block_root(self.slot),
self.beacon_state.get_state_root(self.slot),
) {
(Ok(block_root), Ok(state_root)) => Some((*block_root, *state_root, self.slot)),
(Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => {
// Read a `BeaconState` from the store that has access to prior historical roots.
let beacon_state =
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?;
self.beacon_state = Cow::Owned(beacon_state);
let root = self.beacon_state.get_state_root(self.slot).ok()?;
let block_root = *self.beacon_state.get_block_root(self.slot).ok()?;
let state_root = *self.beacon_state.get_state_root(self.slot).ok()?;
Some((*root, self.slot))
Some((block_root, state_root, self.slot))
}
_ => None,
}
@@ -165,79 +263,7 @@ impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockIterator<'a, T, U> {
fn next(&mut self) -> Option<Self::Item> {
let (root, _slot) = self.roots.next()?;
self.roots.store.get_block(&root).ok()?
}
}
/// Iterates backwards through block roots. If any specified slot is unable to be retrieved, the
/// iterator returns `None` indefinitely.
///
/// Uses the `block_roots` field of `BeaconState` to as the source of block roots and will
/// perform a lookup on the `Store` for a prior `BeaconState` if `block_roots` has been
/// exhausted.
///
/// Returns `None` for roots prior to genesis or when there is an error reading from `Store`.
pub struct BlockRootsIterator<'a, T: EthSpec, U> {
store: Arc<U>,
beacon_state: Cow<'a, BeaconState<T>>,
slot: Slot,
}
impl<'a, T: EthSpec, U> Clone for BlockRootsIterator<'a, T, U> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
beacon_state: self.beacon_state.clone(),
slot: self.slot,
}
}
}
impl<'a, T: EthSpec, U: Store<T>> BlockRootsIterator<'a, T, U> {
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn new(store: Arc<U>, beacon_state: &'a BeaconState<T>) -> Self {
Self {
store,
slot: beacon_state.slot,
beacon_state: Cow::Borrowed(beacon_state),
}
}
/// Create a new iterator over all block roots in the given `beacon_state` and prior states.
pub fn owned(store: Arc<U>, beacon_state: BeaconState<T>) -> Self {
Self {
store,
slot: beacon_state.slot,
beacon_state: Cow::Owned(beacon_state),
}
}
}
impl<'a, T: EthSpec, U: Store<T>> Iterator for BlockRootsIterator<'a, T, U> {
type Item = (Hash256, Slot);
fn next(&mut self) -> Option<Self::Item> {
if self.slot == 0 || self.slot > self.beacon_state.slot {
return None;
}
self.slot -= 1;
match self.beacon_state.get_block_root(self.slot) {
Ok(root) => Some((*root, self.slot)),
Err(BeaconStateError::SlotOutOfBounds) => {
// Read a `BeaconState` from the store that has access to prior historical roots.
let beacon_state =
next_historical_root_backtrack_state(&*self.store, &self.beacon_state)?;
self.beacon_state = Cow::Owned(beacon_state);
let root = self.beacon_state.get_block_root(self.slot).ok()?;
Some((*root, self.slot))
}
_ => None,
}
self.roots.inner.store.get_block(&root).ok()?
}
}

View File

@@ -15,7 +15,7 @@ pub mod chunked_vector;
pub mod config;
mod errors;
mod forwards_iter;
mod hot_cold_store;
pub mod hot_cold_store;
mod impls;
mod leveldb_store;
mod memory_store;
@@ -24,7 +24,6 @@ mod partial_beacon_state;
mod state_batch;
pub mod iter;
pub mod migrate;
use std::sync::Arc;
@@ -32,7 +31,6 @@ pub use self::config::StoreConfig;
pub use self::hot_cold_store::{HotColdDB as DiskStore, HotStateSummary};
pub use self::leveldb_store::LevelDB as SimpleDiskStore;
pub use self::memory_store::MemoryStore;
pub use self::migrate::Migrate;
pub use self::partial_beacon_state::PartialBeaconState;
pub use errors::Error;
pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer;
@@ -132,7 +130,7 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
}
/// (Optionally) Move all data before the frozen slot to the freezer database.
fn freeze_to_state(
fn process_finalization(
_store: Arc<Self>,
_frozen_head_root: Hash256,
_frozen_head: &BeaconState<E>,

View File

@@ -1,153 +0,0 @@
use crate::{
hot_cold_store::HotColdDBError, DiskStore, Error, MemoryStore, SimpleDiskStore, Store,
};
use parking_lot::Mutex;
use slog::{debug, warn};
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use types::{BeaconState, EthSpec, Hash256, Slot};
/// Trait for migration processes that update the database upon finalization.
pub trait Migrate<S, E: EthSpec>: Send + Sync + 'static {
fn new(db: Arc<S>) -> Self;
fn freeze_to_state(
&self,
_state_root: Hash256,
_state: BeaconState<E>,
_max_finality_distance: u64,
) {
}
}
/// Migrator that does nothing, for stores that don't need migration.
pub struct NullMigrator;
impl<E: EthSpec> Migrate<SimpleDiskStore<E>, E> for NullMigrator {
fn new(_: Arc<SimpleDiskStore<E>>) -> Self {
NullMigrator
}
}
impl<E: EthSpec> Migrate<MemoryStore<E>, E> for NullMigrator {
fn new(_: Arc<MemoryStore<E>>) -> Self {
NullMigrator
}
}
/// Migrator that immediately calls the store's migration function, blocking the current execution.
///
/// Mostly useful for tests.
pub struct BlockingMigrator<S>(Arc<S>);
impl<E: EthSpec, S: Store<E>> Migrate<S, E> for BlockingMigrator<S> {
fn new(db: Arc<S>) -> Self {
BlockingMigrator(db)
}
fn freeze_to_state(
&self,
state_root: Hash256,
state: BeaconState<E>,
_max_finality_distance: u64,
) {
if let Err(e) = S::freeze_to_state(self.0.clone(), state_root, &state) {
// This migrator is only used for testing, so we just log to stderr without a logger.
eprintln!("Migration error: {:?}", e);
}
}
}
type MpscSender<E> = mpsc::Sender<(Hash256, BeaconState<E>)>;
/// Migrator that runs a background thread to migrate state from the hot to the cold database.
pub struct BackgroundMigrator<E: EthSpec> {
db: Arc<DiskStore<E>>,
tx_thread: Mutex<(MpscSender<E>, thread::JoinHandle<()>)>,
}
impl<E: EthSpec> Migrate<DiskStore<E>, E> for BackgroundMigrator<E> {
fn new(db: Arc<DiskStore<E>>) -> Self {
let tx_thread = Mutex::new(Self::spawn_thread(db.clone()));
Self { db, tx_thread }
}
/// Perform the freezing operation on the database,
fn freeze_to_state(
&self,
finalized_state_root: Hash256,
finalized_state: BeaconState<E>,
max_finality_distance: u64,
) {
if !self.needs_migration(finalized_state.slot, max_finality_distance) {
return;
}
let (ref mut tx, ref mut thread) = *self.tx_thread.lock();
if let Err(tx_err) = tx.send((finalized_state_root, finalized_state)) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone());
drop(mem::replace(tx, new_tx));
let old_thread = mem::replace(thread, new_thread);
// Join the old thread, which will probably have panicked, or may have
// halted normally just now as a result of us dropping the old `mpsc::Sender`.
if let Err(thread_err) = old_thread.join() {
warn!(
self.db.log,
"Migration thread died, so it was restarted";
"reason" => format!("{:?}", thread_err)
);
}
// Retry at most once, we could recurse but that would risk overflowing the stack.
let _ = tx.send(tx_err.0);
}
}
}
impl<E: EthSpec> BackgroundMigrator<E> {
/// Return true if a migration needs to be performed, given a new `finalized_slot`.
fn needs_migration(&self, finalized_slot: Slot, max_finality_distance: u64) -> bool {
let finality_distance = finalized_slot - self.db.get_split_slot();
finality_distance > max_finality_distance
}
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<DiskStore<E>>,
) -> (
mpsc::Sender<(Hash256, BeaconState<E>)>,
thread::JoinHandle<()>,
) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok((state_root, state)) = rx.recv() {
match DiskStore::freeze_to_state(db.clone(), state_root, &state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
db.log,
"Database migration postponed, unaligned finalized block";
"slot" => slot.as_u64()
);
}
Err(e) => {
warn!(
db.log,
"Database migration failed";
"error" => format!("{:?}", e)
);
}
}
}
});
(tx, thread)
}
}