Hierarchical state diffs in hot DB (#6750)

This PR implements https://github.com/sigp/lighthouse/pull/5978 (tree-states) but on the hot DB. It allows Lighthouse to massively reduce its disk footprint during non-finality and overall I/O in all cases.

Closes https://github.com/sigp/lighthouse/issues/6580

Conga into https://github.com/sigp/lighthouse/pull/6744

### TODOs

- [x] Fix OOM in CI https://github.com/sigp/lighthouse/pull/7176
- [x] optimise store_hot_state to avoid storing a duplicate state if the summary already exists (should be safe from races now that pruning is cleaner)
- [x] mispelled: get_ancenstor_state_root
- [x] get_ancestor_state_root should use state summaries
- [x] Prevent split from changing during ancestor calc
- [x] Use same hierarchy for hot and cold

### TODO Good optimization for future PRs

- [ ] On the migration, if the latest hot snapshot is aligned with the cold snapshot migrate the diffs instead of the full states.
```
align slot  time
10485760    Nov-26-2024
12582912    Sep-14-2025
14680064    Jul-02-2026
```

### TODO Maybe things good to have

- [ ] Rename anchor_slot https://github.com/sigp/lighthouse/compare/tree-states-hot-rebase-oom...dapplion:lighthouse:tree-states-hot-anchor-slot-rename?expand=1
- [ ] Make anchor fields not public such that they must be mutated through a method. To prevent un-wanted changes of the anchor_slot

### NOTTODO

- [ ] Use fork-choice and a new method [`descendants_of_checkpoint`](ca2388e196 (diff-046fbdb517ca16b80e4464c2c824cf001a74a0a94ac0065e635768ac391062a8)) to filter only the state summaries that descend of finalized checkpoint]
This commit is contained in:
Lion - dapplion
2025-06-19 04:43:25 +02:00
committed by GitHub
parent 6786b9d12a
commit dd98534158
33 changed files with 2695 additions and 812 deletions

View File

