mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-17 21:08:32 +00:00
Redb slasher backend impl (#4529)
* initial redb impl * redb impl * remove phantom data * fixed table definition * fighting the borrow checker * a rough draft that doesnt cause lifetime issues * refactoring * refactor * refactor * passing unit tests * refactor * refactor * refactor * commit * move everything to one database * remove panics, ready for a review * merge * a working redb impl * passing a ref of txn to cursor * this tries to create a second write transaction when initializing cursor. breaks everything * Use 2 lifetimes and subtyping Also fixes a bug in last_key caused by rev and next_back cancelling out * Move table into cursor * Merge remote-tracking branch 'origin/unstable' into redb-slasher-backend-impl * changes based on feedback * update lmdb * fix lifetime issues * moving everything from Cursor to Transaction * update * upgrade to redb 2.0 * Merge branch 'unstable' of https://github.com/sigp/lighthouse into redb-slasher-backend-impl * bring back cursor * Merge branch 'unstable' of https://github.com/sigp/lighthouse into redb-slasher-backend-impl * fix delete while * linting * linting * switch to lmdb * update redb to v2.1 * build fixes, remove unwrap or default * another build error * hopefully this is the last build error * fmt * cargo.toml * fix mdbx * Merge branch 'unstable' of https://github.com/sigp/lighthouse into redb-slasher-backend-impl * Remove a collect * Merge remote-tracking branch 'origin/unstable' into redb-slasher-backend-impl * Merge branch 'redb-slasher-backend-impl' of https://github.com/eserilev/lighthouse into redb-slasher-backend-impl * re-enable test * fix failing slasher test * Merge remote-tracking branch 'origin/unstable' into redb-slasher-backend-impl * Rename DB file to `slasher.redb`
This commit is contained in:
@@ -15,16 +15,19 @@ pub const DEFAULT_MAX_DB_SIZE: usize = 512 * 1024; // 512 GiB
|
||||
pub const DEFAULT_ATTESTATION_ROOT_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(100_000);
|
||||
pub const DEFAULT_BROADCAST: bool = false;
|
||||
|
||||
#[cfg(all(feature = "mdbx", not(feature = "lmdb")))]
|
||||
#[cfg(all(feature = "mdbx", not(any(feature = "lmdb", feature = "redb"))))]
|
||||
pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Mdbx;
|
||||
#[cfg(feature = "lmdb")]
|
||||
pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Lmdb;
|
||||
#[cfg(not(any(feature = "mdbx", feature = "lmdb")))]
|
||||
#[cfg(all(feature = "redb", not(any(feature = "mdbx", feature = "lmdb"))))]
|
||||
pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb;
|
||||
#[cfg(not(any(feature = "mdbx", feature = "lmdb", feature = "redb")))]
|
||||
pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Disabled;
|
||||
|
||||
pub const MAX_HISTORY_LENGTH: usize = 1 << 16;
|
||||
pub const MEGABYTE: usize = 1 << 20;
|
||||
pub const MDBX_DATA_FILENAME: &str = "mdbx.dat";
|
||||
pub const REDB_DATA_FILENAME: &str = "slasher.redb";
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Config {
|
||||
@@ -64,6 +67,8 @@ pub enum DatabaseBackend {
|
||||
Mdbx,
|
||||
#[cfg(feature = "lmdb")]
|
||||
Lmdb,
|
||||
#[cfg(feature = "redb")]
|
||||
Redb,
|
||||
Disabled,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod interface;
|
||||
mod lmdb_impl;
|
||||
mod mdbx_impl;
|
||||
mod redb_impl;
|
||||
|
||||
use crate::{
|
||||
metrics, AttesterRecord, AttesterSlashingStatus, CompactAttesterRecord, Config, Error,
|
||||
@@ -489,8 +490,7 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
}
|
||||
|
||||
// Store the new indexed attestation at the end of the current table.
|
||||
let db = &self.databases.indexed_attestation_db;
|
||||
let mut cursor = txn.cursor(db)?;
|
||||
let mut cursor = txn.cursor(&self.databases.indexed_attestation_db)?;
|
||||
|
||||
let indexed_att_id = match cursor.last_key()? {
|
||||
// First ID is 1 so that 0 can be used to represent `null` in `CompactAttesterRecord`.
|
||||
@@ -504,7 +504,6 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
|
||||
cursor.put(attestation_key.as_ref(), &data)?;
|
||||
drop(cursor);
|
||||
|
||||
// Update the (epoch, hash) to ID mapping.
|
||||
self.put_indexed_attestation_id(txn, &id_key, attestation_key)?;
|
||||
|
||||
@@ -743,21 +742,17 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
loop {
|
||||
let (key_bytes, _) = cursor.get_current()?.ok_or(Error::MissingProposerKey)?;
|
||||
|
||||
let (slot, _) = ProposerKey::parse(key_bytes)?;
|
||||
let should_delete = |key: &[u8]| -> Result<bool, Error> {
|
||||
let mut should_delete = false;
|
||||
let (slot, _) = ProposerKey::parse(Cow::from(key))?;
|
||||
if slot < min_slot {
|
||||
cursor.delete_current()?;
|
||||
|
||||
// End the loop if there is no next entry.
|
||||
if cursor.next_key()?.is_none() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
should_delete = true;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(should_delete)
|
||||
};
|
||||
|
||||
cursor.delete_while(should_delete)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -771,9 +766,6 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
.saturating_add(1u64)
|
||||
.saturating_sub(self.config.history_length as u64);
|
||||
|
||||
// Collect indexed attestation IDs to delete.
|
||||
let mut indexed_attestation_ids = vec![];
|
||||
|
||||
let mut cursor = txn.cursor(&self.databases.indexed_attestation_id_db)?;
|
||||
|
||||
// Position cursor at first key, bailing out if the database is empty.
|
||||
@@ -781,27 +773,20 @@ impl<E: EthSpec> SlasherDB<E> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
loop {
|
||||
let (key_bytes, value) = cursor
|
||||
.get_current()?
|
||||
.ok_or(Error::MissingIndexedAttestationIdKey)?;
|
||||
|
||||
let (target_epoch, _) = IndexedAttestationIdKey::parse(key_bytes)?;
|
||||
|
||||
let should_delete = |key: &[u8]| -> Result<bool, Error> {
|
||||
let (target_epoch, _) = IndexedAttestationIdKey::parse(Cow::from(key))?;
|
||||
if target_epoch < min_epoch {
|
||||
indexed_attestation_ids.push(IndexedAttestationId::new(
|
||||
IndexedAttestationId::parse(value)?,
|
||||
));
|
||||
|
||||
cursor.delete_current()?;
|
||||
|
||||
if cursor.next_key()?.is_none() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
};
|
||||
|
||||
let indexed_attestation_ids = cursor
|
||||
.delete_while(should_delete)?
|
||||
.into_iter()
|
||||
.map(|id| IndexedAttestationId::parse(id).map(IndexedAttestationId::new))
|
||||
.collect::<Result<Vec<IndexedAttestationId>, Error>>()?;
|
||||
drop(cursor);
|
||||
|
||||
// Delete the indexed attestations.
|
||||
|
||||
@@ -7,6 +7,8 @@ use std::path::PathBuf;
|
||||
use crate::database::lmdb_impl;
|
||||
#[cfg(feature = "mdbx")]
|
||||
use crate::database::mdbx_impl;
|
||||
#[cfg(feature = "redb")]
|
||||
use crate::database::redb_impl;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Environment {
|
||||
@@ -14,6 +16,8 @@ pub enum Environment {
|
||||
Mdbx(mdbx_impl::Environment),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Lmdb(lmdb_impl::Environment),
|
||||
#[cfg(feature = "redb")]
|
||||
Redb(redb_impl::Environment),
|
||||
Disabled,
|
||||
}
|
||||
|
||||
@@ -23,6 +27,8 @@ pub enum RwTransaction<'env> {
|
||||
Mdbx(mdbx_impl::RwTransaction<'env>),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Lmdb(lmdb_impl::RwTransaction<'env>),
|
||||
#[cfg(feature = "redb")]
|
||||
Redb(redb_impl::RwTransaction<'env>),
|
||||
Disabled(PhantomData<&'env ()>),
|
||||
}
|
||||
|
||||
@@ -32,6 +38,8 @@ pub enum Database<'env> {
|
||||
Mdbx(mdbx_impl::Database<'env>),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Lmdb(lmdb_impl::Database<'env>),
|
||||
#[cfg(feature = "redb")]
|
||||
Redb(redb_impl::Database<'env>),
|
||||
Disabled(PhantomData<&'env ()>),
|
||||
}
|
||||
|
||||
@@ -54,6 +62,8 @@ pub enum Cursor<'env> {
|
||||
Mdbx(mdbx_impl::Cursor<'env>),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Lmdb(lmdb_impl::Cursor<'env>),
|
||||
#[cfg(feature = "redb")]
|
||||
Redb(redb_impl::Cursor<'env>),
|
||||
Disabled(PhantomData<&'env ()>),
|
||||
}
|
||||
|
||||
@@ -67,6 +77,8 @@ impl Environment {
|
||||
DatabaseBackend::Mdbx => mdbx_impl::Environment::new(config).map(Environment::Mdbx),
|
||||
#[cfg(feature = "lmdb")]
|
||||
DatabaseBackend::Lmdb => lmdb_impl::Environment::new(config).map(Environment::Lmdb),
|
||||
#[cfg(feature = "redb")]
|
||||
DatabaseBackend::Redb => redb_impl::Environment::new(config).map(Environment::Redb),
|
||||
DatabaseBackend::Disabled => Err(Error::SlasherDatabaseBackendDisabled),
|
||||
}
|
||||
}
|
||||
@@ -77,6 +89,8 @@ impl Environment {
|
||||
Self::Mdbx(env) => env.create_databases(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(env) => env.create_databases(),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(env) => env.create_databases(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -87,6 +101,8 @@ impl Environment {
|
||||
Self::Mdbx(env) => env.begin_rw_txn().map(RwTransaction::Mdbx),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(env) => env.begin_rw_txn().map(RwTransaction::Lmdb),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(env) => env.begin_rw_txn().map(RwTransaction::Redb),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -98,6 +114,8 @@ impl Environment {
|
||||
Self::Mdbx(env) => env.filenames(config),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(env) => env.filenames(config),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(env) => env.filenames(config),
|
||||
_ => vec![],
|
||||
}
|
||||
}
|
||||
@@ -106,7 +124,7 @@ impl Environment {
|
||||
impl<'env> RwTransaction<'env> {
|
||||
pub fn get<K: AsRef<[u8]> + ?Sized>(
|
||||
&'env self,
|
||||
db: &Database<'env>,
|
||||
db: &'env Database,
|
||||
key: &K,
|
||||
) -> Result<Option<Cow<'env, [u8]>>, Error> {
|
||||
match (self, db) {
|
||||
@@ -114,6 +132,8 @@ impl<'env> RwTransaction<'env> {
|
||||
(Self::Mdbx(txn), Database::Mdbx(db)) => txn.get(db, key),
|
||||
#[cfg(feature = "lmdb")]
|
||||
(Self::Lmdb(txn), Database::Lmdb(db)) => txn.get(db, key),
|
||||
#[cfg(feature = "redb")]
|
||||
(Self::Redb(txn), Database::Redb(db)) => txn.get(db, key),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -129,6 +149,8 @@ impl<'env> RwTransaction<'env> {
|
||||
(Self::Mdbx(txn), Database::Mdbx(db)) => txn.put(db, key, value),
|
||||
#[cfg(feature = "lmdb")]
|
||||
(Self::Lmdb(txn), Database::Lmdb(db)) => txn.put(db, key, value),
|
||||
#[cfg(feature = "redb")]
|
||||
(Self::Redb(txn), Database::Redb(db)) => txn.put(db, key, value),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -139,16 +161,8 @@ impl<'env> RwTransaction<'env> {
|
||||
(Self::Mdbx(txn), Database::Mdbx(db)) => txn.del(db, key),
|
||||
#[cfg(feature = "lmdb")]
|
||||
(Self::Lmdb(txn), Database::Lmdb(db)) => txn.del(db, key),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cursor<'a>(&'a mut self, db: &Database) -> Result<Cursor<'a>, Error> {
|
||||
match (self, db) {
|
||||
#[cfg(feature = "mdbx")]
|
||||
(Self::Mdbx(txn), Database::Mdbx(db)) => txn.cursor(db).map(Cursor::Mdbx),
|
||||
#[cfg(feature = "lmdb")]
|
||||
(Self::Lmdb(txn), Database::Lmdb(db)) => txn.cursor(db).map(Cursor::Lmdb),
|
||||
#[cfg(feature = "redb")]
|
||||
(Self::Redb(txn), Database::Redb(db)) => txn.del(db, key),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -159,6 +173,20 @@ impl<'env> RwTransaction<'env> {
|
||||
Self::Mdbx(txn) => txn.commit(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(txn) => txn.commit(),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(txn) => txn.commit(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result<Cursor<'a>, Error> {
|
||||
match (self, db) {
|
||||
#[cfg(feature = "mdbx")]
|
||||
(Self::Mdbx(txn), Database::Mdbx(db)) => txn.cursor(db).map(Cursor::Mdbx),
|
||||
#[cfg(feature = "lmdb")]
|
||||
(Self::Lmdb(txn), Database::Lmdb(db)) => txn.cursor(db).map(Cursor::Lmdb),
|
||||
#[cfg(feature = "redb")]
|
||||
(Self::Redb(txn), Database::Redb(db)) => txn.cursor(db).map(Cursor::Redb),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -172,6 +200,8 @@ impl<'env> Cursor<'env> {
|
||||
Cursor::Mdbx(cursor) => cursor.first_key(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Cursor::Lmdb(cursor) => cursor.first_key(),
|
||||
#[cfg(feature = "redb")]
|
||||
Cursor::Redb(cursor) => cursor.first_key(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -183,6 +213,8 @@ impl<'env> Cursor<'env> {
|
||||
Cursor::Mdbx(cursor) => cursor.last_key(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Cursor::Lmdb(cursor) => cursor.last_key(),
|
||||
#[cfg(feature = "redb")]
|
||||
Cursor::Redb(cursor) => cursor.last_key(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -193,17 +225,8 @@ impl<'env> Cursor<'env> {
|
||||
Cursor::Mdbx(cursor) => cursor.next_key(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Cursor::Lmdb(cursor) => cursor.next_key(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the key value pair at the current position.
|
||||
pub fn get_current(&mut self) -> Result<Option<(Key, Value)>, Error> {
|
||||
match self {
|
||||
#[cfg(feature = "mdbx")]
|
||||
Cursor::Mdbx(cursor) => cursor.get_current(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Cursor::Lmdb(cursor) => cursor.get_current(),
|
||||
#[cfg(feature = "redb")]
|
||||
Cursor::Redb(cursor) => cursor.next_key(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -214,6 +237,8 @@ impl<'env> Cursor<'env> {
|
||||
Cursor::Mdbx(cursor) => cursor.delete_current(),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Cursor::Lmdb(cursor) => cursor.delete_current(),
|
||||
#[cfg(feature = "redb")]
|
||||
Cursor::Redb(cursor) => cursor.delete_current(),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
@@ -224,6 +249,23 @@ impl<'env> Cursor<'env> {
|
||||
Self::Mdbx(cursor) => cursor.put(key, value),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(cursor) => cursor.put(key, value),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(cursor) => cursor.put(key, value),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete_while(
|
||||
&mut self,
|
||||
f: impl Fn(&[u8]) -> Result<bool, Error>,
|
||||
) -> Result<Vec<Cow<'_, [u8]>>, Error> {
|
||||
match self {
|
||||
#[cfg(feature = "mdbx")]
|
||||
Self::Mdbx(txn) => txn.delete_while(f),
|
||||
#[cfg(feature = "lmdb")]
|
||||
Self::Lmdb(txn) => txn.delete_while(f),
|
||||
#[cfg(feature = "redb")]
|
||||
Self::Redb(txn) => txn.delete_while(f),
|
||||
_ => Err(Error::MismatchedDatabaseVariant),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ impl Environment {
|
||||
impl<'env> RwTransaction<'env> {
|
||||
pub fn get<K: AsRef<[u8]> + ?Sized>(
|
||||
&'env self,
|
||||
db: &Database<'env>,
|
||||
db: &'env Database,
|
||||
key: &K,
|
||||
) -> Result<Option<Cow<'env, [u8]>>, Error> {
|
||||
Ok(self.txn.get(db.db, key).optional()?.map(Cow::Borrowed))
|
||||
@@ -182,6 +182,29 @@ impl<'env> Cursor<'env> {
|
||||
.put(&key, &value, RwTransaction::write_flags())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_while(
|
||||
&mut self,
|
||||
f: impl Fn(&[u8]) -> Result<bool, Error>,
|
||||
) -> Result<Vec<Cow<'_, [u8]>>, Error> {
|
||||
let mut result = vec![];
|
||||
|
||||
loop {
|
||||
let (key_bytes, value) = self.get_current()?.ok_or(Error::MissingKey)?;
|
||||
|
||||
if f(&key_bytes)? {
|
||||
result.push(value);
|
||||
self.delete_current()?;
|
||||
if self.next_key()?.is_none() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
/// Mix-in trait for loading values from LMDB that may or may not exist.
|
||||
|
||||
@@ -113,7 +113,7 @@ impl<'env> RwTransaction<'env> {
|
||||
|
||||
pub fn get<K: AsRef<[u8]> + ?Sized>(
|
||||
&'env self,
|
||||
db: &Database<'env>,
|
||||
db: &'env Database,
|
||||
key: &K,
|
||||
) -> Result<Option<Cow<'env, [u8]>>, Error> {
|
||||
Ok(self.txn.get(&db.db, key.as_ref())?)
|
||||
@@ -183,4 +183,27 @@ impl<'env> Cursor<'env> {
|
||||
.put(key.as_ref(), value.as_ref(), RwTransaction::write_flags())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_while(
|
||||
&mut self,
|
||||
f: impl Fn(&[u8]) -> Result<bool, Error>,
|
||||
) -> Result<Vec<Cow<'_, [u8]>>, Error> {
|
||||
let mut result = vec![];
|
||||
|
||||
loop {
|
||||
let (key_bytes, value) = self.get_current()?.ok_or(Error::MissingKey)?;
|
||||
|
||||
if f(&key_bytes)? {
|
||||
result.push(value);
|
||||
self.delete_current()?;
|
||||
if self.next_key()?.is_none() {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
276
slasher/src/database/redb_impl.rs
Normal file
276
slasher/src/database/redb_impl.rs
Normal file
@@ -0,0 +1,276 @@
|
||||
#![cfg(feature = "redb")]
|
||||
use crate::{
|
||||
config::REDB_DATA_FILENAME,
|
||||
database::{
|
||||
interface::{Key, OpenDatabases, Value},
|
||||
*,
|
||||
},
|
||||
Config, Error,
|
||||
};
|
||||
use derivative::Derivative;
|
||||
use redb::{ReadableTable, TableDefinition};
|
||||
use std::{borrow::Cow, path::PathBuf};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Environment {
|
||||
_db_count: usize,
|
||||
db: redb::Database,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database<'env> {
|
||||
table_name: String,
|
||||
_phantom: PhantomData<&'env ()>,
|
||||
}
|
||||
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct RwTransaction<'env> {
|
||||
#[derivative(Debug = "ignore")]
|
||||
txn: redb::WriteTransaction,
|
||||
_phantom: PhantomData<&'env ()>,
|
||||
}
|
||||
|
||||
#[derive(Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct Cursor<'env> {
|
||||
#[derivative(Debug = "ignore")]
|
||||
txn: &'env redb::WriteTransaction,
|
||||
db: &'env Database<'env>,
|
||||
current_key: Option<Cow<'env, [u8]>>,
|
||||
}
|
||||
|
||||
impl Environment {
|
||||
pub fn new(config: &Config) -> Result<Environment, Error> {
|
||||
let db_path = config.database_path.join(REDB_DATA_FILENAME);
|
||||
let database = redb::Database::create(db_path)?;
|
||||
|
||||
Ok(Environment {
|
||||
_db_count: MAX_NUM_DBS,
|
||||
db: database,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_databases(&self) -> Result<OpenDatabases, Error> {
|
||||
let indexed_attestation_db = self.create_table(INDEXED_ATTESTATION_DB)?;
|
||||
let indexed_attestation_id_db = self.create_table(INDEXED_ATTESTATION_ID_DB)?;
|
||||
let attesters_db = self.create_table(ATTESTERS_DB)?;
|
||||
let attesters_max_targets_db = self.create_table(ATTESTERS_MAX_TARGETS_DB)?;
|
||||
let min_targets_db = self.create_table(MIN_TARGETS_DB)?;
|
||||
let max_targets_db = self.create_table(MAX_TARGETS_DB)?;
|
||||
let current_epochs_db = self.create_table(CURRENT_EPOCHS_DB)?;
|
||||
let proposers_db = self.create_table(PROPOSERS_DB)?;
|
||||
let metadata_db = self.create_table(METADATA_DB)?;
|
||||
|
||||
Ok(OpenDatabases {
|
||||
indexed_attestation_db,
|
||||
indexed_attestation_id_db,
|
||||
attesters_db,
|
||||
attesters_max_targets_db,
|
||||
min_targets_db,
|
||||
max_targets_db,
|
||||
current_epochs_db,
|
||||
proposers_db,
|
||||
metadata_db,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create_table<'env>(
|
||||
&'env self,
|
||||
table_name: &'env str,
|
||||
) -> Result<crate::Database<'env>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(table_name);
|
||||
let tx = self.db.begin_write()?;
|
||||
tx.open_table(table_definition)?;
|
||||
tx.commit()?;
|
||||
|
||||
Ok(crate::Database::Redb(Database {
|
||||
table_name: table_name.to_string(),
|
||||
_phantom: PhantomData,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn filenames(&self, config: &Config) -> Vec<PathBuf> {
|
||||
vec![config.database_path.join(BASE_DB)]
|
||||
}
|
||||
|
||||
pub fn begin_rw_txn(&self) -> Result<RwTransaction, Error> {
|
||||
let mut txn = self.db.begin_write()?;
|
||||
txn.set_durability(redb::Durability::Eventual);
|
||||
Ok(RwTransaction {
|
||||
txn,
|
||||
_phantom: PhantomData,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'env> RwTransaction<'env> {
|
||||
pub fn get<K: AsRef<[u8]> + ?Sized>(
|
||||
&'env self,
|
||||
db: &'env Database,
|
||||
key: &K,
|
||||
) -> Result<Option<Cow<'env, [u8]>>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&db.table_name);
|
||||
let table = self.txn.open_table(table_definition)?;
|
||||
let result = table.get(key.as_ref())?;
|
||||
if let Some(access_guard) = result {
|
||||
let value = access_guard.value().to_vec();
|
||||
Ok(Some(Cow::from(value)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(
|
||||
&mut self,
|
||||
db: &Database,
|
||||
key: K,
|
||||
value: V,
|
||||
) -> Result<(), Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&db.table_name);
|
||||
let mut table = self.txn.open_table(table_definition)?;
|
||||
table.insert(key.as_ref(), value.as_ref())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn del<K: AsRef<[u8]>>(&mut self, db: &Database, key: K) -> Result<(), Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&db.table_name);
|
||||
let mut table = self.txn.open_table(table_definition)?;
|
||||
table.remove(key.as_ref())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn commit(self) -> Result<(), Error> {
|
||||
self.txn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn cursor<'a>(&'a mut self, db: &'a Database) -> Result<Cursor<'a>, Error> {
|
||||
Ok(Cursor {
|
||||
txn: &self.txn,
|
||||
db,
|
||||
current_key: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'env> Cursor<'env> {
|
||||
pub fn first_key(&mut self) -> Result<Option<Key>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let table = self.txn.open_table(table_definition)?;
|
||||
let first = table
|
||||
.iter()?
|
||||
.next()
|
||||
.map(|x| x.map(|(key, _)| key.value().to_vec()));
|
||||
|
||||
if let Some(owned_key) = first {
|
||||
let owned_key = owned_key?;
|
||||
self.current_key = Some(Cow::from(owned_key));
|
||||
Ok(self.current_key.clone())
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn last_key(&mut self) -> Result<Option<Key<'env>>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let table = self.txn.open_table(table_definition)?;
|
||||
let last = table
|
||||
.iter()?
|
||||
.next_back()
|
||||
.map(|x| x.map(|(key, _)| key.value().to_vec()));
|
||||
|
||||
if let Some(owned_key) = last {
|
||||
let owned_key = owned_key?;
|
||||
self.current_key = Some(Cow::from(owned_key));
|
||||
return Ok(self.current_key.clone());
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn get_current(&self) -> Result<Option<(Key<'env>, Value<'env>)>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let table = self.txn.open_table(table_definition)?;
|
||||
if let Some(key) = &self.current_key {
|
||||
let result = table.get(key.as_ref())?;
|
||||
|
||||
if let Some(access_guard) = result {
|
||||
let value = access_guard.value().to_vec();
|
||||
return Ok(Some((key.clone(), Cow::from(value))));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn next_key(&mut self) -> Result<Option<Key<'env>>, Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let table = self.txn.open_table(table_definition)?;
|
||||
if let Some(current_key) = &self.current_key {
|
||||
let range: std::ops::RangeFrom<&[u8]> = current_key..;
|
||||
|
||||
let next = table
|
||||
.range(range)?
|
||||
.next()
|
||||
.map(|x| x.map(|(key, _)| key.value().to_vec()));
|
||||
|
||||
if let Some(owned_key) = next {
|
||||
let owned_key = owned_key?;
|
||||
self.current_key = Some(Cow::from(owned_key));
|
||||
return Ok(self.current_key.clone());
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub fn delete_current(&self) -> Result<(), Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let mut table = self.txn.open_table(table_definition)?;
|
||||
if let Some(key) = &self.current_key {
|
||||
table.remove(key.as_ref())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_while(
|
||||
&self,
|
||||
f: impl Fn(&[u8]) -> Result<bool, Error>,
|
||||
) -> Result<Vec<Cow<'_, [u8]>>, Error> {
|
||||
let mut deleted_values = vec![];
|
||||
if let Some(current_key) = &self.current_key {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
|
||||
let mut table = self.txn.open_table(table_definition)?;
|
||||
|
||||
let deleted =
|
||||
table.extract_from_if(current_key.as_ref().., |key, _| f(key).unwrap_or(false))?;
|
||||
|
||||
deleted.for_each(|result| {
|
||||
if let Ok(item) = result {
|
||||
let value = item.1.value().to_vec();
|
||||
deleted_values.push(Cow::from(value));
|
||||
}
|
||||
})
|
||||
};
|
||||
Ok(deleted_values)
|
||||
}
|
||||
|
||||
pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<(), Error> {
|
||||
let table_definition: TableDefinition<'_, &[u8], &[u8]> =
|
||||
TableDefinition::new(&self.db.table_name);
|
||||
let mut table = self.txn.open_table(table_definition)?;
|
||||
table.insert(key.as_ref(), value.as_ref())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,8 @@ pub enum Error {
|
||||
DatabaseMdbxError(mdbx::Error),
|
||||
#[cfg(feature = "lmdb")]
|
||||
DatabaseLmdbError(lmdb::Error),
|
||||
#[cfg(feature = "redb")]
|
||||
DatabaseRedbError(redb::Error),
|
||||
SlasherDatabaseBackendDisabled,
|
||||
MismatchedDatabaseVariant,
|
||||
DatabaseIOError(io::Error),
|
||||
@@ -67,6 +69,7 @@ pub enum Error {
|
||||
MissingIndexedAttestationId,
|
||||
MissingIndexedAttestationIdKey,
|
||||
InconsistentAttestationDataRoot,
|
||||
MissingKey,
|
||||
}
|
||||
|
||||
#[cfg(feature = "mdbx")]
|
||||
@@ -89,6 +92,41 @@ impl From<lmdb::Error> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "redb")]
|
||||
impl From<redb::TableError> for Error {
|
||||
fn from(e: redb::TableError) -> Self {
|
||||
Error::DatabaseRedbError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "redb")]
|
||||
impl From<redb::TransactionError> for Error {
|
||||
fn from(e: redb::TransactionError) -> Self {
|
||||
Error::DatabaseRedbError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "redb")]
|
||||
impl From<redb::DatabaseError> for Error {
|
||||
fn from(e: redb::DatabaseError) -> Self {
|
||||
Error::DatabaseRedbError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "redb")]
|
||||
impl From<redb::StorageError> for Error {
|
||||
fn from(e: redb::StorageError) -> Self {
|
||||
Error::DatabaseRedbError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "redb")]
|
||||
impl From<redb::CommitError> for Error {
|
||||
fn from(e: redb::CommitError) -> Self {
|
||||
Error::DatabaseRedbError(e.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<io::Error> for Error {
|
||||
fn from(e: io::Error) -> Self {
|
||||
Error::DatabaseIOError(e)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#![deny(missing_debug_implementations)]
|
||||
#![cfg_attr(
|
||||
not(any(feature = "mdbx", feature = "lmdb")),
|
||||
not(any(feature = "mdbx", feature = "lmdb", feature = "redb")),
|
||||
allow(unused, clippy::drop_non_drop)
|
||||
)]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user