diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index f26eadc398..3be8097ddf 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -102,6 +102,16 @@ jobs: run: rustup update stable - name: Run operation_pool tests for all known forks run: make test-op-pool + slasher-tests: + name: slasher-tests + runs-on: ubuntu-latest + needs: cargo-fmt + steps: + - uses: actions/checkout@v1 + - name: Get latest version of stable Rust + run: rustup update stable + - name: Run slasher tests for all supported backends + run: make test-slasher debug-tests-ubuntu: name: debug-tests-ubuntu runs-on: ubuntu-22.04 diff --git a/Cargo.lock b/Cargo.lock index a6b5f56374..a406df149f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -462,6 +462,7 @@ dependencies = [ "slasher", "slog", "store", + "strum", "task_executor", "types", "unused_port", @@ -3204,8 +3205,7 @@ checksum = "33a33a362ce288760ec6a508b94caaec573ae7d3bbbd91b87aa0bad4456839db" [[package]] name = "libmdbx" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "002d7890ec770d222903165b6ba279b0fa3dba8e82610820833184066b006ce0" +source = "git+https://github.com/sigp/libmdbx-rs?tag=v0.1.4#096da80a83d14343f8df833006483f48075cd135" dependencies = [ "bitflags", "byteorder", @@ -3629,6 +3629,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "slasher", "slashing_protection", "slog", "sloggers", @@ -3712,6 +3713,27 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" +[[package]] +name = "lmdb-rkv" +version = "0.14.0" +source = "git+https://github.com/sigp/lmdb-rs?rev=f33845c6469b94265319aac0ed5085597862c27e#f33845c6469b94265319aac0ed5085597862c27e" +dependencies = [ + "bitflags", + "byteorder", + "libc", + "lmdb-rkv-sys", +] + +[[package]] +name = "lmdb-rkv-sys" +version = "0.11.2" +source = "git+https://github.com/sigp/lmdb-rs?rev=f33845c6469b94265319aac0ed5085597862c27e#f33845c6469b94265319aac0ed5085597862c27e" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "lock_api" version = "0.4.7" @@ -3830,8 +3852,7 @@ checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" [[package]] name = "mdbx-sys" version = "0.11.6-4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dde320ea35df4678486346065386943ed6c5920f2ab445dff8dd5d9c8cd04ad" +source = "git+https://github.com/sigp/libmdbx-rs?tag=v0.1.4#096da80a83d14343f8df833006483f48075cd135" dependencies = [ "bindgen", "cc", @@ -5965,6 +5986,8 @@ dependencies = [ "lazy_static", "libmdbx", "lighthouse_metrics", + "lmdb-rkv", + "lmdb-rkv-sys", "logging", "lru", "maplit", @@ -5976,6 +5999,7 @@ dependencies = [ "serde_derive", "slog", "sloggers", + "strum", "tempfile", "tree_hash", "tree_hash_derive", diff --git a/Makefile b/Makefile index df90ba3e76..6b5c6b3e5d 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,9 @@ BUILD_PATH_AARCH64 = "target/$(AARCH64_TAG)/release" PINNED_NIGHTLY ?= nightly CLIPPY_PINNED_NIGHTLY=nightly-2022-05-19 +# List of features to use when cross-compiling. Can be overridden via the environment. +CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx + # List of all hard forks. This list is used to set env variables for several tests so that # they run for different forks. FORKS=phase0 altair merge @@ -42,13 +45,13 @@ install-lcli: # optimized CPU functions that may not be available on some systems. This # results in a more portable binary with ~20% slower BLS verification. build-x86_64: - cross build --release --bin lighthouse --target x86_64-unknown-linux-gnu --features modern,gnosis + cross build --release --bin lighthouse --target x86_64-unknown-linux-gnu --features "modern,$(CROSS_FEATURES)" build-x86_64-portable: - cross build --release --bin lighthouse --target x86_64-unknown-linux-gnu --features portable,gnosis + cross build --release --bin lighthouse --target x86_64-unknown-linux-gnu --features "portable,$(CROSS_FEATURES)" build-aarch64: - cross build --release --bin lighthouse --target aarch64-unknown-linux-gnu --features gnosis + cross build --release --bin lighthouse --target aarch64-unknown-linux-gnu --features "$(CROSS_FEATURES)" build-aarch64-portable: - cross build --release --bin lighthouse --target aarch64-unknown-linux-gnu --features portable,gnosis + cross build --release --bin lighthouse --target aarch64-unknown-linux-gnu --features "portable,$(CROSS_FEATURES)" # Create a `.tar.gz` containing a binary for a specific target. define tarball_release_binary @@ -77,7 +80,7 @@ build-release-tarballs: # Runs the full workspace tests in **release**, without downloading any additional # test vectors. test-release: - cargo test --workspace --release --exclude ef_tests --exclude beacon_chain + cargo test --workspace --release --exclude ef_tests --exclude beacon_chain --exclude slasher # Runs the full workspace tests in **debug**, without downloading any additional test # vectors. @@ -118,6 +121,11 @@ test-op-pool-%: --features 'beacon_chain/fork_from_env'\ -p operation_pool +# Run the tests in the `slasher` crate for all supported database backends. +test-slasher: + cargo test --release -p slasher --features mdbx + cargo test --release -p slasher --no-default-features --features lmdb + # Runs only the tests/state_transition_vectors tests. run-state-transition-tests: make -C $(STATE_TRANSITION_VECTORS) test diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 9c6385e8ed..417acf3d9e 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -41,3 +41,4 @@ monitoring_api = { path = "../common/monitoring_api" } sensitive_url = { path = "../common/sensitive_url" } http_api = { path = "http_api" } unused_port = { path = "../common/unused_port" } +strum = "0.24.1" diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index edf79ad34f..7a91530252 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1,4 +1,5 @@ use clap::{App, Arg}; +use strum::VariantNames; pub fn cli_app<'a, 'b>() -> App<'a, 'b> { App::new("beacon_node") @@ -628,6 +629,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { [disabled by default].") .requires("slasher") ) + .arg( + Arg::with_name("slasher-backend") + .long("slasher-backend") + .help("Set the database backend to be used by the slasher.") + .takes_value(true) + .possible_values(slasher::DatabaseBackend::VARIANTS) + .requires("slasher") + ) .arg( Arg::with_name("wss-checkpoint") .long("wss-checkpoint") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 35d566d76e..e885275b04 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -591,6 +591,10 @@ pub fn get_config( slasher_config.broadcast = cli_args.is_present("slasher-broadcast"); + if let Some(backend) = clap_utils::parse_optional(cli_args, "slasher-backend")? { + slasher_config.backend = backend; + } + client_config.slasher = Some(slasher_config); } diff --git a/book/src/cross-compiling.md b/book/src/cross-compiling.md index 9b458078e2..8ccf23da9d 100644 --- a/book/src/cross-compiling.md +++ b/book/src/cross-compiling.md @@ -38,3 +38,9 @@ make build-aarch64 The `lighthouse` binary will be compiled inside a Docker container and placed in `lighthouse/target/aarch64-unknown-linux-gnu/release`. + +## Feature Flags + +When using the makefile the set of features used for building can be controlled with +the environment variable `CROSS_FEATURES`. See [Feature + Flags](./installation-source.md#feature-flags) for available features. diff --git a/book/src/installation-source.md b/book/src/installation-source.md index fc1ac4c092..1f8477260f 100644 --- a/book/src/installation-source.md +++ b/book/src/installation-source.md @@ -107,6 +107,23 @@ git checkout ${VERSION} make ``` +## Feature Flags + +You can customise the features that Lighthouse is built with using the `FEATURES` environment +variable. E.g. + +``` +env FEATURES="gnosis,slasher-lmdb" make +``` + +Commonly used features include: + +* `gnosis`: support for the Gnosis Beacon Chain. +* `portable`: support for legacy hardware. +* `modern`: support for exclusively modern hardware. +* `slasher-mdbx`: support for the MDBX slasher backend (enabled by default). +* `slasher-lmdb`: support for the LMDB slasher backend. + ## Troubleshooting ### Command is not found diff --git a/book/src/slasher.md b/book/src/slasher.md index 889f9c6cbc..61dc4b327f 100644 --- a/book/src/slasher.md +++ b/book/src/slasher.md @@ -43,6 +43,34 @@ By default the slasher stores data in the `slasher_db` directory inside the beac e.g. `~/.lighthouse/{network}/beacon/slasher_db`. You can use this flag to change that storage directory. +### Database Backend + +* Flag: `--slasher-backend NAME` +* Argument: one of `mdbx`, `lmdb` or `disabled` +* Default: `mdbx` + +Since Lighthouse v2.6.0 it is possible to use one of several database backends with the slasher: + +- MDBX (default) +- LMDB + +The advantage of MDBX is that it performs compaction, resulting in less disk usage over time. The +disadvantage is that upstream MDBX has removed support for Windows and macOS, so Lighthouse is stuck +on an older version. If bugs are found in our pinned version of MDBX it may be deprecated in future. + +LMDB does not have compaction but is more stable upstream than MDBX. It is not currently recommended +to use the LMDB backend on Windows. + +More backends may be added in future. + +### Switching Backends + +If you change database backends and want to reclaim the space used by the old backend you can +delete the following files from your `slasher_db` directory: + +* removing MDBX: delete `mdbx.dat` and `mdbx.lck` +* removing LMDB: delete `data.mdb` and `lock.mdb` + ### History Length * Flag: `--slasher-history-length EPOCHS` @@ -65,7 +93,7 @@ changed after initialization. * Argument: maximum size of the database in gigabytes * Default: 256 GB -The slasher uses MDBX as its backing store, which places a hard limit on the size of the database +Both database backends LMDB and MDBX place a hard limit on the size of the database file. You can use the `--slasher-max-db-size` flag to set this limit. It can be adjusted after initialization if the limit is reached. @@ -85,10 +113,6 @@ where `V` is the validator count and `N` is the history length. You should set the maximum size higher than the estimate to allow room for growth in the validator count. -> NOTE: In Lighthouse v2.1.0 the slasher database was switched from LMDB to MDBX. Unlike LMDB, MDBX -> does garbage collection of free pages and is capable of shrinking the database file and preventing -> it from growing indefinitely. - ### Update Period * Flag: `--slasher-update-period SECONDS` diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 7792ad074e..805b4eca26 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -7,6 +7,7 @@ autotests = false rust-version = "1.62" [features] +default = ["slasher-mdbx"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines. @@ -19,6 +20,10 @@ milagro = ["bls/milagro"] spec-minimal = [] # Support Gnosis spec and Gnosis Beacon Chain. gnosis = [] +# Support slasher MDBX backend. +slasher-mdbx = ["slasher/mdbx"] +# Support slasher LMDB backend. +slasher-lmdb = ["slasher/lmdb"] [dependencies] beacon_node = { "path" = "../beacon_node" } @@ -48,6 +53,7 @@ malloc_utils = { path = "../common/malloc_utils" } directory = { path = "../common/directory" } unused_port = { path = "../common/unused_port" } database_manager = { path = "../database_manager" } +slasher = { path = "../slasher" } [dev-dependencies] tempfile = "3.1.0" diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 9d952e5cc5..7fd4ad91cf 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1288,6 +1288,32 @@ fn slasher_broadcast_flag() { assert!(slasher_config.broadcast); }); } + +#[test] +fn slasher_backend_default() { + CommandLineTest::new() + .flag("slasher", None) + .run_with_zero_port() + .with_config(|config| { + let slasher_config = config.slasher.as_ref().unwrap(); + assert_eq!(slasher_config.backend, slasher::DatabaseBackend::Mdbx); + }); +} + +#[test] +fn slasher_backend_override_to_default() { + // Hard to test this flag because all but one backend is disabled by default and the backend + // called "disabled" results in a panic. + CommandLineTest::new() + .flag("slasher", None) + .flag("slasher-backend", Some("mdbx")) + .run_with_zero_port() + .with_config(|config| { + let slasher_config = config.slasher.as_ref().unwrap(); + assert_eq!(slasher_config.backend, slasher::DatabaseBackend::Mdbx); + }); +} + #[test] pub fn malloc_tuning_flag() { CommandLineTest::new() diff --git a/slasher/Cargo.toml b/slasher/Cargo.toml index 368350f11b..0f24fe9f04 100644 --- a/slasher/Cargo.toml +++ b/slasher/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" authors = ["Michael Sproul "] edition = "2021" +[features] +default = ["mdbx"] +mdbx = ["dep:mdbx"] +lmdb = ["lmdb-rkv", "lmdb-rkv-sys"] + [dependencies] bincode = "1.3.1" byteorder = "1.3.4" @@ -13,8 +18,6 @@ flate2 = { version = "1.0.14", features = ["zlib"], default-features = false } lazy_static = "1.4.0" lighthouse_metrics = { path = "../common/lighthouse_metrics" } filesystem = { path = "../common/filesystem" } -# MDBX is pinned at the last version with Windows and macOS support. This is only viable short-term. -mdbx = { package = "libmdbx", version = "=0.1.4" } lru = "0.7.1" parking_lot = "0.12.0" rand = "0.8.5" @@ -26,6 +29,12 @@ sloggers = { version = "2.1.1", features = ["json"] } tree_hash = "0.4.1" tree_hash_derive = "0.4.0" types = { path = "../consensus/types" } +strum = { version = "0.24.1", features = ["derive"] } + +# MDBX is pinned at the last version with Windows and macOS support. +mdbx = { package = "libmdbx", git = "https://github.com/sigp/libmdbx-rs", tag = "v0.1.4", optional = true } +lmdb-rkv = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true } +lmdb-rkv-sys = { git = "https://github.com/sigp/lmdb-rs", rev = "f33845c6469b94265319aac0ed5085597862c27e", optional = true } [dev-dependencies] maplit = "1.0.2" diff --git a/slasher/src/array.rs b/slasher/src/array.rs index d9f1fab819..d9cb8a4ec6 100644 --- a/slasher/src/array.rs +++ b/slasher/src/array.rs @@ -1,9 +1,11 @@ use crate::metrics::{self, SLASHER_COMPRESSION_RATIO, SLASHER_NUM_CHUNKS_UPDATED}; -use crate::RwTransaction; -use crate::{AttesterSlashingStatus, Config, Error, IndexedAttesterRecord, SlasherDB}; +use crate::{ + AttesterSlashingStatus, Config, Database, Error, IndexedAttesterRecord, RwTransaction, + SlasherDB, +}; use flate2::bufread::{ZlibDecoder, ZlibEncoder}; use serde_derive::{Deserialize, Serialize}; -use std::borrow::{Borrow, Cow}; +use std::borrow::Borrow; use std::collections::{btree_map::Entry, BTreeMap, HashSet}; use std::convert::TryFrom; use std::io::Read; @@ -147,10 +149,7 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn fn next_start_epoch(start_epoch: Epoch, config: &Config) -> Epoch; - fn select_db<'txn, E: EthSpec>( - db: &SlasherDB, - txn: &'txn RwTransaction<'txn>, - ) -> Result, Error>; + fn select_db(db: &SlasherDB) -> &Database; fn load( db: &SlasherDB, @@ -160,11 +159,10 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn config: &Config, ) -> Result, Error> { let disk_key = config.disk_key(validator_chunk_index, chunk_index); - let chunk_bytes: Cow<[u8]> = - match txn.get(&Self::select_db(db, txn)?, &disk_key.to_be_bytes())? { - Some(chunk_bytes) => chunk_bytes, - None => return Ok(None), - }; + let chunk_bytes = match txn.get(Self::select_db(db), &disk_key.to_be_bytes())? { + Some(chunk_bytes) => chunk_bytes, + None => return Ok(None), + }; let chunk = bincode::deserialize_from(ZlibDecoder::new(chunk_bytes.borrow()))?; @@ -189,10 +187,9 @@ pub trait TargetArrayChunk: Sized + serde::Serialize + serde::de::DeserializeOwn metrics::set_float_gauge(&SLASHER_COMPRESSION_RATIO, compression_ratio); txn.put( - &Self::select_db(db, txn)?, + Self::select_db(db), &disk_key.to_be_bytes(), &compressed_value, - SlasherDB::::write_flags(), )?; Ok(()) } @@ -296,11 +293,8 @@ impl TargetArrayChunk for MinTargetChunk { start_epoch / chunk_size * chunk_size - 1 } - fn select_db<'txn, E: EthSpec>( - db: &SlasherDB, - txn: &'txn RwTransaction<'txn>, - ) -> Result, Error> { - db.min_targets_db(txn) + fn select_db(db: &SlasherDB) -> &Database { + &db.databases.min_targets_db } } @@ -398,11 +392,8 @@ impl TargetArrayChunk for MaxTargetChunk { (start_epoch / chunk_size + 1) * chunk_size } - fn select_db<'txn, E: EthSpec>( - db: &SlasherDB, - txn: &'txn RwTransaction<'txn>, - ) -> Result, Error> { - db.max_targets_db(txn) + fn select_db(db: &SlasherDB) -> &Database { + &db.databases.max_targets_db } } diff --git a/slasher/src/config.rs b/slasher/src/config.rs index 81aa4b597d..e2a58a406a 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -1,6 +1,7 @@ use crate::Error; use serde_derive::{Deserialize, Serialize}; use std::path::PathBuf; +use strum::{Display, EnumString, EnumVariantNames}; use types::{Epoch, EthSpec, IndexedAttestation}; pub const DEFAULT_CHUNK_SIZE: usize = 16; @@ -12,8 +13,15 @@ pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: usize = 100_000; pub const DEFAULT_BROADCAST: bool = false; +#[cfg(feature = "mdbx")] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Mdbx; +#[cfg(all(feature = "lmdb", not(feature = "mdbx")))] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Lmdb; +#[cfg(not(any(feature = "mdbx", feature = "lmdb")))] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Disabled; + pub const MAX_HISTORY_LENGTH: usize = 1 << 16; -pub const MDBX_GROWTH_STEP: isize = 256 * (1 << 20); // 256 MiB +pub const MEGABYTE: usize = 1 << 20; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -32,6 +40,8 @@ pub struct Config { pub attestation_root_cache_size: usize, /// Whether to broadcast slashings found to the network. pub broadcast: bool, + /// Database backend to use. + pub backend: DatabaseBackend, } /// Immutable configuration parameters which are stored on disk and checked for consistency. @@ -42,6 +52,18 @@ pub struct DiskConfig { pub history_length: usize, } +#[derive( + Debug, Clone, Copy, PartialEq, Serialize, Deserialize, Display, EnumString, EnumVariantNames, +)] +#[strum(serialize_all = "lowercase")] +pub enum DatabaseBackend { + #[cfg(feature = "mdbx")] + Mdbx, + #[cfg(feature = "lmdb")] + Lmdb, + Disabled, +} + impl Config { pub fn new(database_path: PathBuf) -> Self { Self { @@ -54,6 +76,7 @@ impl Config { max_db_size_mbs: DEFAULT_MAX_DB_SIZE, attestation_root_cache_size: DEFAULT_ATTESTATION_ROOT_CACHE_SIZE, broadcast: DEFAULT_BROADCAST, + backend: DEFAULT_BACKEND, } } diff --git a/slasher/src/database.rs b/slasher/src/database.rs index 653eccfa72..c8046c80dc 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -1,19 +1,20 @@ -use crate::config::MDBX_GROWTH_STEP; +pub mod interface; +mod lmdb_impl; +mod mdbx_impl; + use crate::{ - metrics, utils::TxnMapFull, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, - Config, Environment, Error, ProposerSlashingStatus, RwTransaction, + metrics, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, Config, Error, + ProposerSlashingStatus, }; use byteorder::{BigEndian, ByteOrder}; +use interface::{Environment, OpenDatabases, RwTransaction}; use lru::LruCache; -use mdbx::{Database, DatabaseFlags, Geometry, WriteFlags}; use parking_lot::Mutex; use serde::de::DeserializeOwned; use slog::{info, Logger}; use ssz::{Decode, Encode}; use std::borrow::{Borrow, Cow}; use std::marker::PhantomData; -use std::ops::Range; -use std::path::Path; use std::sync::Arc; use tree_hash::TreeHash; use types::{ @@ -50,10 +51,6 @@ const PROPOSERS_DB: &str = "proposers"; /// The number of DBs for MDBX to use (equal to the number of DBs defined above). const MAX_NUM_DBS: usize = 9; -/// Filename for the legacy (LMDB) database file, so that it may be deleted. -const LEGACY_DB_FILENAME: &str = "data.mdb"; -const LEGACY_DB_LOCK_FILENAME: &str = "lock.mdb"; - /// Constant key under which the schema version is stored in the `metadata_db`. const METADATA_VERSION_KEY: &[u8] = &[0]; /// Constant key under which the slasher configuration is stored in the `metadata_db`. @@ -64,11 +61,11 @@ const PROPOSER_KEY_SIZE: usize = 16; const CURRENT_EPOCH_KEY_SIZE: usize = 8; const INDEXED_ATTESTATION_ID_SIZE: usize = 6; const INDEXED_ATTESTATION_ID_KEY_SIZE: usize = 40; -const MEGABYTE: usize = 1 << 20; #[derive(Debug)] pub struct SlasherDB { - pub(crate) env: Environment, + pub(crate) env: &'static Environment, + pub(crate) databases: OpenDatabases<'static>, /// LRU cache mapping indexed attestation IDs to their attestation data roots. attestation_root_cache: Mutex>, pub(crate) config: Arc, @@ -249,42 +246,26 @@ fn ssz_decode(bytes: Cow<[u8]>) -> Result { impl SlasherDB { pub fn open(config: Arc, log: Logger) -> Result { - // Delete any legacy LMDB database. - Self::delete_legacy_file(&config.database_path, LEGACY_DB_FILENAME, &log)?; - Self::delete_legacy_file(&config.database_path, LEGACY_DB_LOCK_FILENAME, &log)?; + info!(log, "Opening slasher database"; "backend" => %config.backend); std::fs::create_dir_all(&config.database_path)?; - let env = Environment::new() - .set_max_dbs(MAX_NUM_DBS) - .set_geometry(Self::geometry(&config)) - .open_with_permissions(&config.database_path, 0o600)?; - - let txn = env.begin_rw_txn()?; - txn.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; - txn.create_db(Some(INDEXED_ATTESTATION_ID_DB), Self::db_flags())?; - txn.create_db(Some(ATTESTERS_DB), Self::db_flags())?; - txn.create_db(Some(ATTESTERS_MAX_TARGETS_DB), Self::db_flags())?; - txn.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; - txn.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; - txn.create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?; - txn.create_db(Some(PROPOSERS_DB), Self::db_flags())?; - txn.create_db(Some(METADATA_DB), Self::db_flags())?; - txn.commit()?; + let env = Box::leak(Box::new(Environment::new(&config)?)); + let databases = env.create_databases()?; #[cfg(windows)] { - use filesystem::restrict_file_permissions; - let data = config.database_path.join("mdbx.dat"); - let lock = config.database_path.join("mdbx.lck"); - restrict_file_permissions(data).map_err(Error::DatabasePermissionsError)?; - restrict_file_permissions(lock).map_err(Error::DatabasePermissionsError)?; + for database_file in env.filenames(&config) { + filesystem::restrict_file_permissions(database_file) + .map_err(Error::DatabasePermissionsError)?; + } } let attestation_root_cache = Mutex::new(LruCache::new(config.attestation_root_cache_size)); let mut db = Self { env, + databases, attestation_root_cache, config, _phantom: PhantomData, @@ -307,102 +288,21 @@ impl SlasherDB { Ok(db) } - fn delete_legacy_file(slasher_dir: &Path, filename: &str, log: &Logger) -> Result<(), Error> { - let path = slasher_dir.join(filename); - - if path.is_file() { - info!( - log, - "Deleting legacy slasher DB"; - "file" => ?path.display(), - ); - std::fs::remove_file(&path)?; - } - Ok(()) - } - - fn open_db<'a>(&self, txn: &'a RwTransaction<'a>, name: &str) -> Result, Error> { - Ok(txn.open_db(Some(name))?) - } - - pub fn indexed_attestation_db<'a>( - &self, - txn: &'a RwTransaction<'a>, - ) -> Result, Error> { - self.open_db(txn, INDEXED_ATTESTATION_DB) - } - - pub fn indexed_attestation_id_db<'a>( - &self, - txn: &'a RwTransaction<'a>, - ) -> Result, Error> { - self.open_db(txn, INDEXED_ATTESTATION_ID_DB) - } - - pub fn attesters_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, ATTESTERS_DB) - } - - pub fn attesters_max_targets_db<'a>( - &self, - txn: &'a RwTransaction<'a>, - ) -> Result, Error> { - self.open_db(txn, ATTESTERS_MAX_TARGETS_DB) - } - - pub fn min_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, MIN_TARGETS_DB) - } - - pub fn max_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, MAX_TARGETS_DB) - } - - pub fn current_epochs_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, CURRENT_EPOCHS_DB) - } - - pub fn proposers_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, PROPOSERS_DB) - } - - pub fn metadata_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result, Error> { - self.open_db(txn, METADATA_DB) - } - - pub fn db_flags() -> DatabaseFlags { - DatabaseFlags::default() - } - - pub fn write_flags() -> WriteFlags { - WriteFlags::default() - } - - pub fn begin_rw_txn(&self) -> Result, Error> { - Ok(self.env.begin_rw_txn()?) - } - - pub fn geometry(config: &Config) -> Geometry> { - Geometry { - size: Some(0..config.max_db_size_mbs * MEGABYTE), - growth_step: Some(MDBX_GROWTH_STEP), - shrink_threshold: None, - page_size: None, - } + pub fn begin_rw_txn(&self) -> Result { + self.env.begin_rw_txn() } pub fn load_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result, Error> { - txn.get(&self.metadata_db(txn)?, METADATA_VERSION_KEY)? + txn.get(&self.databases.metadata_db, METADATA_VERSION_KEY)? .map(bincode_deserialize) .transpose() } pub fn store_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> { txn.put( - &self.metadata_db(txn)?, + &self.databases.metadata_db, &METADATA_VERSION_KEY, &bincode::serialize(&CURRENT_SCHEMA_VERSION)?, - Self::write_flags(), )?; Ok(()) } @@ -415,17 +315,16 @@ impl SlasherDB { &self, txn: &mut RwTransaction<'_>, ) -> Result, Error> { - txn.get(&self.metadata_db(txn)?, METADATA_CONFIG_KEY)? + txn.get(&self.databases.metadata_db, METADATA_CONFIG_KEY)? .map(bincode_deserialize) .transpose() } pub fn store_config(&self, config: &Config, txn: &mut RwTransaction<'_>) -> Result<(), Error> { txn.put( - &self.metadata_db(txn)?, + &self.databases.metadata_db, &METADATA_CONFIG_KEY, &bincode::serialize(config)?, - Self::write_flags(), )?; Ok(()) } @@ -436,7 +335,7 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, ) -> Result, Error> { txn.get( - &self.attesters_max_targets_db(txn)?, + &self.databases.attesters_max_targets_db, CurrentEpochKey::new(validator_index).as_ref(), )? .map(ssz_decode) @@ -466,19 +365,17 @@ impl SlasherDB { ); for target_epoch in (start_epoch..max_target.as_u64()).map(Epoch::new) { txn.put( - &self.attesters_db(txn)?, + &self.databases.attesters_db, &AttesterKey::new(validator_index, target_epoch, &self.config), &CompactAttesterRecord::null().as_bytes(), - Self::write_flags(), )?; } } txn.put( - &self.attesters_max_targets_db(txn)?, + &self.databases.attesters_max_targets_db, &CurrentEpochKey::new(validator_index), &max_target.as_ssz_bytes(), - Self::write_flags(), )?; Ok(()) } @@ -489,7 +386,7 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, ) -> Result, Error> { txn.get( - &self.current_epochs_db(txn)?, + &self.databases.current_epochs_db, CurrentEpochKey::new(validator_index).as_ref(), )? .map(ssz_decode) @@ -503,10 +400,9 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, ) -> Result<(), Error> { txn.put( - &self.current_epochs_db(txn)?, + &self.databases.current_epochs_db, &CurrentEpochKey::new(validator_index), ¤t_epoch.as_ssz_bytes(), - Self::write_flags(), )?; Ok(()) } @@ -516,7 +412,7 @@ impl SlasherDB { txn: &mut RwTransaction<'_>, key: &IndexedAttestationIdKey, ) -> Result, Error> { - txn.get(&self.indexed_attestation_id_db(txn)?, key.as_ref())? + txn.get(&self.databases.indexed_attestation_id_db, key.as_ref())? .map(IndexedAttestationId::parse) .transpose() } @@ -527,12 +423,7 @@ impl SlasherDB { key: &IndexedAttestationIdKey, value: IndexedAttestationId, ) -> Result<(), Error> { - txn.put( - &self.indexed_attestation_id_db(txn)?, - key, - &value, - Self::write_flags(), - )?; + txn.put(&self.databases.indexed_attestation_id_db, key, &value)?; Ok(()) } @@ -556,18 +447,19 @@ impl SlasherDB { } // Store the new indexed attestation at the end of the current table. - let mut cursor = txn.cursor(&self.indexed_attestation_db(txn)?)?; + let db = &self.databases.indexed_attestation_db; + let mut cursor = txn.cursor(db)?; - let indexed_att_id = match cursor.last::<_, ()>()? { + let indexed_att_id = match cursor.last_key()? { // First ID is 1 so that 0 can be used to represent `null` in `CompactAttesterRecord`. None => 1, - Some((key_bytes, _)) => IndexedAttestationId::parse(key_bytes)? + 1, + Some(key_bytes) => IndexedAttestationId::parse(key_bytes)? + 1, }; let attestation_key = IndexedAttestationId::new(indexed_att_id); let data = indexed_attestation.as_ssz_bytes(); - cursor.put(attestation_key.as_ref(), &data, Self::write_flags())?; + cursor.put(attestation_key.as_ref(), &data)?; drop(cursor); // Update the (epoch, hash) to ID mapping. @@ -583,7 +475,7 @@ impl SlasherDB { ) -> Result, Error> { let bytes = txn .get( - &self.indexed_attestation_db(txn)?, + &self.databases.indexed_attestation_db, indexed_attestation_id.as_ref(), )? .ok_or(Error::MissingIndexedAttestation { @@ -685,10 +577,9 @@ impl SlasherDB { self.update_attester_max_target(validator_index, prev_max_target, target_epoch, txn)?; txn.put( - &self.attesters_db(txn)?, + &self.databases.attesters_db, &AttesterKey::new(validator_index, target_epoch, &self.config), &indexed_attestation_id, - Self::write_flags(), )?; Ok(AttesterSlashingStatus::NotSlashable) @@ -725,7 +616,7 @@ impl SlasherDB { let attester_key = AttesterKey::new(validator_index, target, &self.config); Ok(txn - .get(&self.attesters_db(txn)?, attester_key.as_ref())? + .get(&self.databases.attesters_db, attester_key.as_ref())? .map(CompactAttesterRecord::parse) .transpose()? .filter(|record| !record.is_null())) @@ -738,7 +629,7 @@ impl SlasherDB { slot: Slot, ) -> Result, Error> { let proposer_key = ProposerKey::new(proposer_index, slot); - txn.get(&self.proposers_db(txn)?, proposer_key.as_ref())? + txn.get(&self.databases.proposers_db, proposer_key.as_ref())? .map(ssz_decode) .transpose() } @@ -764,10 +655,9 @@ impl SlasherDB { } } else { txn.put( - &self.proposers_db(txn)?, + &self.databases.proposers_db, &ProposerKey::new(proposer_index, slot), &block_header.as_ssz_bytes(), - Self::write_flags(), )?; Ok(ProposerSlashingStatus::NotSlashable) } @@ -776,14 +666,12 @@ impl SlasherDB { /// Attempt to prune the database, deleting old blocks and attestations. pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> { let mut txn = self.begin_rw_txn()?; - self.try_prune(current_epoch, &mut txn).allow_map_full()?; + self.try_prune(current_epoch, &mut txn)?; txn.commit()?; Ok(()) } /// Try to prune the database. - /// - /// This is a separate method from `prune` so that `allow_map_full` may be used. pub fn try_prune( &self, current_epoch: Epoch, @@ -804,22 +692,22 @@ impl SlasherDB { .saturating_sub(self.config.history_length) .start_slot(E::slots_per_epoch()); - let mut cursor = txn.cursor(&self.proposers_db(txn)?)?; + let mut cursor = txn.cursor(&self.databases.proposers_db)?; // Position cursor at first key, bailing out if the database is empty. - if cursor.first::<(), ()>()?.is_none() { + if cursor.first_key()?.is_none() { return Ok(()); } loop { - let (key_bytes, ()) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?; + let (key_bytes, _) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?; let (slot, _) = ProposerKey::parse(key_bytes)?; if slot < min_slot { - cursor.del(Self::write_flags())?; + cursor.delete_current()?; // End the loop if there is no next entry. - if cursor.next::<(), ()>()?.is_none() { + if cursor.next_key()?.is_none() { break; } } else { @@ -842,10 +730,10 @@ impl SlasherDB { // Collect indexed attestation IDs to delete. let mut indexed_attestation_ids = vec![]; - let mut cursor = txn.cursor(&self.indexed_attestation_id_db(txn)?)?; + let mut cursor = txn.cursor(&self.databases.indexed_attestation_id_db)?; // Position cursor at first key, bailing out if the database is empty. - if cursor.first::<(), ()>()?.is_none() { + if cursor.first_key()?.is_none() { return Ok(()); } @@ -861,9 +749,9 @@ impl SlasherDB { IndexedAttestationId::parse(value)?, )); - cursor.del(Self::write_flags())?; + cursor.delete_current()?; - if cursor.next::<(), ()>()?.is_none() { + if cursor.next_key()?.is_none() { break; } } else { @@ -874,9 +762,9 @@ impl SlasherDB { // Delete the indexed attestations. // Optimisation potential: use a cursor here. - let indexed_attestation_db = self.indexed_attestation_db(txn)?; + let indexed_attestation_db = &self.databases.indexed_attestation_db; for indexed_attestation_id in &indexed_attestation_ids { - txn.del(&indexed_attestation_db, indexed_attestation_id, None)?; + txn.del(indexed_attestation_db, indexed_attestation_id)?; } self.delete_attestation_data_roots(indexed_attestation_ids); diff --git a/slasher/src/database/interface.rs b/slasher/src/database/interface.rs new file mode 100644 index 0000000000..5bb920383c --- /dev/null +++ b/slasher/src/database/interface.rs @@ -0,0 +1,230 @@ +use crate::{Config, DatabaseBackend, Error}; +use std::borrow::Cow; +use std::marker::PhantomData; +use std::path::PathBuf; + +#[cfg(feature = "lmdb")] +use crate::database::lmdb_impl; +#[cfg(feature = "mdbx")] +use crate::database::mdbx_impl; + +#[derive(Debug)] +pub enum Environment { + #[cfg(feature = "mdbx")] + Mdbx(mdbx_impl::Environment), + #[cfg(feature = "lmdb")] + Lmdb(lmdb_impl::Environment), + Disabled, +} + +#[derive(Debug)] +pub enum RwTransaction<'env> { + #[cfg(feature = "mdbx")] + Mdbx(mdbx_impl::RwTransaction<'env>), + #[cfg(feature = "lmdb")] + Lmdb(lmdb_impl::RwTransaction<'env>), + Disabled(PhantomData<&'env ()>), +} + +#[derive(Debug)] +pub enum Database<'env> { + #[cfg(feature = "mdbx")] + Mdbx(mdbx_impl::Database<'env>), + #[cfg(feature = "lmdb")] + Lmdb(lmdb_impl::Database<'env>), + Disabled(PhantomData<&'env ()>), +} + +#[derive(Debug)] +pub struct OpenDatabases<'env> { + pub indexed_attestation_db: Database<'env>, + pub indexed_attestation_id_db: Database<'env>, + pub attesters_db: Database<'env>, + pub attesters_max_targets_db: Database<'env>, + pub min_targets_db: Database<'env>, + pub max_targets_db: Database<'env>, + pub current_epochs_db: Database<'env>, + pub proposers_db: Database<'env>, + pub metadata_db: Database<'env>, +} + +#[derive(Debug)] +pub enum Cursor<'env> { + #[cfg(feature = "mdbx")] + Mdbx(mdbx_impl::Cursor<'env>), + #[cfg(feature = "lmdb")] + Lmdb(lmdb_impl::Cursor<'env>), + Disabled(PhantomData<&'env ()>), +} + +pub type Key<'a> = Cow<'a, [u8]>; +pub type Value<'a> = Cow<'a, [u8]>; + +impl Environment { + pub fn new(config: &Config) -> Result { + match config.backend { + #[cfg(feature = "mdbx")] + DatabaseBackend::Mdbx => mdbx_impl::Environment::new(config).map(Environment::Mdbx), + #[cfg(feature = "lmdb")] + DatabaseBackend::Lmdb => lmdb_impl::Environment::new(config).map(Environment::Lmdb), + DatabaseBackend::Disabled => Err(Error::SlasherDatabaseBackendDisabled), + } + } + + pub fn create_databases(&self) -> Result { + match self { + #[cfg(feature = "mdbx")] + Self::Mdbx(env) => env.create_databases(), + #[cfg(feature = "lmdb")] + Self::Lmdb(env) => env.create_databases(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn begin_rw_txn(&self) -> Result { + match self { + #[cfg(feature = "mdbx")] + Self::Mdbx(env) => env.begin_rw_txn().map(RwTransaction::Mdbx), + #[cfg(feature = "lmdb")] + Self::Lmdb(env) => env.begin_rw_txn().map(RwTransaction::Lmdb), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + /// List of all files used by the database. + pub fn filenames(&self, config: &Config) -> Vec { + match self { + #[cfg(feature = "mdbx")] + Self::Mdbx(env) => env.filenames(config), + #[cfg(feature = "lmdb")] + Self::Lmdb(env) => env.filenames(config), + _ => vec![], + } + } +} + +impl<'env> RwTransaction<'env> { + pub fn get + ?Sized>( + &'env self, + db: &Database<'env>, + key: &K, + ) -> Result>, Error> { + match (self, db) { + #[cfg(feature = "mdbx")] + (Self::Mdbx(txn), Database::Mdbx(db)) => txn.get(db, key), + #[cfg(feature = "lmdb")] + (Self::Lmdb(txn), Database::Lmdb(db)) => txn.get(db, key), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn put, V: AsRef<[u8]>>( + &mut self, + db: &Database, + key: K, + value: V, + ) -> Result<(), Error> { + match (self, db) { + #[cfg(feature = "mdbx")] + (Self::Mdbx(txn), Database::Mdbx(db)) => txn.put(db, key, value), + #[cfg(feature = "lmdb")] + (Self::Lmdb(txn), Database::Lmdb(db)) => txn.put(db, key, value), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { + match (self, db) { + #[cfg(feature = "mdbx")] + (Self::Mdbx(txn), Database::Mdbx(db)) => txn.del(db, key), + #[cfg(feature = "lmdb")] + (Self::Lmdb(txn), Database::Lmdb(db)) => txn.del(db, key), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn cursor<'a>(&'a mut self, db: &Database) -> Result, Error> { + match (self, db) { + #[cfg(feature = "mdbx")] + (Self::Mdbx(txn), Database::Mdbx(db)) => txn.cursor(db).map(Cursor::Mdbx), + #[cfg(feature = "lmdb")] + (Self::Lmdb(txn), Database::Lmdb(db)) => txn.cursor(db).map(Cursor::Lmdb), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn commit(self) -> Result<(), Error> { + match self { + #[cfg(feature = "mdbx")] + Self::Mdbx(txn) => txn.commit(), + #[cfg(feature = "lmdb")] + Self::Lmdb(txn) => txn.commit(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } +} + +impl<'env> Cursor<'env> { + /// Return the first key in the current database while advancing the cursor's position. + pub fn first_key(&mut self) -> Result, Error> { + match self { + #[cfg(feature = "mdbx")] + Cursor::Mdbx(cursor) => cursor.first_key(), + #[cfg(feature = "lmdb")] + Cursor::Lmdb(cursor) => cursor.first_key(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + /// Return the last key in the current database while advancing the cursor's position. + pub fn last_key(&mut self) -> Result, Error> { + match self { + #[cfg(feature = "mdbx")] + Cursor::Mdbx(cursor) => cursor.last_key(), + #[cfg(feature = "lmdb")] + Cursor::Lmdb(cursor) => cursor.last_key(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn next_key(&mut self) -> Result, Error> { + match self { + #[cfg(feature = "mdbx")] + Cursor::Mdbx(cursor) => cursor.next_key(), + #[cfg(feature = "lmdb")] + Cursor::Lmdb(cursor) => cursor.next_key(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + /// Get the key value pair at the current position. + pub fn get_current(&mut self) -> Result, Error> { + match self { + #[cfg(feature = "mdbx")] + Cursor::Mdbx(cursor) => cursor.get_current(), + #[cfg(feature = "lmdb")] + Cursor::Lmdb(cursor) => cursor.get_current(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn delete_current(&mut self) -> Result<(), Error> { + match self { + #[cfg(feature = "mdbx")] + Cursor::Mdbx(cursor) => cursor.delete_current(), + #[cfg(feature = "lmdb")] + Cursor::Lmdb(cursor) => cursor.delete_current(), + _ => Err(Error::MismatchedDatabaseVariant), + } + } + + pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { + match self { + #[cfg(feature = "mdbx")] + Self::Mdbx(cursor) => cursor.put(key, value), + #[cfg(feature = "lmdb")] + Self::Lmdb(cursor) => cursor.put(key, value), + _ => Err(Error::MismatchedDatabaseVariant), + } + } +} diff --git a/slasher/src/database/lmdb_impl.rs b/slasher/src/database/lmdb_impl.rs new file mode 100644 index 0000000000..98839fcc46 --- /dev/null +++ b/slasher/src/database/lmdb_impl.rs @@ -0,0 +1,203 @@ +#![cfg(feature = "lmdb")] + +use crate::{ + config::MEGABYTE, + database::{ + interface::{Key, OpenDatabases, Value}, + *, + }, + Config, Error, +}; +use lmdb::{Cursor as _, DatabaseFlags, Transaction, WriteFlags}; +use lmdb_sys::{MDB_FIRST, MDB_GET_CURRENT, MDB_LAST, MDB_NEXT}; +use std::borrow::Cow; +use std::marker::PhantomData; +use std::path::PathBuf; + +#[derive(Debug)] +pub struct Environment { + env: lmdb::Environment, +} + +#[derive(Debug)] +pub struct RwTransaction<'env> { + txn: lmdb::RwTransaction<'env>, +} + +#[derive(Debug)] +pub struct Database<'env> { + db: lmdb::Database, + _phantom: PhantomData<&'env ()>, +} + +#[derive(Debug)] +pub struct Cursor<'env> { + cursor: lmdb::RwCursor<'env>, +} + +impl Environment { + pub fn new(config: &Config) -> Result { + let env = lmdb::Environment::new() + .set_max_dbs(MAX_NUM_DBS as u32) + .set_map_size(config.max_db_size_mbs * MEGABYTE) + .open_with_permissions(&config.database_path, 0o600)?; + Ok(Environment { env }) + } + + pub fn create_databases(&self) -> Result { + let indexed_attestation_db = self + .env + .create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; + let indexed_attestation_id_db = self + .env + .create_db(Some(INDEXED_ATTESTATION_ID_DB), Self::db_flags())?; + let attesters_db = self.env.create_db(Some(ATTESTERS_DB), Self::db_flags())?; + let attesters_max_targets_db = self + .env + .create_db(Some(ATTESTERS_MAX_TARGETS_DB), Self::db_flags())?; + let min_targets_db = self.env.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?; + let max_targets_db = self.env.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?; + let current_epochs_db = self + .env + .create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?; + let proposers_db = self.env.create_db(Some(PROPOSERS_DB), Self::db_flags())?; + let metadata_db = self.env.create_db(Some(METADATA_DB), Self::db_flags())?; + + let wrap = |db| { + crate::Database::Lmdb(Database { + db, + _phantom: PhantomData, + }) + }; + + Ok(OpenDatabases { + indexed_attestation_db: wrap(indexed_attestation_db), + indexed_attestation_id_db: wrap(indexed_attestation_id_db), + attesters_db: wrap(attesters_db), + attesters_max_targets_db: wrap(attesters_max_targets_db), + min_targets_db: wrap(min_targets_db), + max_targets_db: wrap(max_targets_db), + current_epochs_db: wrap(current_epochs_db), + proposers_db: wrap(proposers_db), + metadata_db: wrap(metadata_db), + }) + } + + pub fn begin_rw_txn(&self) -> Result { + let txn = self.env.begin_rw_txn()?; + Ok(RwTransaction { txn }) + } + + pub fn filenames(&self, config: &Config) -> Vec { + vec![ + config.database_path.join("data.mdb"), + config.database_path.join("lock.mdb"), + ] + } + + fn db_flags() -> DatabaseFlags { + DatabaseFlags::default() + } +} + +impl<'env> RwTransaction<'env> { + pub fn get + ?Sized>( + &'env self, + db: &Database<'env>, + key: &K, + ) -> Result>, Error> { + Ok(self.txn.get(db.db, key).optional()?.map(Cow::Borrowed)) + } + + pub fn put, V: AsRef<[u8]>>( + &mut self, + db: &Database, + key: K, + value: V, + ) -> Result<(), Error> { + self.txn.put(db.db, &key, &value, Self::write_flags())?; + Ok(()) + } + + pub fn del>(&mut self, db: &Database, key: K) -> Result<(), Error> { + self.txn.del(db.db, &key, None)?; + Ok(()) + } + + pub fn cursor<'a>(&'a mut self, db: &Database) -> Result, Error> { + let cursor = self.txn.open_rw_cursor(db.db)?; + Ok(Cursor { cursor }) + } + + pub fn commit(self) -> Result<(), Error> { + self.txn.commit()?; + Ok(()) + } + + fn write_flags() -> WriteFlags { + WriteFlags::default() + } +} + +impl<'env> Cursor<'env> { + pub fn first_key(&mut self) -> Result, Error> { + let opt_key = self + .cursor + .get(None, None, MDB_FIRST) + .optional()? + .and_then(|(key, _)| Some(Cow::Borrowed(key?))); + Ok(opt_key) + } + + pub fn last_key(&mut self) -> Result>, Error> { + let opt_key = self + .cursor + .get(None, None, MDB_LAST) + .optional()? + .and_then(|(key, _)| Some(Cow::Borrowed(key?))); + Ok(opt_key) + } + + pub fn next_key(&mut self) -> Result>, Error> { + let opt_key = self + .cursor + .get(None, None, MDB_NEXT) + .optional()? + .and_then(|(key, _)| Some(Cow::Borrowed(key?))); + Ok(opt_key) + } + + pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { + if let Some((Some(key), value)) = self.cursor.get(None, None, MDB_GET_CURRENT).optional()? { + Ok(Some((Cow::Borrowed(key), Cow::Borrowed(value)))) + } else { + Ok(None) + } + } + + pub fn delete_current(&mut self) -> Result<(), Error> { + self.cursor.del(RwTransaction::write_flags())?; + Ok(()) + } + + pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { + self.cursor + .put(&key, &value, RwTransaction::write_flags())?; + Ok(()) + } +} + +/// Mix-in trait for loading values from LMDB that may or may not exist. +pub trait TxnOptional { + fn optional(self) -> Result, E>; +} + +impl TxnOptional for Result { + fn optional(self) -> Result, Error> { + match self { + Ok(x) => Ok(Some(x)), + Err(lmdb::Error::NotFound) => Ok(None), + Err(e) => Err(e.into()), + } + } +} diff --git a/slasher/src/database/mdbx_impl.rs b/slasher/src/database/mdbx_impl.rs new file mode 100644 index 0000000000..d25f17e7ac --- /dev/null +++ b/slasher/src/database/mdbx_impl.rs @@ -0,0 +1,186 @@ +#![cfg(feature = "mdbx")] + +use crate::{ + config::MEGABYTE, + database::{ + interface::{Key, OpenDatabases, Value}, + *, + }, + Config, Error, +}; +use mdbx::{DatabaseFlags, Geometry, WriteFlags}; +use std::borrow::Cow; +use std::ops::Range; +use std::path::PathBuf; + +pub const MDBX_GROWTH_STEP: isize = 256 * (1 << 20); // 256 MiB + +#[derive(Debug)] +pub struct Environment { + env: mdbx::Environment, +} + +#[derive(Debug)] +pub struct RwTransaction<'env> { + txn: mdbx::Transaction<'env, mdbx::RW, mdbx::NoWriteMap>, +} + +#[derive(Debug)] +pub struct Database<'env> { + db: mdbx::Database<'env>, +} + +#[derive(Debug)] +pub struct Cursor<'env> { + cursor: mdbx::Cursor<'env, mdbx::RW>, +} + +impl Environment { + pub fn new(config: &Config) -> Result { + let env = mdbx::Environment::new() + .set_max_dbs(MAX_NUM_DBS) + .set_geometry(Self::geometry(config)) + .open_with_permissions(&config.database_path, 0o600)?; + Ok(Environment { env }) + } + + pub fn create_databases(&self) -> Result { + let txn = self.begin_rw_txn()?; + txn.create_db(INDEXED_ATTESTATION_DB)?; + txn.create_db(INDEXED_ATTESTATION_ID_DB)?; + txn.create_db(ATTESTERS_DB)?; + txn.create_db(ATTESTERS_MAX_TARGETS_DB)?; + txn.create_db(MIN_TARGETS_DB)?; + txn.create_db(MAX_TARGETS_DB)?; + txn.create_db(CURRENT_EPOCHS_DB)?; + txn.create_db(PROPOSERS_DB)?; + txn.create_db(METADATA_DB)?; + + // This is all rather nasty + let (_, mut databases) = txn.txn.commit_and_rebind_open_dbs()?; + let mut next_db = || { + crate::Database::Mdbx(Database { + db: databases.remove(0), + }) + }; + + Ok(OpenDatabases { + indexed_attestation_db: next_db(), + indexed_attestation_id_db: next_db(), + attesters_db: next_db(), + attesters_max_targets_db: next_db(), + min_targets_db: next_db(), + max_targets_db: next_db(), + current_epochs_db: next_db(), + proposers_db: next_db(), + metadata_db: next_db(), + }) + } + + pub fn begin_rw_txn(&self) -> Result { + let txn = self.env.begin_rw_txn()?; + Ok(RwTransaction { txn }) + } + + pub fn filenames(&self, config: &Config) -> Vec { + vec![ + config.database_path.join("mdbx.dat"), + config.database_path.join("mdbx.lck"), + ] + } + + fn geometry(config: &Config) -> Geometry> { + Geometry { + size: Some(0..config.max_db_size_mbs * MEGABYTE), + growth_step: Some(MDBX_GROWTH_STEP), + shrink_threshold: None, + page_size: None, + } + } +} + +impl<'env> RwTransaction<'env> { + pub fn create_db(&self, name: &'static str) -> Result<(), Error> { + let db = self.txn.create_db(Some(name), Self::db_flags())?; + self.txn.prime_for_permaopen(db); + Ok(()) + } + + pub fn open_db(&self, name: &'static str) -> Result { + let db = self.txn.open_db(Some(name))?; + Ok(Database { db }) + } + + pub fn get + ?Sized>( + &'env self, + db: &Database<'env>, + key: &K, + ) -> Result>, Error> { + Ok(self.txn.get(&db.db, key.as_ref())?) + } + + pub fn put, V: AsRef<[u8]>>( + &self, + db: &Database, + key: K, + value: V, + ) -> Result<(), Error> { + self.txn.put(&db.db, key, value, Self::write_flags())?; + Ok(()) + } + + pub fn del>(&self, db: &Database, key: K) -> Result<(), Error> { + self.txn.del(&db.db, key, None)?; + Ok(()) + } + + pub fn cursor<'a>(&'a self, db: &Database) -> Result, Error> { + let cursor = self.txn.cursor(&db.db)?; + Ok(Cursor { cursor }) + } + + pub fn commit(self) -> Result<(), Error> { + self.txn.commit()?; + Ok(()) + } + + fn db_flags() -> DatabaseFlags { + DatabaseFlags::default() + } + + fn write_flags() -> WriteFlags { + WriteFlags::default() + } +} + +impl<'env> Cursor<'env> { + pub fn first_key(&mut self) -> Result>, Error> { + let opt_key = self.cursor.first()?.map(|(key_bytes, ())| key_bytes); + Ok(opt_key) + } + + pub fn last_key(&mut self) -> Result>, Error> { + let opt_key = self.cursor.last()?.map(|(key_bytes, ())| key_bytes); + Ok(opt_key) + } + + pub fn next_key(&mut self) -> Result>, Error> { + let opt_key = self.cursor.next()?.map(|(key_bytes, ())| key_bytes); + Ok(opt_key) + } + + pub fn get_current(&mut self) -> Result, Value<'env>)>, Error> { + Ok(self.cursor.get_current()?) + } + + pub fn delete_current(&mut self) -> Result<(), Error> { + self.cursor.del(RwTransaction::write_flags())?; + Ok(()) + } + + pub fn put, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> { + self.cursor + .put(key.as_ref(), value.as_ref(), RwTransaction::write_flags())?; + Ok(()) + } +} diff --git a/slasher/src/error.rs b/slasher/src/error.rs index 7e689022e4..b939c281e9 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -4,7 +4,12 @@ use types::Epoch; #[derive(Debug)] pub enum Error { - DatabaseError(mdbx::Error), + #[cfg(feature = "mdbx")] + DatabaseMdbxError(mdbx::Error), + #[cfg(feature = "lmdb")] + DatabaseLmdbError(lmdb::Error), + SlasherDatabaseBackendDisabled, + MismatchedDatabaseVariant, DatabaseIOError(io::Error), DatabasePermissionsError(filesystem::Error), SszDecodeError(ssz::DecodeError), @@ -63,11 +68,22 @@ pub enum Error { InconsistentAttestationDataRoot, } +#[cfg(feature = "mdbx")] impl From for Error { fn from(e: mdbx::Error) -> Self { match e { mdbx::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), - _ => Error::DatabaseError(e), + _ => Error::DatabaseMdbxError(e), + } + } +} + +#[cfg(feature = "lmdb")] +impl From for Error { + fn from(e: lmdb::Error) -> Self { + match e { + lmdb::Error::Other(os_error) => Error::from(io::Error::from_raw_os_error(os_error)), + _ => Error::DatabaseLmdbError(e), } } } diff --git a/slasher/src/lib.rs b/slasher/src/lib.rs index 184e3080e5..132ce8b235 100644 --- a/slasher/src/lib.rs +++ b/slasher/src/lib.rs @@ -1,4 +1,8 @@ #![deny(missing_debug_implementations)] +#![cfg_attr( + not(any(feature = "mdbx", feature = "lmdb")), + allow(unused, clippy::drop_non_drop) +)] mod array; mod attestation_queue; @@ -12,22 +16,20 @@ pub mod metrics; mod migrate; mod slasher; pub mod test_utils; -mod utils; pub use crate::slasher::Slasher; pub use attestation_queue::{AttestationBatch, AttestationQueue, SimpleBatch}; pub use attester_record::{AttesterRecord, CompactAttesterRecord, IndexedAttesterRecord}; pub use block_queue::BlockQueue; -pub use config::Config; -pub use database::{IndexedAttestationId, SlasherDB}; +pub use config::{Config, DatabaseBackend}; +pub use database::{ + interface::{Database, Environment, RwTransaction}, + IndexedAttestationId, SlasherDB, +}; pub use error::Error; use types::{AttesterSlashing, EthSpec, IndexedAttestation, ProposerSlashing}; -/// LMDB-to-MDBX compatibility shims. -pub type Environment = mdbx::Environment; -pub type RwTransaction<'env> = mdbx::Transaction<'env, mdbx::RW, mdbx::NoWriteMap>; - #[derive(Debug, PartialEq)] pub enum AttesterSlashingStatus { NotSlashable, diff --git a/slasher/src/utils.rs b/slasher/src/utils.rs deleted file mode 100644 index ccd31e74e2..0000000000 --- a/slasher/src/utils.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::Error; - -/// Transform a transaction that would fail with a `MapFull` error into an optional result. -pub trait TxnMapFull { - fn allow_map_full(self) -> Result, E>; -} - -impl TxnMapFull for Result { - fn allow_map_full(self) -> Result, Error> { - match self { - Ok(x) => Ok(Some(x)), - Err(Error::DatabaseError(mdbx::Error::MapFull)) => Ok(None), - Err(e) => Err(e), - } - } -} diff --git a/slasher/tests/attester_slashings.rs b/slasher/tests/attester_slashings.rs index a2abbc55b1..5cf3fe6c2a 100644 --- a/slasher/tests/attester_slashings.rs +++ b/slasher/tests/attester_slashings.rs @@ -1,3 +1,5 @@ +#![cfg(any(feature = "mdbx", feature = "lmdb"))] + use logging::test_logger; use maplit::hashset; use rayon::prelude::*; diff --git a/slasher/tests/proposer_slashings.rs b/slasher/tests/proposer_slashings.rs index e8b052e664..3b7b8ed583 100644 --- a/slasher/tests/proposer_slashings.rs +++ b/slasher/tests/proposer_slashings.rs @@ -1,3 +1,5 @@ +#![cfg(any(feature = "mdbx", feature = "lmdb"))] + use logging::test_logger; use slasher::{ test_utils::{block as test_block, E}, diff --git a/slasher/tests/random.rs b/slasher/tests/random.rs index 8126602f37..968a4dbb68 100644 --- a/slasher/tests/random.rs +++ b/slasher/tests/random.rs @@ -1,3 +1,5 @@ +#![cfg(any(feature = "mdbx", feature = "lmdb"))] + use logging::test_logger; use rand::prelude::*; use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index b256840ee5..d2c876d363 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -1,3 +1,5 @@ +#![cfg(any(feature = "mdbx", feature = "lmdb"))] + use logging::test_logger; use slasher::{test_utils::indexed_att, Config, Slasher}; use tempfile::tempdir;