Reduce finalization migration frequency

This commit is contained in:
Michael Sproul
2022-09-30 11:59:56 +10:00
parent a6318732cf
commit aa253ddd8f
6 changed files with 113 additions and 3 deletions

View File

@@ -3,6 +3,7 @@ use crate::errors::BeaconChainError;
use crate::head_tracker::{HeadTracker, SszHeadTracker}; use crate::head_tracker::{HeadTracker, SszHeadTracker};
use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT}; use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_BLOCK_ROOT};
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{debug, error, info, trace, warn, Logger}; use slog::{debug, error, info, trace, warn, Logger};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::mem; use std::mem;
@@ -25,20 +26,43 @@ const MIN_COMPACTION_PERIOD_SECONDS: u64 = 7200;
/// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`. /// Compact after a large finality gap, if we respect `MIN_COMPACTION_PERIOD_SECONDS`.
const COMPACTION_FINALITY_DISTANCE: u64 = 1024; const COMPACTION_FINALITY_DISTANCE: u64 = 1024;
/// Default number of epochs to wait between finalization migrations.
pub const DEFAULT_EPOCHS_PER_RUN: u64 = 4;
/// The background migrator runs a thread to perform pruning and migrate state from the hot /// The background migrator runs a thread to perform pruning and migrate state from the hot
/// to the cold database. /// to the cold database.
pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> { pub struct BackgroundMigrator<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
db: Arc<HotColdDB<E, Hot, Cold>>, db: Arc<HotColdDB<E, Hot, Cold>>,
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>, tx_thread: Option<Mutex<(mpsc::Sender<Notification>, thread::JoinHandle<()>)>>,
/// Record of when the last migration ran, for enforcing `epochs_per_run`.
prev_migration: Arc<Mutex<PrevMigration>>,
/// Genesis block root, for persisting the `PersistedBeaconChain`. /// Genesis block root, for persisting the `PersistedBeaconChain`.
genesis_block_root: Hash256, genesis_block_root: Hash256,
log: Logger, log: Logger,
} }
#[derive(Debug, Default, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct MigratorConfig { pub struct MigratorConfig {
pub blocking: bool, pub blocking: bool,
/// Run migrations at most once per `epochs_per_run`.
///
/// If set to 0, then run every finalization.
pub epochs_per_run: u64,
}
impl Default for MigratorConfig {
fn default() -> Self {
Self {
blocking: false,
epochs_per_run: DEFAULT_EPOCHS_PER_RUN,
}
}
}
pub struct PrevMigration {
epoch: Option<Epoch>,
epochs_per_run: u64,
} }
impl MigratorConfig { impl MigratorConfig {
@@ -95,6 +119,18 @@ pub struct FinalizationNotification {
genesis_block_root: Hash256, genesis_block_root: Hash256,
} }
impl Notification {
pub fn epoch(&self) -> Option<Epoch> {
match self {
Notification::Finalization(FinalizationNotification {
finalized_checkpoint,
..
}) => Some(finalized_checkpoint.epoch),
Notification::Reconstruction => None,
}
}
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> { impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Hot, Cold> {
/// Create a new `BackgroundMigrator` and spawn its thread if necessary. /// Create a new `BackgroundMigrator` and spawn its thread if necessary.
pub fn new( pub fn new(
@@ -103,14 +139,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
genesis_block_root: Hash256, genesis_block_root: Hash256,
log: Logger, log: Logger,
) -> Self { ) -> Self {
let prev_migration = Arc::new(Mutex::new(PrevMigration {
epoch: None,
epochs_per_run: config.epochs_per_run,
}));
let tx_thread = if config.blocking { let tx_thread = if config.blocking {
None None
} else { } else {
Some(Mutex::new(Self::spawn_thread(db.clone(), log.clone()))) Some(Mutex::new(Self::spawn_thread(
db.clone(),
prev_migration.clone(),
log.clone(),
)))
}; };
Self { Self {
db, db,
tx_thread, tx_thread,
prev_migration,
genesis_block_root, genesis_block_root,
log, log,
} }
@@ -173,7 +218,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Restart the background thread if it has crashed. // Restart the background thread if it has crashed.
if let Err(tx_err) = tx.send(notif) { if let Err(tx_err) = tx.send(notif) {
let (new_tx, new_thread) = Self::spawn_thread(self.db.clone(), self.log.clone()); let (new_tx, new_thread) = Self::spawn_thread(
self.db.clone(),
self.prev_migration.clone(),
self.log.clone(),
);
*tx = new_tx; *tx = new_tx;
let old_thread = mem::replace(thread, new_thread); let old_thread = mem::replace(thread, new_thread);
@@ -303,6 +352,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
/// Return a channel handle for sending requests to the thread. /// Return a channel handle for sending requests to the thread.
fn spawn_thread( fn spawn_thread(
db: Arc<HotColdDB<E, Hot, Cold>>, db: Arc<HotColdDB<E, Hot, Cold>>,
prev_migration: Arc<Mutex<PrevMigration>>,
log: Logger, log: Logger,
) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) { ) -> (mpsc::Sender<Notification>, thread::JoinHandle<()>) {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@@ -328,6 +378,29 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
} }
}); });
// Do not run too frequently.
if let Some(epoch) = notif.epoch() {
let mut prev_migration = prev_migration.lock();
if let Some(prev_epoch) = prev_migration.epoch {
if epoch < prev_epoch + prev_migration.epochs_per_run {
debug!(
log,
"Database consolidation deferred";
"last_finalized_epoch" => prev_epoch,
"new_finalized_epoch" => epoch,
"epochs_per_run" => prev_migration.epochs_per_run,
);
continue;
}
}
// We intend to run at this epoch, update the in-memory record of the last epoch
// at which we ran. This value isn't tracked on disk so we will always migrate
// on the first finalization after startup.
prev_migration.epoch = Some(epoch);
}
match notif { match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log), Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log), Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),

View File

@@ -170,6 +170,7 @@ where
.task_executor(context.executor.clone()) .task_executor(context.executor.clone())
.custom_spec(spec.clone()) .custom_spec(spec.clone())
.chain_config(chain_config) .chain_config(chain_config)
.store_migrator_config(config.store_migrator.clone())
.graffiti(graffiti) .graffiti(graffiti)
.event_handler(event_handler) .event_handler(event_handler)
.execution_layer(execution_layer) .execution_layer(execution_layer)

View File

@@ -1,3 +1,4 @@
use beacon_chain::migrate::MigratorConfig;
use directory::DEFAULT_ROOT_DIR; use directory::DEFAULT_ROOT_DIR;
use network::NetworkConfig; use network::NetworkConfig;
use sensitive_url::SensitiveUrl; use sensitive_url::SensitiveUrl;
@@ -64,6 +65,7 @@ pub struct Config {
/// via the CLI at runtime, instead of from a configuration file saved to disk. /// via the CLI at runtime, instead of from a configuration file saved to disk.
pub genesis: ClientGenesis, pub genesis: ClientGenesis,
pub store: store::StoreConfig, pub store: store::StoreConfig,
pub store_migrator: MigratorConfig,
pub network: network::NetworkConfig, pub network: network::NetworkConfig,
pub chain: beacon_chain::ChainConfig, pub chain: beacon_chain::ChainConfig,
pub eth1: eth1::Config, pub eth1: eth1::Config,
@@ -83,6 +85,7 @@ impl Default for Config {
log_file: PathBuf::from(""), log_file: PathBuf::from(""),
genesis: <_>::default(), genesis: <_>::default(),
store: <_>::default(), store: <_>::default(),
store_migrator: <_>::default(),
network: NetworkConfig::default(), network: NetworkConfig::default(),
chain: <_>::default(), chain: <_>::default(),
dummy_eth1_backend: false, dummy_eth1_backend: false,

View File

@@ -539,6 +539,16 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true) .takes_value(true)
.default_value("true") .default_value("true")
) )
.arg(
Arg::with_name("db-migration-period")
.long("db-migration-period")
.value_name("EPOCHS")
.help("Specifies the number of epochs to wait between applying each finalization \
migration to the database. Applying migrations less frequently can lead to \
less total disk writes.")
.default_value("4")
.takes_value(true)
)
/* /*
* Misc. * Misc.

View File

@@ -366,6 +366,11 @@ pub fn get_config<E: EthSpec>(
client_config.store.prune_payloads = prune_payloads; client_config.store.prune_payloads = prune_payloads;
} }
if let Some(epochs_per_migration) = clap_utils::parse_optional(cli_args, "db-migration-period")?
{
client_config.store_migrator.epochs_per_run = epochs_per_migration;
}
/* /*
* Zero-ports * Zero-ports
* *

View File

@@ -1252,6 +1252,24 @@ fn no_reconstruct_historic_states_flag() {
.run_with_zero_port() .run_with_zero_port()
.with_config(|config| assert!(!config.chain.reconstruct_historic_states)); .with_config(|config| assert!(!config.chain.reconstruct_historic_states));
} }
#[test]
fn db_migration_period_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| {
assert_eq!(
config.store_migrator.epochs_per_run,
beacon_node::beacon_chain::migrate::DEFAULT_EPOCHS_PER_RUN
)
});
}
#[test]
fn db_migration_period_override() {
CommandLineTest::new()
.flag("db-migration-period", Some("128"))
.run_with_zero_port()
.with_config(|config| assert_eq!(config.store_migrator.epochs_per_run, 128));
}
// Tests for Slasher flags. // Tests for Slasher flags.
#[test] #[test]