From aa253ddd8f5ba230a6c5d68d92c330b7a269265a Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 30 Sep 2022 11:59:56 +1000 Subject: [PATCH] Reduce finalization migration frequency --- beacon_node/beacon_chain/src/migrate.rs | 79 ++++++++++++++++++++++++- beacon_node/client/src/builder.rs | 1 + beacon_node/client/src/config.rs | 3 + beacon_node/src/cli.rs | 10 ++++ beacon_node/src/config.rs | 5 ++ lighthouse/tests/beacon_node.rs | 18 ++++++ 6 files changed, 113 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 64d3bd3c8b..84ef0acdee 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -3,6 +3,7 @@ use crate::errors::BeaconChainError; use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use parking_lot::Mutex; +use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, warn, Logger}; use std::collections::{HashMap, HashSet}; use std::mem; @@ -25,20 +26,43 @@ const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200; /// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`. const COMPACTION_FINALITY_DISTANCE: u64 = 1024; +/// Default number of epochs to wait between finalization migrations. +pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4; + /// The background migrator runs a thread to perform pruning and migrate state from the hot /// to the cold database. pub struct BackgroundMigrator, Cold: ItemStore> { db: Arc>, #[allow(clippy::type_complexity)] tx_thread: Option, thread::JoinHandle<()>)>>, + /// Record of when the last migration ran, for enforcing `epochs_per_run`. + prev_migration: Arc>, /// Genesis block root, for persisting the `PersistedBeaconChain`. genesis_block_root: Hash256, log: Logger, } -#[derive(Debug, Default, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct MigratorConfig { pub blocking: bool, + /// Run migrations at most once per `epochs_per_run`. + /// + /// If set to 0, then run every finalization. + pub epochs_per_run: u64, +} + +impl Default for MigratorConfig { + fn default() -> Self { + Self { + blocking: false, + epochs_per_run: DEFAULT_EPOCHS_PER_RUN, + } + } +} + +pub struct PrevMigration { + epoch: Option, + epochs_per_run: u64, } impl MigratorConfig { @@ -95,6 +119,18 @@ pub struct FinalizationNotification { genesis_block_root: Hash256, } +impl Notification { + pub fn epoch(&self) -> Option { + match self { + Notification::Finalization(FinalizationNotification { + finalized_checkpoint, + .. + }) => Some(finalized_checkpoint.epoch), + Notification::Reconstruction => None, + } + } +} + impl, Cold: ItemStore> BackgroundMigrator { /// Create a new `BackgroundMigrator` and spawn its thread if necessary. pub fn new( @@ -103,14 +139,23 @@ impl, Cold: ItemStore> BackgroundMigrator Self { + let prev_migration = Arc::new(Mutex::new(PrevMigration { + epoch: None, + epochs_per_run: config.epochs_per_run, + })); let tx_thread = if config.blocking { None } else { - Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone()))) + Some(Mutex::new(Self::spawn_thread( + db.clone(), + prev_migration.clone(), + log.clone(), + ))) }; Self { db, tx_thread, + prev_migration, genesis_block_root, log, } @@ -173,7 +218,11 @@ impl, Cold: ItemStore> BackgroundMigrator, Cold: ItemStore> BackgroundMigrator>, + prev_migration: Arc>, log: Logger, ) -> (mpsc::Sender, thread::JoinHandle<()>) { let (tx, rx) = mpsc::channel(); @@ -328,6 +378,29 @@ impl, Cold: ItemStore> BackgroundMigrator prev_epoch, + "new_finalized_epoch" => epoch, + "epochs_per_run" => prev_migration.epochs_per_run, + ); + continue; + } + } + + // We intend to run at this epoch, update the in-memory record of the last epoch + // at which we ran. This value isn't tracked on disk so we will always migrate + // on the first finalization after startup. + prev_migration.epoch = Some(epoch); + } + match notif { Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log), Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log), diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 752ba3b7bc..89372aa91b 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -170,6 +170,7 @@ where .task_executor(context.executor.clone()) .custom_spec(spec.clone()) .chain_config(chain_config) + .store_migrator_config(config.store_migrator.clone()) .graffiti(graffiti) .event_handler(event_handler) .execution_layer(execution_layer) diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index a5d5b37c7a..892d55dbea 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -1,3 +1,4 @@ +use beacon_chain::migrate::MigratorConfig; use directory::DEFAULT_ROOT_DIR; use network::NetworkConfig; use sensitive_url::SensitiveUrl; @@ -64,6 +65,7 @@ pub struct Config { /// via the CLI at runtime, instead of from a configuration file saved to disk. pub genesis: ClientGenesis, pub store: store::StoreConfig, + pub store_migrator: MigratorConfig, pub network: network::NetworkConfig, pub chain: beacon_chain::ChainConfig, pub eth1: eth1::Config, @@ -83,6 +85,7 @@ impl Default for Config { log_file: PathBuf::from(""), genesis: <_>::default(), store: <_>::default(), + store_migrator: <_>::default(), network: NetworkConfig::default(), chain: <_>::default(), dummy_eth1_backend: false, diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 5399da258a..187ff74729 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -539,6 +539,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) + .arg( + Arg::with_name("db-migration-period") + .long("db-migration-period") + .value_name("EPOCHS") + .help("Specifies the number of epochs to wait between applying each finalization \ + migration to the database. Applying migrations less frequently can lead to \ + less total disk writes.") + .default_value("4") + .takes_value(true) + ) /* * Misc. diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4cd34aec7a..6e63aa22f6 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -366,6 +366,11 @@ pub fn get_config( client_config.store.prune_payloads = prune_payloads; } + if let Some(epochs_per_migration) = clap_utils::parse_optional(cli_args, "db-migration-period")? + { + client_config.store_migrator.epochs_per_run = epochs_per_migration; + } + /* * Zero-ports * diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 661bbcdb0c..3b143a704a 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1252,6 +1252,24 @@ fn no_reconstruct_historic_states_flag() { .run_with_zero_port() .with_config(|config| assert!(!config.chain.reconstruct_historic_states)); } +#[test] +fn db_migration_period_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.store_migrator.epochs_per_run, + beacon_node::beacon_chain::migrate::DEFAULT_EPOCHS_PER_RUN + ) + }); +} +#[test] +fn db_migration_period_override() { + CommandLineTest::new() + .flag("db-migration-period", Some("128")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.store_migrator.epochs_per_run, 128)); +} // Tests for Slasher flags. #[test]