resolve merge conflicts between untstable and release-v7.0.0

This commit is contained in:
Eitan Seri-Levi
2025-03-23 11:09:02 -06:00
63 changed files with 1422 additions and 242 deletions

View File

@@ -21,6 +21,7 @@ pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192;
pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8;
pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64);
pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128);
pub const DEFAULT_STATE_CACHE_HEADROOM: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1;
pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1);
pub const DEFAULT_HDIFF_BUFFER_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(16);
@@ -35,6 +36,8 @@ pub struct StoreConfig {
pub block_cache_size: NonZeroUsize,
/// Maximum number of states to store in the in-memory state cache.
pub state_cache_size: NonZeroUsize,
/// Minimum number of states to cull from the state cache upon fullness.
pub state_cache_headroom: NonZeroUsize,
/// Compression level for blocks, state diffs and other compressed values.
pub compression_level: i32,
/// Maximum number of historic states to store in the in-memory historic state cache.
@@ -107,6 +110,7 @@ impl Default for StoreConfig {
Self {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
state_cache_headroom: DEFAULT_STATE_CACHE_HEADROOM,
historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE,
hdiff_buffer_cache_size: DEFAULT_HDIFF_BUFFER_CACHE_SIZE,
compression_level: DEFAULT_COMPRESSION_LEVEL,

View File

@@ -73,7 +73,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
state_cache: Mutex<StateCache<E>>,
pub state_cache: Mutex<StateCache<E>>,
/// Cache of historic states and hierarchical diff buffers.
///
/// This cache is never pruned. It is only populated in response to historical queries from the
@@ -215,7 +215,10 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
blobs_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
)),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.hdiff_buffer_cache_size,
config.historic_state_cache_size,
@@ -259,7 +262,10 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
cold_db: BeaconNodeBackend::open(&config, cold_path)?,
hot_db,
block_cache: Mutex::new(BlockCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
state_cache: Mutex::new(StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
)),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.hdiff_buffer_cache_size,
config.historic_state_cache_size,
@@ -934,6 +940,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self,
state_root: &Hash256,
slot: Option<Slot>,
update_cache: bool,
) -> Result<Option<BeaconState<E>>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT);
@@ -945,10 +952,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// chain. This way we avoid returning a state that doesn't match `state_root`.
self.load_cold_state(state_root)
} else {
self.get_hot_state(state_root)
self.get_hot_state(state_root, update_cache)
}
} else {
match self.get_hot_state(state_root)? {
match self.get_hot_state(state_root, update_cache)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
@@ -998,21 +1005,27 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
} else {
state_root
};
// It's a bit redundant but we elect to cache the state here and down below.
let mut opt_state = self
.load_hot_state(&state_root)?
.load_hot_state(&state_root, true)?
.map(|(state, _block_root)| (state_root, state));
if let Some((state_root, state)) = opt_state.as_mut() {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?;
debug!(
?state_root,
slot = %state.slot(),
"Cached state"
);
if let PutStateOutcome::New(deleted_states) =
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?
{
debug!(
?state_root,
state_slot = %state.slot(),
?deleted_states,
location = "get_advanced_hot_state",
"Cached state",
);
}
}
drop(split);
Ok(opt_state)
@@ -1109,6 +1122,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Load an epoch boundary state by using the hot state summary look-up.
///
/// Will fall back to the cold DB if a hot state summary is not found.
///
/// NOTE: only used in tests at the moment
pub fn load_epoch_boundary_state(
&self,
state_root: &Hash256,
@@ -1119,9 +1134,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(state_root)?
{
// NOTE: minor inefficiency here because we load an unnecessary hot state summary
let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;
let (state, _) = self
.load_hot_state(&epoch_boundary_state_root, true)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(
epoch_boundary_state_root,
))?;
Ok(Some(state))
} else {
// Try the cold DB
@@ -1445,23 +1462,32 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Put the state in the cache.
let block_root = state.get_latest_block_root(*state_root);
// Avoid storing states in the database if they already exist in the state cache.
// The exception to this is the finalized state, which must exist in the cache before it
// is stored on disk.
if let PutStateOutcome::Duplicate =
self.state_cache
.lock()
.put_state(*state_root, block_root, state)?
{
debug!(
slot = %state.slot(),
?state_root,
"Skipping storage of cached state"
);
return Ok(());
match self.state_cache.lock().put_state(
*state_root,
state.get_latest_block_root(*state_root),
state,
)? {
PutStateOutcome::New(deleted_states) => {
debug!(
?state_root,
state_slot = %state.slot(),
?deleted_states,
location = "store_hot_state",
"Cached state",
);
}
PutStateOutcome::Duplicate => {
debug!(
?state_root,
state_slot = %state.slot(),
"State already exists in state cache",
);
return Ok(());
}
PutStateOutcome::Finalized => {} // Continue to store.
}
// On the epoch boundary, store the full state.
@@ -1485,7 +1511,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
/// Get a post-finalization state from the database or store.
pub fn get_hot_state(&self, state_root: &Hash256) -> Result<Option<BeaconState<E>>, Error> {
pub fn get_hot_state(
&self,
state_root: &Hash256,
update_cache: bool,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
return Ok(Some(state));
}
@@ -1495,19 +1525,33 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
warn!(?state_root, "State cache missed");
}
let state_from_disk = self.load_hot_state(state_root)?;
let state_from_disk = self.load_hot_state(state_root, update_cache)?;
if let Some((mut state, block_root)) = state_from_disk {
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?;
debug!(
?state_root,
slot = %state.slot(),
"Cached state"
);
if update_cache {
if let PutStateOutcome::New(deleted_states) =
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?
{
debug!(
?state_root,
state_slot = %state.slot(),
?deleted_states,
location = "get_hot_state",
"Cached state",
);
}
} else {
debug!(
?state_root,
state_slot = %state.slot(),
"Did not cache state",
);
}
Ok(Some(state))
} else {
Ok(None)
@@ -1523,6 +1567,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn load_hot_state(
&self,
state_root: &Hash256,
update_cache: bool,
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
@@ -1554,25 +1599,28 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut state = if slot % E::slots_per_epoch() == 0 {
boundary_state
} else {
// Cache ALL intermediate states that are reached during block replay. We may want
// to restrict this in future to only cache epoch boundary states. At worst we will
// cache up to 32 states for each state loaded, which should not flush out the cache
// entirely.
// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
// state that this state is based on. It may be useful as the basis of more states
// in the same epoch.
let state_cache_hook = |state_root, state: &mut BeaconState<E>| {
if !update_cache || state.slot() % E::slots_per_epoch() != 0 {
return Ok(());
}
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
let latest_block_root = state.get_latest_block_root(state_root);
if let PutStateOutcome::New =
if let PutStateOutcome::New(_) =
self.state_cache
.lock()
.put_state(state_root, latest_block_root, state)?
{
debug!(
?state_root,
%slot,
"Cached ancestor state"
state_slot = %state.slot(),
descendant_slot = %slot,
"Cached ancestor state",
);
}
Ok(())
@@ -2619,10 +2667,15 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(());
};
// Load the split state so we can backtrack to find execution payloads.
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;
// Load the split state so we can backtrack to find execution payloads. The split state
// should be in the state cache as the enshrined finalized state, so this should never
// cache miss.
let split_state = self
.get_state(&split.state_root, Some(split.slot), true)?
.ok_or(HotColdDBError::MissingSplitState(
split.state_root,
split.slot,
))?;
// The finalized block may or may not have its execution payload stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
@@ -3080,8 +3133,10 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Store slot -> state_root and state_root -> slot mappings.
store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?;
} else {
// This is some state that we want to migrate to the freezer db.
// There is no reason to cache this state.
let state: BeaconState<E> = store
.get_hot_state(&state_root)?
.get_hot_state(&state_root, false)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;

View File

@@ -27,8 +27,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>
&self,
store: &'a HotColdDB<E, Hot, Cold>,
) -> Option<BlockRootsIterator<'a, E, Hot, Cold>> {
// Ancestor roots and their states are probably in the cold db
// but we set `update_cache` to false just in case
let state = store
.get_state(&self.message().state_root(), Some(self.slot()))
.get_state(&self.message().state_root(), Some(self.slot()), false)
.ok()??;
Some(BlockRootsIterator::owned(store, state))
@@ -189,8 +191,10 @@ impl<'a, E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> RootsIterator<'a, E,
let block = store
.get_blinded_block(&block_hash)?
.ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?;
// We are querying some block from the database. It's not clear if the block's state is useful,
// we elect not to cache it.
let state = store
.get_state(&block.state_root(), Some(block.slot()))?
.get_state(&block.state_root(), Some(block.slot()), false)?
.ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?;
Ok(Self::owned(store, state))
}
@@ -362,8 +366,9 @@ fn next_historical_root_backtrack_state<E: EthSpec, Hot: ItemStore<E>, Cold: Ite
if new_state_slot >= historic_state_upper_limit {
let new_state_root = current_state.get_state_root(new_state_slot)?;
// We are backtracking through historical states, we don't want to cache these.
Ok(store
.get_state(new_state_root, Some(new_state_slot))?
.get_state(new_state_root, Some(new_state_slot), false)?
.ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?)
} else {
Err(Error::HistoryUnavailable)

View File

@@ -33,26 +33,33 @@ pub struct SlotMap {
#[derive(Debug)]
pub struct StateCache<E: EthSpec> {
finalized_state: Option<FinalizedState<E>>,
states: LruCache<Hash256, BeaconState<E>>,
// Stores the tuple (state_root, state) as LruCache only returns the value on put and we need
// the state_root
states: LruCache<Hash256, (Hash256, BeaconState<E>)>,
block_map: BlockMap,
max_epoch: Epoch,
head_block_root: Hash256,
headroom: NonZeroUsize,
}
#[derive(Debug)]
pub enum PutStateOutcome {
Finalized,
Duplicate,
New,
/// Includes deleted states as a result of this insertion
New(Vec<Hash256>),
}
#[allow(clippy::len_without_is_empty)]
impl<E: EthSpec> StateCache<E> {
pub fn new(capacity: NonZeroUsize) -> Self {
pub fn new(capacity: NonZeroUsize, headroom: NonZeroUsize) -> Self {
StateCache {
finalized_state: None,
states: LruCache::new(capacity),
block_map: BlockMap::default(),
max_epoch: Epoch::new(0),
head_block_root: Hash256::ZERO,
headroom,
}
}
@@ -98,6 +105,13 @@ impl<E: EthSpec> StateCache<E> {
Ok(())
}
/// Update the state cache's view of the enshrined head block.
///
/// We never prune the unadvanced state for the head block.
pub fn update_head_block_root(&mut self, head_block_root: Hash256) {
self.head_block_root = head_block_root;
}
/// Rebase the given state on the finalized state in order to reduce its memory consumption.
///
/// This function should only be called on states that are likely not to already share tree
@@ -147,18 +161,26 @@ impl<E: EthSpec> StateCache<E> {
self.max_epoch = std::cmp::max(state.current_epoch(), self.max_epoch);
// If the cache is full, use the custom cull routine to make room.
if let Some(over_capacity) = self.len().checked_sub(self.capacity()) {
self.cull(over_capacity + 1);
}
let mut deleted_states =
if let Some(over_capacity) = self.len().checked_sub(self.capacity()) {
// The `over_capacity` should always be 0, but we add it here just in case.
self.cull(over_capacity + self.headroom.get())
} else {
vec![]
};
// Insert the full state into the cache.
self.states.put(state_root, state.clone());
if let Some((deleted_state_root, _)) =
self.states.put(state_root, (state_root, state.clone()))
{
deleted_states.push(deleted_state_root);
}
// Record the connection from block root and slot to this state.
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
Ok(PutStateOutcome::New)
Ok(PutStateOutcome::New(deleted_states))
}
pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option<BeaconState<E>> {
@@ -167,7 +189,7 @@ impl<E: EthSpec> StateCache<E> {
return Some(finalized_state.state.clone());
}
}
self.states.get(&state_root).cloned()
self.states.get(&state_root).map(|(_, state)| state.clone())
}
pub fn get_by_block_root(
@@ -211,7 +233,7 @@ impl<E: EthSpec> StateCache<E> {
/// - Mid-epoch unadvanced states.
/// - Epoch-boundary states that are too old to be finalized.
/// - Epoch-boundary states that could be finalized.
pub fn cull(&mut self, count: usize) {
pub fn cull(&mut self, count: usize) -> Vec<Hash256> {
let cull_exempt = std::cmp::max(
1,
self.len() * CULL_EXEMPT_NUMERATOR / CULL_EXEMPT_DENOMINATOR,
@@ -222,7 +244,8 @@ impl<E: EthSpec> StateCache<E> {
let mut mid_epoch_state_roots = vec![];
let mut old_boundary_state_roots = vec![];
let mut good_boundary_state_roots = vec![];
for (&state_root, state) in self.states.iter().skip(cull_exempt) {
for (&state_root, (_, state)) in self.states.iter().skip(cull_exempt) {
let is_advanced = state.slot() > state.latest_block_header().slot;
let is_boundary = state.slot() % E::slots_per_epoch() == 0;
let could_finalize =
@@ -236,7 +259,8 @@ impl<E: EthSpec> StateCache<E> {
}
} else if is_advanced {
advanced_state_roots.push(state_root);
} else {
} else if state.get_latest_block_root(state_root) != self.head_block_root {
// Never prune the head state
mid_epoch_state_roots.push(state_root);
}
@@ -248,15 +272,19 @@ impl<E: EthSpec> StateCache<E> {
// Stage 2: delete.
// This could probably be more efficient in how it interacts with the block map.
for state_root in advanced_state_roots
.iter()
.chain(mid_epoch_state_roots.iter())
.chain(old_boundary_state_roots.iter())
.chain(good_boundary_state_roots.iter())
let state_roots_to_delete = advanced_state_roots
.into_iter()
.chain(old_boundary_state_roots)
.chain(mid_epoch_state_roots)
.chain(good_boundary_state_roots)
.take(count)
{
.collect::<Vec<_>>();
for state_root in &state_roots_to_delete {
self.delete_state(state_root);
}
state_roots_to_delete
}
}