mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 12:47:05 +00:00
Merge remote-tracking branch 'sigp/unstable' into validator-custody-cgc-mutex
This commit is contained in:
@@ -22,7 +22,7 @@ directory = { workspace = true }
|
||||
ethereum_ssz = { workspace = true }
|
||||
ethereum_ssz_derive = { workspace = true }
|
||||
itertools = { workspace = true }
|
||||
leveldb = { version = "0.8.6", optional = true }
|
||||
leveldb = { version = "0.8.6", optional = true, default-features = false }
|
||||
logging = { workspace = true }
|
||||
lru = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
|
||||
@@ -195,7 +195,6 @@ impl<E: EthSpec> LevelDB<E> {
|
||||
};
|
||||
|
||||
for (start_key, end_key) in [
|
||||
endpoints(DBColumn::BeaconStateTemporary),
|
||||
endpoints(DBColumn::BeaconState),
|
||||
endpoints(DBColumn::BeaconStateSummary),
|
||||
] {
|
||||
|
||||
@@ -25,7 +25,7 @@ pub enum Error {
|
||||
NoContinuationData,
|
||||
SplitPointModified(Slot, Slot),
|
||||
ConfigError(StoreConfigError),
|
||||
SchemaMigrationError(String),
|
||||
MigrationError(String),
|
||||
/// The store's `anchor_info` was mutated concurrently, the latest modification wasn't applied.
|
||||
AnchorInfoConcurrentMutation,
|
||||
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
//! Garbage collection process that runs at start-up to clean up the database.
|
||||
use crate::database::interface::BeaconNodeBackend;
|
||||
use crate::hot_cold_store::HotColdDB;
|
||||
use crate::{DBColumn, Error};
|
||||
use tracing::debug;
|
||||
use types::EthSpec;
|
||||
|
||||
impl<E> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>
|
||||
where
|
||||
E: EthSpec,
|
||||
{
|
||||
/// Clean up the database by performing one-off maintenance at start-up.
|
||||
pub fn remove_garbage(&self) -> Result<(), Error> {
|
||||
self.delete_temp_states()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete the temporary states that were leftover by failed block imports.
|
||||
pub fn delete_temp_states(&self) -> Result<(), Error> {
|
||||
let mut ops = vec![];
|
||||
self.iter_temporary_state_roots().for_each(|state_root| {
|
||||
if let Ok(state_root) = state_root {
|
||||
ops.push(state_root);
|
||||
}
|
||||
});
|
||||
if !ops.is_empty() {
|
||||
debug!("Garbage collecting {} temporary states", ops.len());
|
||||
|
||||
self.delete_batch(DBColumn::BeaconState, ops.clone())?;
|
||||
self.delete_batch(DBColumn::BeaconStateSummary, ops.clone())?;
|
||||
self.delete_batch(DBColumn::BeaconStateTemporary, ops)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -21,8 +21,8 @@ static EMPTY_PUBKEY: LazyLock<PublicKeyBytes> = LazyLock::new(PublicKeyBytes::em
|
||||
pub enum Error {
|
||||
InvalidHierarchy,
|
||||
DiffDeletionsNotSupported,
|
||||
UnableToComputeDiff,
|
||||
UnableToApplyDiff,
|
||||
UnableToComputeDiff(xdelta3::Error),
|
||||
UnableToApplyDiff(xdelta3::Error),
|
||||
BalancesIncompleteChunk,
|
||||
Compression(std::io::Error),
|
||||
InvalidSszState(ssz::DecodeError),
|
||||
@@ -323,9 +323,15 @@ impl BytesDiff {
|
||||
}
|
||||
|
||||
pub fn compute_xdelta(source_bytes: &[u8], target_bytes: &[u8]) -> Result<Self, Error> {
|
||||
let bytes = xdelta3::encode(target_bytes, source_bytes)
|
||||
.ok_or(Error::UnableToComputeDiff)
|
||||
.unwrap();
|
||||
// TODO(hdiff): Use a smaller estimate for the output diff buffer size, currently the
|
||||
// xdelta3 lib will use 2x the size of the source plus the target length, which is 4x the
|
||||
// size of the hdiff buffer. In practice, diffs are almost always smaller than buffers (by a
|
||||
// signficiant factor), so this is 4-16x larger than necessary in a temporary allocation.
|
||||
//
|
||||
// We should use an estimated size that *should* be enough, and then dynamically increase it
|
||||
// if we hit an insufficient space error.
|
||||
let bytes =
|
||||
xdelta3::encode(target_bytes, source_bytes).map_err(Error::UnableToComputeDiff)?;
|
||||
Ok(Self { bytes })
|
||||
}
|
||||
|
||||
@@ -334,8 +340,31 @@ impl BytesDiff {
|
||||
}
|
||||
|
||||
pub fn apply_xdelta(&self, source: &[u8], target: &mut Vec<u8>) -> Result<(), Error> {
|
||||
*target = xdelta3::decode(&self.bytes, source).ok_or(Error::UnableToApplyDiff)?;
|
||||
Ok(())
|
||||
// TODO(hdiff): Dynamic buffer allocation. This is a stopgap until we implement a schema
|
||||
// change to store the output buffer size inside the `BytesDiff`.
|
||||
let mut output_length = ((source.len() + self.bytes.len()) * 3) / 2;
|
||||
let mut num_resizes = 0;
|
||||
loop {
|
||||
match xdelta3::decode_with_output_len(&self.bytes, source, output_length as u32) {
|
||||
Ok(result_buffer) => {
|
||||
*target = result_buffer;
|
||||
|
||||
metrics::observe(
|
||||
&metrics::BEACON_HDIFF_BUFFER_APPLY_RESIZES,
|
||||
num_resizes as f64,
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(xdelta3::Error::InsufficientOutputLength) => {
|
||||
// Double the output buffer length and try again.
|
||||
output_length *= 2;
|
||||
num_resizes += 1;
|
||||
}
|
||||
Err(err) => {
|
||||
return Err(Error::UnableToApplyDiff(err));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Byte size of this instance
|
||||
|
||||
@@ -15,8 +15,8 @@ use crate::metadata::{
|
||||
};
|
||||
use crate::state_cache::{PutStateOutcome, StateCache};
|
||||
use crate::{
|
||||
get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, ColumnKeyIter,
|
||||
DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
|
||||
get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, DBColumn,
|
||||
DatabaseBlock, Error, ItemStore, KeyValueStoreOp, StoreItem, StoreOp,
|
||||
};
|
||||
use itertools::{process_results, Itertools};
|
||||
use lru::LruCache;
|
||||
@@ -37,7 +37,7 @@ use std::num::NonZeroUsize;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, warn};
|
||||
use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList};
|
||||
use types::*;
|
||||
use zstd::{Decoder, Encoder};
|
||||
@@ -81,7 +81,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
||||
/// HTTP API.
|
||||
historic_state_cache: Mutex<HistoricStateCache<E>>,
|
||||
/// Chain spec.
|
||||
pub(crate) spec: Arc<ChainSpec>,
|
||||
pub spec: Arc<ChainSpec>,
|
||||
/// Mere vessel for E.
|
||||
_phantom: PhantomData<E>,
|
||||
}
|
||||
@@ -162,7 +162,7 @@ pub enum HotColdDBError {
|
||||
MissingRestorePoint(Hash256),
|
||||
MissingColdStateSummary(Hash256),
|
||||
MissingHotStateSummary(Hash256),
|
||||
MissingEpochBoundaryState(Hash256),
|
||||
MissingEpochBoundaryState(Hash256, Hash256),
|
||||
MissingPrevState(Hash256),
|
||||
MissingSplitState(Hash256, Slot),
|
||||
MissingStateDiff(Hash256),
|
||||
@@ -399,8 +399,11 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
|
||||
}
|
||||
db.store_config()?;
|
||||
|
||||
// Run a garbage collection pass.
|
||||
db.remove_garbage()?;
|
||||
// TODO(tree-states): Here we can choose to prune advanced states to reclaim disk space. As
|
||||
// it's a foreground task there's no risk of race condition that can corrupt the DB.
|
||||
// Advanced states for invalid blocks that were never written to the DB, or descendants of
|
||||
// heads can be safely pruned at the expense of potentially having to recompute them in the
|
||||
// future. However this would require a new dedicated pruning routine.
|
||||
|
||||
// If configured, run a foreground compaction pass.
|
||||
if db.config.compact_on_init {
|
||||
@@ -411,12 +414,6 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
|
||||
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
/// Return an iterator over the state roots of all temporary states.
|
||||
pub fn iter_temporary_state_roots(&self) -> ColumnKeyIter<Hash256> {
|
||||
self.hot_db
|
||||
.iter_column_keys::<Hash256>(DBColumn::BeaconStateTemporary)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
|
||||
@@ -912,26 +909,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
/// Store a state in the store.
|
||||
pub fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error> {
|
||||
self.put_state_possibly_temporary(state_root, state, false)
|
||||
}
|
||||
|
||||
/// Store a state in the store.
|
||||
///
|
||||
/// The `temporary` flag indicates whether this state should be considered canonical.
|
||||
pub fn put_state_possibly_temporary(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
state: &BeaconState<E>,
|
||||
temporary: bool,
|
||||
) -> Result<(), Error> {
|
||||
let mut ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
if state.slot() < self.get_split_slot() {
|
||||
self.store_cold_state(state_root, state, &mut ops)?;
|
||||
self.cold_db.do_atomically(ops)
|
||||
} else {
|
||||
if temporary {
|
||||
ops.push(TemporaryFlag.as_kv_store_op(*state_root));
|
||||
}
|
||||
self.store_hot_state(state_root, state, &mut ops)?;
|
||||
self.hot_db.do_atomically(ops)
|
||||
}
|
||||
@@ -1147,6 +1129,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.load_hot_state(&epoch_boundary_state_root, true)?
|
||||
.ok_or(HotColdDBError::MissingEpochBoundaryState(
|
||||
epoch_boundary_state_root,
|
||||
*state_root,
|
||||
))?;
|
||||
Ok(Some(state))
|
||||
} else {
|
||||
@@ -1210,17 +1193,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
key_value_batch.push(summary.as_kv_store_op(state_root));
|
||||
}
|
||||
|
||||
StoreOp::PutStateTemporaryFlag(state_root) => {
|
||||
key_value_batch.push(TemporaryFlag.as_kv_store_op(state_root));
|
||||
}
|
||||
|
||||
StoreOp::DeleteStateTemporaryFlag(state_root) => {
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(
|
||||
TemporaryFlag::db_column(),
|
||||
state_root.as_slice().to_vec(),
|
||||
));
|
||||
}
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(
|
||||
DBColumn::BeaconBlock,
|
||||
@@ -1250,13 +1222,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
state_root.as_slice().to_vec(),
|
||||
));
|
||||
|
||||
// Delete the state temporary flag (if any). Temporary flags are commonly
|
||||
// created by the state advance routine.
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(
|
||||
DBColumn::BeaconStateTemporary,
|
||||
state_root.as_slice().to_vec(),
|
||||
));
|
||||
|
||||
if slot.is_none_or(|slot| slot % E::slots_per_epoch() == 0) {
|
||||
key_value_batch.push(KeyValueStoreOp::DeleteKey(
|
||||
DBColumn::BeaconState,
|
||||
@@ -1417,10 +1382,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
StoreOp::PutStateSummary(_, _) => (),
|
||||
|
||||
StoreOp::PutStateTemporaryFlag(_) => (),
|
||||
|
||||
StoreOp::DeleteStateTemporaryFlag(_) => (),
|
||||
|
||||
StoreOp::DeleteBlock(block_root) => {
|
||||
guard.delete_block(&block_root);
|
||||
self.state_cache.lock().delete_block_states(&block_root);
|
||||
@@ -1501,8 +1462,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
// On the epoch boundary, store the full state.
|
||||
if state.slot() % E::slots_per_epoch() == 0 {
|
||||
trace!(
|
||||
slot = %state.slot().as_u64(),
|
||||
debug!(
|
||||
slot = %state.slot(),
|
||||
?state_root,
|
||||
"Storing full state on epoch boundary"
|
||||
);
|
||||
@@ -1580,12 +1541,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
|
||||
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
|
||||
|
||||
// If the state is marked as temporary, do not return it. It will become visible
|
||||
// only once its transaction commits and deletes its temporary flag.
|
||||
if self.load_state_temporary_flag(state_root)?.is_some() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(HotStateSummary {
|
||||
slot,
|
||||
latest_block_root,
|
||||
@@ -1594,7 +1549,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
{
|
||||
let mut boundary_state =
|
||||
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
|
||||
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
|
||||
HotColdDBError::MissingEpochBoundaryState(
|
||||
epoch_boundary_state_root,
|
||||
*state_root,
|
||||
),
|
||||
)?;
|
||||
|
||||
// Immediately rebase the state from disk on the finalized state so that we can reuse
|
||||
@@ -2578,15 +2536,16 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
self.hot_db.get(state_root)
|
||||
}
|
||||
|
||||
/// Load the temporary flag for a state root, if one exists.
|
||||
///
|
||||
/// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not
|
||||
/// exist -- you should call `load_hot_state_summary` to find out which.
|
||||
pub fn load_state_temporary_flag(
|
||||
&self,
|
||||
state_root: &Hash256,
|
||||
) -> Result<Option<TemporaryFlag>, Error> {
|
||||
self.hot_db.get(state_root)
|
||||
/// Load all hot state summaries present in the hot DB
|
||||
pub fn load_hot_state_summaries(&self) -> Result<Vec<(Hash256, HotStateSummary)>, Error> {
|
||||
self.hot_db
|
||||
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
|
||||
.map(|res| {
|
||||
let (state_root, value) = res?;
|
||||
let summary = HotStateSummary::from_ssz_bytes(&value)?;
|
||||
Ok((state_root, summary))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Run a compaction pass to free up space used by deleted states.
|
||||
@@ -3018,54 +2977,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prune states from the hot database which are prior to the split.
|
||||
///
|
||||
/// This routine is important for cleaning up advanced states which are stored in the database
|
||||
/// with a temporary flag.
|
||||
pub fn prune_old_hot_states(&self) -> Result<(), Error> {
|
||||
let split = self.get_split_info();
|
||||
debug!(
|
||||
%split.slot,
|
||||
"Database state pruning started"
|
||||
);
|
||||
let mut state_delete_batch = vec![];
|
||||
for res in self
|
||||
.hot_db
|
||||
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
|
||||
{
|
||||
let (state_root, summary_bytes) = res?;
|
||||
let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?;
|
||||
|
||||
if summary.slot <= split.slot {
|
||||
let old = summary.slot < split.slot;
|
||||
let non_canonical = summary.slot == split.slot
|
||||
&& state_root != split.state_root
|
||||
&& !split.state_root.is_zero();
|
||||
if old || non_canonical {
|
||||
let reason = if old {
|
||||
"old dangling state"
|
||||
} else {
|
||||
"non-canonical"
|
||||
};
|
||||
debug!(
|
||||
?state_root,
|
||||
slot = %summary.slot,
|
||||
%reason,
|
||||
"Deleting state"
|
||||
);
|
||||
state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot)));
|
||||
}
|
||||
}
|
||||
}
|
||||
let num_deleted_states = state_delete_batch.len();
|
||||
self.do_atomically_with_block_and_blobs_cache(state_delete_batch)?;
|
||||
debug!(%num_deleted_states, "Database state pruning complete");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
/// Advance the split point of the store, copying new finalized states to the freezer.
|
||||
///
|
||||
/// This function previously did a combination of freezer migration alongside pruning. Now it is
|
||||
/// *just* responsible for copying relevant data to the freezer, while pruning is implemented
|
||||
/// in `prune_hot_db`.
|
||||
pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
store: Arc<HotColdDB<E, Hot, Cold>>,
|
||||
finalized_state_root: Hash256,
|
||||
@@ -3097,29 +3015,17 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
|
||||
}
|
||||
|
||||
let mut hot_db_ops = vec![];
|
||||
let mut cold_db_block_ops = vec![];
|
||||
let mut epoch_boundary_blocks = HashSet::new();
|
||||
let mut non_checkpoint_block_roots = HashSet::new();
|
||||
|
||||
// Iterate in descending order until the current split slot
|
||||
let state_roots = RootsIterator::new(&store, finalized_state)
|
||||
.take_while(|result| match result {
|
||||
Ok((_, _, slot)) => *slot >= current_split_slot,
|
||||
Err(_) => true,
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
let state_roots: Vec<_> =
|
||||
process_results(RootsIterator::new(&store, finalized_state), |iter| {
|
||||
iter.take_while(|(_, _, slot)| *slot >= current_split_slot)
|
||||
.collect()
|
||||
})?;
|
||||
|
||||
// Then, iterate states in slot ascending order, as they are stored wrt previous states.
|
||||
for (block_root, state_root, slot) in state_roots.into_iter().rev() {
|
||||
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
|
||||
// delete the payload for the finalized block itself, but that's OK as we only guarantee
|
||||
// that payloads are present for slots >= the split slot. The payload fetching code is also
|
||||
// forgiving of missing payloads.
|
||||
if store.config.prune_payloads {
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Store the slot to block root mapping.
|
||||
cold_db_block_ops.push(KeyValueStoreOp::PutKeyValue(
|
||||
DBColumn::BeaconBlockRoots,
|
||||
@@ -3127,44 +3033,27 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
block_root.as_slice().to_vec(),
|
||||
));
|
||||
|
||||
// At a missed slot, `state_root_iter` will return the block root
|
||||
// from the previous non-missed slot. This ensures that the block root at an
|
||||
// epoch boundary is always a checkpoint block root. We keep track of block roots
|
||||
// at epoch boundaries by storing them in the `epoch_boundary_blocks` hash set.
|
||||
// We then ensure that block roots at the epoch boundary aren't included in the
|
||||
// `non_checkpoint_block_roots` hash set.
|
||||
if slot % E::slots_per_epoch() == 0 {
|
||||
epoch_boundary_blocks.insert(block_root);
|
||||
} else {
|
||||
non_checkpoint_block_roots.insert(block_root);
|
||||
}
|
||||
|
||||
if epoch_boundary_blocks.contains(&block_root) {
|
||||
non_checkpoint_block_roots.remove(&block_root);
|
||||
}
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
|
||||
|
||||
// Do not try to store states if a restore point is yet to be stored, or will never be
|
||||
// stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state
|
||||
// which always needs to be copied from the hot DB to the freezer and should not be deleted.
|
||||
if slot != 0 && slot < anchor_info.state_upper_limit {
|
||||
debug!(%slot, "Pruning finalized state");
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut cold_db_ops = vec![];
|
||||
let mut cold_db_state_ops = vec![];
|
||||
|
||||
// Only store the cold state if it's on a diff boundary.
|
||||
// Calling `store_cold_state_summary` instead of `store_cold_state` for those allows us
|
||||
// to skip loading many hot states.
|
||||
if matches!(
|
||||
store.hierarchy.storage_strategy(slot)?,
|
||||
StorageStrategy::ReplayFrom(..)
|
||||
) {
|
||||
if let StorageStrategy::ReplayFrom(from) = store.hierarchy.storage_strategy(slot)? {
|
||||
// Store slot -> state_root and state_root -> slot mappings.
|
||||
store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?;
|
||||
debug!(
|
||||
strategy = "replay",
|
||||
from_slot = %from,
|
||||
%slot,
|
||||
"Storing cold state"
|
||||
);
|
||||
store.store_cold_state_summary(&state_root, slot, &mut cold_db_state_ops)?;
|
||||
} else {
|
||||
// This is some state that we want to migrate to the freezer db.
|
||||
// There is no reason to cache this state.
|
||||
@@ -3172,36 +3061,22 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
.get_hot_state(&state_root, false)?
|
||||
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
|
||||
|
||||
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
|
||||
store.store_cold_state(&state_root, &state, &mut cold_db_state_ops)?;
|
||||
}
|
||||
|
||||
// Cold states are diffed with respect to each other, so we need to finish writing previous
|
||||
// states before storing new ones.
|
||||
store.cold_db.do_atomically(cold_db_ops)?;
|
||||
store.cold_db.do_atomically(cold_db_state_ops)?;
|
||||
}
|
||||
|
||||
// Prune sync committee branch data for all non checkpoint block roots.
|
||||
// Note that `non_checkpoint_block_roots` should only contain non checkpoint block roots
|
||||
// as long as `finalized_state.slot()` is at an epoch boundary. If this were not the case
|
||||
// we risk the chance of pruning a `sync_committee_branch` for a checkpoint block root.
|
||||
// E.g. if `current_split_slot` = (Epoch A slot 0) and `finalized_state.slot()` = (Epoch C slot 31)
|
||||
// and (Epoch D slot 0) is a skipped slot, we will have pruned a `sync_committee_branch`
|
||||
// for a checkpoint block root.
|
||||
non_checkpoint_block_roots
|
||||
.into_iter()
|
||||
.for_each(|block_root| {
|
||||
hot_db_ops.push(StoreOp::DeleteSyncCommitteeBranch(block_root));
|
||||
});
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
// inconsistent state if the OS process dies at any point during the freezing
|
||||
// procedure.
|
||||
//
|
||||
// Since it is pretty much impossible to be atomic across more than one database, we trade
|
||||
// losing track of states to delete, for consistency. In other words: We should be safe to die
|
||||
// at any point below but it may happen that some states won't be deleted from the hot database
|
||||
// and will remain there forever. Since dying in these particular few lines should be an
|
||||
// exceedingly rare event, this should be an acceptable tradeoff.
|
||||
// potentially re-doing the migration to copy data to the freezer, for consistency. If we crash
|
||||
// after writing all new block & state data to the freezer but before updating the split, then
|
||||
// in the worst case we will restart with the old split and re-run the migration.
|
||||
store.cold_db.do_atomically(cold_db_block_ops)?;
|
||||
store.cold_db.sync()?;
|
||||
{
|
||||
@@ -3214,7 +3089,7 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
error!(
|
||||
previous_split_slot = %current_split_slot,
|
||||
current_split_slot = %latest_split_slot,
|
||||
"Race condition detected: Split point changed while moving states to the freezer"
|
||||
"Race condition detected: Split point changed while copying states to the freezer"
|
||||
);
|
||||
|
||||
// Assume the freezing procedure will be retried in case this happens.
|
||||
@@ -3239,9 +3114,6 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
*split_guard = split;
|
||||
}
|
||||
|
||||
// Delete the blocks and states from the hot database if we got this far.
|
||||
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
||||
|
||||
// Update the cache's view of the finalized state.
|
||||
store.update_finalized_state(
|
||||
finalized_state_root,
|
||||
@@ -3358,23 +3230,6 @@ impl StoreItem for ColdStateSummary {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct TemporaryFlag;
|
||||
|
||||
impl StoreItem for TemporaryFlag {
|
||||
fn db_column() -> DBColumn {
|
||||
DBColumn::BeaconStateTemporary
|
||||
}
|
||||
|
||||
fn as_store_bytes(&self) -> Vec<u8> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn from_store_bytes(_: &[u8]) -> Result<Self, Error> {
|
||||
Ok(TemporaryFlag)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct BytesKey {
|
||||
pub key: Vec<u8>,
|
||||
|
||||
@@ -14,7 +14,6 @@ pub mod config;
|
||||
pub mod consensus_context;
|
||||
pub mod errors;
|
||||
mod forwards_iter;
|
||||
mod garbage_collection;
|
||||
pub mod hdiff;
|
||||
pub mod historic_state_cache;
|
||||
pub mod hot_cold_store;
|
||||
@@ -241,8 +240,6 @@ pub enum StoreOp<'a, E: EthSpec> {
|
||||
PutBlobs(Hash256, BlobSidecarList<E>),
|
||||
PutDataColumns(Hash256, DataColumnSidecarList<E>),
|
||||
PutStateSummary(Hash256, HotStateSummary),
|
||||
PutStateTemporaryFlag(Hash256),
|
||||
DeleteStateTemporaryFlag(Hash256),
|
||||
DeleteBlock(Hash256),
|
||||
DeleteBlobs(Hash256),
|
||||
DeleteDataColumns(Hash256, Vec<ColumnIndex>),
|
||||
@@ -287,8 +284,10 @@ pub enum DBColumn {
|
||||
/// Mapping from state root to `ColdStateSummary` in the cold DB.
|
||||
#[strum(serialize = "bcs")]
|
||||
BeaconColdStateSummary,
|
||||
/// For the list of temporary states stored during block import,
|
||||
/// and then made non-temporary by the deletion of their state root from this column.
|
||||
/// DEPRECATED.
|
||||
///
|
||||
/// Previously used for the list of temporary states stored during block import, and then made
|
||||
/// non-temporary by the deletion of their state root from this column.
|
||||
#[strum(serialize = "bst")]
|
||||
BeaconStateTemporary,
|
||||
/// Execution payloads for blocks more recent than the finalized checkpoint.
|
||||
|
||||
@@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
|
||||
use ssz_derive::{Decode, Encode};
|
||||
use types::{typenum::U4096, CGCUpdates, ChainSpec, Checkpoint, Hash256, Slot, VariableList};
|
||||
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(22);
|
||||
pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(23);
|
||||
|
||||
// All the keys that get stored under the `BeaconMeta` column.
|
||||
//
|
||||
|
||||
@@ -202,6 +202,13 @@ pub static BEACON_HDIFF_BUFFER_CLONE_TIMES: LazyLock<Result<Histogram>> = LazyLo
|
||||
"Time required to clone hierarchical diff buffer bytes",
|
||||
)
|
||||
});
|
||||
pub static BEACON_HDIFF_BUFFER_APPLY_RESIZES: LazyLock<Result<Histogram>> = LazyLock::new(|| {
|
||||
try_create_histogram_with_buckets(
|
||||
"store_hdiff_buffer_apply_resizes",
|
||||
"Number of times during diff application that the output buffer had to be resized before decoding succeeded",
|
||||
Ok(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0])
|
||||
)
|
||||
});
|
||||
/*
|
||||
* Beacon Block
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user