Files
lighthouse/beacon_node/beacon_chain/tests/store_tests.rs
Pawan Dhananjay 11bcccb353 Remove all prod eth1 related code (#7133)
N/A


  After the electra fork which includes EIP 6110, the beacon node no longer needs the eth1 bridging mechanism to include new deposits as they are provided by the EL as a `deposit_request`. So after electra + a transition period where the finalized bridge deposits pre-fork are included through the old mechanism, we no longer need the elaborate machinery we had to get deposit contract data from the execution layer.

Since holesky has already forked to electra and completed the transition period, this PR basically checks to see if removing all the eth1 related logic leads to any surprises.
2025-06-23 03:00:07 +00:00

3954 lines
133 KiB
Rust

#![cfg(not(debug_assertions))]
use beacon_chain::attestation_verification::Error as AttnError;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::builder::BeaconChainBuilder;
use beacon_chain::data_availability_checker::AvailableBlock;
use beacon_chain::schema_change::migrate_schema;
use beacon_chain::test_utils::SyncCommitteeStrategy;
use beacon_chain::test_utils::{
get_kzg, mock_execution_layer_from_parts, test_spec, AttestationStrategy, BeaconChainHarness,
BlockStrategy, DiskHarnessType,
};
use beacon_chain::{
data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError,
migrate::MigratorConfig, BeaconChain, BeaconChainError, BeaconChainTypes, BeaconSnapshot,
BlockError, ChainConfig, NotifyExecutionLayer, ServerSentEventHandler, WhenSlotSkipped,
};
use logging::create_test_tracing_subscriber;
use maplit::hashset;
use rand::rngs::StdRng;
use rand::Rng;
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::{state_advance::complete_state_advance, BlockReplayer};
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryInto;
use std::str::FromStr;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use store::database::interface::BeaconNodeBackend;
use store::metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION, STATE_UPPER_LIMIT_NO_RETAIN};
use store::{
hdiff::HierarchyConfig,
iter::{BlockRootsIterator, StateRootsIterator},
BlobInfo, DBColumn, HotColdDB, StoreConfig,
};
use tempfile::{tempdir, TempDir};
use tracing::info;
use types::test_utils::{SeedableRng, XorShiftRng};
use types::*;
// Should ideally be divisible by 3.
pub const LOW_VALIDATOR_COUNT: usize = 24;
pub const HIGH_VALIDATOR_COUNT: usize = 64;
// When set to true, cache any states fetched from the db.
pub const CACHE_STATE_IN_TESTS: bool = true;
/// A cached set of keys.
static KEYPAIRS: LazyLock<Vec<Keypair>> =
LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(HIGH_VALIDATOR_COUNT));
type E = MinimalEthSpec;
type TestHarness = BeaconChainHarness<DiskHarnessType<E>>;
fn get_store(db_path: &TempDir) -> Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>> {
let store_config = StoreConfig {
prune_payloads: false,
..StoreConfig::default()
};
get_store_generic(db_path, store_config, test_spec::<E>())
}
fn get_store_generic(
db_path: &TempDir,
config: StoreConfig,
spec: ChainSpec,
) -> Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>> {
create_test_tracing_subscriber();
let hot_path = db_path.path().join("chain_db");
let cold_path = db_path.path().join("freezer_db");
let blobs_path = db_path.path().join("blobs_db");
HotColdDB::open(
&hot_path,
&cold_path,
&blobs_path,
|_, _, _| Ok(()),
config,
spec.into(),
)
.expect("disk store should initialize")
}
fn get_harness(
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
validator_count: usize,
) -> TestHarness {
// Most tests expect to retain historic states, so we use this as the default.
let chain_config = ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
};
get_harness_generic(store, validator_count, chain_config, false)
}
fn get_harness_import_all_data_columns(
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
validator_count: usize,
) -> TestHarness {
// Most tests expect to retain historic states, so we use this as the default.
let chain_config = ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
};
get_harness_generic(store, validator_count, chain_config, true)
}
fn get_harness_generic(
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
validator_count: usize,
chain_config: ChainConfig,
import_all_data_columns: bool,
) -> TestHarness {
let harness = TestHarness::builder(MinimalEthSpec)
.spec(store.get_chain_spec().clone())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store)
.mock_execution_layer()
.chain_config(chain_config)
.import_all_data_columns(import_all_data_columns)
.build();
harness.advance_slot();
harness
}
fn get_states_descendant_of_block(
store: &HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>,
block_root: Hash256,
) -> Vec<(Hash256, Slot)> {
let summaries = store.load_hot_state_summaries().unwrap();
summaries
.iter()
.filter(|(_, s)| s.latest_block_root == block_root)
.map(|(state_root, summary)| (*state_root, summary.slot))
.collect()
}
#[tokio::test]
async fn light_client_bootstrap_test() {
let spec = test_spec::<E>();
let Some(_) = spec.altair_fork_epoch else {
// No-op prior to Altair.
return;
};
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, StoreConfig::default(), spec.clone());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let num_initial_slots = E::slots_per_epoch() * 7;
let slots: Vec<Slot> = (1..num_initial_slots).map(Slot::new).collect();
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
harness
.add_attested_blocks_at_slots_with_lc_data(
genesis_state.clone(),
genesis_state_root,
&slots,
&all_validators,
None,
SyncCommitteeStrategy::NoValidators,
)
.await;
let finalized_checkpoint = harness
.chain
.canonical_head
.cached_head()
.finalized_checkpoint();
let block_root = finalized_checkpoint.root;
let (lc_bootstrap, _) = harness
.chain
.get_light_client_bootstrap(&block_root)
.unwrap()
.unwrap();
let bootstrap_slot = match lc_bootstrap {
LightClientBootstrap::Altair(lc_bootstrap) => lc_bootstrap.header.beacon.slot,
LightClientBootstrap::Capella(lc_bootstrap) => lc_bootstrap.header.beacon.slot,
LightClientBootstrap::Deneb(lc_bootstrap) => lc_bootstrap.header.beacon.slot,
LightClientBootstrap::Electra(lc_bootstrap) => lc_bootstrap.header.beacon.slot,
LightClientBootstrap::Fulu(lc_bootstrap) => lc_bootstrap.header.beacon.slot,
};
assert_eq!(
bootstrap_slot.epoch(E::slots_per_epoch()),
finalized_checkpoint.epoch
);
}
#[tokio::test]
async fn light_client_updates_test() {
let spec = test_spec::<E>();
let Some(_) = spec.altair_fork_epoch else {
// No-op prior to Altair.
return;
};
let num_final_blocks = E::slots_per_epoch() * 2;
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::<E>());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let num_initial_slots = E::slots_per_epoch() * 10;
let slots: Vec<Slot> = (1..num_initial_slots).map(Slot::new).collect();
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
harness
.add_attested_blocks_at_slots(
genesis_state.clone(),
genesis_state_root,
&slots,
&all_validators,
)
.await;
harness.advance_slot();
harness
.extend_chain_with_light_client_data(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let current_state = harness.get_current_state();
// calculate the sync period from the previous slot
let sync_period = (current_state.slot() - Slot::new(1))
.epoch(E::slots_per_epoch())
.sync_committee_period(&spec)
.unwrap();
// fetch a range of light client updates. right now there should only be one light client update
// in the db.
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();
assert_eq!(lc_updates.len(), 1);
// Advance to the next sync committee period
for _i in 0..(E::slots_per_epoch() * u64::from(spec.epochs_per_sync_committee_period)) {
harness.advance_slot();
}
harness
.extend_chain_with_light_client_data(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// we should now have two light client updates in the db
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();
assert_eq!(lc_updates.len(), 2);
}
#[tokio::test]
async fn full_participation_no_skips() {
let num_blocks_produced = E::slots_per_epoch() * 5;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store);
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
}
#[tokio::test]
async fn randomised_skips() {
let num_slots = E::slots_per_epoch() * 5;
let mut num_blocks_produced = 0;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let rng = &mut XorShiftRng::from_seed([42; 16]);
let mut head_slot = 0;
for slot in 1..=num_slots {
if rng.gen_bool(0.8) {
harness
.extend_chain(
1,
BlockStrategy::ForkCanonicalChainAt {
previous_slot: Slot::new(head_slot),
first_slot: Slot::new(slot),
},
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
num_blocks_produced += 1;
head_slot = slot;
} else {
harness.advance_slot();
}
}
let state = &harness.chain.head_snapshot().beacon_state;
assert_eq!(
state.slot(),
num_slots,
"head should be at the current slot"
);
check_split_slot(&harness, store.clone());
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
}
#[tokio::test]
async fn long_skip() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Number of blocks to create in the first run, intentionally not falling on an epoch
// boundary in order to check that the DB hot -> cold migration is capable of reaching
// back across the skip distance, and correctly migrating those extra non-finalized states.
let initial_blocks = E::slots_per_epoch() * 5 + E::slots_per_epoch() / 2;
let skip_slots = E::slots_per_historical_root() as u64 * 8;
// Create the minimum ~2.5 epochs of extra blocks required to re-finalize the chain.
// Having this set lower ensures that we start justifying and finalizing quickly after a skip.
let final_blocks = 2 * E::slots_per_epoch() + E::slots_per_epoch() / 2;
harness
.extend_chain(
initial_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
check_finalization(&harness, initial_blocks);
// 2. Skip slots
for _ in 0..skip_slots {
harness.advance_slot();
}
// 3. Produce more blocks, establish a new finalized epoch
harness
.extend_chain(
final_blocks as usize,
BlockStrategy::ForkCanonicalChainAt {
previous_slot: Slot::new(initial_blocks),
first_slot: Slot::new(initial_blocks + skip_slots + 1),
},
AttestationStrategy::AllValidators,
)
.await;
check_finalization(&harness, initial_blocks + skip_slots + final_blocks);
check_split_slot(&harness, store);
check_chain_dump(&harness, initial_blocks + final_blocks + 1);
check_iterators(&harness);
}
/// Go forward to the point where the genesis randao value is no longer part of the vector.
///
/// This implicitly checks that:
/// 1. The chunked vector scheme doesn't attempt to store an incorrect genesis value
/// 2. We correctly load the genesis value for all required slots
/// NOTE: this test takes about a minute to run
#[tokio::test]
async fn randao_genesis_storage() {
let validator_count = 8;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), validator_count);
let num_slots = E::slots_per_epoch() * (E::epochs_per_historical_vector() - 1) as u64;
// Check we have a non-trivial genesis value
let genesis_value = *harness
.chain
.head_snapshot()
.beacon_state
.get_randao_mix(Epoch::new(0))
.expect("randao mix ok");
assert!(!genesis_value.is_zero());
harness
.extend_chain(
num_slots as usize - 1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Check that genesis value is still present
assert!(harness
.chain
.head_snapshot()
.beacon_state
.randao_mixes()
.iter()
.any(|x| *x == genesis_value));
// Then upon adding one more block, it isn't
harness.advance_slot();
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
assert!(!harness
.chain
.head_snapshot()
.beacon_state
.randao_mixes()
.iter()
.any(|x| *x == genesis_value));
check_finalization(&harness, num_slots);
check_split_slot(&harness, store);
check_chain_dump(&harness, num_slots + 1);
check_iterators(&harness);
}
// Check that closing and reopening a freezer DB restores the split slot to its correct value.
#[tokio::test]
async fn split_slot_restore() {
let db_path = tempdir().unwrap();
let split_slot = {
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let num_blocks = 4 * E::slots_per_epoch();
harness
.extend_chain(
num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
store.get_split_slot()
};
assert_ne!(split_slot, Slot::new(0));
// Re-open the store
let store = get_store(&db_path);
assert_eq!(store.get_split_slot(), split_slot);
}
// Check attestation processing and `load_epoch_boundary_state` in the presence of a split DB.
// This is a bit of a monster test in that it tests lots of different things, but until they're
// tested elsewhere, this is as good a place as any.
#[tokio::test]
async fn epoch_boundary_state_attestation_processing() {
let num_blocks_produced = E::slots_per_epoch() * 5;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let late_validators = vec![0, 1];
let timely_validators = (2..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let mut late_attestations = vec![];
for _ in 0..num_blocks_produced {
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(timely_validators.clone()),
)
.await;
let head = harness.chain.head_snapshot();
late_attestations.extend(harness.get_single_attestations(
&AttestationStrategy::SomeValidators(late_validators.clone()),
&head.beacon_state,
head.beacon_state_root(),
head.beacon_block_root,
head.beacon_block.slot(),
));
harness.advance_slot();
}
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
check_chain_dump(&harness, num_blocks_produced + 1);
check_iterators(&harness);
let mut checked_pre_fin = false;
for (attestation, subnet_id) in late_attestations.into_iter().flatten() {
// load_epoch_boundary_state is idempotent!
let block_root = attestation.data.beacon_block_root;
let block = store
.get_blinded_block(&block_root)
.unwrap()
.expect("block exists");
// Use get_state as the state may be finalized by this point
let mut epoch_boundary_state = store
.get_state(&block.state_root(), None, CACHE_STATE_IN_TESTS)
.expect("no error")
.unwrap_or_else(|| {
panic!("epoch boundary state should exist {:?}", block.state_root())
});
let ebs_state_root = epoch_boundary_state.update_tree_hash_cache().unwrap();
let mut ebs_of_ebs = store
.get_state(&ebs_state_root, None, CACHE_STATE_IN_TESTS)
.expect("no error")
.unwrap_or_else(|| panic!("ebs of ebs should exist {ebs_state_root:?}"));
ebs_of_ebs.apply_pending_mutations().unwrap();
assert_eq!(epoch_boundary_state, ebs_of_ebs);
// If the attestation is pre-finalization it should be rejected.
let finalized_epoch = harness.finalized_checkpoint().epoch;
let res = harness
.chain
.verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id));
let current_slot = harness.chain.slot().expect("should get slot");
let expected_attestation_slot = attestation.data.slot;
// Extra -1 to handle gossip clock disparity.
let expected_earliest_permissible_slot = current_slot - E::slots_per_epoch() - 1;
if expected_attestation_slot <= finalized_epoch.start_slot(E::slots_per_epoch())
|| expected_attestation_slot < expected_earliest_permissible_slot
{
checked_pre_fin = true;
assert!(matches!(
res.err().unwrap(),
AttnError::PastSlot {
attestation_slot,
earliest_permissible_slot,
}
if attestation_slot == expected_attestation_slot && earliest_permissible_slot == expected_earliest_permissible_slot
));
} else {
res.expect("should have verified attetation");
}
}
assert!(checked_pre_fin);
}
// Test that the `end_slot` for forwards block and state root iterators works correctly.
#[tokio::test]
async fn forwards_iter_block_and_state_roots_until() {
let num_blocks_produced = E::slots_per_epoch() * 17;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = &harness.get_all_validators();
let (mut head_state, mut head_state_root) = harness.get_current_state_and_root();
let head_block_root = harness.head_block_root();
let mut block_roots = vec![head_block_root];
let mut state_roots = vec![head_state_root];
for slot in (1..=num_blocks_produced).map(Slot::from) {
let (block_root, mut state) = harness
.add_attested_block_at_slot(slot, head_state, head_state_root, all_validators)
.await
.unwrap();
head_state_root = state.update_tree_hash_cache().unwrap();
head_state = state;
block_roots.push(block_root.into());
state_roots.push(head_state_root);
}
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
// The freezer upper bound slot is the point at which the hybrid forwards iterator behaviour
// changes.
let block_upper_bound = store
.freezer_upper_bound_for_column(DBColumn::BeaconBlockRoots, Slot::new(0))
.unwrap()
.unwrap();
assert!(block_upper_bound > 0);
let state_upper_bound = store
.freezer_upper_bound_for_column(DBColumn::BeaconStateRoots, Slot::new(0))
.unwrap()
.unwrap();
assert!(state_upper_bound > 0);
assert_eq!(state_upper_bound, block_upper_bound);
let chain = &harness.chain;
let head_state = harness.get_current_state();
let head_slot = head_state.slot();
assert_eq!(head_slot, num_blocks_produced);
let test_range = |start_slot: Slot, end_slot: Slot| {
let mut block_root_iter = chain
.forwards_iter_block_roots_until(start_slot, end_slot)
.unwrap();
let mut state_root_iter = chain
.forwards_iter_state_roots_until(start_slot, end_slot)
.unwrap();
for slot in (start_slot.as_u64()..=end_slot.as_u64()).map(Slot::new) {
let block_root = block_roots[slot.as_usize()];
assert_eq!(block_root_iter.next().unwrap().unwrap(), (block_root, slot));
let state_root = state_roots[slot.as_usize()];
assert_eq!(state_root_iter.next().unwrap().unwrap(), (state_root, slot));
}
};
let split_slot = store.get_split_slot();
assert_eq!(split_slot, block_upper_bound);
test_range(Slot::new(0), split_slot);
test_range(split_slot, split_slot);
test_range(split_slot - 1, split_slot);
test_range(Slot::new(0), split_slot - 1);
test_range(Slot::new(0), head_state.slot());
}
#[tokio::test]
async fn block_replayer_hooks() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let chain = &harness.chain;
let block_slots = vec![1, 3, 5, 10, 11, 12, 13, 14, 31, 32, 33]
.into_iter()
.map(Slot::new)
.collect::<Vec<_>>();
let max_slot = *block_slots.last().unwrap();
let all_slots = (0..=max_slot.as_u64()).map(Slot::new).collect::<Vec<_>>();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
let (_, _, end_block_root, mut end_state) = harness
.add_attested_blocks_at_slots(state.clone(), state_root, &block_slots, &all_validators)
.await;
let blocks = store
.load_blocks_to_replay(Slot::new(0), max_slot, end_block_root.into())
.unwrap();
let mut pre_slots = vec![];
let mut post_slots = vec![];
let mut pre_block_slots = vec![];
let mut post_block_slots = vec![];
let mut replay_state = BlockReplayer::<MinimalEthSpec>::new(state, &chain.spec)
.pre_slot_hook(Box::new(|_, state| {
pre_slots.push(state.slot());
Ok(())
}))
.post_slot_hook(Box::new(|state, epoch_summary, is_skip_slot| {
if is_skip_slot {
assert!(!block_slots.contains(&state.slot()));
} else {
assert!(block_slots.contains(&state.slot()));
}
if state.slot() % E::slots_per_epoch() == 0 {
assert!(epoch_summary.is_some());
}
post_slots.push(state.slot());
Ok(())
}))
.pre_block_hook(Box::new(|state, block| {
assert_eq!(state.slot(), block.slot());
pre_block_slots.push(block.slot());
Ok(())
}))
.post_block_hook(Box::new(|state, block| {
assert_eq!(state.slot(), block.slot());
post_block_slots.push(block.slot());
Ok(())
}))
.apply_blocks(blocks, None)
.unwrap()
.into_state();
// All but last slot seen by pre-slot hook
assert_eq!(&pre_slots, all_slots.split_last().unwrap().1);
// All but 0th slot seen by post-slot hook
assert_eq!(&post_slots, all_slots.split_first().unwrap().1);
// All blocks seen by both hooks
assert_eq!(pre_block_slots, block_slots);
assert_eq!(post_block_slots, block_slots);
// States match.
end_state.apply_pending_mutations().unwrap();
replay_state.apply_pending_mutations().unwrap();
end_state.drop_all_caches().unwrap();
replay_state.drop_all_caches().unwrap();
assert_eq!(end_state, replay_state);
}
#[tokio::test]
async fn delete_blocks_and_states() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let unforked_blocks: u64 = 4 * E::slots_per_epoch();
// Finalize an initial portion of the chain.
let initial_slots: Vec<Slot> = (1..=unforked_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;
// Create a fork post-finalization.
let two_thirds = (LOW_VALIDATOR_COUNT / 3) * 2;
let honest_validators: Vec<usize> = (0..two_thirds).collect();
let faulty_validators: Vec<usize> = (two_thirds..LOW_VALIDATOR_COUNT).collect();
let fork_blocks = 2 * E::slots_per_epoch();
let slot_u64: u64 = harness.get_current_slot().as_u64() + 1;
let fork1_slots: Vec<Slot> = (slot_u64..(slot_u64 + fork_blocks))
.map(Into::into)
.collect();
let fork2_slots: Vec<Slot> = (slot_u64 + 1..(slot_u64 + 1 + fork_blocks))
.map(Into::into)
.collect();
let fork1_state = harness.get_current_state();
let fork2_state = fork1_state.clone();
let results = harness
.add_blocks_on_multiple_chains(vec![
(fork1_state, fork1_slots, honest_validators),
(fork2_state, fork2_slots, faulty_validators),
])
.await;
let honest_head = results[0].2;
let faulty_head = results[1].2;
assert_ne!(honest_head, faulty_head, "forks should be distinct");
assert_eq!(harness.head_slot(), unforked_blocks + fork_blocks);
assert_eq!(
harness.head_block_root(),
Hash256::from(honest_head),
"the honest chain should be the canonical chain",
);
let faulty_head_block = store
.get_blinded_block(&faulty_head.into())
.expect("no errors")
.expect("faulty head block exists");
let faulty_head_state = store
.get_state(
&faulty_head_block.state_root(),
Some(faulty_head_block.slot()),
CACHE_STATE_IN_TESTS,
)
.expect("no db error")
.expect("faulty head state exists");
// Delete faulty fork
// Attempting to load those states should find them unavailable
for (state_root, slot) in
StateRootsIterator::new(&store, &faulty_head_state).map(Result::unwrap)
{
if slot <= unforked_blocks {
break;
}
store.delete_state(&state_root, slot).unwrap();
assert_eq!(
store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap(),
None
);
}
// Double-deleting should also be OK (deleting non-existent things is fine)
for (state_root, slot) in
StateRootsIterator::new(&store, &faulty_head_state).map(Result::unwrap)
{
if slot <= unforked_blocks {
break;
}
store.delete_state(&state_root, slot).unwrap();
}
// Deleting the blocks from the fork should remove them completely
for (block_root, slot) in
BlockRootsIterator::new(&store, &faulty_head_state).map(Result::unwrap)
{
if slot <= unforked_blocks + 1 {
break;
}
store.delete_block(&block_root).unwrap();
assert_eq!(store.get_blinded_block(&block_root).unwrap(), None);
}
// Deleting frozen states should do nothing
let split_slot = store.get_split_slot();
let finalized_states = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.expect("should get iter")
.map(Result::unwrap);
for (state_root, slot) in finalized_states {
if slot < split_slot {
store.delete_state(&state_root, slot).unwrap();
}
}
// After all that, the chain dump should still be OK
check_chain_dump(&harness, unforked_blocks + fork_blocks + 1);
}
// Check that we never produce invalid blocks when there is deep forking that changes the shuffling.
// See https://github.com/sigp/lighthouse/issues/845
async fn multi_epoch_fork_valid_blocks_test(
initial_blocks: usize,
num_fork1_blocks_: usize,
num_fork2_blocks_: usize,
num_fork1_validators: usize,
) -> (TempDir, TestHarness, Hash256, Hash256) {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let validators_keypairs =
types::test_utils::generate_deterministic_keypairs(LOW_VALIDATOR_COUNT);
let harness = TestHarness::builder(MinimalEthSpec)
.default_spec()
.keypairs(validators_keypairs)
.fresh_disk_store(store)
.mock_execution_layer()
.build();
let num_fork1_blocks: u64 = num_fork1_blocks_.try_into().unwrap();
let num_fork2_blocks: u64 = num_fork2_blocks_.try_into().unwrap();
// Create the initial portion of the chain
if initial_blocks > 0 {
let initial_slots: Vec<Slot> = (1..=initial_blocks).map(Into::into).collect();
let (state, state_root) = harness.get_current_state_and_root();
let all_validators = harness.get_all_validators();
harness
.add_attested_blocks_at_slots(state, state_root, &initial_slots, &all_validators)
.await;
}
assert!(num_fork1_validators <= LOW_VALIDATOR_COUNT);
let fork1_validators: Vec<usize> = (0..num_fork1_validators).collect();
let fork2_validators: Vec<usize> = (num_fork1_validators..LOW_VALIDATOR_COUNT).collect();
let fork1_state = harness.get_current_state();
let fork2_state = fork1_state.clone();
let slot_u64: u64 = harness.get_current_slot().as_u64() + 1;
let fork1_slots: Vec<Slot> = (slot_u64..(slot_u64 + num_fork1_blocks))
.map(Into::into)
.collect();
let fork2_slots: Vec<Slot> = (slot_u64 + 1..(slot_u64 + 1 + num_fork2_blocks))
.map(Into::into)
.collect();
let results = harness
.add_blocks_on_multiple_chains(vec![
(fork1_state, fork1_slots, fork1_validators),
(fork2_state, fork2_slots, fork2_validators),
])
.await;
let head1 = results[0].2;
let head2 = results[1].2;
(db_path, harness, head1.into(), head2.into())
}
// This is the minimal test of block production with different shufflings.
#[tokio::test]
async fn block_production_different_shuffling_early() {
let slots_per_epoch = E::slots_per_epoch() as usize;
multi_epoch_fork_valid_blocks_test(
slots_per_epoch - 2,
slots_per_epoch + 3,
slots_per_epoch + 3,
LOW_VALIDATOR_COUNT / 2,
)
.await;
}
#[tokio::test]
async fn block_production_different_shuffling_long() {
let slots_per_epoch = E::slots_per_epoch() as usize;
multi_epoch_fork_valid_blocks_test(
2 * slots_per_epoch - 2,
3 * slots_per_epoch,
3 * slots_per_epoch,
LOW_VALIDATOR_COUNT / 2,
)
.await;
}
// Check that the op pool safely includes multiple attestations per block when necessary.
// This checks the correctness of the shuffling compatibility memoization.
#[tokio::test]
async fn multiple_attestations_per_block() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store, HIGH_VALIDATOR_COUNT);
harness
.extend_chain(
E::slots_per_epoch() as usize * 3,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let head = harness.chain.head_snapshot();
let committees_per_slot = head
.beacon_state
.get_committee_count_at_slot(head.beacon_state.slot())
.unwrap();
assert!(committees_per_slot > 1);
for snapshot in harness.chain.chain_dump().unwrap() {
let slot = snapshot.beacon_block.slot();
let fork_name = harness.chain.spec.fork_name_at_slot::<E>(slot);
if fork_name.electra_enabled() {
assert_eq!(
snapshot
.beacon_block
.as_ref()
.message()
.body()
.attestations_len() as u64,
if slot <= 1 { 0 } else { 1 }
);
} else {
assert_eq!(
snapshot
.beacon_block
.as_ref()
.message()
.body()
.attestations_len() as u64,
if slot <= 1 { 0 } else { committees_per_slot }
);
}
}
}
#[tokio::test]
async fn shuffling_compatible_linear_chain() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let head_block_root = harness
.extend_chain(
4 * E::slots_per_epoch() as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
check_shuffling_compatible(
&harness,
&get_state_for_block(&harness, head_block_root),
head_block_root,
);
}
#[tokio::test]
async fn shuffling_compatible_missing_pivot_block() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Skip the block at the end of the first epoch.
harness
.extend_chain(
E::slots_per_epoch() as usize - 2,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
harness.advance_slot();
let head_block_root = harness
.extend_chain(
2 * E::slots_per_epoch() as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
check_shuffling_compatible(
&harness,
&get_state_for_block(&harness, head_block_root),
head_block_root,
);
}
#[tokio::test]
async fn shuffling_compatible_simple_fork() {
let slots_per_epoch = E::slots_per_epoch() as usize;
let (db_path, harness, head1, head2) = multi_epoch_fork_valid_blocks_test(
2 * slots_per_epoch,
3 * slots_per_epoch,
3 * slots_per_epoch,
LOW_VALIDATOR_COUNT / 2,
)
.await;
let head1_state = get_state_for_block(&harness, head1);
let head2_state = get_state_for_block(&harness, head2);
check_shuffling_compatible(&harness, &head1_state, head1);
check_shuffling_compatible(&harness, &head1_state, head2);
check_shuffling_compatible(&harness, &head2_state, head1);
check_shuffling_compatible(&harness, &head2_state, head2);
drop(db_path);
}
#[tokio::test]
async fn shuffling_compatible_short_fork() {
let slots_per_epoch = E::slots_per_epoch() as usize;
let (db_path, harness, head1, head2) = multi_epoch_fork_valid_blocks_test(
2 * slots_per_epoch - 2,
slots_per_epoch + 2,
slots_per_epoch + 2,
LOW_VALIDATOR_COUNT / 2,
)
.await;
let head1_state = get_state_for_block(&harness, head1);
let head2_state = get_state_for_block(&harness, head2);
check_shuffling_compatible(&harness, &head1_state, head1);
check_shuffling_compatible(&harness, &head1_state, head2);
check_shuffling_compatible(&harness, &head2_state, head1);
check_shuffling_compatible(&harness, &head2_state, head2);
drop(db_path);
}
fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconState<E> {
let head_block = harness
.chain
.store
.get_blinded_block(&block_root)
.unwrap()
.unwrap();
harness
.chain
.get_state(
&head_block.state_root(),
Some(head_block.slot()),
CACHE_STATE_IN_TESTS,
)
.unwrap()
.unwrap()
}
/// Check the invariants that apply to `shuffling_is_compatible`.
fn check_shuffling_compatible(
harness: &TestHarness,
head_state: &BeaconState<E>,
head_block_root: Hash256,
) {
for maybe_tuple in harness
.chain
.rev_iter_block_roots_from(head_block_root)
.unwrap()
{
let (block_root, slot) = maybe_tuple.unwrap();
// Would an attestation to `block_root` at the current epoch be compatible with the head
// state's shuffling?
let current_epoch_shuffling_is_compatible = harness.chain.shuffling_is_compatible(
&block_root,
head_state.current_epoch(),
head_state,
);
// Check for consistency with the more expensive shuffling lookup.
harness
.chain
.with_committee_cache(
block_root,
head_state.current_epoch(),
|committee_cache, _| {
let state_cache = head_state.committee_cache(RelativeEpoch::Current).unwrap();
// We used to check for false negatives here, but had to remove that check
// because `shuffling_is_compatible` does not guarantee their absence.
//
// See: https://github.com/sigp/lighthouse/issues/6269
if current_epoch_shuffling_is_compatible {
assert_eq!(
committee_cache,
state_cache.as_ref(),
"block at slot {slot}"
);
}
Ok(())
},
)
.unwrap_or_else(|e| {
// If the lookup fails then the shuffling must be invalid in some way, e.g. the
// block with `block_root` is from a later epoch than `previous_epoch`.
assert!(
!current_epoch_shuffling_is_compatible,
"block at slot {slot} has compatible shuffling at epoch {} \
but should be incompatible due to error: {e:?}",
head_state.current_epoch()
);
});
// Similarly for the previous epoch
let previous_epoch_shuffling_is_compatible = harness.chain.shuffling_is_compatible(
&block_root,
head_state.previous_epoch(),
head_state,
);
harness
.chain
.with_committee_cache(
block_root,
head_state.previous_epoch(),
|committee_cache, _| {
let state_cache = head_state.committee_cache(RelativeEpoch::Previous).unwrap();
if previous_epoch_shuffling_is_compatible {
assert_eq!(committee_cache, state_cache.as_ref());
}
Ok(())
},
)
.unwrap_or_else(|e| {
// If the lookup fails then the shuffling must be invalid in some way, e.g. the
// block with `block_root` is from a later epoch than `previous_epoch`.
assert!(
!previous_epoch_shuffling_is_compatible,
"block at slot {slot} has compatible shuffling at epoch {} \
but should be incompatible due to error: {e:?}",
head_state.previous_epoch()
);
});
// Targeting two epochs before the current epoch should always return false
if head_state.current_epoch() >= 2 {
assert!(!harness.chain.shuffling_is_compatible(
&block_root,
head_state.current_epoch() - 2,
head_state
));
}
}
}
// Ensure blocks from abandoned forks are pruned from the Hot DB
#[tokio::test]
async fn prunes_abandoned_fork_between_two_finalized_checkpoints() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let slots_per_epoch = rig.slots_per_epoch();
let (mut state, state_root) = rig.get_current_state_and_root();
let canonical_chain_slots: Vec<Slot> = (1..=rig.epoch_start_slot(1)).map(Slot::new).collect();
let (canonical_chain_blocks_pre_finalization, _, _, new_state) = rig
.add_attested_blocks_at_slots(
state,
state_root,
&canonical_chain_slots,
&honest_validators,
)
.await;
state = new_state;
let canonical_chain_slot: u64 = rig.get_current_slot().into();
let stray_slots: Vec<Slot> = (canonical_chain_slot + 1..rig.epoch_start_slot(2))
.map(Slot::new)
.collect();
let (current_state, current_state_root) = rig.get_current_state_and_root();
let (stray_blocks, stray_states, stray_head, _) = rig
.add_attested_blocks_at_slots(
current_state,
current_state_root,
&stray_slots,
&adversarial_validators,
)
.await;
// Precondition: Ensure all stray_blocks blocks are still known
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
assert_eq!(rig.get_finalized_checkpoints(), hashset! {});
rig.assert_knows_head(stray_head.into());
// Trigger finalization
let finalization_slots: Vec<Slot> = ((canonical_chain_slot + 1)
..=(canonical_chain_slot + slots_per_epoch * 5))
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (canonical_chain_blocks_post_finalization, _, _, _) = rig
.add_attested_blocks_at_slots(state, state_root, &finalization_slots, &honest_validators)
.await;
// Postcondition: New blocks got finalized
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {
canonical_chain_blocks_pre_finalization[&rig.epoch_start_slot(1).into()],
canonical_chain_blocks_post_finalization[&rig.epoch_start_slot(2).into()],
},
);
// Postcondition: Ensure all stray_blocks blocks have been pruned
for &block_hash in stray_blocks.values() {
assert!(
!rig.block_exists(block_hash),
"abandoned block {block_hash:?} should have been pruned",
);
assert!(
!rig.chain.store.blobs_exist(&block_hash.into()).unwrap(),
"blobs for abandoned block {block_hash:?} should have been pruned"
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
!rig.hot_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
assert!(
!rig.cold_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
}
assert!(!rig.knows_head(&stray_head));
}
#[tokio::test]
async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let slots_per_epoch = rig.slots_per_epoch();
let (state, state_root) = rig.get_current_state_and_root();
// Fill up 0th epoch
let canonical_chain_slots_zeroth_epoch: Vec<Slot> =
(1..rig.epoch_start_slot(1)).map(Slot::new).collect();
let (_, _, _, mut state) = rig
.add_attested_blocks_at_slots(
state,
state_root,
&canonical_chain_slots_zeroth_epoch,
&honest_validators,
)
.await;
// Fill up 1st epoch
let canonical_chain_slots_first_epoch: Vec<Slot> = (rig.epoch_start_slot(1)
..=rig.epoch_start_slot(1) + 1)
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (canonical_chain_blocks_first_epoch, _, shared_head, mut state) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&canonical_chain_slots_first_epoch,
&honest_validators,
)
.await;
let canonical_chain_slot: u64 = rig.get_current_slot().into();
let stray_chain_slots_first_epoch: Vec<Slot> = (rig.epoch_start_slot(1) + 2
..=rig.epoch_start_slot(1) + 2)
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (stray_blocks, stray_states, stray_head, _) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&stray_chain_slots_first_epoch,
&adversarial_validators,
)
.await;
// Preconditions
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
let chain_dump = rig.chain.chain_dump().unwrap();
assert_eq!(
get_finalized_epoch_boundary_blocks(&chain_dump),
vec![Hash256::zero().into()].into_iter().collect(),
);
assert!(get_blocks(&chain_dump).contains(&shared_head));
// Trigger finalization
let finalization_slots: Vec<Slot> = ((canonical_chain_slot + 1)
..=(canonical_chain_slot + slots_per_epoch * 5))
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (canonical_chain_blocks, _, _, _) = rig
.add_attested_blocks_at_slots(state, state_root, &finalization_slots, &honest_validators)
.await;
// Postconditions
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {
canonical_chain_blocks_first_epoch[&rig.epoch_start_slot(1).into()],
canonical_chain_blocks[&rig.epoch_start_slot(2).into()],
},
);
for &block_hash in stray_blocks.values() {
assert!(
!rig.block_exists(block_hash),
"stray block {} should have been pruned",
block_hash,
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
!rig.hot_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
assert!(
!rig.cold_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
}
assert!(!rig.knows_head(&stray_head));
let chain_dump = rig.chain.chain_dump().unwrap();
assert!(get_blocks(&chain_dump).contains(&shared_head));
}
#[tokio::test]
async fn pruning_does_not_touch_blocks_prior_to_finalization() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let slots_per_epoch = rig.slots_per_epoch();
let (mut state, state_root) = rig.get_current_state_and_root();
// Fill up 0th epoch with canonical chain blocks
let zeroth_epoch_slots: Vec<Slot> = (1..=rig.epoch_start_slot(1)).map(Slot::new).collect();
let (canonical_chain_blocks, _, _, new_state) = rig
.add_attested_blocks_at_slots(state, state_root, &zeroth_epoch_slots, &honest_validators)
.await;
state = new_state;
let canonical_chain_slot: u64 = rig.get_current_slot().into();
// Fill up 1st epoch. Contains a fork.
let first_epoch_slots: Vec<Slot> = ((rig.epoch_start_slot(1) + 1)..(rig.epoch_start_slot(2)))
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (stray_blocks, stray_states, stray_head, _) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&first_epoch_slots,
&adversarial_validators,
)
.await;
// Preconditions
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
assert_eq!(rig.get_finalized_checkpoints(), hashset! {});
// Trigger finalization
let slots: Vec<Slot> = ((canonical_chain_slot + 1)
..=(canonical_chain_slot + slots_per_epoch * 4))
.map(Slot::new)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (_, _, _, _) = rig
.add_attested_blocks_at_slots(state, state_root, &slots, &honest_validators)
.await;
// Postconditions
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {canonical_chain_blocks[&rig.epoch_start_slot(1).into()]},
);
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
rig.assert_knows_head(stray_head.into());
}
#[tokio::test]
async fn prunes_fork_growing_past_youngest_finalized_checkpoint() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let (state, state_root) = rig.get_current_state_and_root();
// Fill up 0th epoch with canonical chain blocks
let zeroth_epoch_slots: Vec<Slot> = (1..=rig.epoch_start_slot(1)).map(Slot::new).collect();
let (canonical_blocks_zeroth_epoch, _, _, mut state) = rig
.add_attested_blocks_at_slots(state, state_root, &zeroth_epoch_slots, &honest_validators)
.await;
// Fill up 1st epoch. Contains a fork.
let slots_first_epoch: Vec<Slot> = (rig.epoch_start_slot(1) + 1..rig.epoch_start_slot(2))
.map(Into::into)
.collect();
let state_root = state.update_tree_hash_cache().unwrap();
let (stray_blocks_first_epoch, stray_states_first_epoch, _, mut stray_state) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&slots_first_epoch,
&adversarial_validators,
)
.await;
let (canonical_blocks_first_epoch, _, _, mut canonical_state) = rig
.add_attested_blocks_at_slots(state, state_root, &slots_first_epoch, &honest_validators)
.await;
// Fill up 2nd epoch. Extends both the canonical chain and the fork.
let stray_slots_second_epoch: Vec<Slot> = (rig.epoch_start_slot(2)
..=rig.epoch_start_slot(2) + 1)
.map(Into::into)
.collect();
let stray_state_root = stray_state.update_tree_hash_cache().unwrap();
let (stray_blocks_second_epoch, stray_states_second_epoch, stray_head, _) = rig
.add_attested_blocks_at_slots(
stray_state,
stray_state_root,
&stray_slots_second_epoch,
&adversarial_validators,
)
.await;
// Precondition: Ensure all stray_blocks blocks are still known
let stray_blocks: HashMap<Slot, SignedBeaconBlockHash> = stray_blocks_first_epoch
.into_iter()
.chain(stray_blocks_second_epoch.into_iter())
.collect();
let stray_states: HashMap<Slot, BeaconStateHash> = stray_states_first_epoch
.into_iter()
.chain(stray_states_second_epoch.into_iter())
.collect();
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
// Precondition: Nothing is finalized yet
assert_eq!(rig.get_finalized_checkpoints(), hashset! {},);
rig.assert_knows_head(stray_head.into());
// Trigger finalization
let canonical_slots: Vec<Slot> = (rig.epoch_start_slot(2)..=rig.epoch_start_slot(6))
.map(Into::into)
.collect();
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
let (canonical_blocks, _, _, _) = Box::pin(rig.add_attested_blocks_at_slots(
canonical_state,
canonical_state_root,
&canonical_slots,
&honest_validators,
))
.await;
// Postconditions
let canonical_blocks: HashMap<Slot, SignedBeaconBlockHash> = canonical_blocks_zeroth_epoch
.into_iter()
.chain(canonical_blocks_first_epoch.into_iter())
.chain(canonical_blocks.into_iter())
.collect();
// Postcondition: New blocks got finalized
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {
canonical_blocks[&rig.epoch_start_slot(1).into()],
canonical_blocks[&rig.epoch_start_slot(2).into()],
},
);
// Postcondition: Ensure all stray_blocks blocks have been pruned
for &block_hash in stray_blocks.values() {
assert!(
!rig.block_exists(block_hash),
"abandoned block {} should have been pruned",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
!rig.hot_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
assert!(
!rig.cold_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
}
assert!(!rig.knows_head(&stray_head));
}
// This is to check if state outside of normal block processing are pruned correctly.
#[tokio::test]
async fn prunes_skipped_slots_states() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let (state, state_root) = rig.get_current_state_and_root();
let canonical_slots_zeroth_epoch: Vec<Slot> =
(1..=rig.epoch_start_slot(1)).map(Into::into).collect();
let (canonical_blocks_zeroth_epoch, _, _, mut canonical_state) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&canonical_slots_zeroth_epoch,
&honest_validators,
)
.await;
let skipped_slot: Slot = (rig.epoch_start_slot(1) + 1).into();
let stray_slots: Vec<Slot> = ((skipped_slot + 1).into()..rig.epoch_start_slot(2))
.map(Into::into)
.collect();
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
let (stray_blocks, stray_states, _, stray_state) = rig
.add_attested_blocks_at_slots(
canonical_state.clone(),
canonical_state_root,
&stray_slots,
&adversarial_validators,
)
.await;
// Preconditions
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
assert_eq!(rig.get_finalized_checkpoints(), hashset! {},);
// Make sure slots were skipped
assert!(rig.is_skipped_slot(&stray_state, skipped_slot));
{
let state_hash = (*stray_state.get_state_root(skipped_slot).unwrap()).into();
assert!(
rig.hot_state_exists(state_hash),
"skipped slot state {} should be still present",
state_hash
);
}
// Trigger finalization
let canonical_slots: Vec<Slot> = ((skipped_slot + 1).into()..rig.epoch_start_slot(7))
.map(Into::into)
.collect();
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
let (canonical_blocks_post_finalization, _, _, _) = rig
.add_attested_blocks_at_slots(
canonical_state,
canonical_state_root,
&canonical_slots,
&honest_validators,
)
.await;
// Postconditions
let canonical_blocks: HashMap<Slot, SignedBeaconBlockHash> = canonical_blocks_zeroth_epoch
.into_iter()
.chain(canonical_blocks_post_finalization.into_iter())
.collect();
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {
canonical_blocks[&rig.epoch_start_slot(1).into()],
canonical_blocks[&rig.epoch_start_slot(2).into()],
},
);
for (&slot, &state_hash) in &stray_states {
assert!(
!rig.hot_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
assert!(
!rig.cold_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
}
assert!(rig.is_skipped_slot(&stray_state, skipped_slot));
{
let state_hash: BeaconStateHash =
(*stray_state.get_state_root(skipped_slot).unwrap()).into();
assert!(
!rig.hot_state_exists(state_hash),
"skipped slot {} state {} should have been pruned",
skipped_slot,
state_hash
);
}
}
// This is to check if state outside of normal block processing are pruned correctly.
#[tokio::test]
async fn finalizes_non_epoch_start_slot() {
const HONEST_VALIDATOR_COUNT: usize = 32;
const ADVERSARIAL_VALIDATOR_COUNT: usize = 16;
const VALIDATOR_COUNT: usize = HONEST_VALIDATOR_COUNT + ADVERSARIAL_VALIDATOR_COUNT;
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let adversarial_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let rig = get_harness(store.clone(), VALIDATOR_COUNT);
let (state, state_root) = rig.get_current_state_and_root();
let canonical_slots_zeroth_epoch: Vec<Slot> =
(1..rig.epoch_start_slot(1)).map(Into::into).collect();
let (canonical_blocks_zeroth_epoch, _, _, mut canonical_state) = rig
.add_attested_blocks_at_slots(
state.clone(),
state_root,
&canonical_slots_zeroth_epoch,
&honest_validators,
)
.await;
let skipped_slot: Slot = rig.epoch_start_slot(1).into();
let stray_slots: Vec<Slot> = ((skipped_slot + 1).into()..rig.epoch_start_slot(2))
.map(Into::into)
.collect();
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
let (stray_blocks, stray_states, _, stray_state) = rig
.add_attested_blocks_at_slots(
canonical_state.clone(),
canonical_state_root,
&stray_slots,
&adversarial_validators,
)
.await;
// Preconditions
for &block_hash in stray_blocks.values() {
assert!(
rig.block_exists(block_hash),
"stray block {} should be still present",
block_hash
);
}
for (&slot, &state_hash) in &stray_states {
assert!(
rig.hot_state_exists(state_hash),
"stray state {} at slot {} should be still present",
state_hash,
slot
);
}
assert_eq!(rig.get_finalized_checkpoints(), hashset! {});
// Make sure slots were skipped
assert!(rig.is_skipped_slot(&stray_state, skipped_slot));
{
let state_hash = (*stray_state.get_state_root(skipped_slot).unwrap()).into();
assert!(
rig.hot_state_exists(state_hash),
"skipped slot state {} should be still present",
state_hash
);
}
// Trigger finalization
let canonical_slots: Vec<Slot> = ((skipped_slot + 1).into()..rig.epoch_start_slot(7))
.map(Into::into)
.collect();
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
let (canonical_blocks_post_finalization, _, _, _) = rig
.add_attested_blocks_at_slots(
canonical_state,
canonical_state_root,
&canonical_slots,
&honest_validators,
)
.await;
// Postconditions
let canonical_blocks: HashMap<Slot, SignedBeaconBlockHash> = canonical_blocks_zeroth_epoch
.into_iter()
.chain(canonical_blocks_post_finalization.into_iter())
.collect();
assert_eq!(
rig.get_finalized_checkpoints(),
hashset! {
canonical_blocks[&(rig.epoch_start_slot(1)-1).into()],
canonical_blocks[&rig.epoch_start_slot(2).into()],
},
);
for (&slot, &state_hash) in &stray_states {
assert!(
!rig.hot_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
assert!(
!rig.cold_state_exists(state_hash),
"stray state {} at slot {} should have been pruned",
state_hash,
slot
);
}
assert!(rig.is_skipped_slot(&stray_state, skipped_slot));
{
let state_hash: BeaconStateHash =
(*stray_state.get_state_root(skipped_slot).unwrap()).into();
assert!(
!rig.hot_state_exists(state_hash),
"skipped slot {} state {} should have been pruned",
skipped_slot,
state_hash
);
}
}
fn check_all_blocks_exist<'a>(
harness: &TestHarness,
blocks: impl Iterator<Item = &'a SignedBeaconBlockHash>,
) {
for &block_hash in blocks {
let block = harness.chain.get_blinded_block(&block_hash.into()).unwrap();
assert!(
block.is_some(),
"expected block {:?} to be in DB",
block_hash
);
}
}
fn check_all_states_exist<'a>(
harness: &TestHarness,
states: impl Iterator<Item = &'a BeaconStateHash>,
) {
for &state_hash in states {
let state = harness
.chain
.get_state(&state_hash.into(), None, CACHE_STATE_IN_TESTS)
.unwrap();
assert!(
state.is_some(),
"expected state {:?} to be in DB",
state_hash,
);
}
}
// Check that none of the given states exist in the database.
fn check_no_states_exist<'a>(
harness: &TestHarness,
states: impl Iterator<Item = &'a BeaconStateHash>,
) {
for &state_root in states {
assert!(
harness
.chain
.get_state(&state_root.into(), None, CACHE_STATE_IN_TESTS)
.unwrap()
.is_none(),
"state {:?} should not be in the DB",
state_root
);
}
}
// Check that none of the given blocks exist in the database.
fn check_no_blocks_exist<'a>(
harness: &TestHarness,
blocks: impl Iterator<Item = &'a SignedBeaconBlockHash>,
) {
for &block_hash in blocks {
let block = harness.chain.get_blinded_block(&block_hash.into()).unwrap();
assert!(
block.is_none(),
"did not expect block {:?} to be in the DB",
block_hash
);
assert!(
!harness.chain.store.blobs_exist(&block_hash.into()).unwrap(),
"blobs for abandoned block {block_hash:?} should have been pruned"
);
}
}
#[tokio::test]
async fn prune_single_block_fork() {
let slots_per_epoch = E::slots_per_epoch();
pruning_test(3 * slots_per_epoch, 1, slots_per_epoch, 0, 1).await;
}
#[tokio::test]
async fn prune_single_block_long_skip() {
let slots_per_epoch = E::slots_per_epoch();
pruning_test(
2 * slots_per_epoch,
1,
2 * slots_per_epoch,
2 * slots_per_epoch,
1,
)
.await;
}
#[tokio::test]
async fn prune_shared_skip_states_mid_epoch() {
let slots_per_epoch = E::slots_per_epoch();
pruning_test(
slots_per_epoch + slots_per_epoch / 2,
1,
slots_per_epoch,
2,
slots_per_epoch - 1,
)
.await;
}
#[tokio::test]
async fn prune_shared_skip_states_epoch_boundaries() {
let slots_per_epoch = E::slots_per_epoch();
Box::pin(pruning_test(
slots_per_epoch - 1,
1,
slots_per_epoch,
2,
slots_per_epoch,
))
.await;
Box::pin(pruning_test(
slots_per_epoch - 1,
2,
slots_per_epoch,
1,
slots_per_epoch,
))
.await;
Box::pin(pruning_test(
2 * slots_per_epoch + slots_per_epoch / 2,
slots_per_epoch / 2,
slots_per_epoch,
slots_per_epoch / 2 + 1,
slots_per_epoch,
))
.await;
Box::pin(pruning_test(
2 * slots_per_epoch + slots_per_epoch / 2,
slots_per_epoch / 2,
slots_per_epoch,
slots_per_epoch / 2 + 1,
slots_per_epoch,
))
.await;
Box::pin(pruning_test(
2 * slots_per_epoch - 1,
slots_per_epoch,
1,
0,
2 * slots_per_epoch,
))
.await;
}
/// Generic harness for pruning tests.
async fn pruning_test(
// Number of blocks to start the chain with before forking.
num_initial_blocks: u64,
// Number of skip slots on the main chain after the initial blocks.
num_canonical_skips: u64,
// Number of blocks on the main chain after the skip, but before the finalisation-triggering
// blocks.
num_canonical_middle_blocks: u64,
// Number of skip slots on the fork chain after the initial blocks.
num_fork_skips: u64,
// Number of blocks on the fork chain after the skips.
num_fork_blocks: u64,
) {
const VALIDATOR_COUNT: usize = 24;
const VALIDATOR_SUPERMAJORITY: usize = (VALIDATOR_COUNT / 3) * 2;
const HONEST_VALIDATOR_COUNT: usize = VALIDATOR_SUPERMAJORITY;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), VALIDATOR_COUNT);
let honest_validators: Vec<usize> = (0..HONEST_VALIDATOR_COUNT).collect();
let faulty_validators: Vec<usize> = (HONEST_VALIDATOR_COUNT..VALIDATOR_COUNT).collect();
let slots = |start: Slot, num_blocks: u64| -> Vec<Slot> {
(start.as_u64()..start.as_u64() + num_blocks)
.map(Slot::new)
.collect()
};
let start_slot = Slot::new(1);
let divergence_slot = start_slot + num_initial_blocks;
let (state, state_root) = harness.get_current_state_and_root();
let (_, _, _, divergence_state) = harness
.add_attested_blocks_at_slots(
state,
state_root,
&slots(start_slot, num_initial_blocks)[..],
&honest_validators,
)
.await;
let mut chains = harness
.add_blocks_on_multiple_chains(vec![
// Canonical chain
(
divergence_state.clone(),
slots(
divergence_slot + num_canonical_skips,
num_canonical_middle_blocks,
),
honest_validators.clone(),
),
// Fork chain
(
divergence_state.clone(),
slots(divergence_slot + num_fork_skips, num_fork_blocks),
faulty_validators,
),
])
.await;
let (_, _, _, mut canonical_state) = chains.remove(0);
let (stray_blocks, stray_states, _, stray_head_state) = chains.remove(0);
let stray_head_slot = divergence_slot + num_fork_skips + num_fork_blocks - 1;
let stray_head_state_root = stray_states[&stray_head_slot];
let stray_states = harness
.chain
.rev_iter_state_roots_from(stray_head_state_root.into(), &stray_head_state)
.map(Result::unwrap)
.map(|(state_root, _)| state_root.into())
.collect::<HashSet<_>>();
check_all_blocks_exist(&harness, stray_blocks.values());
check_all_states_exist(&harness, stray_states.iter());
let chain_dump = harness.chain.chain_dump().unwrap();
assert_eq!(
get_finalized_epoch_boundary_blocks(&chain_dump),
vec![Hash256::zero().into()].into_iter().collect(),
);
// Trigger finalization
let num_finalization_blocks = 4 * E::slots_per_epoch();
let canonical_slot = divergence_slot + num_canonical_skips + num_canonical_middle_blocks;
let canonical_state_root = canonical_state.update_tree_hash_cache().unwrap();
harness
.add_attested_blocks_at_slots(
canonical_state,
canonical_state_root,
&slots(canonical_slot, num_finalization_blocks),
&honest_validators,
)
.await;
// Check that finalization has advanced past the divergence slot.
assert!(
harness
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch())
> divergence_slot
);
check_chain_dump(
&harness,
num_initial_blocks + num_canonical_middle_blocks + num_finalization_blocks + 1,
);
let all_canonical_states = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.map(|(state_root, _)| state_root.into())
.collect::<HashSet<BeaconStateHash>>();
check_all_states_exist(&harness, all_canonical_states.iter());
check_no_states_exist(&harness, stray_states.difference(&all_canonical_states));
check_no_blocks_exist(&harness, stray_blocks.values());
}
#[tokio::test]
async fn garbage_collect_temp_states_from_failed_block_on_finalization() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let slots_per_epoch = E::slots_per_epoch();
let mut genesis_state = harness.get_current_state();
let genesis_state_root = genesis_state.update_tree_hash_cache().unwrap();
let block_slot = Slot::new(2 * slots_per_epoch);
let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await;
let (mut block, _) = (*signed_block).clone().deconstruct();
let bad_block_parent_root = block.parent_root();
// Mutate the block to make it invalid, and re-sign it.
*block.state_root_mut() = Hash256::repeat_byte(0xff);
let proposer_index = block.proposer_index() as usize;
let block = Arc::new(block.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&harness.spec,
));
// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
harness
.process_block_result((block, None))
.await
.unwrap_err();
// The bad block parent root is the genesis block root. There's `block_slot - 1` temporary
// states to remove + the genesis state = block_slot.
assert_eq!(
get_states_descendant_of_block(&store, bad_block_parent_root).len(),
block_slot.as_usize(),
);
// Finalize the chain without the block, which should result in pruning of all temporary states.
let blocks_required_to_finalize = 3 * slots_per_epoch;
harness.advance_slot();
harness
.extend_chain(
blocks_required_to_finalize as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Check that the finalization migration ran.
assert_ne!(store.get_split_slot(), 0);
// Check that temporary states have been pruned.
assert_eq!(
get_states_descendant_of_block(&store, bad_block_parent_root),
// The genesis state is kept to support the HDiff grid
vec![(genesis_state_root, Slot::new(0))],
"get_states_descendant_of_block({bad_block_parent_root:?})"
);
}
#[tokio::test]
async fn weak_subjectivity_sync_easy() {
let num_initial_slots = E::slots_per_epoch() * 11;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9);
let slots = (1..num_initial_slots).map(Slot::new).collect();
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}
#[tokio::test]
async fn weak_subjectivity_sync_unaligned_advanced_checkpoint() {
let num_initial_slots = E::slots_per_epoch() * 11;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9);
let slots = (1..num_initial_slots)
.map(Slot::new)
.filter(|&slot| {
// Skip 3 slots leading up to the checkpoint slot.
slot <= checkpoint_slot - 3 || slot > checkpoint_slot
})
.collect();
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}
#[tokio::test]
async fn weak_subjectivity_sync_unaligned_unadvanced_checkpoint() {
let num_initial_slots = E::slots_per_epoch() * 11;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9 - 3);
let slots = (1..num_initial_slots)
.map(Slot::new)
.filter(|&slot| {
// Skip 3 slots after the checkpoint slot.
slot <= checkpoint_slot || slot > checkpoint_slot + 3
})
.collect();
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}
// Regression test for https://github.com/sigp/lighthouse/issues/4817
// Skip 3 slots immediately after genesis, creating a gap between the genesis block and the first
// real block.
#[tokio::test]
async fn weak_subjectivity_sync_skips_at_genesis() {
let start_slot = 4;
let end_slot = E::slots_per_epoch() * 4;
let slots = (start_slot..end_slot).map(Slot::new).collect();
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 2);
weak_subjectivity_sync_test(slots, checkpoint_slot).await
}
async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
// Build an initial chain on one harness, representing a synced node with full history.
let num_final_blocks = E::slots_per_epoch() * 2;
let temp1 = tempdir().unwrap();
let full_store = get_store(&temp1);
// TODO(das): Run a supernode so the node has full blobs stored.
// This may not be required in the future if we end up implementing downloading checkpoint
// blobs from p2p peers:
// https://github.com/sigp/lighthouse/issues/6837
let harness = get_harness_import_all_data_columns(full_store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
harness
.add_attested_blocks_at_slots(
genesis_state.clone(),
genesis_state_root,
&slots,
&all_validators,
)
.await;
let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness
.chain
.get_or_reconstruct_blobs(&wss_block_root)
.unwrap();
let wss_state = full_store
.get_state(&wss_state_root, Some(checkpoint_slot), CACHE_STATE_IN_TESTS)
.unwrap()
.unwrap();
let wss_state_slot = wss_state.slot();
let wss_block_slot = wss_block.slot();
// Add more blocks that advance finalization further.
harness.advance_slot();
harness
.extend_chain(
num_final_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);
let temp2 = tempdir().unwrap();
let store = get_store(&temp2);
let spec = test_spec::<E>();
let seconds_per_slot = spec.seconds_per_slot;
let kzg = get_kzg(&spec);
let mock = mock_execution_layer_from_parts(
harness.spec.clone(),
harness.runtime.task_executor.clone(),
);
// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());
let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec, kzg)
.store(store.clone())
.custom_spec(test_spec::<E>().into())
.task_executor(harness.chain.task_executor.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(1)))
.execution_layer(Some(mock.el))
.rng(Box::new(StdRng::seed_from_u64(42)))
.build()
.expect("should build");
let beacon_chain = Arc::new(beacon_chain);
let wss_block_root = wss_block.canonical_root();
let store_wss_block = harness
.chain
.get_block(&wss_block_root)
.await
.unwrap()
.unwrap();
// This test may break in the future if we no longer store the full checkpoint data columns.
let store_wss_blobs_opt = beacon_chain
.get_or_reconstruct_blobs(&wss_block_root)
.unwrap();
assert_eq!(store_wss_block, wss_block);
// TODO(fulu): Remove this condition once #6760 (PeerDAS checkpoint sync) is merged.
if !beacon_chain.spec.is_peer_das_scheduled() {
assert_eq!(store_wss_blobs_opt, wss_blobs_opt);
}
// Apply blocks forward to reach head.
let chain_dump = harness.chain.chain_dump().unwrap();
let new_blocks = chain_dump
.iter()
.filter(|snapshot| snapshot.beacon_block.slot() > checkpoint_slot);
for snapshot in new_blocks {
let block_root = snapshot.beacon_block_root;
let full_block = harness
.chain
.get_block(&snapshot.beacon_block_root)
.await
.unwrap()
.unwrap();
let slot = full_block.slot();
let full_block_root = full_block.canonical_root();
let state_root = full_block.state_root();
info!(block_root = ?full_block_root, ?state_root, %slot, "Importing block from chain dump");
beacon_chain.slot_clock.set_slot(slot.as_u64());
beacon_chain
.process_block(
full_block_root,
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
.unwrap();
beacon_chain.recompute_head_at_current_slot().await;
// Check that the new block's state can be loaded correctly.
let mut state = beacon_chain
.store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap()
.unwrap();
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
}
// Forwards iterator from 0 should fail as we lack blocks.
assert!(matches!(
beacon_chain.forwards_iter_block_roots(Slot::new(0)),
Err(BeaconChainError::HistoricalBlockOutOfRange { .. })
));
// Simulate processing of a `StatusMessage` with an older finalized epoch by calling
// `block_root_at_slot` with an old slot for which we don't know the block root. It should
// return `None` rather than erroring.
assert_eq!(
beacon_chain
.block_root_at_slot(Slot::new(1), WhenSlotSkipped::None)
.unwrap(),
None
);
// Simulate querying the API for a historic state that is unknown. It should also return
// `None` rather than erroring.
assert_eq!(beacon_chain.state_root_at_slot(Slot::new(1)).unwrap(), None);
// Supply blocks backwards to reach genesis. Omit the genesis block to check genesis handling.
let historical_blocks = chain_dump[..wss_block.slot().as_usize()]
.iter()
.filter(|s| s.beacon_block.slot() != 0)
.map(|s| s.beacon_block.clone())
.collect::<Vec<_>>();
let mut available_blocks = vec![];
for blinded in historical_blocks {
let block_root = blinded.canonical_root();
let full_block = harness
.chain
.get_block(&block_root)
.await
.expect("should get block")
.expect("should get block");
if let MaybeAvailableBlock::Available(block) = harness
.chain
.data_availability_checker
.verify_kzg_for_rpc_block(
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)),
)
.expect("should verify kzg")
{
available_blocks.push(block);
}
}
// Corrupt the signature on the 1st block to ensure that the backfill processor is checking
// signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120.
let mut batch_with_invalid_first_block =
available_blocks.iter().map(clone_block).collect::<Vec<_>>();
batch_with_invalid_first_block[0] = {
let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct();
let mut corrupt_block = (*block).clone();
*corrupt_block.signature_mut() = Signature::empty();
AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), data, Arc::new(spec))
};
// Importing the invalid batch should error.
assert!(matches!(
beacon_chain
.import_historical_block_batch(batch_with_invalid_first_block)
.unwrap_err(),
HistoricalBlockError::InvalidSignature
));
let available_blocks_slots = available_blocks
.iter()
.map(|block| (block.block().slot(), block.block().canonical_root()))
.collect::<Vec<_>>();
info!(
?available_blocks_slots,
"wss_block_slot" = wss_block.slot().as_usize(),
"Importing historical block batch"
);
// Importing the batch with valid signatures should succeed.
let available_blocks_dup = available_blocks.iter().map(clone_block).collect::<Vec<_>>();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), wss_block.slot());
beacon_chain
.import_historical_block_batch(available_blocks_dup)
.unwrap();
assert_eq!(beacon_chain.store.get_oldest_block_slot(), 0);
// Resupplying the blocks should not fail, they can be safely ignored.
beacon_chain
.import_historical_block_batch(available_blocks)
.unwrap();
// Sanity check for non-aligned WSS starts, to make sure the WSS block is persisted properly
if wss_block_slot != wss_state_slot {
let new_node_block_root_at_wss_block = beacon_chain
.store
.get_cold_block_root(wss_block_slot)
.unwrap()
.unwrap();
info!(?new_node_block_root_at_wss_block, %wss_block_slot);
assert_eq!(new_node_block_root_at_wss_block, wss_block.canonical_root());
}
// The forwards iterator should now match the original chain
let forwards = beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>();
let expected = harness
.chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
.collect::<Vec<_>>();
assert_eq!(forwards, expected);
// All blocks can be loaded.
let mut prev_block_root = Hash256::zero();
for (block_root, slot) in beacon_chain
.forwards_iter_block_roots(Slot::new(0))
.unwrap()
.map(Result::unwrap)
{
let block = store.get_blinded_block(&block_root).unwrap().unwrap();
if block_root != prev_block_root {
assert_eq!(block.slot(), slot);
}
// Prune_payloads is set to false in the default config, so the payload should exist
if block.message().execution_payload().is_ok() {
assert!(beacon_chain
.store
.execution_payload_exists(&block_root)
.unwrap(),);
}
prev_block_root = block_root;
}
// All states from the oldest state slot can be loaded.
let (_, oldest_state_slot) = store.get_historic_state_limits();
for (state_root, slot) in beacon_chain
.forwards_iter_state_roots(oldest_state_slot)
.unwrap()
.map(Result::unwrap)
{
let mut state = store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap()
.unwrap();
assert_eq!(state.slot(), slot);
assert_eq!(state.canonical_root().unwrap(), state_root);
}
// Anchor slot is still set to the slot of the checkpoint block.
// Note: since hot tree states the anchor slot is set to the aligned ws state slot
// https://github.com/sigp/lighthouse/pull/6750
let wss_aligned_slot = if checkpoint_slot % E::slots_per_epoch() == 0 {
checkpoint_slot
} else {
(checkpoint_slot.epoch(E::slots_per_epoch()) + Epoch::new(1))
.start_slot(E::slots_per_epoch())
};
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!(
store.get_anchor_info().state_upper_limit,
Slot::new(u64::MAX)
);
info!(anchor = ?store.get_anchor_info(), "anchor pre");
// Reconstruct states.
store.clone().reconstruct_historic_states(None).unwrap();
assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot);
assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0));
}
/// Test that blocks and attestations that refer to states around an unaligned split state are
/// processed correctly.
#[tokio::test]
async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
let temp = tempdir().unwrap();
let store = get_store(&temp);
let chain_config = ChainConfig {
reconstruct_historic_states: false,
..ChainConfig::default()
};
let harness = get_harness_generic(store.clone(), LOW_VALIDATOR_COUNT, chain_config, false);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let split_slot = Slot::new(E::slots_per_epoch() * 4);
let pre_skips = 1;
let post_skips = 1;
// Build the chain up to the intended split slot, with 3 skips before the split.
let slots = (1..=split_slot.as_u64() - pre_skips)
.map(Slot::new)
.collect::<Vec<_>>();
let (genesis_state, genesis_state_root) = harness.get_current_state_and_root();
harness
.add_attested_blocks_at_slots(
genesis_state.clone(),
genesis_state_root,
&slots,
&all_validators,
)
.await;
// Before the split slot becomes finalized, create two forking blocks that build on the split
// block:
//
// - one that is invalid because it conflicts with finalization (slot <= finalized_slot)
// - one that is valid because its slot is not finalized (slot > finalized_slot)
let (unadvanced_split_state, unadvanced_split_state_root) =
harness.get_current_state_and_root();
let ((invalid_fork_block, _), _) = harness
.make_block(unadvanced_split_state.clone(), split_slot)
.await;
let ((valid_fork_block, _), _) = harness
.make_block(unadvanced_split_state.clone(), split_slot + 1)
.await;
// Advance the chain so that the intended split slot is finalized.
// Do not attest in the epoch boundary slot, to make attestation production later easier (no
// equivocations).
let finalizing_slot = split_slot + 2 * E::slots_per_epoch();
for _ in 0..pre_skips + post_skips {
harness.advance_slot();
}
harness.extend_to_slot(finalizing_slot - 1).await;
Box::pin(harness.add_block_at_slot(finalizing_slot, harness.get_current_state()))
.await
.unwrap();
// Check that the split slot is as intended.
let split = store.get_split_info();
assert_eq!(split.slot, split_slot);
assert_eq!(split.block_root, valid_fork_block.parent_root());
assert_ne!(split.state_root, unadvanced_split_state_root);
let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone());
// Applying the invalid block should fail.
let err = harness
.chain
.process_block(
invalid_fork_rpc_block.block_root(),
invalid_fork_rpc_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
.unwrap_err();
assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. }));
// Applying the valid block should succeed, but it should not become head.
let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone());
harness
.chain
.process_block(
valid_fork_rpc_block.block_root(),
valid_fork_rpc_block,
NotifyExecutionLayer::Yes,
BlockImportSource::Lookup,
|| Ok(()),
)
.await
.unwrap();
harness.chain.recompute_head_at_current_slot().await;
assert_ne!(harness.head_block_root(), valid_fork_block.canonical_root());
// Attestations to the split block in the next 2 epochs should be processed successfully.
let attestation_start_slot = harness.get_current_slot();
let attestation_end_slot = attestation_start_slot + 2 * E::slots_per_epoch();
let (split_state_root, mut advanced_split_state) = harness
.chain
.store
.get_advanced_hot_state(split.block_root, split.slot, split.state_root)
.unwrap()
.unwrap();
complete_state_advance(
&mut advanced_split_state,
Some(split_state_root),
attestation_start_slot,
&harness.chain.spec,
)
.unwrap();
advanced_split_state
.build_caches(&harness.chain.spec)
.unwrap();
let advanced_split_state_root = advanced_split_state.update_tree_hash_cache().unwrap();
for slot in (attestation_start_slot.as_u64()..attestation_end_slot.as_u64()).map(Slot::new) {
let attestations = harness.make_attestations(
&all_validators,
&advanced_split_state,
advanced_split_state_root,
split.block_root.into(),
slot,
);
harness.advance_slot();
harness.process_attestations(attestations, &advanced_split_state);
}
}
#[tokio::test]
async fn finalizes_after_resuming_from_db() {
let validator_count = 16;
let num_blocks_produced = MinimalEthSpec::slots_per_epoch() * 8;
let first_half = num_blocks_produced / 2;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store.clone())
.mock_execution_layer()
.build();
harness.advance_slot();
harness
.extend_chain(
first_half as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
assert!(
harness
.chain
.head_snapshot()
.beacon_state
.finalized_checkpoint()
.epoch
> 0,
"the chain should have already finalized"
);
let latest_slot = harness.chain.slot().expect("should have a slot");
harness
.chain
.persist_fork_choice()
.expect("should persist fork choice");
harness
.chain
.persist_op_pool()
.expect("should persist the op pool");
let original_chain = harness.chain;
let resumed_harness = BeaconChainHarness::<DiskHarnessType<E>>::builder(MinimalEthSpec)
.default_spec()
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.resumed_disk_store(store)
.testing_slot_clock(original_chain.slot_clock.clone())
.execution_layer(original_chain.execution_layer.clone())
.build();
assert_chains_pretty_much_the_same(&original_chain, &resumed_harness.chain);
// Set the slot clock of the resumed harness to be in the slot following the previous harness.
//
// This allows us to produce the block at the next slot.
resumed_harness
.chain
.slot_clock
.set_slot(latest_slot.as_u64() + 1);
resumed_harness
.extend_chain(
(num_blocks_produced - first_half) as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let state = &resumed_harness.chain.head_snapshot().beacon_state;
assert_eq!(
state.slot(),
num_blocks_produced,
"head should be at the current slot"
);
assert_eq!(
state.current_epoch(),
num_blocks_produced / MinimalEthSpec::slots_per_epoch(),
"head should be at the expected epoch"
);
assert_eq!(
state.current_justified_checkpoint().epoch,
state.current_epoch() - 1,
"the head should be justified one behind the current epoch"
);
assert_eq!(
state.finalized_checkpoint().epoch,
state.current_epoch() - 2,
"the head should be finalized two behind the current epoch"
);
}
#[allow(clippy::large_stack_frames)]
#[tokio::test]
async fn revert_minority_fork_on_resume() {
let validator_count = 16;
let slots_per_epoch = MinimalEthSpec::slots_per_epoch();
let fork_epoch = Epoch::new(4);
let fork_slot = fork_epoch.start_slot(slots_per_epoch);
let initial_blocks = slots_per_epoch * fork_epoch.as_u64() - 1;
let post_fork_blocks = slots_per_epoch * 3;
let mut spec1 = MinimalEthSpec::default_spec();
spec1.altair_fork_epoch = None;
let mut spec2 = MinimalEthSpec::default_spec();
spec2.altair_fork_epoch = Some(fork_epoch);
let seconds_per_slot = spec1.seconds_per_slot;
let all_validators = (0..validator_count).collect::<Vec<usize>>();
// Chain with no fork epoch configured.
let db_path1 = tempdir().unwrap();
let store1 = get_store_generic(&db_path1, StoreConfig::default(), spec1.clone());
let harness1 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec1.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store1)
.mock_execution_layer()
.build();
// Chain with fork epoch configured.
let db_path2 = tempdir().unwrap();
let store2 = get_store_generic(&db_path2, StoreConfig::default(), spec2.clone());
let harness2 = BeaconChainHarness::builder(MinimalEthSpec)
.spec(spec2.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_disk_store(store2)
.mock_execution_layer()
.build();
// Apply the same blocks to both chains initially.
let mut state = harness1.get_current_state();
let mut block_root = harness1.chain.genesis_block_root;
for slot in (1..=initial_blocks).map(Slot::new) {
let state_root = state.update_tree_hash_cache().unwrap();
let attestations = harness1.make_attestations(
&all_validators,
&state,
state_root,
block_root.into(),
slot,
);
harness1.set_current_slot(slot);
harness2.set_current_slot(slot);
harness1.process_attestations(attestations.clone(), &state);
harness2.process_attestations(attestations, &state);
let ((block, blobs), new_state) = harness1.make_block(state, slot).await;
harness1
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
harness2
.process_block(slot, block.canonical_root(), (block.clone(), blobs.clone()))
.await
.unwrap();
state = new_state;
block_root = block.canonical_root();
}
assert_eq!(harness1.head_slot(), fork_slot - 1);
assert_eq!(harness2.head_slot(), fork_slot - 1);
// Fork the two chains.
let mut state1 = state.clone();
let mut state2 = state.clone();
let mut majority_blocks = vec![];
for i in 0..post_fork_blocks {
let slot = fork_slot + i;
// Attestations on majority chain.
let state_root = state.update_tree_hash_cache().unwrap();
let attestations = harness2.make_attestations(
&all_validators,
&state2,
state_root,
block_root.into(),
slot,
);
harness2.set_current_slot(slot);
harness2.process_attestations(attestations, &state2);
// Minority chain block (no attesters).
let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await;
harness1
.process_block(slot, block1.canonical_root(), (block1, blobs1))
.await
.unwrap();
state1 = new_state1;
// Majority chain block (all attesters).
let ((block2, blobs2), new_state2) = harness2.make_block(state2, slot).await;
harness2
.process_block(slot, block2.canonical_root(), (block2.clone(), blobs2))
.await
.unwrap();
state2 = new_state2;
block_root = block2.canonical_root();
majority_blocks.push(block2);
}
let end_slot = fork_slot + post_fork_blocks - 1;
assert_eq!(harness1.head_slot(), end_slot);
assert_eq!(harness2.head_slot(), end_slot);
// Resume from disk with the hard-fork activated: this should revert the post-fork blocks.
// We have to do some hackery with the `slot_clock` so that the correct slot is set when
// the beacon chain builder loads the head block.
drop(harness1);
let resume_store = get_store_generic(&db_path1, StoreConfig::default(), spec2.clone());
let resumed_harness = TestHarness::builder(MinimalEthSpec)
.spec(spec2.clone().into())
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.resumed_disk_store(resume_store)
.override_store_mutator(Box::new(move |mut builder| {
builder = builder
.resume_from_db()
.unwrap()
.testing_slot_clock(Duration::from_secs(seconds_per_slot))
.unwrap();
builder
.get_slot_clock()
.unwrap()
.set_slot(end_slot.as_u64());
builder
}))
.mock_execution_layer()
.build();
// Head should now be just before the fork.
resumed_harness.chain.recompute_head_at_current_slot().await;
assert_eq!(resumed_harness.head_slot(), fork_slot - 1);
// Fork choice should only know the canonical head. When we reverted the head we also should
// have called `reset_fork_choice_to_finalization` which rebuilds fork choice from scratch
// without the reverted block.
assert_eq!(
resumed_harness.chain.heads(),
vec![(resumed_harness.head_block_root(), fork_slot - 1)]
);
// Apply blocks from the majority chain and trigger finalization.
let initial_split_slot = resumed_harness.chain.store.get_split_slot();
for block in &majority_blocks {
resumed_harness
.process_block_result((block.clone(), None))
.await
.unwrap();
// The canonical head should be the block from the majority chain.
resumed_harness.chain.recompute_head_at_current_slot().await;
assert_eq!(resumed_harness.head_slot(), block.slot());
assert_eq!(resumed_harness.head_block_root(), block.canonical_root());
}
let advanced_split_slot = resumed_harness.chain.store.get_split_slot();
// Check that the migration ran successfully.
assert!(advanced_split_slot > initial_split_slot);
// Check that there is only a single head now matching harness2 (the minority chain is gone).
let heads = resumed_harness.chain.heads();
assert_eq!(heads, harness2.chain.heads());
assert_eq!(heads.len(), 1);
}
// This test checks whether the schema downgrade from the latest version to some minimum supported
// version is correct. This is the easiest schema test to write without historic versions of
// Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually
// as old downgrades are deprecated.
async fn schema_downgrade_to_min_version(
store_config: StoreConfig,
reconstruct_historic_states: bool,
) {
let num_blocks_produced = E::slots_per_epoch() * 4;
let db_path = tempdir().unwrap();
let spec = test_spec::<E>();
let chain_config = ChainConfig {
reconstruct_historic_states,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config.clone(), spec.clone());
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config.clone(),
import_all_data_columns,
);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let min_version = SchemaVersion(22);
// Save the slot clock so that the new harness doesn't revert in time.
let slot_clock = harness.chain.slot_clock.clone();
// Close the database to ensure everything is written to disk.
drop(store);
drop(harness);
// Re-open the store.
let store = get_store_generic(&db_path, store_config, spec);
// Downgrade.
migrate_schema::<DiskHarnessType<E>>(store.clone(), CURRENT_SCHEMA_VERSION, min_version)
.expect("schema downgrade to minimum version should work");
// Upgrade back.
migrate_schema::<DiskHarnessType<E>>(store.clone(), min_version, CURRENT_SCHEMA_VERSION)
.expect("schema upgrade from minimum version should work");
// Recreate the harness.
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.chain_config(chain_config)
.keypairs(KEYPAIRS[0..LOW_VALIDATOR_COUNT].to_vec())
.testing_slot_clock(slot_clock)
.resumed_disk_store(store.clone())
.mock_execution_layer()
.build();
// Check chain dump for appropriate range depending on whether this is an archive node.
let chain_dump_start_slot = if reconstruct_historic_states {
Slot::new(0)
} else {
store.get_split_slot()
};
check_finalization(&harness, num_blocks_produced);
check_split_slot(&harness, store.clone());
check_chain_dump_from_slot(
&harness,
chain_dump_start_slot,
num_blocks_produced + 1 - chain_dump_start_slot.as_u64(),
);
check_iterators_from_slot(&harness, chain_dump_start_slot);
// Check that downgrading beyond the minimum version fails (bound is *tight*).
let min_version_sub_1 = SchemaVersion(min_version.as_u64().checked_sub(1).unwrap());
migrate_schema::<DiskHarnessType<E>>(store.clone(), CURRENT_SCHEMA_VERSION, min_version_sub_1)
.expect_err("should not downgrade below minimum version");
}
// Schema upgrade/downgrade on an archive node where the optimised migration does apply due
// to the split state being aligned to a diff layer.
#[tokio::test]
async fn schema_downgrade_to_min_version_archive_node_grid_aligned() {
// Need to use 3 as the hierarchy exponent to get diffs on every epoch boundary with minimal
// spec.
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
// Schema upgrade/downgrade on an archive node where the optimised migration DOES NOT apply
// due to the split state NOT being aligned to a diff layer.
#[tokio::test]
async fn schema_downgrade_to_min_version_archive_node_grid_unaligned() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("7").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
// Schema upgrade/downgrade on a full node with a fairly normal per-epoch diff config.
#[tokio::test]
async fn schema_downgrade_to_min_version_full_node_per_epoch_diffs() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
false,
)
.await
}
// Schema upgrade/downgrade on a full node with dense per-slot diffs.
#[tokio::test]
async fn schema_downgrade_to_min_version_full_node_dense_diffs() {
schema_downgrade_to_min_version(
StoreConfig {
hierarchy_config: HierarchyConfig::from_str("0,3,4,5").unwrap(),
prune_payloads: false,
..StoreConfig::default()
},
true,
)
.await
}
/// Check that blob pruning prunes blobs older than the data availability boundary.
#[tokio::test]
async fn deneb_prune_blobs_happy_case() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
if store.get_chain_spec().is_peer_das_scheduled() {
// TODO(fulu): add prune tests for Fulu / PeerDAS data columns.
return;
}
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let num_blocks_produced = E::slots_per_epoch() * 8;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Prior to manual pruning with an artifically low data availability boundary all blobs should
// be stored.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true);
// Trigger blob pruning of blobs older than epoch 2.
let data_availability_boundary = Epoch::new(2);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is updated accordingly and prior blobs have been deleted.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(
oldest_blob_slot,
data_availability_boundary.start_slot(E::slots_per_epoch())
);
check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false);
check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true);
}
/// Check that blob pruning does not prune without finalization.
#[tokio::test]
async fn deneb_prune_blobs_no_finalization() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
if store.get_chain_spec().is_peer_das_scheduled() {
// TODO(fulu): add prune tests for Fulu / PeerDAS data columns.
return;
}
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let initial_num_blocks = E::slots_per_epoch() * 5;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
// Finalize to epoch 3.
harness
.extend_chain(
initial_num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Extend the chain for another few epochs without attestations.
let unfinalized_num_blocks = E::slots_per_epoch() * 3;
harness.advance_slot();
harness
.extend_chain(
unfinalized_num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::SomeValidators(vec![]),
)
.await;
// Finalization should be at epoch 3.
let finalized_slot = Slot::new(E::slots_per_epoch() * 3);
assert_eq!(harness.get_current_state().finalized_checkpoint().epoch, 3);
assert_eq!(store.get_split_slot(), finalized_slot);
// All blobs should still be available.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Attempt blob pruning of blobs older than epoch 4, which is newer than finalization.
let data_availability_boundary = Epoch::new(4);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is only updated to finalization, and NOT to the DAB.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(oldest_blob_slot, finalized_slot);
check_blob_existence(&harness, Slot::new(0), finalized_slot - 1, false);
check_blob_existence(&harness, finalized_slot, harness.head_slot(), true);
}
/// Check that blob pruning does not fail trying to prune across the fork boundary.
#[tokio::test]
async fn deneb_prune_blobs_fork_boundary() {
let deneb_fork_epoch = Epoch::new(4);
let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec());
spec.deneb_fork_epoch = Some(deneb_fork_epoch);
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, StoreConfig::default(), spec);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let num_blocks = E::slots_per_epoch() * 7;
// Finalize to epoch 5.
harness
.extend_chain(
num_blocks as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Finalization should be at epoch 5.
let finalized_epoch = Epoch::new(5);
let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch());
assert_eq!(
harness.get_current_state().finalized_checkpoint().epoch,
finalized_epoch
);
assert_eq!(store.get_split_slot(), finalized_slot);
// All blobs should still be available.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Attempt pruning with data availability epochs that precede the fork epoch.
// No pruning should occur.
assert!(deneb_fork_epoch < finalized_epoch);
for data_availability_boundary in [Epoch::new(0), Epoch::new(3), deneb_fork_epoch] {
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is not updated.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
}
// All blobs should still be available.
check_blob_existence(&harness, Slot::new(0), harness.head_slot(), true);
// Prune one epoch past the fork.
let pruned_slot = (deneb_fork_epoch + 1).start_slot(E::slots_per_epoch());
store.try_prune_blobs(true, deneb_fork_epoch + 1).unwrap();
assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot));
check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false);
check_blob_existence(&harness, pruned_slot, harness.head_slot(), true);
}
/// Check that blob pruning prunes blobs older than the data availability boundary with margin
/// applied.
#[tokio::test]
async fn deneb_prune_blobs_margin1() {
deneb_prune_blobs_margin_test(1).await;
}
#[tokio::test]
async fn deneb_prune_blobs_margin3() {
deneb_prune_blobs_margin_test(3).await;
}
#[tokio::test]
async fn deneb_prune_blobs_margin4() {
deneb_prune_blobs_margin_test(4).await;
}
async fn deneb_prune_blobs_margin_test(margin: u64) {
let config = StoreConfig {
blob_prune_margin_epochs: margin,
..StoreConfig::default()
};
let db_path = tempdir().unwrap();
let store = get_store_generic(&db_path, config, test_spec::<E>());
if store.get_chain_spec().is_peer_das_scheduled() {
// TODO(fulu): add prune tests for Fulu / PeerDAS data columns.
return;
}
let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else {
// No-op prior to Deneb.
return;
};
let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch());
let num_blocks_produced = E::slots_per_epoch() * 8;
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Prior to manual pruning with an artifically low data availability boundary all blobs should
// be stored.
assert_eq!(
store.get_blob_info().oldest_blob_slot,
Some(deneb_fork_slot)
);
check_blob_existence(&harness, Slot::new(1), harness.head_slot(), true);
// Trigger blob pruning of blobs older than epoch 6 - margin (6 is the minimum, due to
// finalization).
let data_availability_boundary = Epoch::new(6);
let effective_data_availability_boundary =
data_availability_boundary - store.get_config().blob_prune_margin_epochs;
assert!(
effective_data_availability_boundary > 0,
"must be > 0 because epoch 0 won't get pruned alone"
);
store
.try_prune_blobs(true, data_availability_boundary)
.unwrap();
// Check oldest blob slot is updated accordingly and prior blobs have been deleted.
let oldest_blob_slot = store.get_blob_info().oldest_blob_slot.unwrap();
assert_eq!(
oldest_blob_slot,
effective_data_availability_boundary.start_slot(E::slots_per_epoch())
);
check_blob_existence(&harness, Slot::new(0), oldest_blob_slot - 1, false);
check_blob_existence(&harness, oldest_blob_slot, harness.head_slot(), true);
}
/// Check that a database with `blobs_db=false` can be upgraded to `blobs_db=true` before Deneb.
#[tokio::test]
async fn change_to_separate_blobs_db_before_deneb() {
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
// Only run this test on forks prior to Deneb. If the blobs database already has blobs, we can't
// move it.
if store.get_chain_spec().deneb_fork_epoch.is_some() {
return;
}
let init_blob_info = store.get_blob_info();
assert!(
init_blob_info.blobs_db,
"separate blobs DB should be the default"
);
// Change to `blobs_db=false` to emulate legacy Deneb DB.
let legacy_blob_info = BlobInfo {
blobs_db: false,
..init_blob_info
};
store
.compare_and_set_blob_info_with_write(init_blob_info.clone(), legacy_blob_info.clone())
.unwrap();
assert_eq!(store.get_blob_info(), legacy_blob_info);
// Re-open the DB and check that `blobs_db` gets changed back to true.
drop(store);
let store = get_store(&db_path);
assert_eq!(store.get_blob_info(), init_blob_info);
}
/// Check that there are blob sidecars (or not) at every slot in the range.
fn check_blob_existence(
harness: &TestHarness,
start_slot: Slot,
end_slot: Slot,
should_exist: bool,
) {
let mut blobs_seen = 0;
for (block_root, slot) in harness
.chain
.forwards_iter_block_roots_until(start_slot, end_slot)
.unwrap()
.map(Result::unwrap)
{
if let Some(blobs) = harness.chain.store.get_blobs(&block_root).unwrap().blobs() {
assert!(should_exist, "blobs at slot {slot} exist but should not");
blobs_seen += blobs.len();
} else {
// We don't actually store empty blobs, so unfortunately we can't assert anything
// meaningful here (like asserting that the blob should not exist).
}
}
if should_exist {
assert_ne!(blobs_seen, 0, "expected non-zero number of blobs");
}
}
#[tokio::test]
async fn prune_historic_states() {
let num_blocks_produced = E::slots_per_epoch() * 5;
let db_path = tempdir().unwrap();
let store = get_store(&db_path);
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let genesis_state_root = harness.chain.genesis_state_root;
let genesis_state = harness
.chain
.get_state(&genesis_state_root, None, CACHE_STATE_IN_TESTS)
.unwrap()
.unwrap();
harness
.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
// Check historical states are present.
let first_epoch_state_roots = harness
.chain
.forwards_iter_state_roots(Slot::new(0))
.unwrap()
.take(E::slots_per_epoch() as usize)
.map(Result::unwrap)
.collect::<Vec<_>>();
for &(state_root, slot) in &first_epoch_state_roots {
assert!(store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap()
.is_some());
}
store
.prune_historic_states(genesis_state_root, &genesis_state)
.unwrap();
// Check that anchor info is updated.
let anchor_info = store.get_anchor_info();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);
// Ensure all epoch 0 states other than the genesis have been pruned.
for &(state_root, slot) in &first_epoch_state_roots {
assert_eq!(
store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap()
.is_some(),
slot == 0
);
}
// Run for another two epochs.
let additional_blocks_produced = 2 * E::slots_per_epoch();
harness
.extend_slots(additional_blocks_produced as usize)
.await;
check_finalization(&harness, num_blocks_produced + additional_blocks_produced);
check_split_slot(&harness, store);
}
// Test the function `get_ancestor_state_root` for slots prior to the split where we only have
// sparse summaries stored.
#[tokio::test]
async fn ancestor_state_root_prior_to_split() {
let db_path = tempdir().unwrap();
let spec = test_spec::<E>();
let store_config = StoreConfig {
prune_payloads: false,
hierarchy_config: HierarchyConfig::from_str("5,7,8").unwrap(),
..StoreConfig::default()
};
let chain_config = ChainConfig {
reconstruct_historic_states: false,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config, spec);
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config,
import_all_data_columns,
);
// Produce blocks until we have passed through two full snapshot periods. This period length is
// determined by the hierarchy config set above.
let num_blocks = 2 * store
.hierarchy
.next_snapshot_slot(Slot::new(1))
.unwrap()
.as_u64();
for num_blocks_so_far in 0..num_blocks {
harness
.extend_chain(
1,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
harness.advance_slot();
// Check that `get_ancestor_state_root` can look up the grid-aligned ancestors of every hot
// state, even at ancestor slots prior to the split.
let head_state = harness.get_current_state();
assert_eq!(head_state.slot().as_u64(), num_blocks_so_far + 1);
let split_slot = store.get_split_slot();
let anchor_slot = store.get_anchor_info().anchor_slot;
for state_slot in (split_slot.as_u64()..=num_blocks_so_far).map(Slot::new) {
for ancestor_slot in store
.hierarchy
.closest_layer_points(state_slot, anchor_slot)
{
// The function currently doesn't consider a state an ancestor of itself, so this
// does not work.
if ancestor_slot == state_slot {
continue;
}
let ancestor_state_root = store::hot_cold_store::get_ancestor_state_root(
&store,
&head_state,
ancestor_slot,
)
.unwrap_or_else(|e| {
panic!(
"get_ancestor_state_root failed for state_slot={state_slot}, \
ancestor_slot={ancestor_slot}, head_slot={}. error: {e:?}",
head_state.slot()
)
});
// Check state root correctness.
assert_eq!(
store
.load_hot_state_summary(&ancestor_state_root)
.unwrap()
.unwrap_or_else(|| panic!(
"no summary found for {ancestor_state_root:?} (slot {ancestor_slot})"
))
.slot,
ancestor_slot,
)
}
}
}
// This test only makes sense if the split is non-zero by the end.
assert_ne!(store.get_split_slot(), 0);
}
// Test that the chain operates correctly when the split state is stored as a ReplayFrom.
#[tokio::test]
async fn replay_from_split_state() {
let db_path = tempdir().unwrap();
let spec = test_spec::<E>();
let store_config = StoreConfig {
prune_payloads: false,
hierarchy_config: HierarchyConfig::from_str("5").unwrap(),
..StoreConfig::default()
};
let chain_config = ChainConfig {
reconstruct_historic_states: false,
..ChainConfig::default()
};
let import_all_data_columns = false;
let store = get_store_generic(&db_path, store_config.clone(), spec.clone());
let harness = get_harness_generic(
store.clone(),
LOW_VALIDATOR_COUNT,
chain_config,
import_all_data_columns,
);
// Produce blocks until we finalize epoch 3 which will not be stored as a snapshot.
let num_blocks = 5 * E::slots_per_epoch() as usize;
harness
.extend_chain(
num_blocks,
BlockStrategy::OnCanonicalHead,
AttestationStrategy::AllValidators,
)
.await;
let split = store.get_split_info();
let anchor_slot = store.get_anchor_info().anchor_slot;
assert_eq!(split.slot, 3 * E::slots_per_epoch());
assert_eq!(anchor_slot, 0);
assert!(store
.hierarchy
.storage_strategy(split.slot, anchor_slot)
.unwrap()
.is_replay_from());
// Close the database and reopen it.
drop(store);
drop(harness);
let store = get_store_generic(&db_path, store_config, spec);
// Check that the split state is still accessible.
assert_eq!(store.get_split_slot(), split.slot);
let state = store
.get_hot_state(&split.state_root, false)
.unwrap()
.expect("split state should be present");
assert_eq!(state.slot(), split.slot);
}
/// Checks that two chains are the same, for the purpose of these tests.
///
/// Several fields that are hard/impossible to check are ignored (e.g., the store).
fn assert_chains_pretty_much_the_same<T: BeaconChainTypes>(a: &BeaconChain<T>, b: &BeaconChain<T>) {
assert_eq!(a.spec, b.spec, "spec should be equal");
assert_eq!(a.op_pool, b.op_pool, "op_pool should be equal");
let a_head = a.head_snapshot();
let b_head = b.head_snapshot();
assert_eq!(
a_head.beacon_block_root, b_head.beacon_block_root,
"head block roots should be equal"
);
assert_eq!(
a_head.beacon_block, b_head.beacon_block,
"head blocks should be equal"
);
// Drop all caches to prevent them messing with the equality check.
let mut a_head_state = a_head.beacon_state.clone();
a_head_state.drop_all_caches().unwrap();
let mut b_head_state = b_head.beacon_state.clone();
b_head_state.drop_all_caches().unwrap();
assert_eq!(a_head_state, b_head_state, "head states should be equal");
assert_eq!(a.heads(), b.heads(), "heads() should be equal");
assert_eq!(
a.genesis_block_root, b.genesis_block_root,
"genesis_block_root should be equal"
);
let slot = a.slot().unwrap();
let spec = T::EthSpec::default_spec();
assert!(
a.canonical_head
.fork_choice_write_lock()
.get_head(slot, &spec)
.unwrap()
== b.canonical_head
.fork_choice_write_lock()
.get_head(slot, &spec)
.unwrap(),
"fork_choice heads should be equal"
);
}
/// Check that the head state's slot matches `expected_slot`.
fn check_slot(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head_snapshot().beacon_state;
assert_eq!(
state.slot(),
expected_slot,
"head should be at the current slot"
);
}
/// Check that the chain has finalized under best-case assumptions, and check the head slot.
fn check_finalization(harness: &TestHarness, expected_slot: u64) {
let state = &harness.chain.head_snapshot().beacon_state;
check_slot(harness, expected_slot);
assert_eq!(
state.current_justified_checkpoint().epoch,
state.current_epoch() - 1,
"the head should be justified one behind the current epoch"
);
assert_eq!(
state.finalized_checkpoint().epoch,
state.current_epoch() - 2,
"the head should be finalized two behind the current epoch"
);
}
/// Check that the HotColdDB's split_slot is equal to the start slot of the last finalized epoch.
fn check_split_slot(
harness: &TestHarness,
store: Arc<HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>>>,
) {
let split_slot = store.get_split_slot();
assert_eq!(
harness
.chain
.head_snapshot()
.beacon_state
.finalized_checkpoint()
.epoch
.start_slot(E::slots_per_epoch()),
split_slot
);
assert_ne!(split_slot, 0);
}
/// Check that all the states in a chain dump have the correct tree hash.
fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
check_chain_dump_from_slot(harness, Slot::new(0), expected_len)
}
fn check_chain_dump_from_slot(harness: &TestHarness, from_slot: Slot, expected_len: u64) {
let mut chain_dump = harness.chain.chain_dump_from_slot(from_slot).unwrap();
assert_eq!(chain_dump.len() as u64, expected_len);
for checkpoint in &mut chain_dump {
// Check that the tree hash of the stored state is as expected
assert_eq!(
checkpoint.beacon_state_root(),
checkpoint.beacon_state.update_tree_hash_cache().unwrap(),
"tree hash of stored state is incorrect"
);
// Check that looking up the state root with no slot hint succeeds.
// This tests the state root -> slot mapping.
assert_eq!(
harness
.chain
.store
.get_state(&checkpoint.beacon_state_root(), None, CACHE_STATE_IN_TESTS)
.expect("no error")
.expect("state exists")
.slot(),
checkpoint.beacon_state.slot()
);
// Check presence of execution payload on disk.
if harness.chain.spec.bellatrix_fork_epoch.is_some() {
assert!(
harness
.chain
.store
.execution_payload_exists(&checkpoint.beacon_block_root)
.unwrap(),
"incorrect payload storage for block at slot {}: {:?}",
checkpoint.beacon_block.slot(),
checkpoint.beacon_block_root,
);
}
}
// Check the forwards block roots iterator against the chain dump
let chain_dump_block_roots = chain_dump
.iter()
.map(|checkpoint| (checkpoint.beacon_block_root, checkpoint.beacon_block.slot()))
.collect::<Vec<_>>();
let mut forward_block_roots = harness
.chain
.forwards_iter_block_roots(from_slot)
.expect("should get iter")
.map(Result::unwrap)
.collect::<Vec<_>>();
// Drop the block roots for skipped slots.
forward_block_roots.dedup_by_key(|(block_root, _)| *block_root);
for i in 0..std::cmp::max(chain_dump_block_roots.len(), forward_block_roots.len()) {
assert_eq!(
chain_dump_block_roots[i],
forward_block_roots[i],
"split slot is {}",
harness.chain.store.get_split_slot()
);
}
}
/// Check that every state from the canonical chain is in the database, and that the
/// reverse state and block root iterators reach genesis.
fn check_iterators(harness: &TestHarness) {
check_iterators_from_slot(harness, Slot::new(0))
}
fn check_iterators_from_slot(harness: &TestHarness, slot: Slot) {
let mut max_slot = None;
for (state_root, slot) in harness
.chain
.forwards_iter_state_roots(slot)
.expect("should get iter")
.map(Result::unwrap)
{
assert!(
harness
.chain
.store
.get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS)
.unwrap()
.is_some(),
"state {:?} from canonical chain should be in DB",
state_root
);
max_slot = Some(slot);
}
// Assert that we reached the head.
assert_eq!(max_slot, Some(harness.head_slot()));
// Assert that the block root iterator reaches the head.
assert_eq!(
harness
.chain
.forwards_iter_block_roots(slot)
.expect("should get iter")
.last()
.map(Result::unwrap)
.map(|(_, slot)| slot),
Some(harness.head_slot())
);
}
fn get_finalized_epoch_boundary_blocks(
dump: &[BeaconSnapshot<MinimalEthSpec, BlindedPayload<MinimalEthSpec>>],
) -> HashSet<SignedBeaconBlockHash> {
dump.iter()
.cloned()
.map(|checkpoint| checkpoint.beacon_state.finalized_checkpoint().root.into())
.collect()
}
fn get_blocks(
dump: &[BeaconSnapshot<MinimalEthSpec, BlindedPayload<MinimalEthSpec>>],
) -> HashSet<SignedBeaconBlockHash> {
dump.iter()
.cloned()
.map(|checkpoint| checkpoint.beacon_block_root.into())
.collect()
}
fn clone_block<E: EthSpec>(block: &AvailableBlock<E>) -> AvailableBlock<E> {
block.__clone_without_recv().unwrap()
}