Fix head tracker concurrency bugs (#1771)

## Issue Addressed

Closes #1557

## Proposed Changes

Modify the pruning algorithm so that it mutates the head-tracker _before_ committing the database transaction to disk, and _only if_ all the heads to be removed are still present in the head-tracker (i.e. no concurrent mutations).

In the process of writing and testing this I also had to make a few other changes:

* Use internal mutability for all `BeaconChainHarness` functions (namely the RNG and the graffiti), in order to enable parallel calls (see testing section below).
* Disable logging in harness tests unless the `test_logger` feature is turned on

And chose to make some clean-ups:

* Delete the `NullMigrator`
* Remove type-based configuration for the migrator in favour of runtime config (simpler, less duplicated code)
* Use the non-blocking migrator unless the blocking migrator is required. In the store tests we need the blocking migrator because some tests make asserts about the state of the DB after the migration has run.
* Rename `validators_keypairs` -> `validator_keypairs` in the `BeaconChainHarness`

## Testing

To confirm that the fix worked, I wrote a test using [Hiatus](https://crates.io/crates/hiatus), which can be found here:

https://github.com/michaelsproul/lighthouse/tree/hiatus-issue-1557

That test can't be merged because it inserts random breakpoints everywhere, but if you check out that branch you can run the test with:

```
$ cd beacon_node/beacon_chain
$ cargo test --release --test parallel_tests --features test_logger
```

It should pass, and the log output should show:

```
WARN Pruning deferred because of a concurrent mutation, message: this is expected only very rarely!
```

## Additional Info

This is a backwards-compatible change with no impact on consensus.
This commit is contained in:
Michael Sproul
2020-10-19 05:58:39 +00:00
parent 6ba997b88e
commit 703c33bdc7
25 changed files with 538 additions and 729 deletions

View File

@@ -1,5 +1,7 @@
use crate::beacon_chain::BEACON_CHAIN_DB_KEY;
use crate::errors::BeaconChainError;
use crate::head_tracker::HeadTracker;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, warn, Logger};
use std::collections::{HashMap, HashSet};
@@ -9,13 +11,49 @@ use std::sync::Arc;
use std::thread;
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::iter::RootsIterator;
use store::{Error, ItemStore, StoreOp};
use store::{Error, ItemStore, StoreItem, StoreOp};
pub use store::{HotColdDB, MemoryStore};
use types::{
BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, EthSpec, Hash256,
BeaconState, BeaconStateError, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256,
SignedBeaconBlockHash, Slot,
};
/// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
#[allow(clippy::type_complexity)]
tx_thread: Option<
Mutex<(
mpsc::Sender<MigrationNotification<E>>,
thread::JoinHandle<()>,
)>,
>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256,
log: Logger,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct MigratorConfig {
pub blocking: bool,
}
impl MigratorConfig {
pub fn blocking(mut self) -> Self {
self.blocking = true;
self
}
}
/// Pruning can be successful, or in rare cases deferred to a later point.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruningOutcome {
Successful,
DeferredConcurrentMutation,
}
/// Logic errors that can occur during pruning, none of these should ever happen.
#[derive(Debug)]
pub enum PruningError {
@@ -30,29 +68,177 @@ pub enum PruningError {
UnexpectedUnequalStateRoots,
}
/// Trait for migration processes that update the database upon finalization.
pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
Send + Sync + 'static
{
fn new(db: Arc<HotColdDB<E, Hot, Cold>>, log: Logger) -> Self;
/// Message sent to the migration thread containing the information it needs to run.
pub struct MigrationNotification<E: EthSpec> {
finalized_state_root: BeaconStateHash,
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
latest_checkpoint: Arc<Mutex<Checkpoint>>,
genesis_block_root: Hash256,
}
fn process_finalization(
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Create a new `BackgroundMigrator` and spawn its thread if necessary.
pub fn new(
db: Arc<HotColdDB<E, Hot, Cold>>,
config: MigratorConfig,
genesis_block_root: Hash256,
log: Logger,
) -> Self {
let tx_thread = if config.blocking {
None
} else {
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone())))
};
let latest_checkpoint = Arc::new(Mutex::new(Checkpoint {
root: Hash256::zero(),
epoch: Epoch::new(0),
}));
Self {
db,
tx_thread,
latest_checkpoint,
genesis_block_root,
log,
}
}
/// Process a finalized checkpoint from the `BeaconChain`.
///
/// If successful, all forks descending from before the `finalized_checkpoint` will be
/// pruned, and the split point of the database will be advanced to the slot of the finalized
/// checkpoint.
pub fn process_finalization(
&self,
_finalized_state_root: BeaconStateHash,
_new_finalized_state: BeaconState<E>,
_head_tracker: Arc<HeadTracker>,
_old_finalized_checkpoint: Checkpoint,
_new_finalized_checkpoint: Checkpoint,
finalized_state_root: BeaconStateHash,
finalized_state: BeaconState<E>,
finalized_checkpoint: Checkpoint,
head_tracker: Arc<HeadTracker>,
) -> Result<(), BeaconChainError> {
let notif = MigrationNotification {
finalized_state_root,
finalized_state,
finalized_checkpoint,
head_tracker,
latest_checkpoint: self.latest_checkpoint.clone(),
genesis_block_root: self.genesis_block_root,
};
// Async path, on the background thread.
if let Some(tx_thread) = &self.tx_thread {
let (ref mut tx, ref mut thread) = *tx_thread.lock();
// Restart the background thread if it has crashed.
if let Err(tx_err) = tx.send(notif) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());
*tx = new_tx;
let old_thread = mem::replace(thread, new_thread);
// Join the old thread, which will probably have panicked, or may have
// halted normally just now as a result of us dropping the old `mpsc::Sender`.
if let Err(thread_err) = old_thread.join() {
warn!(
self.log,
"Migration thread died, so it was restarted";
"reason" => format!("{:?}", thread_err)
);
}
// Retry at most once, we could recurse but that would risk overflowing the stack.
let _ = tx.send(tx_err.0);
}
}
// Synchronous path, on the current thread.
else {
Self::run_migration(self.db.clone(), notif, &self.log)
}
Ok(())
}
/// Perform the actual work of `process_finalization`.
fn run_migration(
db: Arc<HotColdDB<E, Hot, Cold>>,
notif: MigrationNotification<E>,
log: &Logger,
) {
let mut latest_checkpoint = notif.latest_checkpoint.lock();
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;
match Self::prune_abandoned_forks(
db.clone(),
notif.head_tracker,
finalized_state_root,
&finalized_state,
*latest_checkpoint,
notif.finalized_checkpoint,
notif.genesis_block_root,
log,
) {
Ok(PruningOutcome::DeferredConcurrentMutation) => {
warn!(
log,
"Pruning deferred because of a concurrent mutation";
"message" => "this is expected only very rarely!"
);
return;
}
Ok(PruningOutcome::Successful) => {
// Update the migrator's idea of the latest checkpoint only if the
// pruning process was successful.
*latest_checkpoint = notif.finalized_checkpoint;
}
Err(e) => {
warn!(log, "Block pruning failed"; "error" => format!("{:?}", e));
return;
}
};
match migrate_database(db, finalized_state_root.into(), &finalized_state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
log,
"Database migration postponed, unaligned finalized block";
"slot" => slot.as_u64()
);
}
Err(e) => {
warn!(
log,
"Database migration failed";
"error" => format!("{:?}", e)
);
}
};
}
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
) -> (
mpsc::Sender<MigrationNotification<E>>,
thread::JoinHandle<()>,
) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok(notif) = rx.recv() {
Self::run_migration(db.clone(), notif, &log);
}
});
(tx, thread)
}
/// Traverses live heads and prunes blocks and states of chains that we know can't be built
/// upon because finalization would prohibit it. This is an optimisation intended to save disk
/// space.
///
/// Assumptions:
/// * It is called after every finalization.
#[allow(clippy::too_many_arguments)]
fn prune_abandoned_forks(
store: Arc<HotColdDB<E, Hot, Cold>>,
head_tracker: Arc<HeadTracker>,
@@ -60,13 +246,9 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
new_finalized_state: &BeaconState<E>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
genesis_block_root: Hash256,
log: &Logger,
) -> Result<(), BeaconChainError> {
// There will never be any blocks to prune if there is only a single head in the chain.
if head_tracker.heads().len() == 1 {
return Ok(());
}
) -> Result<PruningOutcome, BeaconChainError> {
let old_finalized_slot = old_finalized_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
@@ -120,7 +302,10 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
let mut abandoned_states: HashSet<(Slot, BeaconStateHash)> = HashSet::new();
let mut abandoned_heads: HashSet<Hash256> = HashSet::new();
for (head_hash, head_slot) in head_tracker.heads() {
let heads = head_tracker.heads();
debug!(log, "Pruning {} heads", heads.len());
for (head_hash, head_slot) in heads {
let mut potentially_abandoned_head = Some(head_hash);
let mut potentially_abandoned_blocks = vec![];
@@ -161,6 +346,7 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
"head_block_root" => format!("{:?}", head_hash),
"head_slot" => head_slot,
);
potentially_abandoned_head.take();
break;
}
}
@@ -229,6 +415,25 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
}
}
// Update the head tracker before the database, so that we maintain the invariant
// that a block present in the head tracker is present in the database.
// See https://github.com/sigp/lighthouse/issues/1557
let mut head_tracker_lock = head_tracker.0.write();
// Check that all the heads to be deleted are still present. The absence of any
// head indicates a race, that will likely resolve itself, so we defer pruning until
// later.
for head_hash in &abandoned_heads {
if !head_tracker_lock.contains_key(head_hash) {
return Ok(PruningOutcome::DeferredConcurrentMutation);
}
}
// Then remove them for real.
for head_hash in abandoned_heads {
head_tracker_lock.remove(&head_hash);
}
let batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(StoreOp::DeleteBlock)
@@ -239,203 +444,21 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
)
.collect();
store.do_atomically(batch)?;
for head_hash in abandoned_heads.into_iter() {
head_tracker.remove_head(head_hash);
}
let mut kv_batch = store.convert_to_kv_batch(&batch)?;
// Persist the head in case the process is killed or crashes here. This prevents
// the head tracker reverting after our mutation above.
let persisted_head = PersistedBeaconChain {
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
genesis_block_root,
ssz_head_tracker: SszHeadTracker::from_map(&*head_tracker_lock),
};
drop(head_tracker_lock);
kv_batch.push(persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY));
store.hot_db.do_atomically(kv_batch)?;
debug!(log, "Database pruning complete");
Ok(())
}
}
/// Migrator that does nothing, for stores that don't need migration.
pub struct NullMigrator;
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold> for NullMigrator {
fn process_finalization(
&self,
_finalized_state_root: BeaconStateHash,
_new_finalized_state: BeaconState<E>,
_head_tracker: Arc<HeadTracker>,
_old_finalized_checkpoint: Checkpoint,
_new_finalized_checkpoint: Checkpoint,
) -> Result<(), BeaconChainError> {
Ok(())
}
fn new(_: Arc<HotColdDB<E, Hot, Cold>>, _: Logger) -> Self {
NullMigrator
}
}
/// Migrator that immediately calls the store's migration function, blocking the current execution.
///
/// Mostly useful for tests.
pub struct BlockingMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold>
for BlockingMigrator<E, Hot, Cold>
{
fn new(db: Arc<HotColdDB<E, Hot, Cold>>, log: Logger) -> Self {
BlockingMigrator { db, log }
}
fn process_finalization(
&self,
finalized_state_root: BeaconStateHash,
new_finalized_state: BeaconState<E>,
head_tracker: Arc<HeadTracker>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
) -> Result<(), BeaconChainError> {
Self::prune_abandoned_forks(
self.db.clone(),
head_tracker,
finalized_state_root,
&new_finalized_state,
old_finalized_checkpoint,
new_finalized_checkpoint,
&self.log,
)?;
match migrate_database(
self.db.clone(),
finalized_state_root.into(),
&new_finalized_state,
) {
Ok(()) => Ok(()),
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
self.log,
"Database migration postponed, unaligned finalized block";
"slot" => slot.as_u64()
);
Ok(())
}
Err(e) => Err(e.into()),
}
}
}
type MpscSender<E> = mpsc::Sender<(
BeaconStateHash,
BeaconState<E>,
Arc<HeadTracker>,
Checkpoint,
Checkpoint,
)>;
/// Migrator that runs a background thread to migrate state from the hot to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>,
tx_thread: Mutex<(MpscSender<E>, thread::JoinHandle<()>)>,
log: Logger,
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> Migrate<E, Hot, Cold>
for BackgroundMigrator<E, Hot, Cold>
{
fn new(db: Arc<HotColdDB<E, Hot, Cold>>, log: Logger) -> Self {
let tx_thread = Mutex::new(Self::spawn_thread(db.clone(), log.clone()));
Self { db, tx_thread, log }
}
fn process_finalization(
&self,
finalized_state_root: BeaconStateHash,
new_finalized_state: BeaconState<E>,
head_tracker: Arc<HeadTracker>,
old_finalized_checkpoint: Checkpoint,
new_finalized_checkpoint: Checkpoint,
) -> Result<(), BeaconChainError> {
let (ref mut tx, ref mut thread) = *self.tx_thread.lock();
if let Err(tx_err) = tx.send((
finalized_state_root,
new_finalized_state,
head_tracker,
old_finalized_checkpoint,
new_finalized_checkpoint,
)) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone());
*tx = new_tx;
let old_thread = mem::replace(thread, new_thread);
// Join the old thread, which will probably have panicked, or may have
// halted normally just now as a result of us dropping the old `mpsc::Sender`.
if let Err(thread_err) = old_thread.join() {
warn!(
self.log,
"Migration thread died, so it was restarted";
"reason" => format!("{:?}", thread_err)
);
}
// Retry at most once, we could recurse but that would risk overflowing the stack.
let _ = tx.send(tx_err.0);
}
Ok(())
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Spawn a new child thread to run the migration process.
///
/// Return a channel handle for sending new finalized states to the thread.
fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>,
log: Logger,
) -> (MpscSender<E>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel();
let thread = thread::spawn(move || {
while let Ok((
state_root,
state,
head_tracker,
old_finalized_checkpoint,
new_finalized_checkpoint,
)) = rx.recv()
{
match Self::prune_abandoned_forks(
db.clone(),
head_tracker,
state_root,
&state,
old_finalized_checkpoint,
new_finalized_checkpoint,
&log,
) {
Ok(()) => {}
Err(e) => warn!(log, "Block pruning failed: {:?}", e),
}
match migrate_database(db.clone(), state_root.into(), &state) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
log,
"Database migration postponed, unaligned finalized block";
"slot" => slot.as_u64()
);
}
Err(e) => {
warn!(
log,
"Database migration failed";
"error" => format!("{:?}", e)
);
}
};
}
});
(tx, thread)
Ok(PruningOutcome::Successful)
}
}