mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
Fix and test checkpoint sync from genesis (#7689)
Fix a bug involving checkpoint sync from genesis reported by Sunnyside labs. Ensure that the store's `anchor` is initialised prior to storing the genesis state. In the case of checkpoint sync from genesis, the genesis state will be in the _hot DB_, so we need the hot DB metadata to be initialised in order to store it. I've extended the existing checkpoint sync tests to cover this case as well. There are some subtleties around what the `state_upper_limit` should be set to in this case. I've opted to just enable state reconstruction from the start in the test so it gets set to 0, which results in an end state more consistent with the other test cases (full state reconstruction). This is required because we can't meaningfully do any state reconstruction when the split slot is 0 (there is no range of frozen slots to reconstruct).
This commit is contained in:
@@ -514,9 +514,26 @@ where
|
|||||||
"Storing split from weak subjectivity state"
|
"Storing split from weak subjectivity state"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Set the store's split point *before* storing genesis so that genesis is stored
|
// Set the store's split point *before* storing genesis so that if the genesis state
|
||||||
// immediately in the freezer DB.
|
// is prior to the split slot, it will immediately be stored in the freezer DB.
|
||||||
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
|
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
|
||||||
|
|
||||||
|
// It is also possible for the checkpoint state to be equal to the genesis state, in which
|
||||||
|
// case it will be stored in the hot DB. In this case, we need to ensure the store's anchor
|
||||||
|
// is initialised prior to storing the state, as the anchor is required for working out
|
||||||
|
// hdiff storage strategies.
|
||||||
|
let retain_historic_states = self.chain_config.reconstruct_historic_states;
|
||||||
|
self.pending_io_batch.push(
|
||||||
|
store
|
||||||
|
.init_anchor_info(
|
||||||
|
weak_subj_block.parent_root(),
|
||||||
|
weak_subj_block.slot(),
|
||||||
|
weak_subj_slot,
|
||||||
|
retain_historic_states,
|
||||||
|
)
|
||||||
|
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
|
||||||
|
);
|
||||||
|
|
||||||
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
|
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
|
||||||
self = updated_builder;
|
self = updated_builder;
|
||||||
|
|
||||||
@@ -541,20 +558,6 @@ where
|
|||||||
"Stored frozen block roots at skipped slots"
|
"Stored frozen block roots at skipped slots"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Write the anchor to memory before calling `put_state` otherwise hot hdiff can't store
|
|
||||||
// states that do not align with the `start_slot` grid.
|
|
||||||
let retain_historic_states = self.chain_config.reconstruct_historic_states;
|
|
||||||
self.pending_io_batch.push(
|
|
||||||
store
|
|
||||||
.init_anchor_info(
|
|
||||||
weak_subj_block.parent_root(),
|
|
||||||
weak_subj_block.slot(),
|
|
||||||
weak_subj_slot,
|
|
||||||
retain_historic_states,
|
|
||||||
)
|
|
||||||
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
|
// Write the state, block and blobs non-atomically, it doesn't matter if they're forgotten
|
||||||
// about on a crash restart.
|
// about on a crash restart.
|
||||||
store
|
store
|
||||||
|
|||||||
@@ -2281,6 +2281,19 @@ async fn weak_subjectivity_sync_skips_at_genesis() {
|
|||||||
weak_subjectivity_sync_test(slots, checkpoint_slot).await
|
weak_subjectivity_sync_test(slots, checkpoint_slot).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Checkpoint sync from the genesis state.
|
||||||
|
//
|
||||||
|
// This is a regression test for a bug we had involving the storage of the genesis state in the hot
|
||||||
|
// DB.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn weak_subjectivity_sync_from_genesis() {
|
||||||
|
let start_slot = 1;
|
||||||
|
let end_slot = E::slots_per_epoch() * 2;
|
||||||
|
let slots = (start_slot..end_slot).map(Slot::new).collect();
|
||||||
|
let checkpoint_slot = Slot::new(0);
|
||||||
|
weak_subjectivity_sync_test(slots, checkpoint_slot).await
|
||||||
|
}
|
||||||
|
|
||||||
async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
||||||
// Build an initial chain on one harness, representing a synced node with full history.
|
// Build an initial chain on one harness, representing a synced node with full history.
|
||||||
let num_final_blocks = E::slots_per_epoch() * 2;
|
let num_final_blocks = E::slots_per_epoch() * 2;
|
||||||
@@ -2367,7 +2380,15 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
);
|
);
|
||||||
slot_clock.set_slot(harness.get_current_slot().as_u64());
|
slot_clock.set_slot(harness.get_current_slot().as_u64());
|
||||||
|
|
||||||
|
let chain_config = ChainConfig {
|
||||||
|
// Set reconstruct_historic_states to true from the start in the genesis case. This makes
|
||||||
|
// some of the later checks more uniform across the genesis/non-genesis cases.
|
||||||
|
reconstruct_historic_states: checkpoint_slot == 0,
|
||||||
|
..ChainConfig::default()
|
||||||
|
};
|
||||||
|
|
||||||
let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg)
|
let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg)
|
||||||
|
.chain_config(chain_config)
|
||||||
.store(store.clone())
|
.store(store.clone())
|
||||||
.custom_spec(test_spec::<E>().into())
|
.custom_spec(test_spec::<E>().into())
|
||||||
.task_executor(harness.chain.task_executor.clone())
|
.task_executor(harness.chain.task_executor.clone())
|
||||||
@@ -2381,7 +2402,6 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
.store_migrator_config(MigratorConfig::default().blocking())
|
.store_migrator_config(MigratorConfig::default().blocking())
|
||||||
.slot_clock(slot_clock)
|
.slot_clock(slot_clock)
|
||||||
.shutdown_sender(shutdown_tx)
|
.shutdown_sender(shutdown_tx)
|
||||||
.chain_config(ChainConfig::default())
|
|
||||||
.event_handler(Some(ServerSentEventHandler::new_with_capacity(1)))
|
.event_handler(Some(ServerSentEventHandler::new_with_capacity(1)))
|
||||||
.execution_layer(Some(mock.el))
|
.execution_layer(Some(mock.el))
|
||||||
.rng(Box::new(StdRng::seed_from_u64(42)))
|
.rng(Box::new(StdRng::seed_from_u64(42)))
|
||||||
@@ -2449,96 +2469,118 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
|
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forwards iterator from 0 should fail as we lack blocks.
|
if checkpoint_slot != 0 {
|
||||||
assert!(matches!(
|
// Forwards iterator from 0 should fail as we lack blocks (unless checkpoint slot is 0).
|
||||||
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
|
assert!(matches!(
|
||||||
Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
|
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
|
||||||
));
|
Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
|
||||||
|
));
|
||||||
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
|
} else {
|
||||||
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
|
assert_eq!(
|
||||||
// return `None` rather than erroring.
|
beacon_chain
|
||||||
assert_eq!(
|
.forwards_iter_block_roots(Slot::new(0))
|
||||||
beacon_chain
|
.unwrap()
|
||||||
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
|
.next()
|
||||||
.unwrap(),
|
.unwrap()
|
||||||
None
|
.unwrap(),
|
||||||
);
|
(wss_block_root, Slot::new(0))
|
||||||
|
);
|
||||||
// Simulate querying the API for a historic state that is unknown. It should also return
|
|
||||||
// `None` rather than erroring.
|
|
||||||
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);
|
|
||||||
|
|
||||||
// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
|
|
||||||
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
|
|
||||||
.iter()
|
|
||||||
.filter(|s| s.beacon_block.slot() != 0)
|
|
||||||
.map(|s| s.beacon_block.clone())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let mut available_blocks = vec![];
|
|
||||||
for blinded in historical_blocks {
|
|
||||||
let block_root = blinded.canonical_root();
|
|
||||||
let full_block = harness
|
|
||||||
.chain
|
|
||||||
.get_block(&block_root)
|
|
||||||
.await
|
|
||||||
.expect("should get block")
|
|
||||||
.expect("should get block");
|
|
||||||
|
|
||||||
if let MaybeAvailableBlock::Available(block) = harness
|
|
||||||
.chain
|
|
||||||
.data_availability_checker
|
|
||||||
.verify_kzg_for_rpc_block(
|
|
||||||
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
|
|
||||||
)
|
|
||||||
.expect("should verify kzg")
|
|
||||||
{
|
|
||||||
available_blocks.push(block);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
|
// The checks in this block only make sense if some data is missing as a result of the
|
||||||
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
|
// checkpoint sync, i.e. if we are not just checkpoint syncing from genesis.
|
||||||
let mut batch_with_invalid_first_block =
|
if checkpoint_slot != 0 {
|
||||||
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
|
||||||
batch_with_invalid_first_block[0] = {
|
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
|
||||||
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
|
// return `None` rather than erroring.
|
||||||
let mut corrupt_block = (*block).clone();
|
assert_eq!(
|
||||||
*corrupt_block.signature_mut() = Signature::empty();
|
beacon_chain
|
||||||
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec))
|
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
|
||||||
};
|
.unwrap(),
|
||||||
|
None
|
||||||
|
);
|
||||||
|
|
||||||
// Importing the invalid batch should error.
|
// Simulate querying the API for a historic state that is unknown. It should also return
|
||||||
assert!(matches!(
|
// `None` rather than erroring.
|
||||||
|
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);
|
||||||
|
|
||||||
|
// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
|
||||||
|
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
|
||||||
|
.iter()
|
||||||
|
.filter(|s| s.beacon_block.slot() != 0)
|
||||||
|
.map(|s| s.beacon_block.clone())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let mut available_blocks = vec![];
|
||||||
|
for blinded in historical_blocks {
|
||||||
|
let block_root = blinded.canonical_root();
|
||||||
|
let full_block = harness
|
||||||
|
.chain
|
||||||
|
.get_block(&block_root)
|
||||||
|
.await
|
||||||
|
.expect("should get block")
|
||||||
|
.expect("should get block");
|
||||||
|
|
||||||
|
if let MaybeAvailableBlock::Available(block) = harness
|
||||||
|
.chain
|
||||||
|
.data_availability_checker
|
||||||
|
.verify_kzg_for_rpc_block(
|
||||||
|
harness
|
||||||
|
.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
|
||||||
|
)
|
||||||
|
.expect("should verify kzg")
|
||||||
|
{
|
||||||
|
available_blocks.push(block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
|
||||||
|
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
|
||||||
|
let mut batch_with_invalid_first_block =
|
||||||
|
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
||||||
|
batch_with_invalid_first_block[0] = {
|
||||||
|
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
|
||||||
|
let mut corrupt_block = (*block).clone();
|
||||||
|
*corrupt_block.signature_mut() = Signature::empty();
|
||||||
|
AvailableBlock::__new_for_testing(
|
||||||
|
block_root,
|
||||||
|
Arc::new(corrupt_block),
|
||||||
|
data,
|
||||||
|
Arc::new(spec),
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Importing the invalid batch should error.
|
||||||
|
assert!(matches!(
|
||||||
|
beacon_chain
|
||||||
|
.import_historical_block_batch(batch_with_invalid_first_block)
|
||||||
|
.unwrap_err(),
|
||||||
|
HistoricalBlockError::InvalidSignature
|
||||||
|
));
|
||||||
|
|
||||||
|
let available_blocks_slots = available_blocks
|
||||||
|
.iter()
|
||||||
|
.map(|block| (block.block().slot(), block.block().canonical_root()))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
info!(
|
||||||
|
?available_blocks_slots,
|
||||||
|
"wss_block_slot" = wss_block.slot().as_usize(),
|
||||||
|
"Importing historical block batch"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Importing the batch with valid signatures should succeed.
|
||||||
|
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
||||||
|
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
|
||||||
beacon_chain
|
beacon_chain
|
||||||
.import_historical_block_batch(batch_with_invalid_first_block)
|
.import_historical_block_batch(available_blocks_dup)
|
||||||
.unwrap_err(),
|
.unwrap();
|
||||||
HistoricalBlockError::InvalidSignature
|
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
|
||||||
));
|
|
||||||
|
|
||||||
let available_blocks_slots = available_blocks
|
// Resupplying the blocks should not fail, they can be safely ignored.
|
||||||
.iter()
|
beacon_chain
|
||||||
.map(|block| (block.block().slot(), block.block().canonical_root()))
|
.import_historical_block_batch(available_blocks)
|
||||||
.collect::<Vec<_>>();
|
.unwrap();
|
||||||
info!(
|
}
|
||||||
?available_blocks_slots,
|
|
||||||
"wss_block_slot" = wss_block.slot().as_usize(),
|
|
||||||
"Importing historical block batch"
|
|
||||||
);
|
|
||||||
|
|
||||||
// Importing the batch with valid signatures should succeed.
|
|
||||||
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
|
|
||||||
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
|
|
||||||
beacon_chain
|
|
||||||
.import_historical_block_batch(available_blocks_dup)
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
|
|
||||||
|
|
||||||
// Resupplying the blocks should not fail, they can be safely ignored.
|
|
||||||
beacon_chain
|
|
||||||
.import_historical_block_batch(available_blocks)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
|
// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
|
||||||
if wss_block_slot != wss_state_slot {
|
if wss_block_slot != wss_state_slot {
|
||||||
@@ -2615,7 +2657,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
|
|||||||
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
|
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
store.get_anchor_info().state_upper_limit,
|
store.get_anchor_info().state_upper_limit,
|
||||||
Slot::new(u64::MAX)
|
if checkpoint_slot == 0 {
|
||||||
|
Slot::new(0)
|
||||||
|
} else {
|
||||||
|
Slot::new(u64::MAX)
|
||||||
|
}
|
||||||
);
|
);
|
||||||
info!(anchor = ?store.get_anchor_info(), "anchor pre");
|
info!(anchor = ?store.get_anchor_info(), "anchor pre");
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,12 @@ where
|
|||||||
let lower_limit_slot = anchor.state_lower_limit;
|
let lower_limit_slot = anchor.state_lower_limit;
|
||||||
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);
|
let upper_limit_slot = std::cmp::min(split.slot, anchor.state_upper_limit);
|
||||||
|
|
||||||
|
// If the split is at 0 we can't reconstruct historic states.
|
||||||
|
if split.slot == 0 {
|
||||||
|
debug!("No state reconstruction possible");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
|
// If `num_blocks` is not specified iterate all blocks. Add 1 so that we end on an epoch
|
||||||
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
|
// boundary when `num_blocks` is a multiple of an epoch boundary. We want to be *inclusive*
|
||||||
// of the state at slot `lower_limit_slot + num_blocks`.
|
// of the state at slot `lower_limit_slot + num_blocks`.
|
||||||
|
|||||||
Reference in New Issue
Block a user