mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-09 11:41:51 +00:00
Squash merge of 3565
Squashed commit of the following: commita4960ebfd7Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Sep 12 12:10:23 2022 +1000 Clippy commitb28e8d0848Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Sep 12 11:41:45 2022 +1000 Add flag to disable prune on startup commitde775d6aa5Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Sep 12 11:19:21 2022 +1000 Fix and update beacon chain tests commit2289b20bcaAuthor: Michael Sproul <michael@sigmaprime.io> Date: Fri Sep 9 17:40:21 2022 +1000 Implement DB manager command commitd5adc2ebc5Author: Michael Sproul <michael@sigmaprime.io> Date: Fri Sep 9 12:56:27 2022 +1000 Implement on-demand pruning operation commit69d54741c1Author: Michael Sproul <michael@sigmaprime.io> Date: Thu Sep 8 16:25:04 2022 +1000 Delete finalized exec payloads while running
This commit is contained in:
@@ -266,6 +266,13 @@ where
|
||||
|
||||
self.genesis_time = Some(genesis_state.genesis_time());
|
||||
|
||||
// Prune finalized execution payloads.
|
||||
if store.get_config().prune_payloads_on_init {
|
||||
store
|
||||
.try_prune_execution_payloads(false)
|
||||
.map_err(|e| format!("Error pruning execution payloads: {e:?}"))?;
|
||||
}
|
||||
|
||||
self.op_pool = Some(
|
||||
store
|
||||
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
|
||||
|
||||
@@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec<BeaconSnapshot<E>> {
|
||||
)
|
||||
.await;
|
||||
|
||||
harness
|
||||
let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH);
|
||||
for snapshot in harness
|
||||
.chain
|
||||
.chain_dump()
|
||||
.expect("should dump chain")
|
||||
.into_iter()
|
||||
.map(|snapshot| {
|
||||
let full_block = harness
|
||||
.chain
|
||||
.store
|
||||
.make_full_block(
|
||||
&snapshot.beacon_block_root,
|
||||
snapshot.beacon_block.as_ref().clone(),
|
||||
)
|
||||
.unwrap();
|
||||
BeaconSnapshot {
|
||||
beacon_block_root: snapshot.beacon_block_root,
|
||||
beacon_block: Arc::new(full_block),
|
||||
beacon_state: snapshot.beacon_state,
|
||||
}
|
||||
})
|
||||
.skip(1)
|
||||
.collect()
|
||||
{
|
||||
let full_block = harness
|
||||
.chain
|
||||
.get_block(&snapshot.beacon_block_root)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
segment.push(BeaconSnapshot {
|
||||
beacon_block_root: snapshot.beacon_block_root,
|
||||
beacon_block: Arc::new(full_block),
|
||||
beacon_state: snapshot.beacon_state,
|
||||
});
|
||||
}
|
||||
segment
|
||||
}
|
||||
|
||||
fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> {
|
||||
|
||||
@@ -2114,14 +2114,16 @@ async fn weak_subjectivity_sync() {
|
||||
assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1);
|
||||
|
||||
for snapshot in new_blocks {
|
||||
let block = &snapshot.beacon_block;
|
||||
let full_block = harness
|
||||
.chain
|
||||
.store
|
||||
.make_full_block(&snapshot.beacon_block_root, block.as_ref().clone())
|
||||
.get_block(&snapshot.beacon_block_root)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
let slot = full_block.slot();
|
||||
let state_root = full_block.state_root();
|
||||
|
||||
beacon_chain.slot_clock.set_slot(block.slot().as_u64());
|
||||
beacon_chain.slot_clock.set_slot(slot.as_u64());
|
||||
beacon_chain
|
||||
.process_block(Arc::new(full_block), CountUnrealized::True)
|
||||
.await
|
||||
@@ -2129,10 +2131,9 @@ async fn weak_subjectivity_sync() {
|
||||
beacon_chain.recompute_head_at_current_slot().await;
|
||||
|
||||
// Check that the new block's state can be loaded correctly.
|
||||
let state_root = block.state_root();
|
||||
let mut state = beacon_chain
|
||||
.store
|
||||
.get_state(&state_root, Some(block.slot()))
|
||||
.get_state(&state_root, Some(slot))
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
|
||||
@@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc<HotColdDB<E, LevelDB<E>, L
|
||||
/// Check that all the states in a chain dump have the correct tree hash.
|
||||
fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
|
||||
let chain_dump = harness.chain.chain_dump().unwrap();
|
||||
let split_slot = harness.chain.store.get_split_slot();
|
||||
|
||||
assert_eq!(chain_dump.len() as u64, expected_len);
|
||||
|
||||
@@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
|
||||
.slot(),
|
||||
checkpoint.beacon_state.slot()
|
||||
);
|
||||
|
||||
// Check presence of execution payload on disk.
|
||||
if harness.chain.spec.bellatrix_fork_epoch.is_some() {
|
||||
assert_eq!(
|
||||
harness
|
||||
.chain
|
||||
.store
|
||||
.execution_payload_exists(&checkpoint.beacon_block_root)
|
||||
.unwrap(),
|
||||
checkpoint.beacon_block.slot() >= split_slot,
|
||||
"incorrect payload storage for block at slot {}: {:?}",
|
||||
checkpoint.beacon_block.slot(),
|
||||
checkpoint.beacon_block_root,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Check the forwards block roots iterator against the chain dump
|
||||
|
||||
@@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.takes_value(true)
|
||||
.default_value("true")
|
||||
)
|
||||
.arg(
|
||||
Arg::with_name("prune-payloads-on-startup")
|
||||
.long("prune-payloads-on-startup")
|
||||
.help("Check for execution payloads to prune on start-up.")
|
||||
.takes_value(true)
|
||||
.default_value("true")
|
||||
)
|
||||
|
||||
/*
|
||||
* Misc.
|
||||
|
||||
@@ -358,6 +358,12 @@ pub fn get_config<E: EthSpec>(
|
||||
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
|
||||
}
|
||||
|
||||
if let Some(prune_payloads_on_init) =
|
||||
clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")?
|
||||
{
|
||||
client_config.store.prune_payloads_on_init = prune_payloads_on_init;
|
||||
}
|
||||
|
||||
/*
|
||||
* Zero-ports
|
||||
*
|
||||
|
||||
@@ -23,6 +23,8 @@ pub struct StoreConfig {
|
||||
pub compact_on_prune: bool,
|
||||
/// Whether to store finalized blocks in the freezer database.
|
||||
pub separate_blocks: bool,
|
||||
/// Whether to try pruning execution payloads on initialization.
|
||||
pub prune_payloads_on_init: bool,
|
||||
}
|
||||
|
||||
/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
|
||||
@@ -48,6 +50,7 @@ impl Default for StoreConfig {
|
||||
compact_on_init: false,
|
||||
compact_on_prune: true,
|
||||
separate_blocks: true,
|
||||
prune_payloads_on_init: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use crate::impls::{
|
||||
beacon_state::{get_full_state, store_full_state},
|
||||
frozen_block_slot::FrozenBlockSlot,
|
||||
};
|
||||
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
|
||||
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
|
||||
use crate::leveldb_store::BytesKey;
|
||||
use crate::leveldb_store::LevelDB;
|
||||
use crate::memory_store::MemoryStore;
|
||||
@@ -532,6 +532,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
|
||||
}
|
||||
|
||||
/// Check if the execution payload for a block exists on disk.
|
||||
pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
self.get_item::<ExecutionPayload<E>>(block_root)
|
||||
.map(|payload| payload.is_some())
|
||||
}
|
||||
|
||||
/// Determine whether a block exists in the database.
|
||||
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
|
||||
self.hot_db
|
||||
@@ -1512,6 +1518,93 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
|
||||
&CompactionTimestamp(compaction_timestamp.as_secs()),
|
||||
)
|
||||
}
|
||||
|
||||
/// Try to prune all execution payloads, returning early if there is no need to prune.
|
||||
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
|
||||
let split = self.get_split_info();
|
||||
|
||||
if split.slot == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch {
|
||||
epoch.start_slot(E::slots_per_epoch())
|
||||
} else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Load the split state so we can backtrack to find execution payloads.
|
||||
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
|
||||
HotColdDBError::MissingSplitState(split.state_root, split.slot),
|
||||
)?;
|
||||
|
||||
// The finalized block may or may not have its execution payload stored, depending on
|
||||
// whether it was at a skipped slot. However for a fully pruned database its parent
|
||||
// should *always* have been pruned.
|
||||
let split_parent_block_root = split_state.get_block_root(split.slot - 1)?;
|
||||
if !self.execution_payload_exists(split_parent_block_root)? && !force {
|
||||
info!(self.log, "Execution payloads are pruned");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes
|
||||
// first.
|
||||
let split_block_root = split_state.get_latest_block_root(split.state_root);
|
||||
let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot);
|
||||
|
||||
let mut ops = vec![];
|
||||
|
||||
for res in std::iter::once(Ok((split_block_root, split.slot)))
|
||||
.chain(BlockRootsIterator::new(self, &split_state))
|
||||
{
|
||||
let (block_root, slot) = match res {
|
||||
Ok(tuple) => tuple,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
self.log,
|
||||
"Stopping backtrack early";
|
||||
"error" => ?e,
|
||||
);
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if slot < bellatrix_fork_slot {
|
||||
info!(
|
||||
self.log,
|
||||
"Finished backtrack to Bellatrix fork";
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
if self.execution_payload_exists(&block_root)? {
|
||||
debug!(
|
||||
self.log,
|
||||
"Pruning execution payload";
|
||||
"slot" => slot,
|
||||
"block_root" => ?block_root,
|
||||
);
|
||||
ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
if Some(slot) == anchor_slot {
|
||||
info!(
|
||||
self.log,
|
||||
"Finished backtrack to anchor state";
|
||||
"slot" => slot
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let payloads_pruned = ops.len();
|
||||
self.do_atomically(ops)?;
|
||||
info!(
|
||||
self.log,
|
||||
"Execution payload pruning complete";
|
||||
"payloads_pruned" => payloads_pruned,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Advance the split point of the store, moving new finalized states to the freezer.
|
||||
@@ -1552,16 +1645,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
let mut cold_db_block_ops: Vec<KeyValueStoreOps> = vec![];
|
||||
|
||||
// 1. Copy all of the states between the head and the split slot, from the hot DB
|
||||
// to the cold DB.
|
||||
let state_root_iter = StateRootsIterator::new(&store, frozen_head);
|
||||
for maybe_pair in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, slot)) => {
|
||||
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
|
||||
let state_root_iter = RootsIterator::new(&store, frozen_head);
|
||||
for maybe_tuple in state_root_iter.take_while(|result| match result {
|
||||
Ok((_, _, slot)) => {
|
||||
slot >= ¤t_split_slot
|
||||
&& anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot)
|
||||
}
|
||||
Err(_) => true,
|
||||
}) {
|
||||
let (state_root, slot) = maybe_pair?;
|
||||
let (block_root, state_root, slot) = maybe_tuple?;
|
||||
|
||||
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
|
||||
|
||||
@@ -1584,6 +1677,11 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
|
||||
|
||||
// Delete the old summary, and the full state if we lie on an epoch boundary.
|
||||
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));
|
||||
|
||||
// Delete the execution payload. Even if this execution payload is the payload of the
|
||||
// new finalized block it is OK to delete it, as `try_get_full_block` looks at the split
|
||||
// slot when determining whether to reconstruct payloads.
|
||||
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
|
||||
}
|
||||
|
||||
// Warning: Critical section. We have to take care not to put any of the two databases in an
|
||||
|
||||
@@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
|
||||
App::new("prune_payloads")
|
||||
.setting(clap::AppSettings::ColoredHelp)
|
||||
.about("Prune finalized execution payloads")
|
||||
}
|
||||
|
||||
pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
App::new(CMD)
|
||||
.visible_aliases(&["db"])
|
||||
@@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
|
||||
.subcommand(migrate_cli_app())
|
||||
.subcommand(version_cli_app())
|
||||
.subcommand(inspect_cli_app())
|
||||
.subcommand(prune_payloads_app())
|
||||
}
|
||||
|
||||
fn parse_client_config<E: EthSpec>(
|
||||
@@ -257,6 +264,30 @@ pub fn migrate_db<E: EthSpec>(
|
||||
)
|
||||
}
|
||||
|
||||
pub fn prune_payloads<E: EthSpec>(
|
||||
client_config: ClientConfig,
|
||||
runtime_context: &RuntimeContext<E>,
|
||||
log: Logger,
|
||||
) -> Result<(), Error> {
|
||||
let spec = &runtime_context.eth2_config.spec;
|
||||
let hot_path = client_config.get_db_path();
|
||||
let cold_path = client_config.get_freezer_db_path();
|
||||
|
||||
let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
|
||||
&hot_path,
|
||||
&cold_path,
|
||||
|_, _, _| Ok(()),
|
||||
client_config.store,
|
||||
spec.clone(),
|
||||
log,
|
||||
)?;
|
||||
|
||||
// If we're trigging a prune manually then ignore the check on the split's parent that bails
|
||||
// out early.
|
||||
let force = true;
|
||||
db.try_prune_execution_payloads(force)
|
||||
}
|
||||
|
||||
/// Run the database manager, returning an error string if the operation did not succeed.
|
||||
pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Result<(), String> {
|
||||
let client_config = parse_client_config(cli_args, &env)?;
|
||||
@@ -273,6 +304,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Re
|
||||
let inspect_config = parse_inspect_config(cli_args)?;
|
||||
inspect_db(inspect_config, client_config, &context, log)
|
||||
}
|
||||
("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log),
|
||||
_ => {
|
||||
return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into())
|
||||
}
|
||||
|
||||
@@ -1227,6 +1227,19 @@ fn compact_db_flag() {
|
||||
.with_config(|config| assert!(config.store.compact_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn prune_payloads_on_startup_default() {
|
||||
CommandLineTest::new()
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert!(config.store.prune_payloads_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn prune_payloads_on_startup_false() {
|
||||
CommandLineTest::new()
|
||||
.flag("prune-payloads-on-startup", Some("false"))
|
||||
.run_with_zero_port()
|
||||
.with_config(|config| assert!(!config.store.prune_payloads_on_init));
|
||||
}
|
||||
#[test]
|
||||
fn reconstruct_historic_states_flag() {
|
||||
CommandLineTest::new()
|
||||
.flag("reconstruct-historic-states", None)
|
||||
|
||||
Reference in New Issue
Block a user