Refine compaction (#1916)

## Proposed Changes

In an attempt to fix OOM issues and database consistency issues observed by some users after the introduction of compaction in v0.3.4, this PR makes the following changes:

* Run compaction less often: roughly every 1024 epochs, including after long periods of non-finality. I think the division check proposed by Paul is pretty solid, and ensures we don't miss any events where we should be compacting. LevelDB lacks an easy way to check the size of the DB, which would be another good trigger.
* Make it possible to disable the compaction on finalization using `--auto-compact-db=false`
* Make it possible to trigger a manual, single-threaded foreground compaction on start-up using `--compact-db`
* Downgrade the pruning log to `DEBUG`, as it's particularly noisy during sync

I would like to ship these changes to affected users ASAP, and will document them further in the Advanced Database section of the book if they prove effective.
This commit is contained in:
Michael Sproul
2020-11-17 09:10:53 +00:00
parent ecff8807a5
commit a60ab4eff2
6 changed files with 176 additions and 27 deletions

View File

@@ -3,12 +3,12 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slog::{debug, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::mem;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::iter::RootsIterator;
use store::{Error, ItemStore, StoreItem, StoreOp};
@@ -18,6 +18,13 @@ use types::{
SignedBeaconBlockHash, Slot,
};
/// Compact at least this frequently, finalization permitting (7 days).
const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800;
/// Compact at *most* this frequently, to prevent over-compaction during sync (2 hours).
const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
/// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
@@ -49,7 +56,10 @@ impl MigratorConfig {
/// Pruning can be successful, or in rare cases deferred to a later point.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PruningOutcome {
Successful,
/// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`.
Successful {
old_finalized_checkpoint: Checkpoint,
},
DeferredConcurrentMutation,
}
@@ -159,7 +169,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
let finalized_state_root = notif.finalized_state_root;
let finalized_state = notif.finalized_state;
match Self::prune_abandoned_forks(
let old_finalized_checkpoint = match Self::prune_abandoned_forks(
db.clone(),
notif.head_tracker,
finalized_state_root,
@@ -168,7 +178,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
notif.genesis_block_root,
log,
) {
Ok(PruningOutcome::Successful) => {}
Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
}) => old_finalized_checkpoint,
Ok(PruningOutcome::DeferredConcurrentMutation) => {
warn!(
log,
@@ -203,15 +215,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
};
// Finally, compact the database so that new free space is properly reclaimed.
debug!(log, "Starting database compaction");
if let Err(e) = db.compact() {
error!(
log,
"Database compaction failed";
"error" => format!("{:?}", e)
);
if let Err(e) = Self::run_compaction(
db,
old_finalized_checkpoint.epoch,
notif.finalized_checkpoint.epoch,
log,
) {
warn!(log, "Database compaction failed"; "error" => format!("{:?}", e));
}
debug!(log, "Database compaction complete");
}
/// Spawn a new child thread to run the migration process.
@@ -272,7 +283,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into());
}
info!(
debug!(
log,
"Starting database pruning";
"old_finalized_epoch" => old_finalized_checkpoint.epoch,
@@ -469,8 +480,55 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
kv_batch.push(store.pruning_checkpoint_store_op(new_finalized_checkpoint));
store.hot_db.do_atomically(kv_batch)?;
info!(log, "Database pruning complete");
debug!(log, "Database pruning complete");
Ok(PruningOutcome::Successful)
Ok(PruningOutcome::Successful {
old_finalized_checkpoint,
})
}
/// Compact the database if it has been more than `COMPACTION_PERIOD_SECONDS` since it
/// was last compacted.
pub fn run_compaction(
db: Arc<HotColdDB<E, Hot, Cold>>,
old_finalized_epoch: Epoch,
new_finalized_epoch: Epoch,
log: &Logger,
) -> Result<(), Error> {
if !db.compact_on_prune() {
return Ok(());
}
let last_compaction_timestamp = db
.load_compaction_timestamp()?
.unwrap_or_else(|| Duration::from_secs(0));
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(last_compaction_timestamp);
let seconds_since_last_compaction = start_time
.checked_sub(last_compaction_timestamp)
.as_ref()
.map_or(0, Duration::as_secs);
if seconds_since_last_compaction > MAX_COMPACTION_PERIOD_SECONDS
|| (new_finalized_epoch - old_finalized_epoch > COMPACTION_FINALITY_DISTANCE
&& seconds_since_last_compaction > MIN_COMPACTION_PERIOD_SECONDS)
{
info!(
log,
"Starting database compaction";
"old_finalized_epoch" => old_finalized_epoch,
"new_finalized_epoch" => new_finalized_epoch,
);
db.compact()?;
let finish_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(start_time);
db.store_compaction_timestamp(finish_time)?;
info!(log, "Database compaction complete");
}
Ok(())
}
}