mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 05:14:33 +00:00
CLI flags for state cache and compression level
This commit is contained in:
@@ -371,22 +371,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.default_value("1000")
|
.default_value("1000")
|
||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
)
|
)
|
||||||
.arg(
|
|
||||||
Arg::with_name("slots-per-restore-point")
|
|
||||||
.long("slots-per-restore-point")
|
|
||||||
.value_name("SLOT_COUNT")
|
|
||||||
.help("Specifies how often a freezer DB restore point should be stored. \
|
|
||||||
Cannot be changed after initialization. \
|
|
||||||
[default: 2048 (mainnet) or 64 (minimal)]")
|
|
||||||
.takes_value(true)
|
|
||||||
)
|
|
||||||
.arg(
|
|
||||||
Arg::with_name("block-cache-size")
|
|
||||||
.long("block-cache-size")
|
|
||||||
.value_name("SIZE")
|
|
||||||
.help("Specifies how many blocks the database should cache in memory [default: 5]")
|
|
||||||
.takes_value(true)
|
|
||||||
)
|
|
||||||
/*
|
/*
|
||||||
* Execution Layer Integration
|
* Execution Layer Integration
|
||||||
*/
|
*/
|
||||||
@@ -421,8 +405,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Database purging and compaction.
|
* Database.
|
||||||
*/
|
*/
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("slots-per-restore-point")
|
||||||
|
.long("slots-per-restore-point")
|
||||||
|
.value_name("SLOT_COUNT")
|
||||||
|
.help("Specifies how often a freezer DB restore point should be stored. \
|
||||||
|
Cannot be changed after initialization. \
|
||||||
|
[default: 2048 (mainnet) or 64 (minimal)]")
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("purge-db")
|
Arg::with_name("purge-db")
|
||||||
.long("purge-db")
|
.long("purge-db")
|
||||||
@@ -441,7 +434,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
|||||||
.takes_value(true)
|
.takes_value(true)
|
||||||
.default_value("true")
|
.default_value("true")
|
||||||
)
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("block-cache-size")
|
||||||
|
.long("block-cache-size")
|
||||||
|
.value_name("SIZE")
|
||||||
|
.help("Specifies how many blocks the database should cache in memory [default: 64]")
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("state-cache-size")
|
||||||
|
.long("state-cache-size")
|
||||||
|
.value_name("SIZE")
|
||||||
|
.help("Specifies how many states the database should cache in memory [default: 128]")
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
|
.arg(
|
||||||
|
Arg::with_name("compression-level")
|
||||||
|
.long("compression-level")
|
||||||
|
.value_name("LEVEL")
|
||||||
|
.help("Compression level (-99 to 22) for zstd compression applied to states on disk \
|
||||||
|
[default: 1]. You may change the compression level freely without re-syncing.")
|
||||||
|
.takes_value(true)
|
||||||
|
)
|
||||||
/*
|
/*
|
||||||
* Misc.
|
* Misc.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -266,10 +266,14 @@ pub fn get_config<E: EthSpec>(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(block_cache_size) = cli_args.value_of("block-cache-size") {
|
if let Some(block_cache_size) = clap_utils::parse_optional(cli_args, "block-cache-size")? {
|
||||||
client_config.store.block_cache_size = block_cache_size
|
client_config.store.block_cache_size = block_cache_size;
|
||||||
.parse()
|
}
|
||||||
.map_err(|_| "block-cache-size is not a valid integer".to_string())?;
|
if let Some(state_cache_size) = clap_utils::parse_optional(cli_args, "state-cache-size")? {
|
||||||
|
client_config.store.state_cache_size = state_cache_size;
|
||||||
|
}
|
||||||
|
if let Some(compression_level) = clap_utils::parse_optional(cli_args, "compression-level")? {
|
||||||
|
client_config.store.compression_level = compression_level;
|
||||||
}
|
}
|
||||||
|
|
||||||
client_config.store.compact_on_init = cli_args.is_present("compact-db");
|
client_config.store.compact_on_init = cli_args.is_present("compact-db");
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ use types::{EthSpec, MinimalEthSpec};
|
|||||||
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
|
||||||
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
||||||
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
||||||
|
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||||
|
const EST_COMPRESSION_FACTOR: usize = 2;
|
||||||
|
|
||||||
/// Database configuration parameters.
|
/// Database configuration parameters.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
@@ -15,8 +17,10 @@ pub struct StoreConfig {
|
|||||||
pub slots_per_restore_point: u64,
|
pub slots_per_restore_point: u64,
|
||||||
/// Maximum number of blocks to store in the in-memory block cache.
|
/// Maximum number of blocks to store in the in-memory block cache.
|
||||||
pub block_cache_size: usize,
|
pub block_cache_size: usize,
|
||||||
/// Maximum number of states to sore in the in-memory state cache.
|
/// Maximum number of states to store in the in-memory state cache.
|
||||||
pub state_cache_size: usize,
|
pub state_cache_size: usize,
|
||||||
|
/// Compression level for `BeaconStateDiff`s.
|
||||||
|
pub compression_level: i32,
|
||||||
/// Whether to compact the database on initialization.
|
/// Whether to compact the database on initialization.
|
||||||
pub compact_on_init: bool,
|
pub compact_on_init: bool,
|
||||||
/// Whether to compact the database during database pruning.
|
/// Whether to compact the database during database pruning.
|
||||||
@@ -32,6 +36,7 @@ pub struct OnDiskStoreConfig {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum StoreConfigError {
|
pub enum StoreConfigError {
|
||||||
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
|
MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 },
|
||||||
|
InvalidCompressionLevel { level: i32 },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for StoreConfig {
|
impl Default for StoreConfig {
|
||||||
@@ -41,6 +46,7 @@ impl Default for StoreConfig {
|
|||||||
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
|
||||||
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
|
||||||
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
|
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
|
||||||
|
compression_level: DEFAULT_COMPRESSION_LEVEL,
|
||||||
compact_on_init: false,
|
compact_on_init: false,
|
||||||
compact_on_prune: true,
|
compact_on_prune: true,
|
||||||
}
|
}
|
||||||
@@ -66,6 +72,36 @@ impl StoreConfig {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check that the compression level is valid.
|
||||||
|
pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
|
||||||
|
if zstd::compression_level_range().contains(&self.compression_level) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(StoreConfigError::InvalidCompressionLevel {
|
||||||
|
level: self.compression_level,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Estimate the size of `len` bytes after compression at the current compression level.
|
||||||
|
pub fn estimate_compressed_size(&self, len: usize) -> usize {
|
||||||
|
if self.compression_level == 0 {
|
||||||
|
len
|
||||||
|
} else {
|
||||||
|
len / EST_COMPRESSION_FACTOR
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Estimate the size of `len` compressed bytes after decompression at the current compression
|
||||||
|
/// level.
|
||||||
|
pub fn estimate_decompressed_size(&self, len: usize) -> usize {
|
||||||
|
if self.compression_level == 0 {
|
||||||
|
len
|
||||||
|
} else {
|
||||||
|
len * EST_COMPRESSION_FACTOR
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StoreItem for OnDiskStoreConfig {
|
impl StoreItem for OnDiskStoreConfig {
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ use std::path::Path;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use types::*;
|
use types::*;
|
||||||
|
use types::{beacon_state::BeaconStateDiff, EthSpec};
|
||||||
|
|
||||||
/// On-disk database that stores finalized states efficiently.
|
/// On-disk database that stores finalized states efficiently.
|
||||||
///
|
///
|
||||||
@@ -121,6 +122,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
|||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
|
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
|
||||||
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
||||||
|
config.verify_compression_level()?;
|
||||||
|
|
||||||
let db = HotColdDB {
|
let db = HotColdDB {
|
||||||
split: RwLock::new(Split::default()),
|
split: RwLock::new(Split::default()),
|
||||||
@@ -155,6 +157,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
|||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
Self::verify_slots_per_restore_point(config.slots_per_restore_point)?;
|
||||||
|
config.verify_compression_level()?;
|
||||||
|
|
||||||
let db = Arc::new(HotColdDB {
|
let db = Arc::new(HotColdDB {
|
||||||
split: RwLock::new(Split::default()),
|
split: RwLock::new(Split::default()),
|
||||||
@@ -671,7 +674,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME);
|
metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME);
|
||||||
let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?;
|
let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?;
|
||||||
drop(compute_diff_timer);
|
drop(compute_diff_timer);
|
||||||
ops.push(diff.as_kv_store_op(*state_root)?);
|
ops.push(self.state_diff_as_kv_store_op(state_root, &diff)?);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -827,11 +830,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
Ok((state, latest_block_root))
|
Ok((state, latest_block_root))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_state_diff(&self, state_root: Hash256) -> Result<BeaconStateDiff<E>, Error> {
|
|
||||||
self.get_item(&state_root)?
|
|
||||||
.ok_or(HotColdDBError::MissingStateDiff(state_root).into())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store a pre-finalization state in the freezer database.
|
/// Store a pre-finalization state in the freezer database.
|
||||||
///
|
///
|
||||||
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
|
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
|
||||||
|
|||||||
@@ -1,34 +1,43 @@
|
|||||||
use crate::{metrics, DBColumn, Error, StoreItem};
|
use crate::{
|
||||||
|
get_key_for_col, hot_cold_store::HotColdDBError, metrics, DBColumn, Error, HotColdDB,
|
||||||
|
ItemStore, KeyValueStore, KeyValueStoreOp,
|
||||||
|
};
|
||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use types::{beacon_state::BeaconStateDiff, EthSpec};
|
use types::{beacon_state::BeaconStateDiff, EthSpec, Hash256};
|
||||||
use zstd::{Decoder, Encoder};
|
use zstd::{Decoder, Encoder};
|
||||||
|
|
||||||
const EST_COMPRESSION_FACTOR: usize = 2;
|
impl<E, Hot, Cold> HotColdDB<E, Hot, Cold>
|
||||||
|
where
|
||||||
|
E: EthSpec,
|
||||||
|
Hot: KeyValueStore<E> + ItemStore<E>,
|
||||||
|
Cold: KeyValueStore<E> + ItemStore<E>,
|
||||||
|
{
|
||||||
|
pub fn load_state_diff(&self, state_root: Hash256) -> Result<BeaconStateDiff<E>, Error> {
|
||||||
|
let bytes = self
|
||||||
|
.hot_db
|
||||||
|
.get_bytes(DBColumn::BeaconStateDiff.into(), state_root.as_bytes())?
|
||||||
|
.ok_or(HotColdDBError::MissingStateDiff(state_root))?;
|
||||||
|
|
||||||
fn estimate_compressed_size(len: usize, compression_level: i32) -> usize {
|
let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len()));
|
||||||
if compression_level == 0 {
|
let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?;
|
||||||
len
|
decoder
|
||||||
} else {
|
.read_to_end(&mut ssz_bytes)
|
||||||
len / EST_COMPRESSION_FACTOR
|
.map_err(Error::Compression)?;
|
||||||
}
|
Ok(BeaconStateDiff::from_ssz_bytes(&ssz_bytes)?)
|
||||||
}
|
|
||||||
|
|
||||||
impl<E: EthSpec> StoreItem for BeaconStateDiff<E> {
|
|
||||||
fn db_column() -> DBColumn {
|
|
||||||
DBColumn::BeaconStateDiff
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn as_store_bytes(&self) -> Result<Vec<u8>, Error> {
|
pub fn state_diff_as_bytes(&self, diff: &BeaconStateDiff<E>) -> Result<Vec<u8>, Error> {
|
||||||
let encode_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_ENCODE_TIME);
|
let encode_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_ENCODE_TIME);
|
||||||
let value = self.as_ssz_bytes();
|
let value = diff.as_ssz_bytes();
|
||||||
drop(encode_timer);
|
drop(encode_timer);
|
||||||
|
|
||||||
let compression_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPRESSION_TIME);
|
let compression_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPRESSION_TIME);
|
||||||
|
|
||||||
let level = 1;
|
let mut compressed_value =
|
||||||
let mut compressed_value = Vec::with_capacity(estimate_compressed_size(value.len(), level));
|
Vec::with_capacity(self.config.estimate_compressed_size(value.len()));
|
||||||
let mut encoder = Encoder::new(&mut compressed_value, level).map_err(Error::Compression)?;
|
let mut encoder = Encoder::new(&mut compressed_value, self.config.compression_level)
|
||||||
|
.map_err(Error::Compression)?;
|
||||||
encoder.write_all(&value).map_err(Error::Compression)?;
|
encoder.write_all(&value).map_err(Error::Compression)?;
|
||||||
encoder.finish().map_err(Error::Compression)?;
|
encoder.finish().map_err(Error::Compression)?;
|
||||||
drop(compression_timer);
|
drop(compression_timer);
|
||||||
@@ -48,12 +57,13 @@ impl<E: EthSpec> StoreItem for BeaconStateDiff<E> {
|
|||||||
Ok(compressed_value)
|
Ok(compressed_value)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
|
pub fn state_diff_as_kv_store_op(
|
||||||
let mut ssz_bytes = Vec::with_capacity(EST_COMPRESSION_FACTOR * bytes.len());
|
&self,
|
||||||
let mut decoder = Decoder::new(bytes).map_err(Error::Compression)?;
|
state_root: &Hash256,
|
||||||
decoder
|
diff: &BeaconStateDiff<E>,
|
||||||
.read_to_end(&mut ssz_bytes)
|
) -> Result<KeyValueStoreOp, Error> {
|
||||||
.map_err(Error::Compression)?;
|
let key = get_key_for_col(DBColumn::BeaconStateDiff.into(), state_root.as_bytes());
|
||||||
Ok(Self::from_ssz_bytes(&ssz_bytes)?)
|
let value = self.state_diff_as_bytes(diff)?;
|
||||||
|
Ok(KeyValueStoreOp::PutKeyValue(key, value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user