Fix and test checkpoint sync from genesis (#7689)

Fix a bug involving checkpoint sync from genesis reported by Sunnyside labs.


  Ensure that the store's `anchor` is initialised prior to storing the genesis state. In the case of checkpoint sync from genesis, the genesis state will be in the _hot DB_, so we need the hot DB metadata to be initialised in order to store it.

I've extended the existing checkpoint sync tests to cover this case as well. There are some subtleties around what the `state_upper_limit` should be set to in this case. I've opted to just enable state reconstruction from the start in the test so it gets set to 0, which results in an end state more consistent with the other test cases (full state reconstruction). This is required because we can't meaningfully do any state reconstruction when the split slot is 0 (there is no range of frozen slots to reconstruct).
This commit is contained in:
Michael Sproul
2025-07-02 14:50:33 +10:00
committed by GitHub
parent fcc602a787
commit a459a9af98
3 changed files with 158 additions and 103 deletions

View File

@@ -514,9 +514,26 @@ where
"Storing split from weak subjectivity state" "Storing split from weak subjectivity state"
); );
// Set the store's split point *before* storing genesis so that genesis is stored // Set the store's split point *before* storing genesis so that if the genesis state
// immediately in the freezer DB. // is prior to the split slot, it will immediately be stored in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root); store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
// It is also possible for the checkpoint state to be equal to the genesis state, in which
// case it will be stored in the hot DB. In this case, we need to ensure the store's anchor
// is initialised prior to storing the state, as the anchor is required for working out
// hdiff storage strategies.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.parent_root(),
weak_subj_block.slot(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
let (_, updated_builder) = self.set_genesis_state(genesis_state)?; let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder; self = updated_builder;
@@ -541,20 +558,6 @@ where
"Stored frozen block roots at skipped slots" "Stored frozen block roots at skipped slots"
); );
// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
// states that do not align with the `start_slot` grid.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(
weak_subj_block.parent_root(),
weak_subj_block.slot(),
weak_subj_slot,
retain_historic_states,
)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten // Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
// about on a crash restart. // about on a crash restart.
store store

View File

@@ -2281,6 +2281,19 @@ async fn weak_subjectivity_sync_skips_at_genesis() {
weak_subjectivity_sync_test(slots, checkpoint_slot).await weak_subjectivity_sync_test(slots, checkpoint_slot).await
} }
// Checkpoint sync from the genesis state.
//
// This is a regression test for a bug we had involving the storage of the genesis state in the hot
// DB.
#[tokio::test]
async fn weak_subjectivity_sync_from_genesis() {
let start_slot = 1;
let end_slot = E::slots_per_epoch() * 2;
let slots = (start_slot..end_slot).map(Slot::new).collect();
let checkpoint_slot = Slot::new(0);
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}
async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) { async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// Build an initial chain on one harness, representing a synced node with full history. // Build an initial chain on one harness, representing a synced node with full history.
let num_final_blocks = E::slots_per_epoch() * 2; let num_final_blocks = E::slots_per_epoch() * 2;
@@ -2367,7 +2380,15 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
); );
slot_clock.set_slot(harness.get_current_slot().as_u64()); slot_clock.set_slot(harness.get_current_slot().as_u64());
let chain_config = ChainConfig {
// Set reconstruct_historic_states to true from the start in the genesis case. This makes
// some of the later checks more uniform across the genesis/non-genesis cases.
reconstruct_historic_states: checkpoint_slot == 0,
..ChainConfig::default()
};
let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg) let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg)
.chain_config(chain_config)
.store(store.clone()) .store(store.clone())
.custom_spec(test_spec::<E>().into()) .custom_spec(test_spec::<E>().into())
.task_executor(harness.chain.task_executor.clone()) .task_executor(harness.chain.task_executor.clone())
@@ -2381,7 +2402,6 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.store_migrator_config(MigratorConfig::default().blocking()) .store_migrator_config(MigratorConfig::default().blocking())
.slot_clock(slot_clock) .slot_clock(slot_clock)
.shutdown_sender(shutdown_tx) .shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(1))) .event_handler(Some(ServerSentEventHandler::new_with_capacity(1)))
.execution_layer(Some(mock.el)) .execution_layer(Some(mock.el))
.rng(Box::new(StdRng::seed_from_u64(42))) .rng(Box::new(StdRng::seed_from_u64(42)))
@@ -2449,12 +2469,27 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
} }
// Forwards iterator from 0 should fail as we lack blocks. if checkpoint_slot != 0 {
// Forwards iterator from 0 should fail as we lack blocks (unless checkpoint slot is 0).
assert!(matches!( assert!(matches!(
beacon_chain.forwards_iter_block_roots(Slot::new(0)), beacon_chain.forwards_iter_block_roots(Slot::new(0)),
Err(BeaconChainError::HistoricalBlockOutOfRange { .. }) Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
)); ));
} else {
assert_eq!(
beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.next()
.unwrap()
.unwrap(),
(wss_block_root, Slot::new(0))
);
}
// The checks in this block only make sense if some data is missing as a result of the
// checkpoint sync, i.e. if we are not just checkpoint syncing from genesis.
if checkpoint_slot != 0 {
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling // Simulate processing of a `StatusMessage` with an older finalized epoch by calling
// `block_root_at_slot` with an old slot for which we don't know the block root. It should // `block_root_at_slot` with an old slot for which we don't know the block root. It should
// return `None` rather than erroring. // return `None` rather than erroring.
@@ -2490,7 +2525,8 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
.chain .chain
.data_availability_checker .data_availability_checker
.verify_kzg_for_rpc_block( .verify_kzg_for_rpc_block(
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), harness
.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
) )
.expect("should verify kzg") .expect("should verify kzg")
{ {
@@ -2506,7 +2542,12 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
let mut corrupt_block = (*block).clone(); let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty(); *corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec)) AvailableBlock::__new_for_testing(
block_root,
Arc::new(corrupt_block),
data,
Arc::new(spec),
)
}; };
// Importing the invalid batch should error. // Importing the invalid batch should error.
@@ -2539,6 +2580,7 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
beacon_chain beacon_chain
.import_historical_block_batch(available_blocks) .import_historical_block_batch(available_blocks)
.unwrap(); .unwrap();
}
// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly // Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
if wss_block_slot != wss_state_slot { if wss_block_slot != wss_state_slot {
@@ -2615,7 +2657,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot); assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!( assert_eq!(
store.get_anchor_info().state_upper_limit, store.get_anchor_info().state_upper_limit,
if checkpoint_slot == 0 {
Slot::new(0)
} else {
Slot::new(u64::MAX) Slot::new(u64::MAX)
}
); );
info!(anchor = ?store.get_anchor_info(), "anchor pre"); info!(anchor = ?store.get_anchor_info(), "anchor pre");

View File

@@ -47,6 +47,12 @@ where
let lower_limit_slot = anchor.state_lower_limit; let lower_limit_slot = anchor.state_lower_limit;
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit); let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);
// If the split is at 0 we can't reconstruct historic states.
if split.slot == 0 {
debug!("No state reconstruction possible");
return Ok(());
}
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch // If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive* // boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
// of the state at slot `lower_limit_slot + num_blocks`. // of the state at slot `lower_limit_slot + num_blocks`.