Add freezer DB debugging tools

This commit is contained in:
Michael Sproul
2022-08-26 16:50:43 +10:00
parent c64e17bb81
commit 209a109877
8 changed files with 168 additions and 40 deletions

1
Cargo.lock generated
View File

@@ -1235,6 +1235,7 @@ dependencies = [
"clap",
"clap_utils",
"environment",
"hex",
"logging",
"slog",
"sloggers",

View File

@@ -119,10 +119,13 @@ pub fn start_otb_verification_service<T: BeaconChainTypes>(
pub fn load_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<Vec<OptimisticTransitionBlock>, StoreError> {
process_results(chain.store.hot_db.iter_column(OTBColumn), |iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
})?
process_results(
chain.store.hot_db.iter_column::<Hash256>(OTBColumn),
|iter| {
iter.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
},
)?
}
#[derive(Debug)]

View File

@@ -45,6 +45,7 @@ pub enum Error {
ResyncRequiredForExecutionPayloadSeparation,
SlotClockUnavailableForMigration,
V9MigrationFailure(Hash256),
InvalidKey,
}
pub trait HandleUnavailable<T> {

View File

@@ -1,7 +1,6 @@
use super::*;
use crate::hot_cold_store::HotColdDBError;
use crate::metrics;
use db_key::Key;
use leveldb::compaction::Compaction;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
@@ -177,9 +176,9 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
}
/// Iterate through all keys and values in a particular column.
fn iter_column(&self, column: DBColumn) -> ColumnIter {
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
let start_key =
BytesKey::from_vec(get_key_for_col(column.into(), Hash256::zero().as_bytes()));
BytesKey::from_vec(get_key_for_col(column.into(), &vec![0; column.key_size()]));
let iter = self.db.iter(self.read_options());
iter.seek(&start_key);
@@ -187,13 +186,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
Box::new(
iter.take_while(move |(key, _)| key.matches_column(column))
.map(move |(bytes_key, value)| {
let key =
bytes_key
.remove_column(column)
.ok_or(HotColdDBError::IterationError {
unexpected_key: bytes_key,
})?;
Ok((key, value))
let key = bytes_key.remove_column_variable(column).ok_or_else(|| {
HotColdDBError::IterationError {
unexpected_key: bytes_key.clone(),
}
})?;
Ok((K::from_bytes(key)?, value))
}),
)
}
@@ -224,12 +222,12 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
impl<E: EthSpec> ItemStore<E> for LevelDB<E> {}
/// Used for keying leveldb.
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct BytesKey {
key: Vec<u8>,
}
impl Key for BytesKey {
impl db_key::Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
@@ -245,12 +243,20 @@ impl BytesKey {
self.key.starts_with(column.as_bytes())
}
/// Remove the column from a key, returning its `Hash256` portion.
/// Remove the column from a 32 byte key, yielding the `Hash256` key.
pub fn remove_column(&self, column: DBColumn) -> Option<Hash256> {
let key = self.remove_column_variable(column)?;
(column.key_size() == 32).then(|| Hash256::from_slice(key))
}
/// Remove the column from a key.
///
/// Will return `None` if the value doesn't match the column or has the wrong length.
pub fn remove_column_variable(&self, column: DBColumn) -> Option<&[u8]> {
if self.matches_column(column) {
let subkey = &self.key[column.as_bytes().len()..];
if subkey.len() == 32 {
return Some(Hash256::from_slice(subkey));
if subkey.len() == column.key_size() {
return Some(subkey);
}
}
None

View File

@@ -43,7 +43,7 @@ use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;
pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a> = Box<dyn Iterator<Item = Result<Hash256, Error>> + 'a>;
pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
@@ -80,7 +80,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
fn compact(&self) -> Result<(), Error>;
/// Iterate through all keys and values in a particular column.
fn iter_column(&self, _column: DBColumn) -> ColumnIter {
fn iter_column<K: Key>(&self, _column: DBColumn) -> ColumnIter<K> {
// Default impl for non LevelDB databases
Box::new(std::iter::empty())
}
@@ -92,6 +92,26 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
}
}
pub trait Key: Sized + 'static {
fn from_bytes(key: &[u8]) -> Result<Self, Error>;
}
impl Key for Hash256 {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
if key.len() == 32 {
Ok(Hash256::from_slice(key))
} else {
Err(Error::InvalidKey)
}
}
}
impl Key for Vec<u8> {
fn from_bytes(key: &[u8]) -> Result<Self, Error> {
Ok(key.to_vec())
}
}
pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec<u8> {
let mut result = column.as_bytes().to_vec();
result.extend_from_slice(key);
@@ -227,6 +247,32 @@ impl DBColumn {
pub fn as_bytes(self) -> &'static [u8] {
self.as_str().as_bytes()
}
/// Most database keys are 32 bytes, but some freezer DB keys are 8 bytes.
///
/// This function returns the number of bytes used by keys in a given column.
pub fn key_size(self) -> usize {
match self {
Self::BeaconMeta
| Self::BeaconBlock
| Self::BeaconState
| Self::BeaconStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
| Self::BeaconChain
| Self::OpPool
| Self::Eth1Cache
| Self::ForkChoice
| Self::PubkeyCache
| Self::BeaconRestorePoint
| Self::DhtEnrs
| Self::OptimisticTransitionBlock => 32,
Self::BeaconBlockRoots
| Self::BeaconStateRoots
| Self::BeaconHistoricalRoots
| Self::BeaconRandaoMixes => 8,
}
}
}
/// An item that may stored in a `Store` by serializing and deserializing from bytes.

View File

@@ -1,5 +1,4 @@
use super::{Error, ItemStore, KeyValueStore, KeyValueStoreOp};
use crate::{ColumnIter, DBColumn};
use crate::{ColumnIter, DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
@@ -94,8 +93,7 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
Ok(())
}
// pub type ColumnIter<'a> = Box<dyn Iterator<Item = Result<(Hash256, Vec<u8>), Error>> + 'a>;
fn iter_column(&self, column: DBColumn) -> ColumnIter {
fn iter_column<K: Key>(&self, column: DBColumn) -> ColumnIter<K> {
let col = column.as_str();
if let Some(keys) = self
.col_keys
@@ -104,10 +102,11 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
.map(|set| set.iter().cloned().collect::<Vec<_>>())
{
Box::new(keys.into_iter().filter_map(move |key| {
let hash = Hash256::from_slice(&key);
self.get_bytes(col, &key)
.transpose()
.map(|res| res.map(|bytes| (hash, bytes)))
self.get_bytes(col, &key).transpose().map(|res| {
let k = K::from_bytes(&key)?;
let v = res?;
Ok((k, v))
})
}))
} else {
Box::new(std::iter::empty())

View File

@@ -16,3 +16,4 @@ tempfile = "3.1.0"
types = { path = "../consensus/types" }
slog = "2.5.2"
strum = { version = "0.24.0", features = ["derive"] }
hex = "0.4.2"

View File

@@ -57,6 +57,24 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
.default_value("sizes")
.possible_values(InspectTarget::VARIANTS),
)
.arg(
Arg::with_name("skip")
.long("skip")
.value_name("N")
.help("Skip over the first N keys"),
)
.arg(
Arg::with_name("limit")
.long("limit")
.value_name("N")
.help("Output at most N keys"),
)
.arg(
Arg::with_name("freezer")
.long("freezer")
.help("Inspect the freezer DB rather than the hot DB")
.takes_value(false),
)
}
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
@@ -142,24 +160,40 @@ pub fn display_db_version<E: EthSpec>(
Ok(())
}
#[derive(Debug, EnumString, EnumVariantNames)]
#[derive(Debug, PartialEq, Eq, EnumString, EnumVariantNames)]
pub enum InspectTarget {
#[strum(serialize = "sizes")]
ValueSizes,
#[strum(serialize = "total")]
ValueTotal,
#[strum(serialize = "values")]
ValueBytes,
#[strum(serialize = "gaps")]
Gaps,
}
pub struct InspectConfig {
column: DBColumn,
target: InspectTarget,
skip: Option<usize>,
limit: Option<usize>,
freezer: bool,
}
fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String> {
let column = clap_utils::parse_required(cli_args, "column")?;
let target = clap_utils::parse_required(cli_args, "output")?;
let skip = clap_utils::parse_optional(cli_args, "skip")?;
let limit = clap_utils::parse_optional(cli_args, "limit")?;
let freezer = cli_args.is_present("freezer");
Ok(InspectConfig { column, target })
Ok(InspectConfig {
column,
target,
skip,
limit,
freezer,
})
}
pub fn inspect_db<E: EthSpec>(
@@ -182,27 +216,64 @@ pub fn inspect_db<E: EthSpec>(
)?;
let mut total = 0;
let mut num_keys = 0;
for res in db.hot_db.iter_column(inspect_config.column) {
let sub_db = if inspect_config.freezer {
&db.cold_db
} else {
&db.hot_db
};
let skip = inspect_config.skip.unwrap_or(0);
let limit = inspect_config.limit.unwrap_or(usize::MAX);
let mut prev_key = 0;
let mut found_gaps = false;
for res in sub_db
.iter_column::<Vec<u8>>(inspect_config.column)
.skip(skip)
.take(limit)
{
let (key, value) = res?;
match inspect_config.target {
InspectTarget::ValueSizes => {
println!("{:?}: {} bytes", key, value.len());
total += value.len();
println!("{}: {} bytes", hex::encode(&key), value.len());
}
InspectTarget::ValueTotal => {
total += value.len();
InspectTarget::ValueBytes => {
println!("{}: {}", hex::encode(&key), hex::encode(&value));
}
InspectTarget::Gaps => {
// Convert last 8 bytes of key to u64.
let numeric_key = u64::from_be_bytes(
key[key.len() - 8..]
.try_into()
.expect("key is at least 8 bytes"),
);
if numeric_key > prev_key + 1 {
println!(
"gap between keys {} and {} (offset: {})",
prev_key, numeric_key, num_keys,
);
found_gaps = true;
}
prev_key = numeric_key;
}
InspectTarget::ValueTotal => (),
}
total += value.len();
num_keys += 1;
}
match inspect_config.target {
InspectTarget::ValueSizes | InspectTarget::ValueTotal => {
println!("Total: {} bytes", total);
}
if inspect_config.target == InspectTarget::Gaps && !found_gaps {
println!("No gaps found!");
}
println!("Num keys: {}", num_keys);
println!("Total: {} bytes", total);
Ok(())
}