diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4f3f33fe06..ccd65877e0 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -371,8 +371,10 @@ pub fn get_config( slasher_config.history_length = history_length; } - if let Some(max_db_size) = clap_utils::parse_optional(cli_args, "slasher-max-db-size")? { - slasher_config.max_db_size_gbs = max_db_size; + if let Some(max_db_size_gbs) = + clap_utils::parse_optional::(cli_args, "slasher-max-db-size")? + { + slasher_config.max_db_size_mbs = max_db_size_gbs * 1024; } if let Some(chunk_size) = clap_utils::parse_optional(cli_args, "slasher-chunk-size")? { diff --git a/book/src/slasher.md b/book/src/slasher.md index 0be9a65f64..a5f986fa70 100644 --- a/book/src/slasher.md +++ b/book/src/slasher.md @@ -74,7 +74,7 @@ either you can halve the space required. If you want a better estimate you can use this formula: ``` -352 * V * N + (16 * V * N)/(C * K) + 15000 * N +360 * V * N + (16 * V * N)/(C * K) + 15000 * N ``` where diff --git a/slasher/src/config.rs b/slasher/src/config.rs index dba2e604eb..4438a25307 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -7,7 +7,7 @@ pub const DEFAULT_CHUNK_SIZE: usize = 16; pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; pub const DEFAULT_HISTORY_LENGTH: usize = 4096; pub const DEFAULT_UPDATE_PERIOD: u64 = 12; -pub const DEFAULT_MAX_DB_SIZE: usize = 256; +pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -18,8 +18,8 @@ pub struct Config { pub history_length: usize, /// Update frequency in seconds. pub update_period: u64, - /// Maximum size of the LMDB database in gigabytes. - pub max_db_size_gbs: usize, + /// Maximum size of the LMDB database in megabytes. + pub max_db_size_mbs: usize, } impl Config { @@ -30,7 +30,7 @@ impl Config { validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE, history_length: DEFAULT_HISTORY_LENGTH, update_period: DEFAULT_UPDATE_PERIOD, - max_db_size_gbs: DEFAULT_MAX_DB_SIZE, + max_db_size_mbs: DEFAULT_MAX_DB_SIZE, } } @@ -38,7 +38,7 @@ impl Config { if self.chunk_size == 0 || self.validator_chunk_size == 0 || self.history_length == 0 - || self.max_db_size_gbs == 0 + || self.max_db_size_mbs == 0 { Err(Error::ConfigInvalidZeroParameter { config: self.clone(), diff --git a/slasher/src/database.rs b/slasher/src/database.rs index 8899d05479..26eac835f7 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -1,11 +1,10 @@ use crate::{ - utils::TxnOptional, AttesterRecord, AttesterSlashingStatus, Config, Error, - ProposerSlashingStatus, + utils::{TxnMapFull, TxnOptional}, + AttesterRecord, AttesterSlashingStatus, Config, Error, ProposerSlashingStatus, }; use byteorder::{BigEndian, ByteOrder}; use lmdb::{Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags}; use ssz::{Decode, Encode}; -use std::collections::HashSet; use std::marker::PhantomData; use std::sync::Arc; use types::{ @@ -13,13 +12,13 @@ use types::{ }; /// Current database schema version, to check compatibility of on-disk DB with software. -const CURRENT_SCHEMA_VERSION: u64 = 0; +const CURRENT_SCHEMA_VERSION: u64 = 1; /// Metadata about the slashing database itself. const METADATA_DB: &str = "metadata"; /// Map from `(target_epoch, validator_index)` to `AttesterRecord`. const ATTESTERS_DB: &str = "attesters"; -/// Map from `indexed_attestation_hash` to `IndexedAttestation`. +/// Map from `(target_epoch, indexed_attestation_hash)` to `IndexedAttestation`. const INDEXED_ATTESTATION_DB: &str = "indexed_attestations"; /// Table of minimum targets for every source epoch within range. const MIN_TARGETS_DB: &str = "min_targets"; @@ -42,7 +41,9 @@ const METADATA_CONFIG_KEY: &[u8] = &[1]; const ATTESTER_KEY_SIZE: usize = 16; const PROPOSER_KEY_SIZE: usize = 16; -const GIGABYTE: usize = 1 << 30; +const CURRENT_EPOCH_KEY_SIZE: usize = 8; +const INDEXED_ATTESTATION_KEY_SIZE: usize = 40; +const MEGABYTE: usize = 1 << 20; #[derive(Debug)] pub struct SlasherDB { @@ -128,7 +129,7 @@ impl AsRef<[u8]> for ProposerKey { /// Key containing a validator index pub struct CurrentEpochKey { - validator_index: [u8; 8], + validator_index: [u8; CURRENT_EPOCH_KEY_SIZE], } impl CurrentEpochKey { @@ -145,12 +146,44 @@ impl AsRef<[u8]> for CurrentEpochKey { } } +/// Key containing an epoch and an indexed attestation hash. +pub struct IndexedAttestationKey { + target_and_root: [u8; INDEXED_ATTESTATION_KEY_SIZE], +} + +impl IndexedAttestationKey { + pub fn new(target_epoch: Epoch, indexed_attestation_root: Hash256) -> Self { + let mut data = [0; INDEXED_ATTESTATION_KEY_SIZE]; + data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes()); + data[8..INDEXED_ATTESTATION_KEY_SIZE].copy_from_slice(indexed_attestation_root.as_bytes()); + Self { + target_and_root: data, + } + } + + pub fn parse(data: &[u8]) -> Result<(Epoch, Hash256), Error> { + if data.len() == INDEXED_ATTESTATION_KEY_SIZE { + let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8])); + let indexed_attestation_root = Hash256::from_slice(&data[8..]); + Ok((target_epoch, indexed_attestation_root)) + } else { + Err(Error::IndexedAttestationKeyCorrupt { length: data.len() }) + } + } +} + +impl AsRef<[u8]> for IndexedAttestationKey { + fn as_ref(&self) -> &[u8] { + &self.target_and_root + } +} + impl SlasherDB { pub fn open(config: Arc) -> Result { std::fs::create_dir_all(&config.database_path)?; let env = Environment::new() .set_max_dbs(LMDB_MAX_DBS) - .set_map_size(config.max_db_size_gbs * GIGABYTE) + .set_map_size(config.max_db_size_mbs * MEGABYTE) .open_with_permissions(&config.database_path, 0o600)?; let indexed_attestation_db = env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; @@ -284,11 +317,15 @@ impl SlasherDB { indexed_attestation_hash: Hash256, indexed_attestation: &IndexedAttestation, ) -> Result<(), Error> { + let key = IndexedAttestationKey::new( + indexed_attestation.data.target.epoch, + indexed_attestation_hash, + ); let data = indexed_attestation.as_ssz_bytes(); txn.put( self.indexed_attestation_db, - &indexed_attestation_hash.as_bytes(), + &key, &data, Self::write_flags(), )?; @@ -298,10 +335,12 @@ impl SlasherDB { pub fn get_indexed_attestation( &self, txn: &mut RwTransaction<'_>, + target_epoch: Epoch, indexed_attestation_hash: Hash256, ) -> Result, Error> { + let key = IndexedAttestationKey::new(target_epoch, indexed_attestation_hash); let bytes = txn - .get(self.indexed_attestation_db, &indexed_attestation_hash) + .get(self.indexed_attestation_db, &key) .optional()? .ok_or_else(|| Error::MissingIndexedAttestation { root: indexed_attestation_hash, @@ -317,8 +356,9 @@ impl SlasherDB { record: AttesterRecord, ) -> Result, Error> { // See if there's an existing attestation for this attester. + let target_epoch = attestation.data.target.epoch; if let Some(existing_record) = - self.get_attester_record(txn, validator_index, attestation.data.target.epoch)? + self.get_attester_record(txn, validator_index, target_epoch)? { // If the existing attestation data is identical, then this attestation is not // slashable and no update is required. @@ -327,8 +367,11 @@ impl SlasherDB { } // Otherwise, load the indexed attestation so we can confirm that it's slashable. - let existing_attestation = - self.get_indexed_attestation(txn, existing_record.indexed_attestation_hash)?; + let existing_attestation = self.get_indexed_attestation( + txn, + target_epoch, + existing_record.indexed_attestation_hash, + )?; if attestation.is_double_vote(&existing_attestation) { Ok(AttesterSlashingStatus::DoubleVote(Box::new( existing_attestation, @@ -341,7 +384,7 @@ impl SlasherDB { else { txn.put( self.attesters_db, - &AttesterKey::new(validator_index, attestation.data.target.epoch), + &AttesterKey::new(validator_index, target_epoch), &record.as_ssz_bytes(), Self::write_flags(), )?; @@ -361,7 +404,7 @@ impl SlasherDB { validator_index, target_epoch, })?; - self.get_indexed_attestation(txn, record.indexed_attestation_hash) + self.get_indexed_attestation(txn, target_epoch, record.indexed_attestation_hash) } pub fn get_attester_record( @@ -422,14 +465,28 @@ impl SlasherDB { } } + /// Attempt to prune the database, deleting old blocks and attestations. pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> { let mut txn = self.begin_rw_txn()?; - self.prune_proposers(current_epoch, &mut txn)?; - self.prune_attesters(current_epoch, &mut txn)?; + self.try_prune(current_epoch, &mut txn).allow_map_full()?; txn.commit()?; Ok(()) } + /// Try to prune the database. + /// + /// This is a separate method from `prune` so that `allow_map_full` may be used. + pub fn try_prune( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + self.prune_proposers(current_epoch, txn)?; + self.prune_attesters(current_epoch, txn)?; + self.prune_indexed_attestations(current_epoch, txn)?; + Ok(()) + } + fn prune_proposers( &self, current_epoch: Epoch, @@ -497,19 +554,15 @@ impl SlasherDB { return Ok(()); } - let mut indexed_attestations_to_delete = HashSet::new(); - loop { - let (optional_key, value) = cursor.get(None, None, lmdb_sys::MDB_GET_CURRENT)?; - let key_bytes = optional_key.ok_or_else(|| Error::MissingAttesterKey)?; + let key_bytes = cursor + .get(None, None, lmdb_sys::MDB_GET_CURRENT)? + .0 + .ok_or_else(|| Error::MissingAttesterKey)?; - let (target_epoch, _validator_index) = AttesterKey::parse(key_bytes)?; + let (target_epoch, _) = AttesterKey::parse(key_bytes)?; if target_epoch < min_epoch { - // Stage the indexed attestation for deletion and delete the record itself. - let attester_record = AttesterRecord::from_ssz_bytes(value)?; - indexed_attestations_to_delete.insert(attester_record.indexed_attestation_hash); - cursor.del(Self::write_flags())?; // End the loop if there is no next entry. @@ -524,10 +577,51 @@ impl SlasherDB { break; } } - drop(cursor); - for indexed_attestation_hash in indexed_attestations_to_delete { - txn.del(self.indexed_attestation_db, &indexed_attestation_hash, None)?; + Ok(()) + } + + fn prune_indexed_attestations( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + let min_epoch = current_epoch + .saturating_add(1u64) + .saturating_sub(self.config.history_length as u64); + + let mut cursor = txn.open_rw_cursor(self.indexed_attestation_db)?; + + // Position cursor at first key, bailing out if the database is empty. + if cursor + .get(None, None, lmdb_sys::MDB_FIRST) + .optional()? + .is_none() + { + return Ok(()); + } + + loop { + let key_bytes = cursor + .get(None, None, lmdb_sys::MDB_GET_CURRENT)? + .0 + .ok_or_else(|| Error::MissingAttesterKey)?; + + let (target_epoch, _) = IndexedAttestationKey::parse(key_bytes)?; + + if target_epoch < min_epoch { + cursor.del(Self::write_flags())?; + + if cursor + .get(None, None, lmdb_sys::MDB_NEXT) + .optional()? + .is_none() + { + break; + } + } else { + break; + } } Ok(()) diff --git a/slasher/src/error.rs b/slasher/src/error.rs index f1c8f727e9..0d3262bce1 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -41,11 +41,15 @@ pub enum Error { ProposerKeyCorrupt { length: usize, }, + IndexedAttestationKeyCorrupt { + length: usize, + }, MissingIndexedAttestation { root: Hash256, }, MissingAttesterKey, MissingProposerKey, + MissingIndexedAttestationKey, AttesterRecordInconsistentRoot, } diff --git a/slasher/src/utils.rs b/slasher/src/utils.rs index b9df9b5b44..9c9eceaa14 100644 --- a/slasher/src/utils.rs +++ b/slasher/src/utils.rs @@ -14,3 +14,18 @@ impl TxnOptional for Result { } } } + +/// Transform a transaction that would fail with a `MapFull` error into an optional result. +pub trait TxnMapFull { + fn allow_map_full(self) -> Result, E>; +} + +impl TxnMapFull for Result { + fn allow_map_full(self) -> Result, Error> { + match self { + Ok(x) => Ok(Some(x)), + Err(Error::DatabaseError(lmdb::Error::MapFull)) => Ok(None), + Err(e) => Err(e), + } + } +} diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index 0ed3860dcf..7802a73909 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -1,6 +1,6 @@ use slasher::{ test_utils::{indexed_att, logger}, - Config, Slasher, + Config, Error, Slasher, }; use tempdir::TempDir; use types::Epoch; @@ -37,3 +37,53 @@ fn attestation_pruning_empty_wrap_around() { )); slasher.process_queued(current_epoch).unwrap(); } + +// Test that pruning can recover from a `MapFull` error +#[test] +fn pruning_with_map_full() { + let tempdir = TempDir::new("slasher").unwrap(); + let mut config = Config::new(tempdir.path().into()); + config.validator_chunk_size = 1; + config.chunk_size = 16; + config.history_length = 1024; + config.max_db_size_mbs = 1; + + let slasher = Slasher::open(config.clone(), logger()).unwrap(); + + let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; + + let mut current_epoch = Epoch::new(0); + + loop { + slasher.accept_attestation(indexed_att( + v.clone(), + (current_epoch - 1).as_u64(), + current_epoch.as_u64(), + 0, + )); + if let Err(Error::DatabaseError(lmdb::Error::MapFull)) = + slasher.process_queued(current_epoch) + { + break; + } + current_epoch += 1; + } + + loop { + slasher.prune_database(current_epoch).unwrap(); + + slasher.accept_attestation(indexed_att( + v.clone(), + (current_epoch - 1).as_u64(), + current_epoch.as_u64(), + 0, + )); + match slasher.process_queued(current_epoch) { + Ok(()) => break, + Err(Error::DatabaseError(lmdb::Error::MapFull)) => { + current_epoch += 1; + } + Err(e) => panic!("{:?}", e), + } + } +}