Files
lighthouse/beacon_node/store/src/leveldb_store.rs
Paul Hauner 10a134792b Testnet2 (#685)
* Apply clippy lints to beacon node

* Remove unnecessary logging and correct formatting

* Initial bones of load-balanced range-sync

* Port bump meshsup tests

* Further structure and network handling logic added

* Basic structure, ignoring error handling

* Correct max peers delay bug

* Clean up and re-write message processor and sync manager

* Restructure directory, correct type issues

* Fix compiler issues

* Completed first testing of new sync

* Correct merge issues

* Clean up warnings

* Push attestation processed log down to dbg

* Add state enc/dec benches

* Correct math error, downgraded logs

* Add example for flamegraph

* Use `PublicKeyBytes` for `Validator`

* Ripple PublicKeyBytes change through codebase

* Add RPC error handling and improved syncing code

* Add benches, optimizations to store BeaconState

* Store BeaconState in StorageContainer too

* Optimize StorageContainer with std::mem magic

* Add libp2p stream error handling and dropping of invalid peers

* Lower logs

* Update lcli to parse spec at boot, remove pycli

* Fix issues when starting with mainnet spec

* Set default spec to mainnet

* Fix lcli --spec param

* Add discovery tweak

* Ensure ETH1_FOLLOW_DISTANCE is in YamlConfig

* Set testnet ETH1_FOLLOW_DISTANCE to 16

* Fix rest_api tests

* Set testnet min validator count

* Update with new testnet dir

* Remove some dbg, println

* Add timeout when notifier waits for libp2p lock

* Add validator count CLI flag to lcli contract deploy

* Extend genesis delay time

* Correct libp2p service locking

* Update testnet dir

* Add basic block/state caching on beacon chain

* Add decimals display to notifier sync speed

* Try merge in change to reduce fork choice calls

* Remove fork choice from process block

* Minor log fix

* Check successes > 0

* Adds checkpoint cache

* Stop storing the tree hash cache in the db

* Handles peer disconnects for sync

* Fix failing beacon chain tests

* Change eth2_testnet_config tests to Mainnet

* Add logs downgrade discovery log

* Remove dedunant beacon state write

* Fix re-org warnings

* Use caching get methods in fork choice

* Fix mistake in prev commit

* Use caching state getting in state_by_slot

* Add state.cacheless_clone

* Less fork choice (#679)

* Try merge in change to reduce fork choice calls

* Remove fork choice from process block

* Minor log fix

* Check successes > 0

* Fix failing beacon chain tests

* Fix re-org warnings

* Fix mistake in prev commit

* Attempt to improve attestation processing times

* Introduce HeadInfo struct

* Used cache tree hash for block processing

* Use cached tree hash for block production too

* Range sync refactor

- Introduces `ChainCollection`
- Correct Disconnect node handling
- Removes duplicate code

* Add more logging for DB

* Various bug fixes

* Remove unnecessary logs

* Maintain syncing state in the transition from finalied to head

* Improved disconnect handling

* Add `Speedo` struct

* Fix bugs in speedo

* Fix bug in speedo

* Fix rounding bug in speedo

* Move code around, reduce speedo observation count

* Adds forwards block interator

* Fix inf NaN

* Add first draft of validator onboarding

* Update docs

* Add documentation link to main README

* Continue docs development

* Update book readme

* Update docs

* Allow vc to run without testnet subcommand

* Small change to onboarding docs

* Tidy CLI help messages

* Update docs

* Add check to val client see if beacon node is synced

* Attempt to fix NaN bug

* Fix compile bug

* Add notifier service to validator client

* Re-order onboarding steps

* Update deposit contract address

* Update testnet dir

* Add note about public eth1 node

* Fix installation link

* Set default eth1 endpoint to sigp

* Fix broken test

* Try fix eth1 cache locking

* Be more specific about eth1 endpoint

* Increase gas limit for deposit

* Fix default deposit amount

* Fix re-org log
2019-12-09 23:14:13 +11:00

157 lines
4.5 KiB
Rust

use super::*;
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::metrics;
use db_key::Key;
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use std::marker::PhantomData;
use std::path::Path;
/// A wrapped leveldb database.
pub struct LevelDB<E: EthSpec> {
db: Database<BytesKey>,
_phantom: PhantomData<E>,
}
impl<E: EthSpec> LevelDB<E> {
/// Open a database at `path`, creating a new database if one does not already exist.
pub fn open(path: &Path) -> Result<Self, Error> {
let mut options = Options::new();
options.create_if_missing = true;
let db = Database::open(path, options)?;
Ok(Self {
db,
_phantom: PhantomData,
})
}
fn read_options(&self) -> ReadOptions<BytesKey> {
ReadOptions::new()
}
fn write_options(&self) -> WriteOptions {
WriteOptions::new()
}
fn get_key_for_col(col: &str, key: &[u8]) -> BytesKey {
let mut col = col.as_bytes().to_vec();
col.append(&mut key.to_vec());
BytesKey { key: col }
}
}
/// Used for keying leveldb.
pub struct BytesKey {
key: Vec<u8>,
}
impl Key for BytesKey {
fn from_u8(key: &[u8]) -> Self {
Self { key: key.to_vec() }
}
fn as_slice<T, F: Fn(&[u8]) -> T>(&self, f: F) -> T {
f(self.key.as_slice())
}
}
impl<E: EthSpec> Store<E> for LevelDB<E> {
type ForwardsBlockRootsIterator = SimpleForwardsBlockRootsIterator;
/// Retrieve some bytes in `column` with `key`.
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_READ_COUNT);
let timer = metrics::start_timer(&metrics::DISK_DB_READ_TIMES);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
.map(|opt| {
opt.map(|bytes| {
metrics::inc_counter_by(&metrics::DISK_DB_READ_BYTES, bytes.len() as i64);
metrics::stop_timer(timer);
bytes
})
})
}
/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);
self.db
.put(self.write_options(), column_key, val)
.map_err(Into::into)
.map(|()| {
metrics::stop_timer(timer);
})
}
/// Return `true` if `key` exists in `column`.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_EXISTS_COUNT);
self.db
.get(self.read_options(), column_key)
.map_err(Into::into)
.and_then(|val| Ok(val.is_some()))
}
/// Removes `key` from `column`.
fn key_delete(&self, col: &str, key: &[u8]) -> Result<(), Error> {
let column_key = Self::get_key_for_col(col, key);
metrics::inc_counter(&metrics::DISK_DB_DELETE_COUNT);
self.db
.delete(self.write_options(), column_key)
.map_err(Into::into)
}
/// Store a state in the store.
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
store_full_state(self, state_root, state)
}
/// Fetch a state from the store.
fn get_state(
&self,
state_root: &Hash256,
_: Option<Slot>,
) -> Result<Option<BeaconState<E>>, Error> {
get_full_state(self, state_root)
}
fn forwards_block_roots_iterator(
store: Arc<Self>,
start_slot: Slot,
end_state: BeaconState<E>,
end_block_root: Hash256,
_: &ChainSpec,
) -> Self::ForwardsBlockRootsIterator {
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
}
}
impl From<LevelDBError> for Error {
fn from(e: LevelDBError) -> Error {
Error::DBError {
message: format!("{:?}", e),
}
}
}