mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 16:55:46 +00:00
Improve state cache eviction and reduce mem usage (#4762)
* Improve state cache eviction and reduce mem usage * Fix epochs_per_state_diff tests
This commit is contained in:
@@ -4,9 +4,11 @@ use serde_derive::{Deserialize, Serialize};
|
|||||||
use ssz::{Decode, Encode};
|
use ssz::{Decode, Encode};
|
||||||
use ssz_derive::{Decode, Encode};
|
use ssz_derive::{Decode, Encode};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
use types::{EthSpec, Unsigned};
|
||||||
use zstd::Encoder;
|
use zstd::Encoder;
|
||||||
|
|
||||||
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 16;
|
// Only used in tests. Mainnet sets a higher default on the CLI.
|
||||||
|
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
|
||||||
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
|
||||||
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
|
||||||
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
|
||||||
@@ -64,6 +66,10 @@ pub enum StoreConfigError {
|
|||||||
config: OnDiskStoreConfig,
|
config: OnDiskStoreConfig,
|
||||||
on_disk: OnDiskStoreConfig,
|
on_disk: OnDiskStoreConfig,
|
||||||
},
|
},
|
||||||
|
InvalidEpochsPerStateDiff {
|
||||||
|
epochs_per_state_diff: u64,
|
||||||
|
max_supported: u64,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for StoreConfig {
|
impl Default for StoreConfig {
|
||||||
@@ -107,8 +113,14 @@ impl StoreConfig {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check that the configuration is valid.
|
||||||
|
pub fn verify<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
|
||||||
|
self.verify_compression_level()?;
|
||||||
|
self.verify_epochs_per_state_diff::<E>()
|
||||||
|
}
|
||||||
|
|
||||||
/// Check that the compression level is valid.
|
/// Check that the compression level is valid.
|
||||||
pub fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
|
fn verify_compression_level(&self) -> Result<(), StoreConfigError> {
|
||||||
if zstd::compression_level_range().contains(&self.compression_level) {
|
if zstd::compression_level_range().contains(&self.compression_level) {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
@@ -118,6 +130,21 @@ impl StoreConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check that the configuration is valid.
|
||||||
|
pub fn verify_epochs_per_state_diff<E: EthSpec>(&self) -> Result<(), StoreConfigError> {
|
||||||
|
// To build state diffs we need to be able to determine the previous state root from the
|
||||||
|
// state itself, which requires reading back in the state_roots array.
|
||||||
|
let max_supported = E::SlotsPerHistoricalRoot::to_u64() / E::slots_per_epoch();
|
||||||
|
if self.epochs_per_state_diff <= max_supported {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(StoreConfigError::InvalidEpochsPerStateDiff {
|
||||||
|
epochs_per_state_diff: self.epochs_per_state_diff,
|
||||||
|
max_supported,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Estimate the size of `len` bytes after compression at the current compression level.
|
/// Estimate the size of `len` bytes after compression at the current compression level.
|
||||||
pub fn estimate_compressed_size(&self, len: usize) -> usize {
|
pub fn estimate_compressed_size(&self, len: usize) -> usize {
|
||||||
if self.compression_level == 0 {
|
if self.compression_level == 0 {
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
|
|||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
|
) -> Result<HotColdDB<E, MemoryStore<E>, MemoryStore<E>>, Error> {
|
||||||
config.verify_compression_level()?;
|
config.verify::<E>()?;
|
||||||
|
|
||||||
let hierarchy = config.hierarchy_config.to_moduli()?;
|
let hierarchy = config.hierarchy_config.to_moduli()?;
|
||||||
|
|
||||||
@@ -189,7 +189,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
|
|||||||
spec: ChainSpec,
|
spec: ChainSpec,
|
||||||
log: Logger,
|
log: Logger,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
config.verify_compression_level()?;
|
config.verify::<E>()?;
|
||||||
|
|
||||||
let hierarchy = config.hierarchy_config.to_moduli()?;
|
let hierarchy = config.hierarchy_config.to_moduli()?;
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,15 @@ use crate::Error;
|
|||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
use types::{BeaconState, EthSpec, Hash256, Slot};
|
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
|
||||||
|
|
||||||
|
/// Fraction of the LRU cache to leave intact during culling.
|
||||||
|
const CULL_EXEMPT_NUMERATOR: usize = 1;
|
||||||
|
const CULL_EXEMPT_DENOMINATOR: usize = 10;
|
||||||
|
|
||||||
|
/// States that are less than or equal to this many epochs old *could* become finalized and will not
|
||||||
|
/// be culled from the cache.
|
||||||
|
const EPOCH_FINALIZATION_LIMIT: u64 = 4;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct FinalizedState<E: EthSpec> {
|
pub struct FinalizedState<E: EthSpec> {
|
||||||
@@ -27,6 +35,8 @@ pub struct StateCache<E: EthSpec> {
|
|||||||
finalized_state: Option<FinalizedState<E>>,
|
finalized_state: Option<FinalizedState<E>>,
|
||||||
states: LruCache<Hash256, BeaconState<E>>,
|
states: LruCache<Hash256, BeaconState<E>>,
|
||||||
block_map: BlockMap,
|
block_map: BlockMap,
|
||||||
|
capacity: NonZeroUsize,
|
||||||
|
max_epoch: Epoch,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -42,6 +52,8 @@ impl<E: EthSpec> StateCache<E> {
|
|||||||
finalized_state: None,
|
finalized_state: None,
|
||||||
states: LruCache::new(capacity),
|
states: LruCache::new(capacity),
|
||||||
block_map: BlockMap::default(),
|
block_map: BlockMap::default(),
|
||||||
|
capacity,
|
||||||
|
max_epoch: Epoch::new(0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,6 +127,14 @@ impl<E: EthSpec> StateCache<E> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the cache's idea of the max epoch.
|
||||||
|
self.max_epoch = std::cmp::max(state.current_epoch(), self.max_epoch);
|
||||||
|
|
||||||
|
// If the cache is full, use the custom cull routine to make room.
|
||||||
|
if let Some(over_capacity) = self.len().checked_sub(self.capacity.get()) {
|
||||||
|
self.cull(over_capacity + 1);
|
||||||
|
}
|
||||||
|
|
||||||
// Insert the full state into the cache.
|
// Insert the full state into the cache.
|
||||||
self.states.put(state_root, state.clone());
|
self.states.put(state_root, state.clone());
|
||||||
|
|
||||||
@@ -166,6 +186,60 @@ impl<E: EthSpec> StateCache<E> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Cull approximately `count` states from the cache.
|
||||||
|
///
|
||||||
|
/// States are culled LRU, with the following extra order imposed:
|
||||||
|
///
|
||||||
|
/// - Advanced states.
|
||||||
|
/// - Mid-epoch unadvanced states.
|
||||||
|
/// - Epoch-boundary states that are too old to be finalized.
|
||||||
|
/// - Epoch-boundary states that could be finalized.
|
||||||
|
pub fn cull(&mut self, count: usize) {
|
||||||
|
let cull_exempt = std::cmp::max(
|
||||||
|
1,
|
||||||
|
self.len() * CULL_EXEMPT_NUMERATOR / CULL_EXEMPT_DENOMINATOR,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Stage 1: gather states to cull.
|
||||||
|
let mut advanced_state_roots = vec![];
|
||||||
|
let mut mid_epoch_state_roots = vec![];
|
||||||
|
let mut old_boundary_state_roots = vec![];
|
||||||
|
let mut good_boundary_state_roots = vec![];
|
||||||
|
for (&state_root, state) in self.states.iter().skip(cull_exempt) {
|
||||||
|
let is_advanced = state.slot() > state.latest_block_header().slot;
|
||||||
|
let is_boundary = state.slot() % E::slots_per_epoch() == 0;
|
||||||
|
let could_finalize =
|
||||||
|
(self.max_epoch - state.current_epoch()) <= EPOCH_FINALIZATION_LIMIT;
|
||||||
|
|
||||||
|
if is_advanced {
|
||||||
|
advanced_state_roots.push(state_root);
|
||||||
|
} else if !is_boundary {
|
||||||
|
mid_epoch_state_roots.push(state_root);
|
||||||
|
} else if !could_finalize {
|
||||||
|
old_boundary_state_roots.push(state_root);
|
||||||
|
} else {
|
||||||
|
good_boundary_state_roots.push(state_root);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate early in the common case where we've already found enough junk to cull.
|
||||||
|
if advanced_state_roots.len() == count {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stage 2: delete.
|
||||||
|
// This could probably be more efficient in how it interacts with the block map.
|
||||||
|
for state_root in advanced_state_roots
|
||||||
|
.iter()
|
||||||
|
.chain(mid_epoch_state_roots.iter())
|
||||||
|
.chain(old_boundary_state_roots.iter())
|
||||||
|
.chain(good_boundary_state_roots.iter())
|
||||||
|
.take(count)
|
||||||
|
{
|
||||||
|
self.delete_state(state_root);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockMap {
|
impl BlockMap {
|
||||||
|
|||||||
@@ -1795,12 +1795,7 @@ fn epochs_per_migration_override() {
|
|||||||
fn epochs_per_state_diff_default() {
|
fn epochs_per_state_diff_default() {
|
||||||
CommandLineTest::new()
|
CommandLineTest::new()
|
||||||
.run_with_zero_port()
|
.run_with_zero_port()
|
||||||
.with_config(|config| {
|
.with_config(|config| assert_eq!(config.store.epochs_per_state_diff, 16));
|
||||||
assert_eq!(
|
|
||||||
config.store.epochs_per_state_diff,
|
|
||||||
beacon_node::beacon_chain::store::config::DEFAULT_EPOCHS_PER_STATE_DIFF
|
|
||||||
)
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
fn epochs_per_state_diff_override() {
|
fn epochs_per_state_diff_override() {
|
||||||
|
|||||||
Reference in New Issue
Block a user