From 7d644103c6ef7492fbf92984494b4fed8dcf75b9 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 23 Nov 2020 21:33:51 +0000 Subject: [PATCH] Tweak slasher DB schema and pruning (#1948) ## Issue Addressed Resolves #1890 ## Proposed Changes Change the slasher database schema to key indexed attestations by `(target_epoch, indexed_attestation_root)` instead of just `indexed_attestation_root`. This allows more straight-forward pruning (linear scan), that is also "re-entrant". By re-entrant, we mean that a pruning pass that gets stuck because of a `MapFull` error can attempt to commit midway, and be resumed later without issue. The previous pruning strategy for indexed attestations did not have this property. There was also a flaw in the previous pruning that could leave "zombie" indexed attestations in the database (ones not referenced by any attester record), which could build up and contribute to bloat (although in practice I think they occur quite infrequently). ## Additional Info During testing I noticed that a `MapFull` error can still occur during the commit of the transaction itself, which is irritating, but not unbearable. This PR should at least reduce the frequency with which users need to manually resize their DB, and if the `MapFull` on commit rears its ugly head too often we could use a dynamic strategy (temporarily increase the size of the map until the transaction commits). The extra bytes for the epoch make the database a bit heavier, so the size estimate docs have been updated to reflect this. This is also a breaking schema change, so anyone using a v0 database from a few hours ago will need to drop it and update :sweat_smile: --- beacon_node/src/config.rs | 6 +- book/src/slasher.md | 2 +- slasher/src/config.rs | 10 +-- slasher/src/database.rs | 152 ++++++++++++++++++++++++++++------- slasher/src/error.rs | 4 + slasher/src/utils.rs | 15 ++++ slasher/tests/wrap_around.rs | 52 +++++++++++- 7 files changed, 203 insertions(+), 38 deletions(-) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 4f3f33fe06..ccd65877e0 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -371,8 +371,10 @@ pub fn get_config( slasher_config.history_length = history_length; } - if let Some(max_db_size) = clap_utils::parse_optional(cli_args, "slasher-max-db-size")? { - slasher_config.max_db_size_gbs = max_db_size; + if let Some(max_db_size_gbs) = + clap_utils::parse_optional::(cli_args, "slasher-max-db-size")? + { + slasher_config.max_db_size_mbs = max_db_size_gbs * 1024; } if let Some(chunk_size) = clap_utils::parse_optional(cli_args, "slasher-chunk-size")? { diff --git a/book/src/slasher.md b/book/src/slasher.md index 0be9a65f64..a5f986fa70 100644 --- a/book/src/slasher.md +++ b/book/src/slasher.md @@ -74,7 +74,7 @@ either you can halve the space required. If you want a better estimate you can use this formula: ``` -352 * V * N + (16 * V * N)/(C * K) + 15000 * N +360 * V * N + (16 * V * N)/(C * K) + 15000 * N ``` where diff --git a/slasher/src/config.rs b/slasher/src/config.rs index dba2e604eb..4438a25307 100644 --- a/slasher/src/config.rs +++ b/slasher/src/config.rs @@ -7,7 +7,7 @@ pub const DEFAULT_CHUNK_SIZE: usize = 16; pub const DEFAULT_VALIDATOR_CHUNK_SIZE: usize = 256; pub const DEFAULT_HISTORY_LENGTH: usize = 4096; pub const DEFAULT_UPDATE_PERIOD: u64 = 12; -pub const DEFAULT_MAX_DB_SIZE: usize = 256; +pub const DEFAULT_MAX_DB_SIZE: usize = 256 * 1024; // 256 GiB #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { @@ -18,8 +18,8 @@ pub struct Config { pub history_length: usize, /// Update frequency in seconds. pub update_period: u64, - /// Maximum size of the LMDB database in gigabytes. - pub max_db_size_gbs: usize, + /// Maximum size of the LMDB database in megabytes. + pub max_db_size_mbs: usize, } impl Config { @@ -30,7 +30,7 @@ impl Config { validator_chunk_size: DEFAULT_VALIDATOR_CHUNK_SIZE, history_length: DEFAULT_HISTORY_LENGTH, update_period: DEFAULT_UPDATE_PERIOD, - max_db_size_gbs: DEFAULT_MAX_DB_SIZE, + max_db_size_mbs: DEFAULT_MAX_DB_SIZE, } } @@ -38,7 +38,7 @@ impl Config { if self.chunk_size == 0 || self.validator_chunk_size == 0 || self.history_length == 0 - || self.max_db_size_gbs == 0 + || self.max_db_size_mbs == 0 { Err(Error::ConfigInvalidZeroParameter { config: self.clone(), diff --git a/slasher/src/database.rs b/slasher/src/database.rs index 8899d05479..26eac835f7 100644 --- a/slasher/src/database.rs +++ b/slasher/src/database.rs @@ -1,11 +1,10 @@ use crate::{ - utils::TxnOptional, AttesterRecord, AttesterSlashingStatus, Config, Error, - ProposerSlashingStatus, + utils::{TxnMapFull, TxnOptional}, + AttesterRecord, AttesterSlashingStatus, Config, Error, ProposerSlashingStatus, }; use byteorder::{BigEndian, ByteOrder}; use lmdb::{Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction, WriteFlags}; use ssz::{Decode, Encode}; -use std::collections::HashSet; use std::marker::PhantomData; use std::sync::Arc; use types::{ @@ -13,13 +12,13 @@ use types::{ }; /// Current database schema version, to check compatibility of on-disk DB with software. -const CURRENT_SCHEMA_VERSION: u64 = 0; +const CURRENT_SCHEMA_VERSION: u64 = 1; /// Metadata about the slashing database itself. const METADATA_DB: &str = "metadata"; /// Map from `(target_epoch, validator_index)` to `AttesterRecord`. const ATTESTERS_DB: &str = "attesters"; -/// Map from `indexed_attestation_hash` to `IndexedAttestation`. +/// Map from `(target_epoch, indexed_attestation_hash)` to `IndexedAttestation`. const INDEXED_ATTESTATION_DB: &str = "indexed_attestations"; /// Table of minimum targets for every source epoch within range. const MIN_TARGETS_DB: &str = "min_targets"; @@ -42,7 +41,9 @@ const METADATA_CONFIG_KEY: &[u8] = &[1]; const ATTESTER_KEY_SIZE: usize = 16; const PROPOSER_KEY_SIZE: usize = 16; -const GIGABYTE: usize = 1 << 30; +const CURRENT_EPOCH_KEY_SIZE: usize = 8; +const INDEXED_ATTESTATION_KEY_SIZE: usize = 40; +const MEGABYTE: usize = 1 << 20; #[derive(Debug)] pub struct SlasherDB { @@ -128,7 +129,7 @@ impl AsRef<[u8]> for ProposerKey { /// Key containing a validator index pub struct CurrentEpochKey { - validator_index: [u8; 8], + validator_index: [u8; CURRENT_EPOCH_KEY_SIZE], } impl CurrentEpochKey { @@ -145,12 +146,44 @@ impl AsRef<[u8]> for CurrentEpochKey { } } +/// Key containing an epoch and an indexed attestation hash. +pub struct IndexedAttestationKey { + target_and_root: [u8; INDEXED_ATTESTATION_KEY_SIZE], +} + +impl IndexedAttestationKey { + pub fn new(target_epoch: Epoch, indexed_attestation_root: Hash256) -> Self { + let mut data = [0; INDEXED_ATTESTATION_KEY_SIZE]; + data[0..8].copy_from_slice(&target_epoch.as_u64().to_be_bytes()); + data[8..INDEXED_ATTESTATION_KEY_SIZE].copy_from_slice(indexed_attestation_root.as_bytes()); + Self { + target_and_root: data, + } + } + + pub fn parse(data: &[u8]) -> Result<(Epoch, Hash256), Error> { + if data.len() == INDEXED_ATTESTATION_KEY_SIZE { + let target_epoch = Epoch::new(BigEndian::read_u64(&data[..8])); + let indexed_attestation_root = Hash256::from_slice(&data[8..]); + Ok((target_epoch, indexed_attestation_root)) + } else { + Err(Error::IndexedAttestationKeyCorrupt { length: data.len() }) + } + } +} + +impl AsRef<[u8]> for IndexedAttestationKey { + fn as_ref(&self) -> &[u8] { + &self.target_and_root + } +} + impl SlasherDB { pub fn open(config: Arc) -> Result { std::fs::create_dir_all(&config.database_path)?; let env = Environment::new() .set_max_dbs(LMDB_MAX_DBS) - .set_map_size(config.max_db_size_gbs * GIGABYTE) + .set_map_size(config.max_db_size_mbs * MEGABYTE) .open_with_permissions(&config.database_path, 0o600)?; let indexed_attestation_db = env.create_db(Some(INDEXED_ATTESTATION_DB), Self::db_flags())?; @@ -284,11 +317,15 @@ impl SlasherDB { indexed_attestation_hash: Hash256, indexed_attestation: &IndexedAttestation, ) -> Result<(), Error> { + let key = IndexedAttestationKey::new( + indexed_attestation.data.target.epoch, + indexed_attestation_hash, + ); let data = indexed_attestation.as_ssz_bytes(); txn.put( self.indexed_attestation_db, - &indexed_attestation_hash.as_bytes(), + &key, &data, Self::write_flags(), )?; @@ -298,10 +335,12 @@ impl SlasherDB { pub fn get_indexed_attestation( &self, txn: &mut RwTransaction<'_>, + target_epoch: Epoch, indexed_attestation_hash: Hash256, ) -> Result, Error> { + let key = IndexedAttestationKey::new(target_epoch, indexed_attestation_hash); let bytes = txn - .get(self.indexed_attestation_db, &indexed_attestation_hash) + .get(self.indexed_attestation_db, &key) .optional()? .ok_or_else(|| Error::MissingIndexedAttestation { root: indexed_attestation_hash, @@ -317,8 +356,9 @@ impl SlasherDB { record: AttesterRecord, ) -> Result, Error> { // See if there's an existing attestation for this attester. + let target_epoch = attestation.data.target.epoch; if let Some(existing_record) = - self.get_attester_record(txn, validator_index, attestation.data.target.epoch)? + self.get_attester_record(txn, validator_index, target_epoch)? { // If the existing attestation data is identical, then this attestation is not // slashable and no update is required. @@ -327,8 +367,11 @@ impl SlasherDB { } // Otherwise, load the indexed attestation so we can confirm that it's slashable. - let existing_attestation = - self.get_indexed_attestation(txn, existing_record.indexed_attestation_hash)?; + let existing_attestation = self.get_indexed_attestation( + txn, + target_epoch, + existing_record.indexed_attestation_hash, + )?; if attestation.is_double_vote(&existing_attestation) { Ok(AttesterSlashingStatus::DoubleVote(Box::new( existing_attestation, @@ -341,7 +384,7 @@ impl SlasherDB { else { txn.put( self.attesters_db, - &AttesterKey::new(validator_index, attestation.data.target.epoch), + &AttesterKey::new(validator_index, target_epoch), &record.as_ssz_bytes(), Self::write_flags(), )?; @@ -361,7 +404,7 @@ impl SlasherDB { validator_index, target_epoch, })?; - self.get_indexed_attestation(txn, record.indexed_attestation_hash) + self.get_indexed_attestation(txn, target_epoch, record.indexed_attestation_hash) } pub fn get_attester_record( @@ -422,14 +465,28 @@ impl SlasherDB { } } + /// Attempt to prune the database, deleting old blocks and attestations. pub fn prune(&self, current_epoch: Epoch) -> Result<(), Error> { let mut txn = self.begin_rw_txn()?; - self.prune_proposers(current_epoch, &mut txn)?; - self.prune_attesters(current_epoch, &mut txn)?; + self.try_prune(current_epoch, &mut txn).allow_map_full()?; txn.commit()?; Ok(()) } + /// Try to prune the database. + /// + /// This is a separate method from `prune` so that `allow_map_full` may be used. + pub fn try_prune( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + self.prune_proposers(current_epoch, txn)?; + self.prune_attesters(current_epoch, txn)?; + self.prune_indexed_attestations(current_epoch, txn)?; + Ok(()) + } + fn prune_proposers( &self, current_epoch: Epoch, @@ -497,19 +554,15 @@ impl SlasherDB { return Ok(()); } - let mut indexed_attestations_to_delete = HashSet::new(); - loop { - let (optional_key, value) = cursor.get(None, None, lmdb_sys::MDB_GET_CURRENT)?; - let key_bytes = optional_key.ok_or_else(|| Error::MissingAttesterKey)?; + let key_bytes = cursor + .get(None, None, lmdb_sys::MDB_GET_CURRENT)? + .0 + .ok_or_else(|| Error::MissingAttesterKey)?; - let (target_epoch, _validator_index) = AttesterKey::parse(key_bytes)?; + let (target_epoch, _) = AttesterKey::parse(key_bytes)?; if target_epoch < min_epoch { - // Stage the indexed attestation for deletion and delete the record itself. - let attester_record = AttesterRecord::from_ssz_bytes(value)?; - indexed_attestations_to_delete.insert(attester_record.indexed_attestation_hash); - cursor.del(Self::write_flags())?; // End the loop if there is no next entry. @@ -524,10 +577,51 @@ impl SlasherDB { break; } } - drop(cursor); - for indexed_attestation_hash in indexed_attestations_to_delete { - txn.del(self.indexed_attestation_db, &indexed_attestation_hash, None)?; + Ok(()) + } + + fn prune_indexed_attestations( + &self, + current_epoch: Epoch, + txn: &mut RwTransaction<'_>, + ) -> Result<(), Error> { + let min_epoch = current_epoch + .saturating_add(1u64) + .saturating_sub(self.config.history_length as u64); + + let mut cursor = txn.open_rw_cursor(self.indexed_attestation_db)?; + + // Position cursor at first key, bailing out if the database is empty. + if cursor + .get(None, None, lmdb_sys::MDB_FIRST) + .optional()? + .is_none() + { + return Ok(()); + } + + loop { + let key_bytes = cursor + .get(None, None, lmdb_sys::MDB_GET_CURRENT)? + .0 + .ok_or_else(|| Error::MissingAttesterKey)?; + + let (target_epoch, _) = IndexedAttestationKey::parse(key_bytes)?; + + if target_epoch < min_epoch { + cursor.del(Self::write_flags())?; + + if cursor + .get(None, None, lmdb_sys::MDB_NEXT) + .optional()? + .is_none() + { + break; + } + } else { + break; + } } Ok(()) diff --git a/slasher/src/error.rs b/slasher/src/error.rs index f1c8f727e9..0d3262bce1 100644 --- a/slasher/src/error.rs +++ b/slasher/src/error.rs @@ -41,11 +41,15 @@ pub enum Error { ProposerKeyCorrupt { length: usize, }, + IndexedAttestationKeyCorrupt { + length: usize, + }, MissingIndexedAttestation { root: Hash256, }, MissingAttesterKey, MissingProposerKey, + MissingIndexedAttestationKey, AttesterRecordInconsistentRoot, } diff --git a/slasher/src/utils.rs b/slasher/src/utils.rs index b9df9b5b44..9c9eceaa14 100644 --- a/slasher/src/utils.rs +++ b/slasher/src/utils.rs @@ -14,3 +14,18 @@ impl TxnOptional for Result { } } } + +/// Transform a transaction that would fail with a `MapFull` error into an optional result. +pub trait TxnMapFull { + fn allow_map_full(self) -> Result, E>; +} + +impl TxnMapFull for Result { + fn allow_map_full(self) -> Result, Error> { + match self { + Ok(x) => Ok(Some(x)), + Err(Error::DatabaseError(lmdb::Error::MapFull)) => Ok(None), + Err(e) => Err(e), + } + } +} diff --git a/slasher/tests/wrap_around.rs b/slasher/tests/wrap_around.rs index 0ed3860dcf..7802a73909 100644 --- a/slasher/tests/wrap_around.rs +++ b/slasher/tests/wrap_around.rs @@ -1,6 +1,6 @@ use slasher::{ test_utils::{indexed_att, logger}, - Config, Slasher, + Config, Error, Slasher, }; use tempdir::TempDir; use types::Epoch; @@ -37,3 +37,53 @@ fn attestation_pruning_empty_wrap_around() { )); slasher.process_queued(current_epoch).unwrap(); } + +// Test that pruning can recover from a `MapFull` error +#[test] +fn pruning_with_map_full() { + let tempdir = TempDir::new("slasher").unwrap(); + let mut config = Config::new(tempdir.path().into()); + config.validator_chunk_size = 1; + config.chunk_size = 16; + config.history_length = 1024; + config.max_db_size_mbs = 1; + + let slasher = Slasher::open(config.clone(), logger()).unwrap(); + + let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; + + let mut current_epoch = Epoch::new(0); + + loop { + slasher.accept_attestation(indexed_att( + v.clone(), + (current_epoch - 1).as_u64(), + current_epoch.as_u64(), + 0, + )); + if let Err(Error::DatabaseError(lmdb::Error::MapFull)) = + slasher.process_queued(current_epoch) + { + break; + } + current_epoch += 1; + } + + loop { + slasher.prune_database(current_epoch).unwrap(); + + slasher.accept_attestation(indexed_att( + v.clone(), + (current_epoch - 1).as_u64(), + current_epoch.as_u64(), + 0, + )); + match slasher.process_queued(current_epoch) { + Ok(()) => break, + Err(Error::DatabaseError(lmdb::Error::MapFull)) => { + current_epoch += 1; + } + Err(e) => panic!("{:?}", e), + } + } +}