Modularise slasher backend (#3443)

## Proposed Changes

Enable multiple database backends for the slasher, either MDBX (default) or LMDB. The backend can be selected using `--slasher-backend={lmdb,mdbx}`.

## Additional Info

In order to abstract over the two library's different handling of database lifetimes I've used `Box::leak` to give the `Environment` type a `'static` lifetime. This was the only way I could think of using 100% safe code to construct a self-referential struct `SlasherDB`, where the `OpenDatabases` refers to the `Environment`. I think this is OK, as the `Environment` is expected to live for the life of the program, and both database engines leave the database in a consistent state after each write. The memory claimed for memory-mapping will be freed by the OS and appropriately flushed regardless of whether the `Environment` is actually dropped.

We are depending on two `sigp` forks of `libmdbx-rs` and `lmdb-rs`, to give us greater control over MDBX OS support and LMDB's version.
This commit is contained in:
Michael Sproul
2022-08-15 01:30:56 +00:00
parent 71fd0b42f2
commit 92d597ad23
25 changed files with 905 additions and 230 deletions

View File

@@ -1,19 +1,20 @@
use crate::config::MDBX_GROWTH_STEP;
pub mod interface;
mod lmdb_impl;
mod mdbx_impl;
use crate::{
metrics, utils::TxnMapFull, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord,
Config, Environment, Error, ProposerSlashingStatus, RwTransaction,
metrics, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, Config, Error,
ProposerSlashingStatus,
};
use byteorder::{BigEndian, ByteOrder};
use interface::{Environment, OpenDatabases, RwTransaction};
use lru::LruCache;
use mdbx::{Database, DatabaseFlags, Geometry, WriteFlags};
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use std::borrow::{Borrow, Cow};
use std::marker::PhantomData;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use tree_hash::TreeHash;
use types::{
@@ -50,10 +51,6 @@ const PROPOSERS_DB: &str = "proposers";
/// The number of DBs for MDBX to use (equal to the number of DBs defined above).
const MAX_NUM_DBS: usize = 9;
/// Filename for the legacy (LMDB) database file, so that it may be deleted.
const LEGACY_DB_FILENAME: &str = "data.mdb";
const LEGACY_DB_LOCK_FILENAME: &str = "lock.mdb";
/// Constant key under which the schema version is stored in the `metadata_db`.
const METADATA_VERSION_KEY: &[u8] = &[0];
/// Constant key under which the slasher configuration is stored in the `metadata_db`.
@@ -64,11 +61,11 @@ const PROPOSER_KEY_SIZE: usize = 16;
const CURRENT_EPOCH_KEY_SIZE: usize = 8;
const INDEXED_ATTESTATION_ID_SIZE: usize = 6;
const INDEXED_ATTESTATION_ID_KEY_SIZE: usize = 40;
const MEGABYTE: usize = 1 << 20;
#[derive(Debug)]
pub struct SlasherDB<E: EthSpec> {
pub(crate) env: Environment,
pub(crate) env: &'static Environment,
pub(crate) databases: OpenDatabases<'static>,
/// LRU cache mapping indexed attestation IDs to their attestation data roots.
attestation_root_cache: Mutex<LruCache<IndexedAttestationId, Hash256>>,
pub(crate) config: Arc<Config>,
@@ -249,42 +246,26 @@ fn ssz_decode<T: Decode>(bytes: Cow<[u8]>) -> Result<T, Error> {
impl<E: EthSpec> SlasherDB<E> {
pub fn open(config: Arc<Config>, log: Logger) -> Result<Self, Error> {
// Delete any legacy LMDB database.
Self::delete_legacy_file(&config.database_path, LEGACY_DB_FILENAME, &log)?;
Self::delete_legacy_file(&config.database_path, LEGACY_DB_LOCK_FILENAME, &log)?;
info!(log, "Opening slasher database"; "backend" => %config.backend);
std::fs::create_dir_all(&config.database_path)?;
let env = Environment::new()
.set_max_dbs(MAX_NUM_DBS)
.set_geometry(Self::geometry(&config))
.open_with_permissions(&config.database_path, 0o600)?;
let txn = env.begin_rw_txn()?;
txn.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?;
txn.create_db(Some(INDEXED_ATTESTATION_ID_DB), Self::db_flags())?;
txn.create_db(Some(ATTESTERS_DB), Self::db_flags())?;
txn.create_db(Some(ATTESTERS_MAX_TARGETS_DB), Self::db_flags())?;
txn.create_db(Some(MIN_TARGETS_DB), Self::db_flags())?;
txn.create_db(Some(MAX_TARGETS_DB), Self::db_flags())?;
txn.create_db(Some(CURRENT_EPOCHS_DB), Self::db_flags())?;
txn.create_db(Some(PROPOSERS_DB), Self::db_flags())?;
txn.create_db(Some(METADATA_DB), Self::db_flags())?;
txn.commit()?;
let env = Box::leak(Box::new(Environment::new(&config)?));
let databases = env.create_databases()?;
#[cfg(windows)]
{
use filesystem::restrict_file_permissions;
let data = config.database_path.join("mdbx.dat");
let lock = config.database_path.join("mdbx.lck");
restrict_file_permissions(data).map_err(Error::DatabasePermissionsError)?;
restrict_file_permissions(lock).map_err(Error::DatabasePermissionsError)?;
for database_file in env.filenames(&config) {
filesystem::restrict_file_permissions(database_file)
.map_err(Error::DatabasePermissionsError)?;
}
}
let attestation_root_cache = Mutex::new(LruCache::new(config.attestation_root_cache_size));
let mut db = Self {
env,
databases,
attestation_root_cache,
config,
_phantom: PhantomData,
@@ -307,102 +288,21 @@ impl<E: EthSpec> SlasherDB<E> {
Ok(db)
}
fn delete_legacy_file(slasher_dir: &Path, filename: &str, log: &Logger) -> Result<(), Error> {
let path = slasher_dir.join(filename);
if path.is_file() {
info!(
log,
"Deleting legacy slasher DB";
"file" => ?path.display(),
);
std::fs::remove_file(&path)?;
}
Ok(())
}
fn open_db<'a>(&self, txn: &'a RwTransaction<'a>, name: &str) -> Result<Database<'a>, Error> {
Ok(txn.open_db(Some(name))?)
}
pub fn indexed_attestation_db<'a>(
&self,
txn: &'a RwTransaction<'a>,
) -> Result<Database<'a>, Error> {
self.open_db(txn, INDEXED_ATTESTATION_DB)
}
pub fn indexed_attestation_id_db<'a>(
&self,
txn: &'a RwTransaction<'a>,
) -> Result<Database<'a>, Error> {
self.open_db(txn, INDEXED_ATTESTATION_ID_DB)
}
pub fn attesters_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, ATTESTERS_DB)
}
pub fn attesters_max_targets_db<'a>(
&self,
txn: &'a RwTransaction<'a>,
) -> Result<Database<'a>, Error> {
self.open_db(txn, ATTESTERS_MAX_TARGETS_DB)
}
pub fn min_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, MIN_TARGETS_DB)
}
pub fn max_targets_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, MAX_TARGETS_DB)
}
pub fn current_epochs_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, CURRENT_EPOCHS_DB)
}
pub fn proposers_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, PROPOSERS_DB)
}
pub fn metadata_db<'a>(&self, txn: &'a RwTransaction<'a>) -> Result<Database<'a>, Error> {
self.open_db(txn, METADATA_DB)
}
pub fn db_flags() -> DatabaseFlags {
DatabaseFlags::default()
}
pub fn write_flags() -> WriteFlags {
WriteFlags::default()
}
pub fn begin_rw_txn(&self) -> Result<RwTransaction<'_>, Error> {
Ok(self.env.begin_rw_txn()?)
}
pub fn geometry(config: &Config) -> Geometry<Range<usize>> {
Geometry {
size: Some(0..config.max_db_size_mbs * MEGABYTE),
growth_step: Some(MDBX_GROWTH_STEP),
shrink_threshold: None,
page_size: None,
}
pub fn begin_rw_txn(&self) -> Result<RwTransaction, Error> {
self.env.begin_rw_txn()
}
pub fn load_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result<Option<u64>, Error> {
txn.get(&self.metadata_db(txn)?, METADATA_VERSION_KEY)?
txn.get(&self.databases.metadata_db, METADATA_VERSION_KEY)?
.map(bincode_deserialize)
.transpose()
}
pub fn store_schema_version(&self, txn: &mut RwTransaction<'_>) -> Result<(), Error> {
txn.put(
&self.metadata_db(txn)?,
&self.databases.metadata_db,
&METADATA_VERSION_KEY,
&bincode::serialize(&CURRENT_SCHEMA_VERSION)?,
Self::write_flags(),
)?;
Ok(())
}
@@ -415,17 +315,16 @@ impl<E: EthSpec> SlasherDB<E> {
&self,
txn: &mut RwTransaction<'_>,
) -> Result<Option<T>, Error> {
txn.get(&self.metadata_db(txn)?, METADATA_CONFIG_KEY)?
txn.get(&self.databases.metadata_db, METADATA_CONFIG_KEY)?
.map(bincode_deserialize)
.transpose()
}
pub fn store_config(&self, config: &Config, txn: &mut RwTransaction<'_>) -> Result<(), Error> {
txn.put(
&self.metadata_db(txn)?,
&self.databases.metadata_db,
&METADATA_CONFIG_KEY,
&bincode::serialize(config)?,
Self::write_flags(),
)?;
Ok(())
}
@@ -436,7 +335,7 @@ impl<E: EthSpec> SlasherDB<E> {
txn: &mut RwTransaction<'_>,
) -> Result<Option<Epoch>, Error> {
txn.get(
&self.attesters_max_targets_db(txn)?,
&self.databases.attesters_max_targets_db,
CurrentEpochKey::new(validator_index).as_ref(),
)?
.map(ssz_decode)
@@ -466,19 +365,17 @@ impl<E: EthSpec> SlasherDB<E> {
);
for target_epoch in (start_epoch..max_target.as_u64()).map(Epoch::new) {
txn.put(
&self.attesters_db(txn)?,
&self.databases.attesters_db,
&AttesterKey::new(validator_index, target_epoch, &self.config),
&CompactAttesterRecord::null().as_bytes(),
Self::write_flags(),
)?;
}
}
txn.put(
&self.attesters_max_targets_db(txn)?,
&self.databases.attesters_max_targets_db,
&CurrentEpochKey::new(validator_index),
&max_target.as_ssz_bytes(),
Self::write_flags(),
)?;
Ok(())
}
@@ -489,7 +386,7 @@ impl<E: EthSpec> SlasherDB<E> {
txn: &mut RwTransaction<'_>,
) -> Result<Option<Epoch>, Error> {
txn.get(
&self.current_epochs_db(txn)?,
&self.databases.current_epochs_db,
CurrentEpochKey::new(validator_index).as_ref(),
)?
.map(ssz_decode)
@@ -503,10 +400,9 @@ impl<E: EthSpec> SlasherDB<E> {
txn: &mut RwTransaction<'_>,
) -> Result<(), Error> {
txn.put(
&self.current_epochs_db(txn)?,
&self.databases.current_epochs_db,
&CurrentEpochKey::new(validator_index),
&current_epoch.as_ssz_bytes(),
Self::write_flags(),
)?;
Ok(())
}
@@ -516,7 +412,7 @@ impl<E: EthSpec> SlasherDB<E> {
txn: &mut RwTransaction<'_>,
key: &IndexedAttestationIdKey,
) -> Result<Option<u64>, Error> {
txn.get(&self.indexed_attestation_id_db(txn)?, key.as_ref())?
txn.get(&self.databases.indexed_attestation_id_db, key.as_ref())?
.map(IndexedAttestationId::parse)
.transpose()
}
@@ -527,12 +423,7 @@ impl<E: EthSpec> SlasherDB<E> {
key: &IndexedAttestationIdKey,
value: IndexedAttestationId,
) -> Result<(), Error> {
txn.put(
&self.indexed_attestation_id_db(txn)?,
key,
&value,
Self::write_flags(),
)?;
txn.put(&self.databases.indexed_attestation_id_db, key, &value)?;
Ok(())
}
@@ -556,18 +447,19 @@ impl<E: EthSpec> SlasherDB<E> {
}
// Store the new indexed attestation at the end of the current table.
let mut cursor = txn.cursor(&self.indexed_attestation_db(txn)?)?;
let db = &self.databases.indexed_attestation_db;
let mut cursor = txn.cursor(db)?;
let indexed_att_id = match cursor.last::<_, ()>()? {
let indexed_att_id = match cursor.last_key()? {
// First ID is 1 so that 0 can be used to represent `null` in `CompactAttesterRecord`.
None => 1,
Some((key_bytes, _)) => IndexedAttestationId::parse(key_bytes)? + 1,
Some(key_bytes) => IndexedAttestationId::parse(key_bytes)? + 1,
};
let attestation_key = IndexedAttestationId::new(indexed_att_id);
let data = indexed_attestation.as_ssz_bytes();
cursor.put(attestation_key.as_ref(), &data, Self::write_flags())?;
cursor.put(attestation_key.as_ref(), &data)?;
drop(cursor);
// Update the (epoch, hash) to ID mapping.
@@ -583,7 +475,7 @@ impl<E: EthSpec> SlasherDB<E> {
) -> Result<IndexedAttestation<E>, Error> {
let bytes = txn
.get(
&self.indexed_attestation_db(txn)?,
&self.databases.indexed_attestation_db,
indexed_attestation_id.as_ref(),
)?
.ok_or(Error::MissingIndexedAttestation {
@@ -685,10 +577,9 @@ impl<E: EthSpec> SlasherDB<E> {
self.update_attester_max_target(validator_index, prev_max_target, target_epoch, txn)?;
txn.put(
&self.attesters_db(txn)?,
&self.databases.attesters_db,
&AttesterKey::new(validator_index, target_epoch, &self.config),
&indexed_attestation_id,
Self::write_flags(),
)?;
Ok(AttesterSlashingStatus::NotSlashable)
@@ -725,7 +616,7 @@ impl<E: EthSpec> SlasherDB<E> {
let attester_key = AttesterKey::new(validator_index, target, &self.config);
Ok(txn
.get(&self.attesters_db(txn)?, attester_key.as_ref())?
.get(&self.databases.attesters_db, attester_key.as_ref())?
.map(CompactAttesterRecord::parse)
.transpose()?
.filter(|record| !record.is_null()))
@@ -738,7 +629,7 @@ impl<E: EthSpec> SlasherDB<E> {
slot: Slot,
) -> Result<Option<SignedBeaconBlockHeader>, Error> {
let proposer_key = ProposerKey::new(proposer_index, slot);
txn.get(&self.proposers_db(txn)?, proposer_key.as_ref())?
txn.get(&self.databases.proposers_db, proposer_key.as_ref())?
.map(ssz_decode)
.transpose()
}
@@ -764,10 +655,9 @@ impl<E: EthSpec> SlasherDB<E> {
}
} else {
txn.put(
&self.proposers_db(txn)?,
&self.databases.proposers_db,
&ProposerKey::new(proposer_index, slot),
&block_header.as_ssz_bytes(),
Self::write_flags(),
)?;
Ok(ProposerSlashingStatus::NotSlashable)
}
@@ -776,14 +666,12 @@ impl<E: EthSpec> SlasherDB<E> {
/// Attempt to prune the database, deleting old blocks and attestations.
pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> {
let mut txn = self.begin_rw_txn()?;
self.try_prune(current_epoch, &mut txn).allow_map_full()?;
self.try_prune(current_epoch, &mut txn)?;
txn.commit()?;
Ok(())
}
/// Try to prune the database.
///
/// This is a separate method from `prune` so that `allow_map_full` may be used.
pub fn try_prune(
&self,
current_epoch: Epoch,
@@ -804,22 +692,22 @@ impl<E: EthSpec> SlasherDB<E> {
.saturating_sub(self.config.history_length)
.start_slot(E::slots_per_epoch());
let mut cursor = txn.cursor(&self.proposers_db(txn)?)?;
let mut cursor = txn.cursor(&self.databases.proposers_db)?;
// Position cursor at first key, bailing out if the database is empty.
if cursor.first::<(), ()>()?.is_none() {
if cursor.first_key()?.is_none() {
return Ok(());
}
loop {
let (key_bytes, ()) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?;
let (key_bytes, _) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?;
let (slot, _) = ProposerKey::parse(key_bytes)?;
if slot < min_slot {
cursor.del(Self::write_flags())?;
cursor.delete_current()?;
// End the loop if there is no next entry.
if cursor.next::<(), ()>()?.is_none() {
if cursor.next_key()?.is_none() {
break;
}
} else {
@@ -842,10 +730,10 @@ impl<E: EthSpec> SlasherDB<E> {
// Collect indexed attestation IDs to delete.
let mut indexed_attestation_ids = vec![];
let mut cursor = txn.cursor(&self.indexed_attestation_id_db(txn)?)?;
let mut cursor = txn.cursor(&self.databases.indexed_attestation_id_db)?;
// Position cursor at first key, bailing out if the database is empty.
if cursor.first::<(), ()>()?.is_none() {
if cursor.first_key()?.is_none() {
return Ok(());
}
@@ -861,9 +749,9 @@ impl<E: EthSpec> SlasherDB<E> {
IndexedAttestationId::parse(value)?,
));
cursor.del(Self::write_flags())?;
cursor.delete_current()?;
if cursor.next::<(), ()>()?.is_none() {
if cursor.next_key()?.is_none() {
break;
}
} else {
@@ -874,9 +762,9 @@ impl<E: EthSpec> SlasherDB<E> {
// Delete the indexed attestations.
// Optimisation potential: use a cursor here.
let indexed_attestation_db = self.indexed_attestation_db(txn)?;
let indexed_attestation_db = &self.databases.indexed_attestation_db;
for indexed_attestation_id in &indexed_attestation_ids {
txn.del(&indexed_attestation_db, indexed_attestation_id, None)?;
txn.del(indexed_attestation_db, indexed_attestation_id)?;
}
self.delete_attestation_data_roots(indexed_attestation_ids);