diff --git a/Cargo.lock b/Cargo.lock index 5a2c4312b1..0c623bea9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1235,6 +1235,7 @@ dependencies = [ "clap", "clap_utils", "environment", + "hex", "logging", "slog", "sloggers", diff --git a/beacon_node/beacon_chain/src/otb_verification_service.rs b/beacon_node/beacon_chain/src/otb_verification_service.rs index 805b61dd9c..b934c553e6 100644 --- a/beacon_node/beacon_chain/src/otb_verification_service.rs +++ b/beacon_node/beacon_chain/src/otb_verification_service.rs @@ -119,10 +119,13 @@ pub fn start_otb_verification_service( pub fn load_optimistic_transition_blocks( chain: &BeaconChain, ) -> Result, StoreError> { - process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| { - iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) - .collect() - })? + process_results( + chain.store.hot_db.iter_column::(OTBColumn), + |iter| { + iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes)) + .collect() + }, + )? } #[derive(Debug)] diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 30ee66074f..f09840142c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -45,6 +45,7 @@ pub enum Error { ResyncRequiredForExecutionPayloadSeparation, SlotClockUnavailableForMigration, V9MigrationFailure(Hash256), + InvalidKey, } pub trait HandleUnavailable { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 86bd4ffacc..c37b81eb18 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,7 +1,6 @@ use super::*; use crate::hot_cold_store::HotColdDBError; use crate::metrics; -use db_key::Key; use leveldb::compaction::Compaction; use leveldb::database::batch::{Batch, Writebatch}; use leveldb::database::kv::KV; @@ -177,9 +176,9 @@ impl KeyValueStore for LevelDB { } /// Iterate through all keys and values in a particular column. - fn iter_column(&self, column: DBColumn) -> ColumnIter { + fn iter_column(&self, column: DBColumn) -> ColumnIter { let start_key = - BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes())); + BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()])); let iter = self.db.iter(self.read_options()); iter.seek(&start_key); @@ -187,13 +186,12 @@ impl KeyValueStore for LevelDB { Box::new( iter.take_while(move |(key, _)| key.matches_column(column)) .map(move |(bytes_key, value)| { - let key = - bytes_key - .remove_column(column) - .ok_or(HotColdDBError::IterationError { - unexpected_key: bytes_key, - })?; - Ok((key, value)) + let key = bytes_key.remove_column_variable(column).ok_or_else(|| { + HotColdDBError::IterationError { + unexpected_key: bytes_key.clone(), + } + })?; + Ok((K::from_bytes(key)?, value)) }), ) } @@ -224,12 +222,12 @@ impl KeyValueStore for LevelDB { impl ItemStore for LevelDB {} /// Used for keying leveldb. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct BytesKey { key: Vec, } -impl Key for BytesKey { +impl db_key::Key for BytesKey { fn from_u8(key: &[u8]) -> Self { Self { key: key.to_vec() } } @@ -245,12 +243,20 @@ impl BytesKey { self.key.starts_with(column.as_bytes()) } - /// Remove the column from a key, returning its `Hash256` portion. + /// Remove the column from a 32 byte key, yielding the `Hash256` key. pub fn remove_column(&self, column: DBColumn) -> Option { + let key = self.remove_column_variable(column)?; + (column.key_size() == 32).then(|| Hash256::from_slice(key)) + } + + /// Remove the column from a key. + /// + /// Will return `None` if the value doesn't match the column or has the wrong length. + pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> { if self.matches_column(column) { let subkey = &self.key[column.as_bytes().len()..]; - if subkey.len() == 32 { - return Some(Hash256::from_slice(subkey)); + if subkey.len() == column.key_size() { + return Some(subkey); } } None diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 75aeca058b..7efaf5a70f 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -43,7 +43,7 @@ use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; pub use types::*; -pub type ColumnIter<'a> = Box), Error>> + 'a>; +pub type ColumnIter<'a, K> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a> = Box> + 'a>; pub trait KeyValueStore: Sync + Send + Sized + 'static { @@ -80,7 +80,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn compact(&self) -> Result<(), Error>; /// Iterate through all keys and values in a particular column. - fn iter_column(&self, _column: DBColumn) -> ColumnIter { + fn iter_column(&self, _column: DBColumn) -> ColumnIter { // Default impl for non LevelDB databases Box::new(std::iter::empty()) } @@ -92,6 +92,26 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { } } +pub trait Key: Sized + 'static { + fn from_bytes(key: &[u8]) -> Result; +} + +impl Key for Hash256 { + fn from_bytes(key: &[u8]) -> Result { + if key.len() == 32 { + Ok(Hash256::from_slice(key)) + } else { + Err(Error::InvalidKey) + } + } +} + +impl Key for Vec { + fn from_bytes(key: &[u8]) -> Result { + Ok(key.to_vec()) + } +} + pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { let mut result = column.as_bytes().to_vec(); result.extend_from_slice(key); @@ -227,6 +247,32 @@ impl DBColumn { pub fn as_bytes(self) -> &'static [u8] { self.as_str().as_bytes() } + + /// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes. + /// + /// This function returns the number of bytes used by keys in a given column. + pub fn key_size(self) -> usize { + match self { + Self::BeaconMeta + | Self::BeaconBlock + | Self::BeaconState + | Self::BeaconStateSummary + | Self::BeaconStateTemporary + | Self::ExecPayload + | Self::BeaconChain + | Self::OpPool + | Self::Eth1Cache + | Self::ForkChoice + | Self::PubkeyCache + | Self::BeaconRestorePoint + | Self::DhtEnrs + | Self::OptimisticTransitionBlock => 32, + Self::BeaconBlockRoots + | Self::BeaconStateRoots + | Self::BeaconHistoricalRoots + | Self::BeaconRandaoMixes => 8, + } + } } /// An item that may stored in a `Store` by serializing and deserializing from bytes. diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 1473f59a4e..f7c50d6518 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,5 +1,4 @@ -use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp}; -use crate::{ColumnIter, DBColumn}; +use crate::{ColumnIter, DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp}; use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; @@ -94,8 +93,7 @@ impl KeyValueStore for MemoryStore { Ok(()) } - // pub type ColumnIter<'a> = Box), Error>> + 'a>; - fn iter_column(&self, column: DBColumn) -> ColumnIter { + fn iter_column(&self, column: DBColumn) -> ColumnIter { let col = column.as_str(); if let Some(keys) = self .col_keys @@ -104,10 +102,11 @@ impl KeyValueStore for MemoryStore { .map(|set| set.iter().cloned().collect::>()) { Box::new(keys.into_iter().filter_map(move |key| { - let hash = Hash256::from_slice(&key); - self.get_bytes(col, &key) - .transpose() - .map(|res| res.map(|bytes| (hash, bytes))) + self.get_bytes(col, &key).transpose().map(|res| { + let k = K::from_bytes(&key)?; + let v = res?; + Ok((k, v)) + }) })) } else { Box::new(std::iter::empty()) diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index f715528138..55a6bb9d3d 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -16,3 +16,4 @@ tempfile = "3.1.0" types = { path = "../consensus/types" } slog = "2.5.2" strum = { version = "0.24.0", features = ["derive"] } +hex = "0.4.2" diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 50295df4b0..c3b69702a0 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -57,6 +57,24 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("sizes") .possible_values(InspectTarget::VARIANTS), ) + .arg( + Arg::with_name("skip") + .long("skip") + .value_name("N") + .help("Skip over the first N keys"), + ) + .arg( + Arg::with_name("limit") + .long("limit") + .value_name("N") + .help("Output at most N keys"), + ) + .arg( + Arg::with_name("freezer") + .long("freezer") + .help("Inspect the freezer DB rather than the hot DB") + .takes_value(false), + ) } pub fn cli_app<'a, 'b>() -> App<'a, 'b> { @@ -142,24 +160,40 @@ pub fn display_db_version( Ok(()) } -#[derive(Debug, EnumString, EnumVariantNames)] +#[derive(Debug, PartialEq, Eq, EnumString, EnumVariantNames)] pub enum InspectTarget { #[strum(serialize = "sizes")] ValueSizes, #[strum(serialize = "total")] ValueTotal, + #[strum(serialize = "values")] + ValueBytes, + #[strum(serialize = "gaps")] + Gaps, } pub struct InspectConfig { column: DBColumn, target: InspectTarget, + skip: Option, + limit: Option, + freezer: bool, } fn parse_inspect_config(cli_args: &ArgMatches) -> Result { let column = clap_utils::parse_required(cli_args, "column")?; let target = clap_utils::parse_required(cli_args, "output")?; + let skip = clap_utils::parse_optional(cli_args, "skip")?; + let limit = clap_utils::parse_optional(cli_args, "limit")?; + let freezer = cli_args.is_present("freezer"); - Ok(InspectConfig { column, target }) + Ok(InspectConfig { + column, + target, + skip, + limit, + freezer, + }) } pub fn inspect_db( @@ -182,27 +216,64 @@ pub fn inspect_db( )?; let mut total = 0; + let mut num_keys = 0; - for res in db.hot_db.iter_column(inspect_config.column) { + let sub_db = if inspect_config.freezer { + &db.cold_db + } else { + &db.hot_db + }; + + let skip = inspect_config.skip.unwrap_or(0); + let limit = inspect_config.limit.unwrap_or(usize::MAX); + + let mut prev_key = 0; + let mut found_gaps = false; + + for res in sub_db + .iter_column::>(inspect_config.column) + .skip(skip) + .take(limit) + { let (key, value) = res?; match inspect_config.target { InspectTarget::ValueSizes => { - println!("{:?}: {} bytes", key, value.len()); - total += value.len(); + println!("{}: {} bytes", hex::encode(&key), value.len()); } - InspectTarget::ValueTotal => { - total += value.len(); + InspectTarget::ValueBytes => { + println!("{}: {}", hex::encode(&key), hex::encode(&value)); } + InspectTarget::Gaps => { + // Convert last 8 bytes of key to u64. + let numeric_key = u64::from_be_bytes( + key[key.len() - 8..] + .try_into() + .expect("key is at least 8 bytes"), + ); + + if numeric_key > prev_key + 1 { + println!( + "gap between keys {} and {} (offset: {})", + prev_key, numeric_key, num_keys, + ); + found_gaps = true; + } + prev_key = numeric_key; + } + InspectTarget::ValueTotal => (), } + total += value.len(); + num_keys += 1; } - match inspect_config.target { - InspectTarget::ValueSizes | InspectTarget::ValueTotal => { - println!("Total: {} bytes", total); - } + if inspect_config.target == InspectTarget::Gaps && !found_gaps { + println!("No gaps found!"); } + println!("Num keys: {}", num_keys); + println!("Total: {} bytes", total); + Ok(()) }