mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-08 01:05:47 +00:00
Restore crash safety for database pruning (#4975)
* Add some DB sanity checks * Restore crash safety for database pruning
This commit is contained in:
@@ -34,7 +34,7 @@ use std::time::Duration;
|
|||||||
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
|
||||||
use task_executor::{ShutdownReason, TaskExecutor};
|
use task_executor::{ShutdownReason, TaskExecutor};
|
||||||
use types::{
|
use types::{
|
||||||
BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, Signature,
|
BeaconBlock, BeaconState, ChainSpec, Epoch, EthSpec, Graffiti, Hash256, Signature,
|
||||||
SignedBeaconBlock, Slot,
|
SignedBeaconBlock, Slot,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -559,16 +559,6 @@ where
|
|||||||
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
|
.map_err(|e| format!("Failed to initialize blob info: {:?}", e))?,
|
||||||
);
|
);
|
||||||
|
|
||||||
// Store pruning checkpoint to prevent attempting to prune before the anchor state.
|
|
||||||
self.pending_io_batch.push(
|
|
||||||
store
|
|
||||||
.pruning_checkpoint_store_op(Checkpoint {
|
|
||||||
root: weak_subj_block_root,
|
|
||||||
epoch: weak_subj_state.slot().epoch(TEthSpec::slots_per_epoch()),
|
|
||||||
})
|
|
||||||
.map_err(|e| format!("{:?}", e))?,
|
|
||||||
);
|
|
||||||
|
|
||||||
let snapshot = BeaconSnapshot {
|
let snapshot = BeaconSnapshot {
|
||||||
beacon_block_root: weak_subj_block_root,
|
beacon_block_root: weak_subj_block_root,
|
||||||
beacon_block: Arc::new(weak_subj_block),
|
beacon_block: Arc::new(weak_subj_block),
|
||||||
|
|||||||
@@ -512,13 +512,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
genesis_block_root: Hash256,
|
genesis_block_root: Hash256,
|
||||||
log: &Logger,
|
log: &Logger,
|
||||||
) -> Result<PruningOutcome, BeaconChainError> {
|
) -> Result<PruningOutcome, BeaconChainError> {
|
||||||
let old_finalized_checkpoint =
|
let old_finalized_checkpoint = store.get_pruning_checkpoint();
|
||||||
store
|
|
||||||
.load_pruning_checkpoint()?
|
|
||||||
.unwrap_or_else(|| Checkpoint {
|
|
||||||
epoch: Epoch::new(0),
|
|
||||||
root: Hash256::zero(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let old_finalized_slot = old_finalized_checkpoint
|
let old_finalized_slot = old_finalized_checkpoint
|
||||||
.epoch
|
.epoch
|
||||||
@@ -572,6 +566,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
})
|
})
|
||||||
.collect::<Result<_, _>>()?;
|
.collect::<Result<_, _>>()?;
|
||||||
|
|
||||||
|
// Quick sanity check. If the canonical block & state roots are incorrect then we could
|
||||||
|
// incorrectly delete canonical states, which would corrupt the database.
|
||||||
|
let expected_canonical_block_roots = new_finalized_slot
|
||||||
|
.saturating_sub(old_finalized_slot)
|
||||||
|
.as_usize()
|
||||||
|
.saturating_add(1);
|
||||||
|
if newly_finalized_chain.len() != expected_canonical_block_roots {
|
||||||
|
return Err(BeaconChainError::DBInconsistent(format!(
|
||||||
|
"canonical chain iterator is corrupt; \
|
||||||
|
expected {} but got {} block roots",
|
||||||
|
expected_canonical_block_roots,
|
||||||
|
newly_finalized_chain.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
|
// We don't know which blocks are shared among abandoned chains, so we buffer and delete
|
||||||
// everything in one fell swoop.
|
// everything in one fell swoop.
|
||||||
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
|
let mut abandoned_blocks: HashSet<SignedBeaconBlockHash> = HashSet::new();
|
||||||
@@ -735,11 +744,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY)?,
|
persisted_head.as_kv_store_op(BEACON_CHAIN_DB_KEY)?,
|
||||||
));
|
));
|
||||||
|
|
||||||
// Persist the new finalized checkpoint as the pruning checkpoint.
|
|
||||||
batch.push(StoreOp::KeyValueOp(
|
|
||||||
store.pruning_checkpoint_store_op(new_finalized_checkpoint)?,
|
|
||||||
));
|
|
||||||
|
|
||||||
store.do_atomically_with_block_and_blobs_cache(batch)?;
|
store.do_atomically_with_block_and_blobs_cache(batch)?;
|
||||||
debug!(
|
debug!(
|
||||||
log,
|
log,
|
||||||
@@ -753,19 +757,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
|
|||||||
let (state_root, summary) = res?;
|
let (state_root, summary) = res?;
|
||||||
|
|
||||||
if summary.slot <= new_finalized_slot {
|
if summary.slot <= new_finalized_slot {
|
||||||
// If state root doesn't match state root from canonical chain, or this slot
|
// If state root doesn't match state root from canonical chain, then delete.
|
||||||
// is not part of the recently finalized chain, then delete.
|
// We may also find older states here that should have been deleted by `migrate_db`
|
||||||
|
// but weren't due to wonky I/O atomicity.
|
||||||
if newly_finalized_chain
|
if newly_finalized_chain
|
||||||
.get(&summary.slot)
|
.get(&summary.slot)
|
||||||
.map_or(true, |(_, canonical_state_root)| {
|
.map_or(true, |(_, canonical_state_root)| {
|
||||||
state_root != Hash256::from(*canonical_state_root)
|
state_root != Hash256::from(*canonical_state_root)
|
||||||
})
|
})
|
||||||
{
|
{
|
||||||
|
let reason = if summary.slot < old_finalized_slot {
|
||||||
|
"old dangling state"
|
||||||
|
} else {
|
||||||
|
"non-canonical"
|
||||||
|
};
|
||||||
debug!(
|
debug!(
|
||||||
log,
|
log,
|
||||||
"Deleting state";
|
"Deleting state";
|
||||||
"state_root" => ?state_root,
|
"state_root" => ?state_root,
|
||||||
"slot" => summary.slot,
|
"slot" => summary.slot,
|
||||||
|
"reason" => reason,
|
||||||
);
|
);
|
||||||
state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot)));
|
state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot)));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,9 +10,9 @@ use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
|
|||||||
use crate::leveldb_store::{BytesKey, LevelDB};
|
use crate::leveldb_store::{BytesKey, LevelDB};
|
||||||
use crate::memory_store::MemoryStore;
|
use crate::memory_store::MemoryStore;
|
||||||
use crate::metadata::{
|
use crate::metadata::{
|
||||||
AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY,
|
AnchorInfo, BlobInfo, CompactionTimestamp, SchemaVersion, ANCHOR_INFO_KEY, BLOB_INFO_KEY,
|
||||||
BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION,
|
COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, SCHEMA_VERSION_KEY, SPLIT_KEY,
|
||||||
PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN,
|
STATE_UPPER_LIMIT_NO_RETAIN,
|
||||||
};
|
};
|
||||||
use crate::metrics;
|
use crate::metrics;
|
||||||
use crate::state_cache::{PutStateOutcome, StateCache};
|
use crate::state_cache::{PutStateOutcome, StateCache};
|
||||||
@@ -77,6 +77,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
|
|||||||
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
|
/// LRU cache of deserialized blocks and blobs. Updated whenever a block or blob is loaded.
|
||||||
block_cache: Mutex<BlockCache<E>>,
|
block_cache: Mutex<BlockCache<E>>,
|
||||||
/// Cache of beacon states.
|
/// Cache of beacon states.
|
||||||
|
///
|
||||||
|
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
|
||||||
state_cache: Mutex<StateCache<E>>,
|
state_cache: Mutex<StateCache<E>>,
|
||||||
/// Immutable validator cache.
|
/// Immutable validator cache.
|
||||||
pub immutable_validators: Arc<RwLock<ValidatorPubkeyCache<E, Hot, Cold>>>,
|
pub immutable_validators: Arc<RwLock<ValidatorPubkeyCache<E, Hot, Cold>>>,
|
||||||
@@ -2385,26 +2387,17 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
|||||||
self.config.compact_on_prune
|
self.config.compact_on_prune
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load the checkpoint to begin pruning from (the "old finalized checkpoint").
|
/// Get the checkpoint to begin pruning from (the "old finalized checkpoint").
|
||||||
pub fn load_pruning_checkpoint(&self) -> Result<Option<Checkpoint>, Error> {
|
pub fn get_pruning_checkpoint(&self) -> Checkpoint {
|
||||||
Ok(self
|
// Since tree-states we infer the pruning checkpoint from the split, as this is simpler &
|
||||||
.hot_db
|
// safer in the presence of crashes that occur after pruning but before the split is
|
||||||
.get(&PRUNING_CHECKPOINT_KEY)?
|
// updated.
|
||||||
.map(|pc: PruningCheckpoint| pc.checkpoint))
|
// FIXME(sproul): ensure delete PRUNING_CHECKPOINT_KEY is deleted in DB migration
|
||||||
}
|
let split = self.get_split_info();
|
||||||
|
Checkpoint {
|
||||||
/// Store the checkpoint to begin pruning from (the "old finalized checkpoint").
|
epoch: split.slot.epoch(E::slots_per_epoch()),
|
||||||
pub fn store_pruning_checkpoint(&self, checkpoint: Checkpoint) -> Result<(), Error> {
|
root: split.block_root,
|
||||||
self.hot_db
|
}
|
||||||
.do_atomically(vec![self.pruning_checkpoint_store_op(checkpoint)?])
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a staged store for the pruning checkpoint.
|
|
||||||
pub fn pruning_checkpoint_store_op(
|
|
||||||
&self,
|
|
||||||
checkpoint: Checkpoint,
|
|
||||||
) -> Result<KeyValueStoreOp, Error> {
|
|
||||||
PruningCheckpoint { checkpoint }.as_kv_store_op(PRUNING_CHECKPOINT_KEY)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch.
|
/// Load the timestamp of the last compaction as a `Duration` since the UNIX epoch.
|
||||||
@@ -2917,8 +2910,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
|||||||
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
|
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// There are data dependencies between calls to `store_cold_state()` that prevent us from
|
// Cold states are diffed with respect to each other, so we need to finish writing previous
|
||||||
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
|
// states before storing new ones.
|
||||||
store.cold_db.do_atomically(cold_db_ops)?;
|
store.cold_db.do_atomically(cold_db_ops)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2927,15 +2920,20 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
|||||||
// procedure.
|
// procedure.
|
||||||
//
|
//
|
||||||
// Since it is pretty much impossible to be atomic across more than one database, we trade
|
// Since it is pretty much impossible to be atomic across more than one database, we trade
|
||||||
// losing track of states to delete, for consistency. In other words: We should be safe to die
|
// temporarily losing track of blocks to delete, for consistency. In other words: We should be
|
||||||
// at any point below but it may happen that some states won't be deleted from the hot database
|
// safe to die at any point below but it may happen that some blocks won't be deleted from the
|
||||||
// and will remain there forever. Since dying in these particular few lines should be an
|
// hot database and will remain there forever. We may also temporarily abandon states, but
|
||||||
// exceedingly rare event, this should be an acceptable tradeoff.
|
// they will get picked up by the state pruning that iterates over the whole column.
|
||||||
|
|
||||||
// Flush to disk all the states that have just been migrated to the cold store.
|
// Flush to disk all the states that have just been migrated to the cold store.
|
||||||
store.cold_db.do_atomically(cold_db_block_ops)?;
|
store.cold_db.do_atomically(cold_db_block_ops)?;
|
||||||
store.cold_db.sync()?;
|
store.cold_db.sync()?;
|
||||||
|
|
||||||
|
// Update the split.
|
||||||
|
//
|
||||||
|
// NOTE(sproul): We do this in its own fsync'd transaction mostly for historical reasons, but
|
||||||
|
// I'm scared to change it, because doing an fsync with *more data* while holding the split
|
||||||
|
// write lock might have terrible performance implications (jamming the split for 100-500ms+).
|
||||||
{
|
{
|
||||||
let mut split_guard = store.split.write();
|
let mut split_guard = store.split.write();
|
||||||
let latest_split_slot = split_guard.slot;
|
let latest_split_slot = split_guard.slot;
|
||||||
@@ -2966,13 +2964,13 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
|||||||
};
|
};
|
||||||
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
|
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
|
||||||
|
|
||||||
// Split point is now persisted in the hot database on disk. The in-memory split point
|
// Split point is now persisted in the hot database on disk. The in-memory split point
|
||||||
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
|
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
|
||||||
// the in-memory split point now.
|
// the in-memory split point now.
|
||||||
*split_guard = split;
|
*split_guard = split;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete the states from the hot database if we got this far.
|
// Delete the blocks and states from the hot database if we got this far.
|
||||||
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
store.do_atomically_with_block_and_blobs_cache(hot_db_ops)?;
|
||||||
|
|
||||||
// Update the cache's view of the finalized state.
|
// Update the cache's view of the finalized state.
|
||||||
|
|||||||
@@ -294,32 +294,17 @@ fn parse_inspect_config(cli_args: &ArgMatches) -> Result<InspectConfig, String>
|
|||||||
pub fn inspect_db<E: EthSpec>(
|
pub fn inspect_db<E: EthSpec>(
|
||||||
inspect_config: InspectConfig,
|
inspect_config: InspectConfig,
|
||||||
client_config: ClientConfig,
|
client_config: ClientConfig,
|
||||||
runtime_context: &RuntimeContext<E>,
|
|
||||||
log: Logger,
|
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
let spec = runtime_context.eth2_config.spec.clone();
|
|
||||||
let hot_path = client_config.get_db_path();
|
let hot_path = client_config.get_db_path();
|
||||||
let cold_path = client_config.get_freezer_db_path();
|
let cold_path = client_config.get_freezer_db_path();
|
||||||
let blobs_path = client_config.get_blobs_db_path();
|
|
||||||
|
|
||||||
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
|
|
||||||
&hot_path,
|
|
||||||
&cold_path,
|
|
||||||
&blobs_path,
|
|
||||||
|_, _, _| Ok(()),
|
|
||||||
client_config.store,
|
|
||||||
spec,
|
|
||||||
log,
|
|
||||||
)
|
|
||||||
.map_err(|e| format!("{:?}", e))?;
|
|
||||||
|
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
let mut num_keys = 0;
|
let mut num_keys = 0;
|
||||||
|
|
||||||
let sub_db = if inspect_config.freezer {
|
let sub_db = if inspect_config.freezer {
|
||||||
&db.cold_db
|
LevelDB::<E>::open(&cold_path).map_err(|e| format!("Unable to open freezer DB: {e:?}"))?
|
||||||
} else {
|
} else {
|
||||||
&db.hot_db
|
LevelDB::<E>::open(&hot_path).map_err(|e| format!("Unable to open hot DB: {e:?}"))?
|
||||||
};
|
};
|
||||||
|
|
||||||
let skip = inspect_config.skip.unwrap_or(0);
|
let skip = inspect_config.skip.unwrap_or(0);
|
||||||
@@ -653,7 +638,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, env: Environment<T>) -> Result
|
|||||||
}
|
}
|
||||||
("inspect", Some(cli_args)) => {
|
("inspect", Some(cli_args)) => {
|
||||||
let inspect_config = parse_inspect_config(cli_args)?;
|
let inspect_config = parse_inspect_config(cli_args)?;
|
||||||
inspect_db(inspect_config, client_config, &context, log)
|
inspect_db::<T>(inspect_config, client_config)
|
||||||
}
|
}
|
||||||
("prune-payloads", Some(_)) => {
|
("prune-payloads", Some(_)) => {
|
||||||
prune_payloads(client_config, &context, log).map_err(format_err)
|
prune_payloads(client_config, &context, log).map_err(format_err)
|
||||||
|
|||||||
Reference in New Issue
Block a user