Rename db crate to store

This commit is contained in:
Paul Hauner
2019-05-21 18:20:23 +10:00
parent 29427cf0e6
commit 3bcf5ba706
30 changed files with 76 additions and 90 deletions

View File

@@ -0,0 +1,183 @@
use super::*;
use ssz::{Decode, DecodeError};
fn get_block_bytes<T: Store>(store: &T, root: Hash256) -> Result<Option<Vec<u8>>, Error> {
store.get_bytes(BeaconBlock::db_column().into(), &root[..])
}
fn read_slot_from_block_bytes(bytes: &[u8]) -> Result<Slot, DecodeError> {
let end = std::cmp::min(Slot::ssz_fixed_len(), bytes.len());
Slot::from_ssz_bytes(&bytes[0..end])
}
fn read_previous_block_root_from_block_bytes(bytes: &[u8]) -> Result<Hash256, DecodeError> {
let previous_bytes = Slot::ssz_fixed_len();
let slice = bytes
.get(previous_bytes..previous_bytes + Hash256::ssz_fixed_len())
.ok_or_else(|| DecodeError::BytesInvalid("Not enough bytes.".to_string()))?;
Hash256::from_ssz_bytes(slice)
}
pub fn get_block_at_preceeding_slot<T: Store>(
store: &T,
slot: Slot,
start_root: Hash256,
) -> Result<Option<(Hash256, BeaconBlock)>, Error> {
let mut root = start_root;
loop {
if let Some(bytes) = get_block_bytes(store, root)? {
let this_slot = read_slot_from_block_bytes(&bytes)?;
if this_slot == slot {
let block = BeaconBlock::from_ssz_bytes(&bytes)?;
break Ok(Some((root, block)));
} else if this_slot < slot {
break Ok(None);
} else {
root = read_previous_block_root_from_block_bytes(&bytes)?;
}
} else {
break Ok(None);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use ssz::Encode;
use tree_hash::TreeHash;
#[test]
fn read_slot() {
let spec = FewValidatorsEthSpec::spec();
let test_slot = |slot: Slot| {
let mut block = BeaconBlock::empty(&spec);
block.slot = slot;
let bytes = block.as_ssz_bytes();
assert_eq!(read_slot_from_block_bytes(&bytes).unwrap(), slot);
};
test_slot(Slot::new(0));
test_slot(Slot::new(1));
test_slot(Slot::new(42));
test_slot(Slot::new(u64::max_value()));
}
#[test]
fn bad_slot() {
for i in 0..8 {
assert!(read_slot_from_block_bytes(&vec![0; i]).is_err());
}
}
#[test]
fn read_previous_block_root() {
let spec = FewValidatorsEthSpec::spec();
let test_root = |root: Hash256| {
let mut block = BeaconBlock::empty(&spec);
block.previous_block_root = root;
let bytes = block.as_ssz_bytes();
assert_eq!(
read_previous_block_root_from_block_bytes(&bytes).unwrap(),
root
);
};
test_root(Hash256::random());
test_root(Hash256::random());
test_root(Hash256::random());
}
fn build_chain(
store: &impl Store,
slots: &[usize],
spec: &ChainSpec,
) -> Vec<(Hash256, BeaconBlock)> {
let mut blocks_and_roots: Vec<(Hash256, BeaconBlock)> = vec![];
for (i, slot) in slots.iter().enumerate() {
let mut block = BeaconBlock::empty(spec);
block.slot = Slot::from(*slot);
if i > 0 {
block.previous_block_root = blocks_and_roots[i - 1].0;
}
let root = Hash256::from_slice(&block.tree_hash_root());
store.put(&root, &block).unwrap();
blocks_and_roots.push((root, block));
}
blocks_and_roots
}
#[test]
fn chain_without_skips() {
let n: usize = 10;
let store = MemoryStore::open();
let spec = FewValidatorsEthSpec::spec();
let slots: Vec<usize> = (0..n).collect();
let blocks_and_roots = build_chain(&store, &slots, &spec);
for source in 1..n {
for target in 0..=source {
let (source_root, _source_block) = &blocks_and_roots[source];
let (target_root, target_block) = &blocks_and_roots[target];
let (found_root, found_block) = store
.get_block_at_preceeding_slot(*source_root, target_block.slot)
.unwrap()
.unwrap();
assert_eq!(found_root, *target_root);
assert_eq!(found_block, *target_block);
}
}
}
#[test]
fn chain_with_skips() {
let store = MemoryStore::open();
let spec = FewValidatorsEthSpec::spec();
let slots = vec![0, 1, 2, 5];
let blocks_and_roots = build_chain(&store, &slots, &spec);
// Valid slots
for target in 0..3 {
let (source_root, _source_block) = &blocks_and_roots[3];
let (target_root, target_block) = &blocks_and_roots[target];
let (found_root, found_block) = store
.get_block_at_preceeding_slot(*source_root, target_block.slot)
.unwrap()
.unwrap();
assert_eq!(found_root, *target_root);
assert_eq!(found_block, *target_block);
}
// Slot that doesn't exist
let (source_root, _source_block) = &blocks_and_roots[3];
assert!(store
.get_block_at_preceeding_slot(*source_root, Slot::new(3))
.unwrap()
.is_none());
// Slot too high
let (source_root, _source_block) = &blocks_and_roots[3];
assert!(store
.get_block_at_preceeding_slot(*source_root, Slot::new(3))
.unwrap()
.is_none());
}
}

View File

@@ -0,0 +1,199 @@
extern crate rocksdb;
// use super::stores::COLUMNS;
use super::{ClientDB, DBError, DBValue};
use rocksdb::Error as RocksError;
use rocksdb::{Options, DB};
use std::fs;
use std::path::Path;
/// A on-disk database which implements the ClientDB trait.
///
/// This implementation uses RocksDB with default options.
pub struct DiskStore {
db: DB,
}
impl DiskStore {
/// Open the RocksDB database, optionally supplying columns if required.
///
/// The RocksDB database will be contained in a directory titled
/// "database" in the supplied path.
///
/// # Panics
///
/// Panics if the database is unable to be created.
pub fn open(path: &Path, columns: Option<&[&str]>) -> Self {
// Rocks options.
let mut options = Options::default();
options.create_if_missing(true);
// Ensure the path exists.
fs::create_dir_all(&path).unwrap_or_else(|_| panic!("Unable to create {:?}", &path));
let db_path = path.join("database");
let columns = columns.unwrap_or(&COLUMNS);
if db_path.exists() {
Self {
db: DB::open_cf(&options, db_path, &COLUMNS)
.expect("Unable to open local database"),
}
} else {
let mut db = Self {
db: DB::open(&options, db_path).expect("Unable to open local database"),
};
for cf in columns {
db.create_col(cf).unwrap();
}
db
}
}
/// Create a RocksDB column family. Corresponds to the
/// `create_cf()` function on the RocksDB API.
#[allow(dead_code)]
fn create_col(&mut self, col: &str) -> Result<(), DBError> {
match self.db.create_cf(col, &Options::default()) {
Err(e) => Err(e.into()),
Ok(_) => Ok(()),
}
}
}
impl From<RocksError> for DBError {
fn from(e: RocksError) -> Self {
Self {
message: e.to_string(),
}
}
}
impl ClientDB for DiskStore {
/// Get the value for some key on some column.
///
/// Corresponds to the `get_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn get(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, DBError> {
match self.db.cf_handle(col) {
None => Err(DBError {
message: "Unknown column".to_string(),
}),
Some(handle) => match self.db.get_cf(handle, key)? {
None => Ok(None),
Some(db_vec) => Ok(Some(DBValue::from(&*db_vec))),
},
}
}
/// Set some value for some key on some column.
///
/// Corresponds to the `cf_handle()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn put(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), DBError> {
match self.db.cf_handle(col) {
None => Err(DBError {
message: "Unknown column".to_string(),
}),
Some(handle) => self.db.put_cf(handle, key, val).map_err(|e| e.into()),
}
}
/// Return true if some key exists in some column.
fn exists(&self, col: &str, key: &[u8]) -> Result<bool, DBError> {
/*
* I'm not sure if this is the correct way to read if some
* block exists. Naively I would expect this to unncessarily
* copy some data, but I could be wrong.
*/
match self.db.cf_handle(col) {
None => Err(DBError {
message: "Unknown column".to_string(),
}),
Some(handle) => Ok(self.db.get_cf(handle, key)?.is_some()),
}
}
/// Delete the value for some key on some column.
///
/// Corresponds to the `delete_cf()` method on the RocksDB API.
/// Will attempt to get the `ColumnFamily` and return an Err
/// if it fails.
fn delete(&self, col: &str, key: &[u8]) -> Result<(), DBError> {
match self.db.cf_handle(col) {
None => Err(DBError {
message: "Unknown column".to_string(),
}),
Some(handle) => {
self.db.delete_cf(handle, key)?;
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use super::super::ClientDB;
use super::*;
use std::sync::Arc;
use std::{env, fs, thread};
#[test]
#[ignore]
fn test_rocksdb_can_use_db() {
let pwd = env::current_dir().unwrap();
let path = pwd.join("testdb_please_remove");
let _ = fs::remove_dir_all(&path);
fs::create_dir_all(&path).unwrap();
let col_name: &str = "TestColumn";
let column_families = vec![col_name];
let mut db = DiskStore::open(&path, None);
for cf in column_families {
db.create_col(&cf).unwrap();
}
let db = Arc::new(db);
let thread_count = 10;
let write_count = 10;
// We're execting the product of these numbers to fit in one byte.
assert!(thread_count * write_count <= 255);
let mut handles = vec![];
for t in 0..thread_count {
let wc = write_count;
let db = db.clone();
let col = col_name.clone();
let handle = thread::spawn(move || {
for w in 0..wc {
let key = (t * w) as u8;
let val = 42;
db.put(&col, &vec![key], &vec![val]).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for t in 0..thread_count {
for w in 0..write_count {
let key = (t * w) as u8;
let val = db.get(&col_name, &vec![key]).unwrap().unwrap();
assert_eq!(vec![42], val);
}
}
fs::remove_dir_all(&path).unwrap();
}
}

View File

@@ -0,0 +1,30 @@
use ssz::DecodeError;
#[derive(Debug, PartialEq)]
pub enum Error {
SszDecodeError(DecodeError),
DBError { message: String },
}
impl From<DecodeError> for Error {
fn from(e: DecodeError) -> Error {
Error::SszDecodeError(e)
}
}
impl From<DBError> for Error {
fn from(e: DBError) -> Error {
Error::DBError { message: e.message }
}
}
#[derive(Debug)]
pub struct DBError {
pub message: String,
}
impl DBError {
pub fn new(message: String) -> Self {
Self { message }
}
}

View File

@@ -0,0 +1,30 @@
use crate::*;
use ssz::{Decode, Encode};
impl StoreItem for BeaconBlock {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}
impl<T: EthSpec> StoreItem for BeaconState<T> {
fn db_column() -> DBColumn {
DBColumn::BeaconState
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}

View File

@@ -0,0 +1,93 @@
use super::*;
use db_key::Key;
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use std::path::Path;
pub struct LevelDB {
db: Database<BytesKey>,
}
impl LevelDB {
pub fn open(path: &Path) -> Result<Self, Error> {
let mut options = Options::new();
options.create_if_missing = true;
let db = Database::open(path, options)?;
Ok(Self { db })
}
fn read_options(&self) -> ReadOptions<BytesKey> {
ReadOptions::new()
}
fn write_options(&self) -> WriteOptions {
WriteOptions::new()
}
fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey {
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
BytesKey { key: col }
}
}
pub struct BytesKey {
key: Vec<u8>,
}
impl Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
f(self.key.as_slice())
}
}
impl Store for LevelDB {
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
}
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.put(self.write_options(), column_key, val)
.map_err(Into::into)
}
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
.and_then(|val| Ok(val.is_some()))
}
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
self.db
.delete(self.write_options(), column_key)
.map_err(Into::into)
}
}
impl From<LevelDBError> for Error {
fn from(e: LevelDBError) -> Error {
Error::DBError {
message: format!("{:?}", e),
}
}
}

View File

@@ -0,0 +1,185 @@
// mod disk_db;
mod block_at_slot;
mod errors;
mod impls;
mod leveldb_store;
mod memory_db;
pub use self::leveldb_store::LevelDB as DiskStore;
pub use self::memory_db::MemoryStore;
pub use errors::Error;
pub use types::*;
pub type DBValue = Vec<u8>;
pub trait Store: Sync + Send + Sized {
fn put(&self, key: &Hash256, item: &impl StoreItem) -> Result<(), Error> {
item.db_put(self, key)
}
fn get<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
I::db_get(self, key)
}
fn exists<I: StoreItem>(&self, key: &Hash256) -> Result<bool, Error> {
I::db_exists(self, key)
}
fn delete<I: StoreItem>(&self, key: &Hash256) -> Result<(), Error> {
I::db_delete(self, key)
}
fn get_block_at_preceeding_slot(
&self,
start_block_root: Hash256,
slot: Slot,
) -> Result<Option<(Hash256, BeaconBlock)>, Error> {
block_at_slot::get_block_at_preceeding_slot(self, slot, start_block_root)
}
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, Error>;
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error>;
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error>;
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error>;
}
pub enum DBColumn {
BeaconBlock,
BeaconState,
BeaconChain,
}
impl<'a> Into<&'a str> for DBColumn {
/// Returns a `&str` that can be used for keying a key-value data base.
fn into(self) -> &'a str {
match self {
DBColumn::BeaconBlock => &"blk",
DBColumn::BeaconState => &"ste",
DBColumn::BeaconChain => &"bch",
}
}
}
pub trait StoreItem: Sized {
fn db_column() -> DBColumn;
fn as_store_bytes(&self) -> Vec<u8>;
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error>;
fn db_put(&self, store: &impl Store, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store
.put_bytes(column, key, &self.as_store_bytes())
.map_err(|e| e.into())
}
fn db_get(store: &impl Store, key: &Hash256) -> Result<Option<Self>, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
match store.get_bytes(column, key)? {
Some(mut bytes) => Ok(Some(Self::from_store_bytes(&mut bytes[..])?)),
None => Ok(None),
}
}
fn db_exists(store: &impl Store, key: &Hash256) -> Result<bool, Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store.key_exists(column, key)
}
fn db_delete(store: &impl Store, key: &Hash256) -> Result<(), Error> {
let column = Self::db_column().into();
let key = key.as_bytes();
store.key_delete(column, key)
}
}
#[cfg(test)]
mod tests {
use super::*;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use tempfile::tempdir;
#[derive(PartialEq, Debug, Encode, Decode)]
struct StorableThing {
a: u64,
b: u64,
}
impl StoreItem for StorableThing {
fn db_column() -> DBColumn {
DBColumn::BeaconBlock
}
fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}
fn from_store_bytes(bytes: &mut [u8]) -> Result<Self, Error> {
Self::from_ssz_bytes(bytes).map_err(Into::into)
}
}
fn test_impl(store: impl Store) {
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };
assert_eq!(store.exists::<StorableThing>(&key), Ok(false));
store.put(&key, &item).unwrap();
assert_eq!(store.exists::<StorableThing>(&key), Ok(true));
let retrieved = store.get(&key).unwrap().unwrap();
assert_eq!(item, retrieved);
store.delete::<StorableThing>(&key).unwrap();
assert_eq!(store.exists::<StorableThing>(&key), Ok(false));
assert_eq!(store.get::<StorableThing>(&key), Ok(None));
}
#[test]
fn diskdb() {
let dir = tempdir().unwrap();
let path = dir.path();
let store = DiskStore::open(&path).unwrap();
test_impl(store);
}
#[test]
fn memorydb() {
let store = MemoryStore::open();
test_impl(store);
}
#[test]
fn exists() {
let store = MemoryStore::open();
let key = Hash256::random();
let item = StorableThing { a: 1, b: 42 };
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), false);
store.put(&key, &item).unwrap();
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), true);
store.delete::<StorableThing>(&key).unwrap();
assert_eq!(store.exists::<StorableThing>(&key).unwrap(), false);
}
}

View File

@@ -0,0 +1,61 @@
use super::{DBValue, Error, Store};
use parking_lot::RwLock;
use std::collections::HashMap;
type DBHashMap = HashMap<Vec<u8>, Vec<u8>>;
pub struct MemoryStore {
db: RwLock<DBHashMap>,
}
impl MemoryStore {
pub fn open() -> Self {
Self {
db: RwLock::new(HashMap::new()),
}
}
fn get_key_for_col(col: &str, key: &[u8]) -> Vec<u8> {
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
col
}
}
impl Store for MemoryStore {
/// Get the value of some key from the database. Returns `None` if the key does not exist.
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<DBValue>, Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
Ok(self
.db
.read()
.get(&column_key)
.and_then(|val| Some(val.clone())))
}
/// Puts a key in the database.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
self.db.write().insert(column_key, val.to_vec());
Ok(())
}
/// Return true if some key exists in some column.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
Ok(self.db.read().contains_key(&column_key))
}
/// Delete some key from the database.
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = MemoryStore::get_key_for_col(col, key);
self.db.write().remove(&column_key);
Ok(())
}
}