diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 4d28142330..00a5385d04 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,12 +3,12 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; -use slog::{debug, error, info, warn, Logger}; +use slog::{debug, info, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; -use std::sync::mpsc; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{migrate_database, HotColdDBError}; use store::iter::RootsIterator; use store::{Error, ItemStore, StoreItem, StoreOp}; @@ -18,6 +18,13 @@ use types::{ SignedBeaconBlockHash, Slot, }; +/// Compact at least this frequently, finalization permitting (7 days). +const MAX_COMPACTION_PERIOD_SECONDS: u64 = 604800; +/// Compact at *most* this frequently, to prevent over-compaction during sync (2 hours). +const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200; +/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`. +const COMPACTION_FINALITY_DISTANCE: u64 = 1024; + /// The background migrator runs a thread to perform pruning and migrate state from the hot /// to the cold database. pub struct BackgroundMigrator, Cold: ItemStore> { @@ -49,7 +56,10 @@ impl MigratorConfig { /// Pruning can be successful, or in rare cases deferred to a later point. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PruningOutcome { - Successful, + /// The pruning succeeded and updated the pruning checkpoint from `old_finalized_checkpoint`. + Successful { + old_finalized_checkpoint: Checkpoint, + }, DeferredConcurrentMutation, } @@ -159,7 +169,7 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator {} + Ok(PruningOutcome::Successful { + old_finalized_checkpoint, + }) => old_finalized_checkpoint, Ok(PruningOutcome::DeferredConcurrentMutation) => { warn!( log, @@ -203,15 +215,14 @@ impl, Cold: ItemStore> BackgroundMigrator format!("{:?}", e) - ); + if let Err(e) = Self::run_compaction( + db, + old_finalized_checkpoint.epoch, + notif.finalized_checkpoint.epoch, + log, + ) { + warn!(log, "Database compaction failed"; "error" => format!("{:?}", e)); } - debug!(log, "Database compaction complete"); } /// Spawn a new child thread to run the migration process. @@ -272,7 +283,7 @@ impl, Cold: ItemStore> BackgroundMigrator old_finalized_checkpoint.epoch, @@ -469,8 +480,55 @@ impl, Cold: ItemStore> BackgroundMigrator>, + old_finalized_epoch: Epoch, + new_finalized_epoch: Epoch, + log: &Logger, + ) -> Result<(), Error> { + if !db.compact_on_prune() { + return Ok(()); + } + + let last_compaction_timestamp = db + .load_compaction_timestamp()? + .unwrap_or_else(|| Duration::from_secs(0)); + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(last_compaction_timestamp); + let seconds_since_last_compaction = start_time + .checked_sub(last_compaction_timestamp) + .as_ref() + .map_or(0, Duration::as_secs); + + if seconds_since_last_compaction > MAX_COMPACTION_PERIOD_SECONDS + || (new_finalized_epoch - old_finalized_epoch > COMPACTION_FINALITY_DISTANCE + && seconds_since_last_compaction > MIN_COMPACTION_PERIOD_SECONDS) + { + info!( + log, + "Starting database compaction"; + "old_finalized_epoch" => old_finalized_epoch, + "new_finalized_epoch" => new_finalized_epoch, + ); + db.compact()?; + + let finish_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(start_time); + db.store_compaction_timestamp(finish_time)?; + + info!(log, "Database compaction complete"); + } + Ok(()) } } diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 45210950eb..3d867320c5 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -297,13 +297,26 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) /* - * Purge. + * Database purging and compaction. */ .arg( Arg::with_name("purge-db") .long("purge-db") .help("If present, the chain database will be deleted. Use with caution.") ) + .arg( + Arg::with_name("compact-db") + .long("compact-db") + .help("If present, apply compaction to the database on start-up. Use with caution. \ + It is generally not recommended unless auto-compaction is disabled.") + ) + .arg( + Arg::with_name("auto-compact-db") + .long("auto-compact-db") + .help("Enable or disable automatic compaction of the database on finalization.") + .takes_value(true) + .default_value("true") + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4ae124a8e5..b0e00c4efb 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -218,6 +218,13 @@ pub fn get_config( .map_err(|_| "block-cache-size is not a valid integer".to_string())?; } + client_config.store.compact_on_init = cli_args.is_present("compact-db"); + if let Some(compact_on_prune) = cli_args.value_of("auto-compact-db") { + client_config.store.compact_on_prune = compact_on_prune + .parse() + .map_err(|_| "auto-compact-db takes a boolean".to_string())?; + } + /* * Zero-ports * diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 91cf5ec1cb..2514e4cf35 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -8,12 +8,24 @@ pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; /// Database configuration parameters. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StoreConfig { /// Number of slots to wait between storing restore points in the freezer database. pub slots_per_restore_point: u64, /// Maximum number of blocks to store in the in-memory block cache. pub block_cache_size: usize, + /// Whether to compact the database on initialization. + pub compact_on_init: bool, + /// Whether to compact the database during database pruning. + pub compact_on_prune: bool, +} + +/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct OnDiskStoreConfig { + pub slots_per_restore_point: u64, + // NOTE: redundant, see https://github.com/sigp/lighthouse/issues/1784 + pub _block_cache_size: usize, } #[derive(Debug, Clone)] @@ -27,12 +39,24 @@ impl Default for StoreConfig { // Safe default for tests, shouldn't ever be read by a CLI node. slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64, block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, + compact_on_init: false, + compact_on_prune: true, } } } impl StoreConfig { - pub fn check_compatibility(&self, on_disk_config: &Self) -> Result<(), StoreConfigError> { + pub fn as_disk_config(&self) -> OnDiskStoreConfig { + OnDiskStoreConfig { + slots_per_restore_point: self.slots_per_restore_point, + _block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, + } + } + + pub fn check_compatibility( + &self, + on_disk_config: &OnDiskStoreConfig, + ) -> Result<(), StoreConfigError> { if self.slots_per_restore_point != on_disk_config.slots_per_restore_point { return Err(StoreConfigError::MismatchedSlotsPerRestorePoint { config: self.slots_per_restore_point, @@ -43,7 +67,7 @@ impl StoreConfig { } } -impl StoreItem for StoreConfig { +impl StoreItem for OnDiskStoreConfig { fn db_column() -> DBColumn { DBColumn::BeaconMeta } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 2323c3aa26..5f2f70d3eb 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1,7 +1,7 @@ use crate::chunked_vector::{ store_updated_vector, BlockRoots, HistoricalRoots, RandaoMixes, StateRoots, }; -use crate::config::StoreConfig; +use crate::config::{OnDiskStoreConfig, StoreConfig}; use crate::forwards_iter::HybridForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; use crate::iter::{ParentRootBlockIterator, StateRootsIterator}; @@ -9,8 +9,8 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - PruningCheckpoint, SchemaVersion, CONFIG_KEY, CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, - SCHEMA_VERSION_KEY, SPLIT_KEY, + CompactionTimestamp, PruningCheckpoint, SchemaVersion, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, + CURRENT_SCHEMA_VERSION, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; use crate::{ @@ -31,6 +31,7 @@ use std::convert::TryInto; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use types::*; /// Defines how blocks should be replayed on states. @@ -187,9 +188,16 @@ impl HotColdDB, LevelDB> { *db.split.write() = split; } - // Finally, run a garbage collection pass. + // Run a garbage collection pass. db.remove_garbage()?; + // If configured, run a foreground compaction pass. + if db.config.compact_on_init { + info!(db.log, "Running foreground compaction"); + db.compact()?; + info!(db.log, "Foreground compaction complete"); + } + Ok(db) } @@ -829,13 +837,13 @@ impl, Cold: ItemStore> HotColdDB } /// Load previously-stored config from disk. - fn load_config(&self) -> Result, Error> { + fn load_config(&self) -> Result, Error> { self.hot_db.get(&CONFIG_KEY) } /// Write the config to disk. fn store_config(&self) -> Result<(), Error> { - self.hot_db.put(&CONFIG_KEY, &self.config) + self.hot_db.put(&CONFIG_KEY, &self.config.as_disk_config()) } /// Load the split point from disk. @@ -932,6 +940,11 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + /// Return `true` if compaction on finalization/pruning is enabled. + pub fn compact_on_prune(&self) -> bool { + self.config.compact_on_prune + } + /// Load the checkpoint to begin pruning from (the "old finalized checkpoint"). pub fn load_pruning_checkpoint(&self) -> Result, Error> { Ok(self @@ -944,6 +957,22 @@ impl, Cold: ItemStore> HotColdDB pub fn pruning_checkpoint_store_op(&self, checkpoint: Checkpoint) -> KeyValueStoreOp { PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY) } + + /// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch. + pub fn load_compaction_timestamp(&self) -> Result, Error> { + Ok(self + .hot_db + .get(&COMPACTION_TIMESTAMP_KEY)? + .map(|c: CompactionTimestamp| Duration::from_secs(c.0))) + } + + /// Store the timestamp of the last compaction as a `Duration` since the UNIX epoch. + pub fn store_compaction_timestamp(&self, compaction_timestamp: Duration) -> Result<(), Error> { + self.hot_db.put( + &COMPACTION_TIMESTAMP_KEY, + &CompactionTimestamp(compaction_timestamp.as_secs()), + ) + } } /// Advance the split point of the store, moving new finalized states to the freezer. diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 797a2b633b..3664443a08 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -11,6 +11,7 @@ pub const SCHEMA_VERSION_KEY: Hash256 = Hash256::repeat_byte(0); pub const CONFIG_KEY: Hash256 = Hash256::repeat_byte(1); pub const SPLIT_KEY: Hash256 = Hash256::repeat_byte(2); pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); +pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); @@ -58,3 +59,20 @@ impl StoreItem for PruningCheckpoint { }) } } + +/// The last time the database was compacted. +pub struct CompactionTimestamp(pub u64); + +impl StoreItem for CompactionTimestamp { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.0.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(CompactionTimestamp(u64::from_ssz_bytes(bytes)?)) + } +}