diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 5150ab492b..9d80070e96 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -371,22 +371,6 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("1000") .takes_value(true) ) - .arg( - Arg::with_name("slots-per-restore-point") - .long("slots-per-restore-point") - .value_name("SLOT_COUNT") - .help("Specifies how often a freezer DB restore point should be stored. \ - Cannot be changed after initialization. \ - [default: 2048 (mainnet) or 64 (minimal)]") - .takes_value(true) - ) - .arg( - Arg::with_name("block-cache-size") - .long("block-cache-size") - .value_name("SIZE") - .help("Specifies how many blocks the database should cache in memory [default: 5]") - .takes_value(true) - ) /* * Execution Layer Integration */ @@ -421,8 +405,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) /* - * Database purging and compaction. + * Database. */ + .arg( + Arg::with_name("slots-per-restore-point") + .long("slots-per-restore-point") + .value_name("SLOT_COUNT") + .help("Specifies how often a freezer DB restore point should be stored. \ + Cannot be changed after initialization. \ + [default: 2048 (mainnet) or 64 (minimal)]") + .takes_value(true) + ) .arg( Arg::with_name("purge-db") .long("purge-db") @@ -441,7 +434,28 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("true") ) - + .arg( + Arg::with_name("block-cache-size") + .long("block-cache-size") + .value_name("SIZE") + .help("Specifies how many blocks the database should cache in memory [default: 64]") + .takes_value(true) + ) + .arg( + Arg::with_name("state-cache-size") + .long("state-cache-size") + .value_name("SIZE") + .help("Specifies how many states the database should cache in memory [default: 128]") + .takes_value(true) + ) + .arg( + Arg::with_name("compression-level") + .long("compression-level") + .value_name("LEVEL") + .help("Compression level (-99 to 22) for zstd compression applied to states on disk \ + [default: 1]. You may change the compression level freely without re-syncing.") + .takes_value(true) + ) /* * Misc. */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 6e8743c055..12055f7b4d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -266,10 +266,14 @@ pub fn get_config( ); } - if let Some(block_cache_size) = cli_args.value_of("block-cache-size") { - client_config.store.block_cache_size = block_cache_size - .parse() - .map_err(|_| "block-cache-size is not a valid integer".to_string())?; + if let Some(block_cache_size) = clap_utils::parse_optional(cli_args, "block-cache-size")? { + client_config.store.block_cache_size = block_cache_size; + } + if let Some(state_cache_size) = clap_utils::parse_optional(cli_args, "state-cache-size")? { + client_config.store.state_cache_size = state_cache_size; + } + if let Some(compression_level) = clap_utils::parse_optional(cli_args, "compression-level")? { + client_config.store.compression_level = compression_level; } client_config.store.compact_on_init = cli_args.is_present("compact-db"); diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 3593320710..abb164ad2f 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -7,6 +7,8 @@ use types::{EthSpec, MinimalEthSpec}; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64; pub const DEFAULT_STATE_CACHE_SIZE: usize = 128; +pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1; +const EST_COMPRESSION_FACTOR: usize = 2; /// Database configuration parameters. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -15,8 +17,10 @@ pub struct StoreConfig { pub slots_per_restore_point: u64, /// Maximum number of blocks to store in the in-memory block cache. pub block_cache_size: usize, - /// Maximum number of states to sore in the in-memory state cache. + /// Maximum number of states to store in the in-memory state cache. pub state_cache_size: usize, + /// Compression level for `BeaconStateDiff`s. + pub compression_level: i32, /// Whether to compact the database on initialization. pub compact_on_init: bool, /// Whether to compact the database during database pruning. @@ -32,6 +36,7 @@ pub struct OnDiskStoreConfig { #[derive(Debug, Clone)] pub enum StoreConfigError { MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 }, + InvalidCompressionLevel { level: i32 }, } impl Default for StoreConfig { @@ -41,6 +46,7 @@ impl Default for StoreConfig { slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64, block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, state_cache_size: DEFAULT_STATE_CACHE_SIZE, + compression_level: DEFAULT_COMPRESSION_LEVEL, compact_on_init: false, compact_on_prune: true, } @@ -66,6 +72,36 @@ impl StoreConfig { } Ok(()) } + + /// Check that the compression level is valid. + pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> { + if zstd::compression_level_range().contains(&self.compression_level) { + Ok(()) + } else { + Err(StoreConfigError::InvalidCompressionLevel { + level: self.compression_level, + }) + } + } + + /// Estimate the size of `len` bytes after compression at the current compression level. + pub fn estimate_compressed_size(&self, len: usize) -> usize { + if self.compression_level == 0 { + len + } else { + len / EST_COMPRESSION_FACTOR + } + } + + /// Estimate the size of `len` compressed bytes after decompression at the current compression + /// level. + pub fn estimate_decompressed_size(&self, len: usize) -> usize { + if self.compression_level == 0 { + len + } else { + len * EST_COMPRESSION_FACTOR + } + } } impl StoreItem for OnDiskStoreConfig { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 87de5ee7c5..6a34575187 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -36,6 +36,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::*; +use types::{beacon_state::BeaconStateDiff, EthSpec}; /// On-disk database that stores finalized states efficiently. /// @@ -121,6 +122,7 @@ impl HotColdDB, MemoryStore> { log: Logger, ) -> Result, MemoryStore>, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + config.verify_compression_level()?; let db = HotColdDB { split: RwLock::new(Split::default()), @@ -155,6 +157,7 @@ impl HotColdDB, LevelDB> { log: Logger, ) -> Result, Error> { Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + config.verify_compression_level()?; let db = Arc::new(HotColdDB { split: RwLock::new(Split::default()), @@ -671,7 +674,7 @@ impl, Cold: ItemStore> HotColdDB metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPUTE_TIME); let diff = BeaconStateDiff::compute_diff(&prev_boundary_state, state)?; drop(compute_diff_timer); - ops.push(diff.as_kv_store_op(*state_root)?); + ops.push(self.state_diff_as_kv_store_op(state_root, &diff)?); } } @@ -827,11 +830,6 @@ impl, Cold: ItemStore> HotColdDB Ok((state, latest_block_root)) } - pub fn load_state_diff(&self, state_root: Hash256) -> Result, Error> { - self.get_item(&state_root)? - .ok_or(HotColdDBError::MissingStateDiff(state_root).into()) - } - /// Store a pre-finalization state in the freezer database. /// /// If the state doesn't lie on a restore point boundary then just its summary will be stored. diff --git a/beacon_node/store/src/state_diff.rs b/beacon_node/store/src/state_diff.rs index c98a567792..7ca818ecf3 100644 --- a/beacon_node/store/src/state_diff.rs +++ b/beacon_node/store/src/state_diff.rs @@ -1,34 +1,43 @@ -use crate::{metrics, DBColumn, Error, StoreItem}; +use crate::{ + get_key_for_col, hot_cold_store::HotColdDBError, metrics, DBColumn, Error, HotColdDB, + ItemStore, KeyValueStore, KeyValueStoreOp, +}; use ssz::{Decode, Encode}; use std::io::{Read, Write}; -use types::{beacon_state::BeaconStateDiff, EthSpec}; +use types::{beacon_state::BeaconStateDiff, EthSpec, Hash256}; use zstd::{Decoder, Encoder}; -const EST_COMPRESSION_FACTOR: usize = 2; +impl HotColdDB +where + E: EthSpec, + Hot: KeyValueStore + ItemStore, + Cold: KeyValueStore + ItemStore, +{ + pub fn load_state_diff(&self, state_root: Hash256) -> Result, Error> { + let bytes = self + .hot_db + .get_bytes(DBColumn::BeaconStateDiff.into(), state_root.as_bytes())? + .ok_or(HotColdDBError::MissingStateDiff(state_root))?; -fn estimate_compressed_size(len: usize, compression_level: i32) -> usize { - if compression_level == 0 { - len - } else { - len / EST_COMPRESSION_FACTOR - } -} - -impl StoreItem for BeaconStateDiff { - fn db_column() -> DBColumn { - DBColumn::BeaconStateDiff + let mut ssz_bytes = Vec::with_capacity(self.config.estimate_decompressed_size(bytes.len())); + let mut decoder = Decoder::new(&*bytes).map_err(Error::Compression)?; + decoder + .read_to_end(&mut ssz_bytes) + .map_err(Error::Compression)?; + Ok(BeaconStateDiff::from_ssz_bytes(&ssz_bytes)?) } - fn as_store_bytes(&self) -> Result, Error> { + pub fn state_diff_as_bytes(&self, diff: &BeaconStateDiff) -> Result, Error> { let encode_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_ENCODE_TIME); - let value = self.as_ssz_bytes(); + let value = diff.as_ssz_bytes(); drop(encode_timer); let compression_timer = metrics::start_timer(&metrics::BEACON_STATE_DIFF_COMPRESSION_TIME); - let level = 1; - let mut compressed_value = Vec::with_capacity(estimate_compressed_size(value.len(), level)); - let mut encoder = Encoder::new(&mut compressed_value, level).map_err(Error::Compression)?; + let mut compressed_value = + Vec::with_capacity(self.config.estimate_compressed_size(value.len())); + let mut encoder = Encoder::new(&mut compressed_value, self.config.compression_level) + .map_err(Error::Compression)?; encoder.write_all(&value).map_err(Error::Compression)?; encoder.finish().map_err(Error::Compression)?; drop(compression_timer); @@ -48,12 +57,13 @@ impl StoreItem for BeaconStateDiff { Ok(compressed_value) } - fn from_store_bytes(bytes: &[u8]) -> Result { - let mut ssz_bytes = Vec::with_capacity(EST_COMPRESSION_FACTOR * bytes.len()); - let mut decoder = Decoder::new(bytes).map_err(Error::Compression)?; - decoder - .read_to_end(&mut ssz_bytes) - .map_err(Error::Compression)?; - Ok(Self::from_ssz_bytes(&ssz_bytes)?) + pub fn state_diff_as_kv_store_op( + &self, + state_root: &Hash256, + diff: &BeaconStateDiff, + ) -> Result { + let key = get_key_for_col(DBColumn::BeaconStateDiff.into(), state_root.as_bytes()); + let value = self.state_diff_as_bytes(diff)?; + Ok(KeyValueStoreOp::PutKeyValue(key, value)) } }