diff --git a/Cargo.toml b/Cargo.toml index 3fb5ae8ad2..46893a7e78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ members = [ "beacon_node/store", "beacon_node/timer", - "boot_node", + "boot_node", "common/account_utils", "common/clap_utils", @@ -45,6 +45,8 @@ members = [ "common/fallback", "common/monitoring_api", + "database_manager", + "consensus/cached_tree_hash", "consensus/int_to_bytes", "consensus/fork_choice", diff --git a/beacon_node/client/src/config.rs b/beacon_node/client/src/config.rs index 9768962260..fc29198405 100644 --- a/beacon_node/client/src/config.rs +++ b/beacon_node/client/src/config.rs @@ -171,7 +171,7 @@ impl Config { /// For more information, see: /// /// https://github.com/sigp/lighthouse/pull/2843 - fn get_data_dir(&self) -> PathBuf { + pub fn get_data_dir(&self) -> PathBuf { let existing_legacy_dir = self.get_existing_legacy_data_dir(); if let Some(legacy_dir) = existing_legacy_dir { diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 12055f7b4d..52a1919114 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -255,16 +255,7 @@ pub fn get_config( client_config.freezer_db_path = Some(PathBuf::from(freezer_dir)); } - if let Some(slots_per_restore_point) = cli_args.value_of("slots-per-restore-point") { - client_config.store.slots_per_restore_point = slots_per_restore_point - .parse() - .map_err(|_| "slots-per-restore-point is not a valid integer".to_string())?; - } else { - client_config.store.slots_per_restore_point = std::cmp::min( - E::slots_per_historical_root() as u64, - store::config::DEFAULT_SLOTS_PER_RESTORE_POINT, - ); - } + client_config.store.slots_per_restore_point = get_slots_per_restore_point::(cli_args)?; if let Some(block_cache_size) = clap_utils::parse_optional(cli_args, "block-cache-size")? { client_config.store.block_cache_size = block_cache_size; @@ -791,3 +782,17 @@ pub fn get_data_dir(cli_args: &ArgMatches) -> PathBuf { }) .unwrap_or_else(|| PathBuf::from(".")) } + +/// Get the `slots_per_restore_point` value to use for the database. +pub fn get_slots_per_restore_point(cli_args: &ArgMatches) -> Result { + if let Some(slots_per_restore_point) = + clap_utils::parse_optional(cli_args, "slots-per-restore-point")? + { + Ok(slots_per_restore_point) + } else { + Ok(std::cmp::min( + E::slots_per_historical_root() as u64, + store::config::DEFAULT_SLOTS_PER_RESTORE_POINT, + )) + } +} diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 773a0d2eb1..690271022a 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -13,7 +13,7 @@ use beacon_chain::{ use clap::ArgMatches; pub use cli::cli_app; pub use client::{Client, ClientBuilder, ClientConfig, ClientGenesis}; -pub use config::{get_config, get_data_dir, set_network_config}; +pub use config::{get_config, get_data_dir, get_slots_per_restore_point, set_network_config}; use environment::RuntimeContext; pub use eth2_config::Eth2Config; use slasher::Slasher; diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index adb42fc2a4..a9c4365747 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -29,6 +29,7 @@ directory = { path = "../../common/directory" } tree_hash = "0.4.0" take-until = "0.1.0" zstd = "0.10.0" +strum = { version = "0.24", features = ["derive"] } [features] milhouse = ["state_processing/milhouse"] diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 960a4624d4..69dfd9b105 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -41,6 +41,7 @@ pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; +use strum::{EnumString, IntoStaticStr}; pub use types::*; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -157,60 +158,52 @@ pub enum StoreOp<'a, E: EthSpec> { } /// A unique column identifier. -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, IntoStaticStr, EnumString)] pub enum DBColumn { /// For data related to the database itself. + #[strum(serialize = "bma")] BeaconMeta, + #[strum(serialize = "blk")] BeaconBlock, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). + #[strum(serialize = "ste")] BeaconState, /// For compact `BeaconStateDiff`s. + #[strum(serialize = "bsd")] BeaconStateDiff, /// For the mapping from state roots to their slots or summaries. + #[strum(serialize = "bss")] BeaconStateSummary, /// For the list of temporary states stored during block import, /// and then made non-temporary by the deletion of their state root from this column. + #[strum(serialize = "bst")] BeaconStateTemporary, /// For persisting in-memory state to the database. + #[strum(serialize = "bch")] BeaconChain, + #[strum(serialize = "opo")] OpPool, + #[strum(serialize = "etc")] Eth1Cache, + #[strum(serialize = "frk")] ForkChoice, + #[strum(serialize = "pkc")] PubkeyCache, /// For the table mapping restore point numbers to state roots. + #[strum(serialize = "brp")] BeaconRestorePoint, + #[strum(serialize = "bbr")] BeaconBlockRoots, + #[strum(serialize = "bsr")] BeaconStateRoots, + #[strum(serialize = "bhr")] BeaconHistoricalRoots, + #[strum(serialize = "brm")] BeaconRandaoMixes, + #[strum(serialize = "dht")] DhtEnrs, } -impl Into<&'static str> for DBColumn { - /// Returns a `&str` prefix to be added to keys before they hit the key-value database. - fn into(self) -> &'static str { - match self { - DBColumn::BeaconMeta => "bma", - DBColumn::BeaconBlock => "blk", - DBColumn::BeaconState => "ste", - DBColumn::BeaconStateDiff => "bsd", - DBColumn::BeaconStateSummary => "bss", - DBColumn::BeaconStateTemporary => "bst", - DBColumn::BeaconChain => "bch", - DBColumn::OpPool => "opo", - DBColumn::Eth1Cache => "etc", - DBColumn::ForkChoice => "frk", - DBColumn::PubkeyCache => "pkc", - DBColumn::BeaconRestorePoint => "brp", - DBColumn::BeaconBlockRoots => "bbr", - DBColumn::BeaconStateRoots => "bsr", - DBColumn::BeaconHistoricalRoots => "bhr", - DBColumn::BeaconRandaoMixes => "brm", - DBColumn::DhtEnrs => "dht", - } - } -} - impl DBColumn { pub fn as_str(self) -> &'static str { self.into() diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml new file mode 100644 index 0000000000..436f9b1e19 --- /dev/null +++ b/database_manager/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "database_manager" +version = "0.1.0" +edition = "2021" + +[dependencies] +beacon_chain = { path = "../beacon_node/beacon_chain" } +beacon_node = { path = "../beacon_node" } +clap = "2.33.3" +clap_utils = { path = "../common/clap_utils" } +environment = { path = "../lighthouse/environment" } +logging = { path = "../common/logging" } +sloggers = "2.0.2" +store = { path = "../beacon_node/store" } +tempfile = "3.1.0" +types = { path = "../consensus/types" } +slog = "2.5.2" +strum = { version = "0.24", features = ["derive"] } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs new file mode 100644 index 0000000000..4e87a8bf55 --- /dev/null +++ b/database_manager/src/lib.rs @@ -0,0 +1,269 @@ +use beacon_chain::{ + builder::Witness, eth1_chain::CachingEth1Backend, schema_change::migrate_schema, + slot_clock::SystemTimeSlotClock, +}; +use beacon_node::{get_data_dir, get_slots_per_restore_point, ClientConfig}; +use clap::{App, Arg, ArgMatches}; +use environment::{Environment, RuntimeContext}; +use slog::{info, Logger}; +use store::{ + errors::Error, + metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, + DBColumn, HotColdDB, KeyValueStore, LevelDB, +}; +use strum::{EnumString, EnumVariantNames, VariantNames}; +use types::EthSpec; + +pub const CMD: &str = "database_manager"; + +pub fn version_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("version") + .visible_aliases(&["v"]) + .setting(clap::AppSettings::ColoredHelp) + .about("Display database schema version") +} + +pub fn migrate_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("migrate") + .setting(clap::AppSettings::ColoredHelp) + .about("Migrate the database to a specific schema version") + .arg( + Arg::with_name("to") + .long("to") + .value_name("VERSION") + .help("Schema version to migrate to") + .takes_value(true) + .required(true), + ) +} + +pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { + App::new("inspect") + .setting(clap::AppSettings::ColoredHelp) + .about("Inspect raw database values") + .arg( + Arg::with_name("column") + .long("column") + .value_name("TAG") + .help("3-byte column ID (see `DBColumn`)") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("output") + .long("output") + .value_name("TARGET") + .help("Select the type of output to show") + .default_value("sizes") + .possible_values(InspectTarget::VARIANTS), + ) +} + +pub fn cli_app<'a, 'b>() -> App<'a, 'b> { + App::new(CMD) + .visible_aliases(&["db"]) + .setting(clap::AppSettings::ColoredHelp) + .about("") + .arg( + Arg::with_name("slots-per-restore-point") + .long("slots-per-restore-point") + .value_name("SLOT_COUNT") + .help( + "Specifies how often a freezer DB restore point should be stored. \ + Cannot be changed after initialization. \ + [default: 2048 (mainnet) or 64 (minimal)]", + ) + .takes_value(true), + ) + .arg( + Arg::with_name("freezer-dir") + .long("freezer-dir") + .value_name("DIR") + .help("Data directory for the freezer database.") + .takes_value(true), + ) + .subcommand(migrate_cli_app()) + .subcommand(version_cli_app()) + .subcommand(inspect_cli_app()) +} + +fn parse_client_config( + cli_args: &ArgMatches, + _env: &Environment, +) -> Result { + let mut client_config = ClientConfig::default(); + + client_config.data_dir = get_data_dir(cli_args); + + if let Some(freezer_dir) = clap_utils::parse_optional(cli_args, "freezer-dir")? { + client_config.freezer_db_path = Some(freezer_dir); + } + + client_config.store.slots_per_restore_point = get_slots_per_restore_point::(cli_args)?; + + Ok(client_config) +} + +pub fn display_db_version( + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let mut version = CURRENT_SCHEMA_VERSION; + HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, from, _| { + version = from; + Ok(()) + }, + client_config.store, + spec, + log, + )?; + + println!("Database version: {}", version.as_u64()); + + Ok(()) +} + +#[derive(Debug, EnumString, EnumVariantNames)] +pub enum InspectTarget { + #[strum(serialize = "sizes")] + ValueSizes, + #[strum(serialize = "total")] + ValueTotal, +} + +pub struct InspectConfig { + column: DBColumn, + target: InspectTarget, +} + +fn parse_inspect_config(cli_args: &ArgMatches) -> Result { + let column = clap_utils::parse_required(cli_args, "column")?; + let target = clap_utils::parse_required(cli_args, "output")?; + + Ok(InspectConfig { column, target }) +} + +pub fn inspect_db( + inspect_config: InspectConfig, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, _, _| Ok(()), + client_config.store, + spec, + log, + )?; + + let mut total = 0; + + for res in db.hot_db.iter_column(inspect_config.column) { + let (key, value) = res?; + + match inspect_config.target { + InspectTarget::ValueSizes => { + println!("{:?}: {} bytes", key, value.len()); + total += value.len(); + } + InspectTarget::ValueTotal => { + total += value.len(); + } + } + } + + match inspect_config.target { + InspectTarget::ValueSizes | InspectTarget::ValueTotal => { + println!("Total: {} bytes", total); + } + } + + Ok(()) +} + +pub struct MigrateConfig { + to: SchemaVersion, +} + +fn parse_migrate_config(cli_args: &ArgMatches) -> Result { + let to = SchemaVersion(clap_utils::parse_required(cli_args, "to")?); + + Ok(MigrateConfig { to }) +} + +pub fn migrate_db( + migrate_config: MigrateConfig, + client_config: ClientConfig, + runtime_context: &RuntimeContext, + log: Logger, +) -> Result<(), Error> { + let spec = runtime_context.eth2_config.spec.clone(); + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + + let mut from = CURRENT_SCHEMA_VERSION; + let to = migrate_config.to; + let db = HotColdDB::, LevelDB>::open( + &hot_path, + &cold_path, + |_, db_initial_version, _| { + from = db_initial_version; + Ok(()) + }, + client_config.store.clone(), + spec, + log.clone(), + )?; + + info!( + log, + "Migrating database schema"; + "from" => from.as_u64(), + "to" => to.as_u64(), + ); + + migrate_schema::, _, _, _>>( + db, + &client_config.get_data_dir(), + from, + to, + log, + ) +} + +/// Run the database manager, returning an error string if the operation did not succeed. +pub fn run(cli_args: &ArgMatches<'_>, mut env: Environment) -> Result<(), String> { + let client_config = parse_client_config(cli_args, &env)?; + let context = env.core_context(); + let log = context.log().clone(); + + match cli_args.subcommand() { + ("version", Some(_)) => display_db_version(client_config, &context, log), + ("migrate", Some(cli_args)) => { + let migrate_config = parse_migrate_config(cli_args)?; + migrate_db(migrate_config, client_config, &context, log) + } + ("inspect", Some(cli_args)) => { + let inspect_config = parse_inspect_config(cli_args)?; + inspect_db(inspect_config, client_config, &context, log) + } + _ => { + return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into()) + } + } + .map_err(|e| format!("Fatal error: {:?}", e)) +} diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index f30c28f301..fd11823c0e 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -49,6 +49,7 @@ malloc_utils = { path = "../common/malloc_utils" } directory = { path = "../common/directory" } unused_port = { path = "../common/unused_port" } store = { path = "../beacon_node/store" } +database_manager = { path = "../database_manager" } [dev-dependencies] tempfile = "3.1.0" diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 2f04b95ca4..d1c6774b20 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -255,6 +255,7 @@ fn main() { .subcommand(boot_node::cli_app()) .subcommand(validator_client::cli_app()) .subcommand(account_manager::cli_app()) + .subcommand(database_manager::cli_app()) .get_matches(); // Configure the allocator early in the process, before it has the chance to use the default values for @@ -465,7 +466,16 @@ fn run( // Exit as soon as account manager returns control. return Ok(()); - }; + } + + if let Some(sub_matches) = matches.subcommand_matches("database_manager") { + eprintln!("Running database manager for {} network", network_name); + // Pass the entire `environment` to the database manager so it can run blocking operations. + database_manager::run(sub_matches, environment)?; + + // Exit as soon as database manager returns control. + return Ok(()); + } info!(log, "Lighthouse started"; "version" => VERSION); info!(