Prevent writing to state cache when migrating the database (#7067)

* add an update_cache flag to get_state to have more granular control over when we write to the cache

* State cache tweaks

- add state-cache-headroom flag to control pruning
- prune old epoch boundary states ahead of mid-epoch states
- never prune head block's state
- avoid caching ancestor states unless they are on an epoch boundary

---------

Co-authored-by: Michael Sproul <michael@sigmaprime.io>
This commit is contained in:
Eitan Seri-Levi
2025-03-03 17:15:12 -07:00
committed by GitHub
parent 8f62b1934a
commit defdc595fc
28 changed files with 178 additions and 77 deletions

View File

@@ -35,6 +35,8 @@ pub struct StoreConfig {
pub block_cache_size: NonZeroUsize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: NonZeroUsize,
/// Minimum number of states to cull from the state cache upon fullness.
pub state_cache_headroom: usize,
/// Compression level for blocks, state diffs and other compressed values.
pub compression_level: i32,
/// Maximum number of historic states to store in the in-memory historic state cache.
@@ -107,6 +109,7 @@ impl Default for StoreConfig {
Self {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
state_cache_headroom: 1,
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
hdiff_buffer_cache_size: DEFAULT_HDIFF_BUFFER_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,

View File

@@ -73,7 +73,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
state_cache: Mutex<StateCache<E>>,
pub state_cache: Mutex<StateCache<E>>,
/// Cache of historic states and hierarchical diff buffers.
///
/// This cache is never pruned. It is only populated in response to historical queries from the
@@ -218,7 +218,10 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
)),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.hdiff_buffer_cache_size,
config.historic_state_cache_size,
@@ -264,7 +267,10 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
hot_db,
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
)),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.hdiff_buffer_cache_size,
config.historic_state_cache_size,
@@ -945,6 +951,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: &Hash256,
slot: Option<Slot>,
update_cache: bool,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
@@ -956,10 +963,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// chain. This way we avoid returning a state that doesn't match `state_root`.
self.load_cold_state(state_root)
} else {
self.get_hot_state(state_root)
self.get_hot_state(state_root, update_cache)
}
} else {
match self.get_hot_state(state_root)? {
match self.get_hot_state(state_root, update_cache)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
@@ -1015,7 +1022,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state_root
};
let mut opt_state = self
.load_hot_state(&state_root)?
.load_hot_state(&state_root, true)?
.map(|(state, _block_root)| (state_root, state));
if let Some((state_root, state)) = opt_state.as_mut() {
@@ -1126,6 +1133,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.
///
/// NOTE: only used in tests at the moment
pub fn load_epoch_boundary_state(
&self,
state_root: &Hash256,
@@ -1136,9 +1145,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
let (state, _) = self
.load_hot_state(&epoch_boundary_state_root, true)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(
epoch_boundary_state_root,
))?;
Ok(Some(state))
} else {
// Try the cold DB
@@ -1505,7 +1516,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Get a post-finalization state from the database or store.
pub fn get_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
pub fn get_hot_state(
&self,
state_root: &Hash256,
update_cache: bool,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
return Ok(Some(state));
}
@@ -1519,20 +1534,30 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
);
}
let state_from_disk = self.load_hot_state(state_root)?;
let state_from_disk = self.load_hot_state(state_root, update_cache)?;
if let Some((mut state, block_root)) = state_from_disk {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?;
debug!(
self.log,
"Cached state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
if update_cache {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?;
debug!(
self.log,
"Cached state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
} else {
debug!(
self.log,
"Did not cache state";
"state_root" => ?state_root,
"slot" => state.slot(),
);
}
Ok(Some(state))
} else {
Ok(None)
@@ -1548,6 +1573,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn load_hot_state(
&self,
state_root: &Hash256,
update_cache: bool,
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
@@ -1579,11 +1605,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
// Cache ALL intermediate states that are reached during block replay. We may want
// to restrict this in future to only cache epoch boundary states. At worst we will
// cache up to 32 states for each state loaded, which should not flush out the cache
// entirely.
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
// state that this state is based on. It may be useful as the basis of more states
// in the same epoch.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
if !update_cache || state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
@@ -1598,7 +1626,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.log,
"Cached ancestor state";
"state_root" => ?state_root,
"slot" => slot,
"state_slot" => state.slot(),
"descendant_slot" => slot,
);
}
Ok(())
@@ -2668,10 +2697,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(());
};
// Load the split state so we can backtrack to find execution payloads.
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;
// Load the split state so we can backtrack to find execution payloads. The split state
// should be in the state cache as the enshrined finalized state, so this should never
// cache miss.
let split_state = self
.get_state(&split.state_root, Some(split.slot), true)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;
// The finalized block may or may not have its execution payload stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
@@ -3169,8 +3203,9 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Store slot -> state_root and state_root -> slot mappings.
store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?;
} else {
// We purposely do not update the state cache to store this state
let state: BeaconState<E> = store
.get_hot_state(&state_root)?
.get_hot_state(&state_root, false)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;

View File

@@ -27,8 +27,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
&self,
store: &'a HotColdDB<E, Hot, Cold>,
) -> Option<BlockRootsIterator<'a, E, Hot, Cold>> {
// ancestor roots and their states are probably in the cold db
// but we set `update_cache` to false just in case
let state = store
.get_state(&self.message().state_root(), Some(self.slot()))
.get_state(&self.message().state_root(), Some(self.slot()), false)
.ok()??;
Some(BlockRootsIterator::owned(store, state))
@@ -190,7 +192,7 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> RootsIterator<'a, E,
.get_blinded_block(&block_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
.get_state(&block.state_root(), Some(block.slot()), false)?
.ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?;
Ok(Self::owned(store, state))
}
@@ -363,7 +365,7 @@ fn next_historical_root_backtrack_state<E: EthSpec, Hot: ItemStore<E>, Cold: Ite
if new_state_slot >= historic_state_upper_limit {
let new_state_root = current_state.get_state_root(new_state_slot)?;
Ok(store
.get_state(new_state_root, Some(new_state_slot))?
.get_state(new_state_root, Some(new_state_slot), false)?
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
} else {
Err(Error::HistoryUnavailable)

View File

@@ -36,6 +36,8 @@ pub struct StateCache<E: EthSpec> {
states: LruCache<Hash256, BeaconState<E>>,
block_map: BlockMap,
max_epoch: Epoch,
head_block_root: Hash256,
headroom: usize,
}
#[derive(Debug)]
@@ -47,12 +49,14 @@ pub enum PutStateOutcome {
#[allow(clippy::len_without_is_empty)]
impl<E: EthSpec> StateCache<E> {
pub fn new(capacity: NonZeroUsize) -> Self {
pub fn new(capacity: NonZeroUsize, headroom: usize) -> Self {
StateCache {
finalized_state: None,
states: LruCache::new(capacity),
block_map: BlockMap::default(),
max_epoch: Epoch::new(0),
head_block_root: Hash256::ZERO,
headroom,
}
}
@@ -98,6 +102,13 @@ impl<E: EthSpec> StateCache<E> {
Ok(())
}
/// Update the state cache's view of the enshrined head block.
///
/// We never prune the unadvanced state for the head block.
pub fn update_head_block_root(&mut self, head_block_root: Hash256) {
self.head_block_root = head_block_root;
}
/// Rebase the given state on the finalized state in order to reduce its memory consumption.
///
/// This function should only be called on states that are likely not to already share tree
@@ -148,7 +159,8 @@ impl<E: EthSpec> StateCache<E> {
// If the cache is full, use the custom cull routine to make room.
if let Some(over_capacity) = self.len().checked_sub(self.capacity()) {
self.cull(over_capacity + 1);
// The `over_capacity` should always be 0, but we add it here just in case.
self.cull((over_capacity + self.headroom).max(1));
}
// Insert the full state into the cache.
@@ -222,6 +234,7 @@ impl<E: EthSpec> StateCache<E> {
let mut mid_epoch_state_roots = vec![];
let mut old_boundary_state_roots = vec![];
let mut good_boundary_state_roots = vec![];
for (&state_root, state) in self.states.iter().skip(cull_exempt) {
let is_advanced = state.slot() > state.latest_block_header().slot;
let is_boundary = state.slot() % E::slots_per_epoch() == 0;
@@ -236,7 +249,7 @@ impl<E: EthSpec> StateCache<E> {
}
} else if is_advanced {
advanced_state_roots.push(state_root);
} else {
} else if state.get_latest_block_root(state_root) != self.head_block_root {
mid_epoch_state_roots.push(state_root);
}
@@ -250,8 +263,8 @@ impl<E: EthSpec> StateCache<E> {
// This could probably be more efficient in how it interacts with the block map.
for state_root in advanced_state_roots
.iter()
.chain(mid_epoch_state_roots.iter())
.chain(old_boundary_state_roots.iter())
.chain(mid_epoch_state_roots.iter())
.chain(good_boundary_state_roots.iter())
.take(count)
{