@@ -24,15 +24,18 @@ use state_processing::{state_advance::complete_state_advance, BlockReplayer};
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use store::database::interface::BeaconNodeBackend;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
hdiff::HierarchyConfig,
iter::{BlockRootsIterator, StateRootsIterator},
BlobInfo, DBColumn, HotColdDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tracing::info;
use types::test_utils::{SeedableRng, XorShiftRng};
use types::*;
@@ -121,15 +124,16 @@ fn get_harness_generic(
harness
}
fn count_states_descendant_of_block(
fn get_states_descendant_of_block(
store: &HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>,
block_root: Hash256,
) -> usize {
) -> Vec<(Hash256, Slot)> {
let summaries = store.load_hot_state_summaries().unwrap();
summaries
.iter()
.filter(|(_, s)| s.latest_block_root == block_root)
.count()
.map(|(state_root, summary)| (*state_root, summary.slot))
.collect()
}
#[tokio::test]
@@ -516,15 +520,18 @@ async fn epoch_boundary_state_attestation_processing() {
.get_blinded_block(&block_root)
.unwrap()
.expect("block exists");
// Use get_state as the state may be finalized by this point
let mut epoch_boundary_state = store
.load_epoch_boundary_state(&block.state_root())
.get_state(&block.state_root(), None, CACHE_STATE_IN_TESTS)
.expect("no error")
.expect("epoch boundary state exists");
.unwrap_or_else(|| {
panic!("epoch boundary state should exist {:?}", block.state_root())
});
let ebs_state_root = epoch_boundary_state.update_tree_hash_cache().unwrap();
let mut ebs_of_ebs = store
.load_epoch_boundary_state(&ebs_state_root)
.get_state(&ebs_state_root, None, CACHE_STATE_IN_TESTS)
.expect("no error")
.expect("ebs of ebs exists");
.unwrap_or_else(|| panic!("ebs of ebs should exist {ebs_state_root:?}"));
ebs_of_ebs.apply_pending_mutations().unwrap();
assert_eq!(epoch_boundary_state, ebs_of_ebs);
@@ -2171,7 +2178,8 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() {
let slots_per_epoch = E::slots_per_epoch();
let genesis_state = harness.get_current_state();
let mut genesis_state = harness.get_current_state();
let genesis_state_root = genesis_state.update_tree_hash_cache().unwrap();
let block_slot = Slot::new(2 * slots_per_epoch);
let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await;
@@ -2198,7 +2206,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() {
// The bad block parent root is the genesis block root. There's `block_slot - 1` temporary
// states to remove + the genesis state = block_slot.
assert_eq!(
count_states_descendant_of_block(&store, bad_block_parent_root),
get_states_descendant_of_block(&store, bad_block_parent_root).len(),
block_slot.as_usize(),
);
@@ -2216,11 +2224,12 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() {
// Check that the finalization migration ran.
assert_ne!(store.get_split_slot(), 0);
// Check that temporary states have been pruned. The genesis block is not a descendant of the
// latest finalized checkpoint, so all its states have been pruned from the hot DB, = 0.
// Check that temporary states have been pruned.
assert_eq!(
count_states_descendant_of_block(&store, bad_block_parent_root),
0
get_states_descendant_of_block(&store, bad_block_parent_root),
// The genesis state is kept to support the HDiff grid
vec![(genesis_state_root, Slot::new(0))],
"get_states_descendant_of_block({bad_block_parent_root:?})"
);
}
@@ -2322,6 +2331,8 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.get_state(&wss_state_root, Some(checkpoint_slot), CACHE_STATE_IN_TESTS)
.unwrap()
.unwrap();
let wss_state_slot = wss_state.slot();
let wss_block_slot = wss_block.slot();
// Add more blocks that advance finalization further.
harness.advance_slot();
@@ -2414,12 +2425,14 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.unwrap();
let slot = full_block.slot();
let full_block_root = full_block.canonical_root();
let state_root = full_block.state_root();
info!(block_root = ?full_block_root, ?state_root, %slot, "Importing block from chain dump");
beacon_chain.slot_clock.set_slot(slot.as_u64());
beacon_chain
.process_block(
full_block.canonical_root(),
full_block_root,
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
@@ -2506,8 +2519,19 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
HistoricalBlockError::InvalidSignature
));
let available_blocks_slots = available_blocks
.iter()
.map(|block| (block.block().slot(), block.block().canonical_root()))
.collect::<Vec<_>>();
info!(
?available_blocks_slots,
"wss_block_slot" = wss_block.slot().as_usize(),
"Importing historical block batch"
);
// Importing the batch with valid signatures should succeed.
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
beacon_chain
.import_historical_block_batch(available_blocks_dup)
.unwrap();
@@ -2518,6 +2542,17 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.import_historical_block_batch(available_blocks)
.unwrap();
// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
if wss_block_slot != wss_state_slot {
let new_node_block_root_at_wss_block = beacon_chain
.store
.get_cold_block_root(wss_block_slot)
.unwrap()
.unwrap();
info!(?new_node_block_root_at_wss_block, %wss_block_slot);
assert_eq!(new_node_block_root_at_wss_block, wss_block.canonical_root());
}
// The forwards iterator should now match the original chain
let forwards = beacon_chain
.forwards_iter_block_roots(Slot::new(0))
@@ -2571,11 +2606,25 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
}
// Anchor slot is still set to the slot of the checkpoint block.
assert_eq!(store.get_anchor_info().anchor_slot, wss_block.slot());
// Note: since hot tree states the anchor slot is set to the aligned ws state slot
// https://github.com/sigp/lighthouse/pull/6750
let wss_aligned_slot = if checkpoint_slot % E::slots_per_epoch() == 0 {
checkpoint_slot
} else {
(checkpoint_slot.epoch(E::slots_per_epoch()) + Epoch::new(1))
.start_slot(E::slots_per_epoch())
};
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!(
store.get_anchor_info().state_upper_limit,
Slot::new(u64::MAX)
);
info!(anchor = ?store.get_anchor_info(), "anchor pre");
// Reconstruct states.
store.clone().reconstruct_historic_states(None).unwrap();
assert_eq!(store.get_anchor_info().anchor_slot, 0);
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0));
}
/// Test that blocks and attestations that refer to states around an unaligned split state are
@@ -2999,12 +3048,27 @@ async fn revert_minority_fork_on_resume() {
// version is correct. This is the easiest schema test to write without historic versions of
// Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually
// as old downgrades are deprecated.
#[tokio::test]
async fn schema_downgrade_to_min_version() {
async fn schema_downgrade_to_min_version(
store_config: StoreConfig,
reconstruct_historic_states: bool,
) {
let num_blocks_produced = E::slots_per_epoch() * 4;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let spec = test_spec::<E>();
let chain_config = ChainConfig {
reconstruct_historic_states,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config.clone(), spec.clone());
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config.clone(),
import_all_data_columns,
);
harness
.extend_chain(
@@ -3024,7 +3088,7 @@ async fn schema_downgrade_to_min_version() {
drop(harness);
// Re-open the store.
let store = get_store(&db_path);
let store = get_store_generic(&db_path, store_config, spec);
// Downgrade.
migrate_schema::<DiskHarnessType<E>>(store.clone(), CURRENT_SCHEMA_VERSION, min_version)
@@ -3037,16 +3101,28 @@ async fn schema_downgrade_to_min_version() {
// Recreate the harness.
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.chain_config(chain_config)
.keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec())
.testing_slot_clock(slot_clock)
.resumed_disk_store(store.clone())
.mock_execution_layer()
.build();
// Check chain dump for appropriate range depending on whether this is an archive node.
let chain_dump_start_slot = if reconstruct_historic_states {
Slot::new(0)
} else {
store.get_split_slot()
};
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
check_chain_dump_from_slot(
&harness,
chain_dump_start_slot,
num_blocks_produced + 1 - chain_dump_start_slot.as_u64(),
);
check_iterators_from_slot(&harness, chain_dump_start_slot);
// Check that downgrading beyond the minimum version fails (bound is *tight*).
let min_version_sub_1 = SchemaVersion(min_version.as_u64().checked_sub(1).unwrap());
@@ -3054,6 +3130,66 @@ async fn schema_downgrade_to_min_version() {
.expect_err("should not downgrade below minimum version");
}
// Schema upgrade/downgrade on an archive node where the optimised migration does apply due
// to the split state being aligned to a diff layer.
#[tokio::test]
async fn schema_downgrade_to_min_version_archive_node_grid_aligned() {
// Need to use 3 as the hierarchy exponent to get diffs on every epoch boundary with minimal
// spec.
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
// Schema upgrade/downgrade on an archive node where the optimised migration DOES NOT apply
// due to the split state NOT being aligned to a diff layer.
#[tokio::test]
async fn schema_downgrade_to_min_version_archive_node_grid_unaligned() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("7").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
// Schema upgrade/downgrade on a full node with a fairly normal per-epoch diff config.
#[tokio::test]
async fn schema_downgrade_to_min_version_full_node_per_epoch_diffs() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
false,
)
.await
}
// Schema upgrade/downgrade on a full node with dense per-slot diffs.
#[tokio::test]
async fn schema_downgrade_to_min_version_full_node_dense_diffs() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("0,3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
/// Check that blob pruning prunes blobs older than the data availability boundary.
#[tokio::test]
async fn deneb_prune_blobs_happy_case() {
@@ -3439,6 +3575,163 @@ async fn prune_historic_states() {
check_split_slot(&harness, store);
}
// Test the function `get_ancestor_state_root` for slots prior to the split where we only have
// sparse summaries stored.
#[tokio::test]
async fn ancestor_state_root_prior_to_split() {
let db_path = tempdir().unwrap();
let spec = test_spec::<E>();
let store_config = StoreConfig {
prune_payloads: false,
hierarchy_config: HierarchyConfig::from_str("5,7,8").unwrap(),
..StoreConfig::default()
};
let chain_config = ChainConfig {
reconstruct_historic_states: false,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config, spec);
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config,
import_all_data_columns,
);
// Produce blocks until we have passed through two full snapshot periods. This period length is
// determined by the hierarchy config set above.
let num_blocks = 2 * store
.hierarchy
.next_snapshot_slot(Slot::new(1))
.unwrap()
.as_u64();
for num_blocks_so_far in 0..num_blocks {
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
// Check that `get_ancestor_state_root` can look up the grid-aligned ancestors of every hot
// state, even at ancestor slots prior to the split.
let head_state = harness.get_current_state();
assert_eq!(head_state.slot().as_u64(), num_blocks_so_far + 1);
let split_slot = store.get_split_slot();
let anchor_slot = store.get_anchor_info().anchor_slot;
for state_slot in (split_slot.as_u64()..=num_blocks_so_far).map(Slot::new) {
for ancestor_slot in store
.hierarchy
.closest_layer_points(state_slot, anchor_slot)
{
// The function currently doesn't consider a state an ancestor of itself, so this
// does not work.
if ancestor_slot == state_slot {
continue;
}
let ancestor_state_root = store::hot_cold_store::get_ancestor_state_root(
&store,
&head_state,
ancestor_slot,
)
.unwrap_or_else(|e| {
panic!(
"get_ancestor_state_root failed for state_slot={state_slot}, \
ancestor_slot={ancestor_slot}, head_slot={}. error: {e:?}",
head_state.slot()
)
});
// Check state root correctness.
assert_eq!(
store
.load_hot_state_summary(&ancestor_state_root)
.unwrap()
.unwrap_or_else(|| panic!(
"no summary found for {ancestor_state_root:?} (slot {ancestor_slot})"
))
.slot,
ancestor_slot,
)
}
}
}
// This test only makes sense if the split is non-zero by the end.
assert_ne!(store.get_split_slot(), 0);
}
// Test that the chain operates correctly when the split state is stored as a ReplayFrom.
#[tokio::test]
async fn replay_from_split_state() {
let db_path = tempdir().unwrap();
let spec = test_spec::<E>();
let store_config = StoreConfig {
prune_payloads: false,
hierarchy_config: HierarchyConfig::from_str("5").unwrap(),
..StoreConfig::default()
};
let chain_config = ChainConfig {
reconstruct_historic_states: false,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config.clone(), spec.clone());
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config,
import_all_data_columns,
);
// Produce blocks until we finalize epoch 3 which will not be stored as a snapshot.
let num_blocks = 5 * E::slots_per_epoch() as usize;
harness
.extend_chain(
num_blocks,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let split = store.get_split_info();
let anchor_slot = store.get_anchor_info().anchor_slot;
assert_eq!(split.slot, 3 * E::slots_per_epoch());
assert_eq!(anchor_slot, 0);
assert!(store
.hierarchy
.storage_strategy(split.slot, anchor_slot)
.unwrap()
.is_replay_from());
// Close the database and reopen it.
drop(store);
drop(harness);
let store = get_store_generic(&db_path, store_config, spec);
// Check that the split state is still accessible.
assert_eq!(store.get_split_slot(), split.slot);
let state = store
.get_hot_state(&split.state_root, false)
.unwrap()
.expect("split state should be present");
assert_eq!(state.slot(), split.slot);
}
/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).
@@ -3532,7 +3825,11 @@ fn check_split_slot(
/// Check that all the states in a chain dump have the correct tree hash.
fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
let mut chain_dump = harness.chain.chain_dump().unwrap();
check_chain_dump_from_slot(harness, Slot::new(0), expected_len)
}
fn check_chain_dump_from_slot(harness: &TestHarness, from_slot: Slot, expected_len: u64) {
let mut chain_dump = harness.chain.chain_dump_from_slot(from_slot).unwrap();
assert_eq!(chain_dump.len() as u64, expected_len);
@@ -3580,7 +3877,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
let mut forward_block_roots = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.forwards_iter_block_roots(from_slot)
.expect("should get iter")
.map(Result::unwrap)
.collect::<Vec<_>>();
@@ -3601,10 +3898,14 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
/// Check that every state from the canonical chain is in the database, and that the
/// reverse state and block root iterators reach genesis.
fn check_iterators(harness: &TestHarness) {
check_iterators_from_slot(harness, Slot::new(0))
}
fn check_iterators_from_slot(harness: &TestHarness, slot: Slot) {
let mut max_slot = None;
for (state_root, slot) in harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.forwards_iter_state_roots(slot)
.expect("should get iter")
.map(Result::unwrap)
{
@@ -3626,7 +3927,7 @@ fn check_iterators(harness: &TestHarness) {
assert_eq!(
harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.forwards_iter_block_roots(slot)
.expect("should get iter")
.last()
.map(Result::unwrap)