Hierarchical state diffs (#5978)

* Start extracting freezer changes for tree-states

* Remove unused config args

* Add comments

* Remove unwraps

* Subjective more clear implementation

* Clean up hdiff

* Update xdelta3

* Tree states archive metrics (#6040)

* Add store cache size metrics

* Add compress timer metrics

* Add diff apply compute timer metrics

* Add diff buffer cache hit metrics

* Add hdiff buffer load times

* Add blocks replayed metric

* Move metrics to store

* Future proof some metrics

---------

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Port and clean up forwards iterator changes

* Add and polish hierarchy-config flag

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Cleaner errors

* Fix beacon_chain test compilation

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Patch a few more freezer block roots

* Fix genesis block root bug

* Fix test failing due to pending updates

* Beacon chain tests passing

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Fix doc lint

* Implement DB schema upgrade for hierarchical state diffs (#6193)

* DB upgrade

* Add flag

* Delete RestorePointHash

* Update docs

* Update docs

* Implement hierarchical state diffs config migration (#6245)

* Implement hierarchical state diffs config migration

* Review PR

* Remove TODO

* Set CURRENT_SCHEMA_VERSION correctly

* Fix genesis state loading

* Re-delete some PartialBeaconState stuff

---------

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Fix test compilation

* Update schema downgrade test

* Fix tests

* Fix null anchor migration

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Fix tree states upgrade migration (#6328)

* Towards crash safety

* Fix compilation

* Move cold summaries and state roots to new columns

* Rename StateRoots chunked field

* Update prune states

* Clean hdiff CLI flag and metrics

* Fix "staged reconstruction"

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Fix alloy issues

* Fix staged reconstruction logic

* Prevent weird slot drift

* Remove "allow" flag

* Update CLI help

* Remove FIXME about downgrade

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Remove some unnecessary error variants

* Fix new test

* Tree states archive - review comments and metrics (#6386)

* Review PR comments and metrics

* Comments

* Add anchor metrics

* drop prev comment

* Update metadata.rs

* Apply suggestions from code review

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Update beacon_node/store/src/hot_cold_store.rs

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Clarify comment and remove anchor_slot garbage

* Simplify database anchor (#6397)

* Simplify database anchor

* Update beacon_node/store/src/reconstruct.rs

* Add migration for anchor

* Fix and simplify light_client store tests

* Fix incompatible config test

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* More metrics

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* New historic state cache (#6475)

* New historic state cache

* Add more metrics

* State cache hit rate metrics

* Fix store metrics

* More logs and metrics

* Fix logger

* Ensure cached states have built caches :O

* Replay blocks in preference to diffing

* Two separate caches

* Distribute cache build time to next slot

* Re-plumb historic-state-cache flag

* Clean up metrics

* Update book

* Update beacon_node/store/src/hdiff.rs

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>

* Update beacon_node/store/src/historic_state_cache.rs

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>

---------

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>

* Update database docs

* Update diagram

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Update lockbud to work with bindgen/etc

* Correct pkg name for Debian

* Remove vestigial epochs_per_state_diff

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Markdown lint

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Address Jimmy's review comments

* Simplify ReplayFrom case

* Fix and document genesis_state_root

* Typo

Co-authored-by: Jimmy Chen <jchen.tc@gmail.com>

* Merge branch 'unstable' into tree-states-archive

* Compute diff of validators list manually (#6556)

* Split hdiff computation

* Dedicated logic for historical roots and summaries

* Benchmark against real states

* Mutated source?

* Version the hdiff

* Add lighthouse DB config for hierarchy exponents

* Tidy up hierarchy exponents flag

* Apply suggestions from code review

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Address PR review

* Remove hardcoded paths in benchmarks

* Delete unused function in benches

* lint

---------

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Test hdiff binary format stability (#6585)

* Merge remote-tracking branch 'origin/unstable' into tree-states-archive

* Add deprecation warning for SPRP

* Update xdelta to get rid of duplicate deps

* Document test
This commit is contained in:
Michael Sproul
2024-11-18 12:51:44 +11:00
committed by GitHub
parent 654fc6acdc
commit 9fdd53df56
57 changed files with 3360 additions and 1691 deletions

View File

@@ -25,13 +25,10 @@ use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use store::chunked_vector::Chunk;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
chunked_vector::{chunk_key, Field},
get_key_for_col,
iter::{BlockRootsIterator, StateRootsIterator},
BlobInfo, DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, LevelDB, StoreConfig,
BlobInfo, DBColumn, HotColdDB, LevelDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tokio::time::sleep;
@@ -58,8 +55,8 @@ fn get_store_generic(
config: StoreConfig,
spec: ChainSpec,
) -> Arc<HotColdDB<E, LevelDB<E>, LevelDB<E>>> {
let hot_path = db_path.path().join("hot_db");
let cold_path = db_path.path().join("cold_db");
let hot_path = db_path.path().join("chain_db");
let cold_path = db_path.path().join("freezer_db");
let blobs_path = db_path.path().join("blobs_db");
let log = test_logger();
@@ -232,253 +229,6 @@ async fn light_client_updates_test() {
assert_eq!(lc_updates.len(), 2);
}
/// Tests that `store.heal_freezer_block_roots_at_split` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots_at_split() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());
// Do a heal before deleting to make sure that it doesn't break.
let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
store.heal_freezer_block_roots_at_split().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Delete block roots between `last_restore_point_slot` and `split_slot`.
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();
let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();
assert!(matches!(block_root_err, store::Error::NoContinuationData));
// Re-insert block roots
store.heal_freezer_block_roots_at_split().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(
&harness,
num_blocks_produced + additional_blocks_produced + 1,
);
check_iterators(&harness);
}
/// Tests that `store.heal_freezer_block_roots` inserts block roots between last restore point
/// slot and the split slot.
#[tokio::test]
async fn heal_freezer_block_roots_with_skip_slots() {
// chunk_size is hard-coded to 128
let num_blocks_produced = E::slots_per_epoch() * 20;
let db_path = tempdir().unwrap();
let store = get_store_generic(
&db_path,
StoreConfig {
slots_per_restore_point: 2 * E::slots_per_epoch(),
..Default::default()
},
test_spec::<E>(),
);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let mut current_state = harness.get_current_state();
let state_root = current_state.canonical_root().unwrap();
let all_validators = &harness.get_all_validators();
harness
.add_attested_blocks_at_slots(
current_state,
state_root,
&(1..=num_blocks_produced)
.filter(|i| i % 12 != 0)
.map(Slot::new)
.collect::<Vec<_>>(),
all_validators,
)
.await;
// split slot should be 18 here
let split_slot = store.get_split_slot();
assert_eq!(split_slot, 18 * E::slots_per_epoch());
let last_restore_point_slot = Slot::new(16 * E::slots_per_epoch());
let chunk_index = <store::chunked_vector::BlockRoots as Field<E>>::chunk_index(
last_restore_point_slot.as_usize(),
);
let key_chunk = get_key_for_col(DBColumn::BeaconBlockRoots.as_str(), &chunk_key(chunk_index));
store
.cold_db
.do_atomically(vec![KeyValueStoreOp::DeleteKey(key_chunk)])
.unwrap();
let block_root_err = store
.forwards_block_roots_iterator_until(
last_restore_point_slot,
last_restore_point_slot + 1,
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.next()
.unwrap()
.unwrap_err();
assert!(matches!(block_root_err, store::Error::NoContinuationData));
// heal function
store.heal_freezer_block_roots_at_split().unwrap();
check_freezer_block_roots(&harness, last_restore_point_slot, split_slot);
// Run for another two epochs to check that the invariant is maintained.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
check_iterators(&harness);
}
/// Tests that `store.heal_freezer_block_roots_at_genesis` replaces 0x0 block roots between slot
/// 0 and the first non-skip slot with genesis block root.
#[tokio::test]
async fn heal_freezer_block_roots_at_genesis() {
// Run for a few epochs to ensure we're past finalization.
let num_blocks_produced = E::slots_per_epoch() * 4;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Start with 2 skip slots.
harness.advance_slot();
harness.advance_slot();
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Do a heal before deleting to make sure that it doesn't break.
store.heal_freezer_block_roots_at_genesis().unwrap();
check_freezer_block_roots(
&harness,
Slot::new(0),
Epoch::new(1).end_slot(E::slots_per_epoch()),
);
// Write 0x0 block roots at slot 1 and slot 2.
let chunk_index = 0;
let chunk_db_key = chunk_key(chunk_index);
let mut chunk =
Chunk::<Hash256>::load(&store.cold_db, DBColumn::BeaconBlockRoots, &chunk_db_key)
.unwrap()
.unwrap();
chunk.values[1] = Hash256::zero();
chunk.values[2] = Hash256::zero();
let mut ops = vec![];
chunk
.store(DBColumn::BeaconBlockRoots, &chunk_db_key, &mut ops)
.unwrap();
store.cold_db.do_atomically(ops).unwrap();
// Ensure the DB is corrupted
let block_roots = store
.forwards_block_roots_iterator_until(
Slot::new(1),
Slot::new(2),
|| unreachable!(),
&harness.chain.spec,
)
.unwrap()
.map(Result::unwrap)
.take(2)
.collect::<Vec<_>>();
assert_eq!(
block_roots,
vec![
(Hash256::zero(), Slot::new(1)),
(Hash256::zero(), Slot::new(2))
]
);
// Insert genesis block roots at skip slots before first block slot
store.heal_freezer_block_roots_at_genesis().unwrap();
check_freezer_block_roots(
&harness,
Slot::new(0),
Epoch::new(1).end_slot(E::slots_per_epoch()),
);
}
fn check_freezer_block_roots(harness: &TestHarness, start_slot: Slot, end_slot: Slot) {
for slot in (start_slot.as_u64()..end_slot.as_u64()).map(Slot::new) {
let (block_root, result_slot) = harness
.chain
.store
.forwards_block_roots_iterator_until(slot, slot, || unreachable!(), &harness.chain.spec)
.unwrap()
.next()
.unwrap()
.unwrap();
assert_eq!(slot, result_slot);
let expected_block_root = harness
.chain
.block_root_at_slot(slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
assert_eq!(expected_block_root, block_root);
}
}
#[tokio::test]
async fn full_participation_no_skips() {
let num_blocks_produced = E::slots_per_epoch() * 5;
@@ -741,11 +491,12 @@ async fn epoch_boundary_state_attestation_processing() {
.load_epoch_boundary_state(&block.state_root())
.expect("no error")
.expect("epoch boundary state exists");
let ebs_state_root = epoch_boundary_state.canonical_root().unwrap();
let ebs_of_ebs = store
let ebs_state_root = epoch_boundary_state.update_tree_hash_cache().unwrap();
let mut ebs_of_ebs = store
.load_epoch_boundary_state(&ebs_state_root)
.expect("no error")
.expect("ebs of ebs exists");
ebs_of_ebs.apply_pending_mutations().unwrap();
assert_eq!(epoch_boundary_state, ebs_of_ebs);
// If the attestation is pre-finalization it should be rejected.
@@ -807,10 +558,19 @@ async fn forwards_iter_block_and_state_roots_until() {
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
// The last restore point slot is the point at which the hybrid forwards iterator behaviour
// The freezer upper bound slot is the point at which the hybrid forwards iterator behaviour
// changes.
let last_restore_point_slot = store.get_latest_restore_point_slot().unwrap();
assert!(last_restore_point_slot > 0);
let block_upper_bound = store
.freezer_upper_bound_for_column(DBColumn::BeaconBlockRoots, Slot::new(0))
.unwrap()
.unwrap();
assert!(block_upper_bound > 0);
let state_upper_bound = store
.freezer_upper_bound_for_column(DBColumn::BeaconStateRoots, Slot::new(0))
.unwrap()
.unwrap();
assert!(state_upper_bound > 0);
assert_eq!(state_upper_bound, block_upper_bound);
let chain = &harness.chain;
let head_state = harness.get_current_state();
@@ -835,14 +595,12 @@ async fn forwards_iter_block_and_state_roots_until() {
};
let split_slot = store.get_split_slot();
assert!(split_slot > last_restore_point_slot);
assert_eq!(split_slot, block_upper_bound);
test_range(Slot::new(0), last_restore_point_slot);
test_range(last_restore_point_slot, last_restore_point_slot);
test_range(last_restore_point_slot - 1, last_restore_point_slot);
test_range(Slot::new(0), last_restore_point_slot - 1);
test_range(Slot::new(0), split_slot);
test_range(last_restore_point_slot - 1, split_slot);
test_range(split_slot, split_slot);
test_range(split_slot - 1, split_slot);
test_range(Slot::new(0), split_slot - 1);
test_range(Slot::new(0), head_state.slot());
}
@@ -2567,7 +2325,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.await;
let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);
let log = test_logger();
let log = harness.chain.logger().clone();
let temp2 = tempdir().unwrap();
let store = get_store(&temp2);
let spec = test_spec::<E>();
@@ -2792,11 +2550,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
}
// Anchor slot is still set to the slot of the checkpoint block.
assert_eq!(store.get_anchor_slot(), Some(wss_block.slot()));
assert_eq!(store.get_anchor_info().anchor_slot, wss_block.slot());
// Reconstruct states.
store.clone().reconstruct_historic_states().unwrap();
assert_eq!(store.get_anchor_slot(), None);
store.clone().reconstruct_historic_states(None).unwrap();
assert_eq!(store.get_anchor_info().anchor_slot, 0);
}
/// Test that blocks and attestations that refer to states around an unaligned split state are
@@ -3222,7 +2980,6 @@ async fn schema_downgrade_to_min_version() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let spec = &harness.chain.spec.clone();
harness
.extend_chain(
@@ -3232,7 +2989,8 @@ async fn schema_downgrade_to_min_version() {
)
.await;
let min_version = SchemaVersion(19);
let min_version = SchemaVersion(22);
let genesis_state_root = Some(harness.chain.genesis_state_root);
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();
@@ -3245,25 +3003,22 @@ async fn schema_downgrade_to_min_version() {
let store = get_store(&db_path);
// Downgrade.
let deposit_contract_deploy_block = 0;
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
genesis_state_root,
CURRENT_SCHEMA_VERSION,
min_version,
store.logger().clone(),
spec,
)
.expect("schema downgrade to minimum version should work");
// Upgrade back.
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
genesis_state_root,
min_version,
CURRENT_SCHEMA_VERSION,
store.logger().clone(),
spec,
)
.expect("schema upgrade from minimum version should work");
@@ -3286,11 +3041,10 @@ async fn schema_downgrade_to_min_version() {
let min_version_sub_1 = SchemaVersion(min_version.as_u64().checked_sub(1).unwrap());
migrate_schema::<DiskHarnessType<E>>(
store.clone(),
deposit_contract_deploy_block,
genesis_state_root,
CURRENT_SCHEMA_VERSION,
min_version_sub_1,
harness.logger().clone(),
spec,
)
.expect_err("should not downgrade below minimum version");
}
@@ -3622,15 +3376,15 @@ async fn prune_historic_states() {
)
.await;
// Check historical state is present.
let state_roots_iter = harness
// Check historical states are present.
let first_epoch_state_roots = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.unwrap();
for (state_root, slot) in state_roots_iter
.unwrap()
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
.collect::<Vec<_>>();
for &(state_root, slot) in &first_epoch_state_roots {
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some());
}
@@ -3639,29 +3393,18 @@ async fn prune_historic_states() {
.unwrap();
// Check that anchor info is updated.
let anchor_info = store.get_anchor_info().unwrap();
let anchor_info = store.get_anchor_info();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);
// Historical states should be pruned.
let state_roots_iter = harness
.chain
.forwards_iter_state_roots(Slot::new(1))
.unwrap();
for (state_root, slot) in state_roots_iter
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
{
assert!(store.get_state(&state_root, Some(slot)).unwrap().is_none());
// Ensure all epoch 0 states other than the genesis have been pruned.
for &(state_root, slot) in &first_epoch_state_roots {
assert_eq!(
store.get_state(&state_root, Some(slot)).unwrap().is_some(),
slot == 0
);
}
// Ensure that genesis state is still accessible
let genesis_state_root = harness.chain.genesis_state_root;
assert!(store
.get_state(&genesis_state_root, Some(Slot::new(0)))
.unwrap()
.is_some());
// Run for another two epochs.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness