From 6350a270319267615034475613c7ff3366b941d3 Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 11 Mar 2026 01:20:02 -0500 Subject: [PATCH] Optionally check DB invariants at runtime (#8952) Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/beacon_chain/src/invariants.rs | 56 ++ beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/tests/store_tests.rs | 43 + beacon_node/http_api/src/database.rs | 9 + beacon_node/http_api/src/lib.rs | 14 + beacon_node/store/src/invariants.rs | 781 ++++++++++++++++++ beacon_node/store/src/lib.rs | 1 + beacon_node/store/src/state_cache.rs | 13 + 8 files changed, 918 insertions(+) create mode 100644 beacon_node/beacon_chain/src/invariants.rs create mode 100644 beacon_node/store/src/invariants.rs diff --git a/beacon_node/beacon_chain/src/invariants.rs b/beacon_node/beacon_chain/src/invariants.rs new file mode 100644 index 0000000000..7bcec7b0b4 --- /dev/null +++ b/beacon_node/beacon_chain/src/invariants.rs @@ -0,0 +1,56 @@ +//! Beacon chain database invariant checks. +//! +//! Builds the `InvariantContext` from beacon chain state and delegates all checks +//! to `HotColdDB::check_invariants`. + +use crate::BeaconChain; +use crate::beacon_chain::BeaconChainTypes; +use store::invariants::{InvariantCheckResult, InvariantContext}; + +impl BeaconChain { + /// Run all database invariant checks. + /// + /// Collects context from fork choice, state cache, custody columns, and pubkey cache, + /// then delegates to the store-level `check_invariants` method. + pub fn check_database_invariants(&self) -> Result { + let fork_choice_blocks = { + let fc = self.canonical_head.fork_choice_read_lock(); + let proto_array = fc.proto_array().core_proto_array(); + proto_array + .nodes + .iter() + .filter(|node| { + // Only check blocks that are descendants of the finalized checkpoint. + // Pruned non-canonical fork blocks may linger in the proto-array but + // are legitimately absent from the database. + fc.is_finalized_checkpoint_or_descendant(node.root) + }) + .map(|node| (node.root, node.slot)) + .collect() + }; + + let custody_context = self.data_availability_checker.custody_context(); + + let ctx = InvariantContext { + fork_choice_blocks, + state_cache_roots: self.store.state_cache.lock().state_roots(), + custody_columns: custody_context + .custody_columns_for_epoch(None, &self.spec) + .to_vec(), + pubkey_cache_pubkeys: { + let cache = self.validator_pubkey_cache.read(); + (0..cache.len()) + .filter_map(|i| { + cache.get(i).map(|pk| { + use store::StoreItem; + crate::validator_pubkey_cache::DatabasePubkey::from_pubkey(pk) + .as_store_bytes() + }) + }) + .collect() + }, + }; + + self.store.check_invariants(&ctx) + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 4efd90bd22..29081fd767 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -29,6 +29,7 @@ pub mod fork_choice_signal; pub mod graffiti_calculator; pub mod historical_blocks; pub mod historical_data_columns; +pub mod invariants; pub mod kzg_utils; pub mod light_client_finality_update_verification; pub mod light_client_optimistic_update_verification; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b6d729cc61..86f4af3efc 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -148,6 +148,22 @@ fn get_harness_generic( harness } +/// Check that all database invariants hold. +/// +/// Panics with a descriptive message if any invariant is violated. +fn check_db_invariants(harness: &TestHarness) { + let result = harness + .chain + .check_database_invariants() + .expect("invariant check should not error"); + + assert!( + result.is_ok(), + "database invariant violations found:\n{:#?}", + result.violations, + ); +} + fn get_states_descendant_of_block( store: &HotColdDB, BeaconNodeBackend>, block_root: Hash256, @@ -308,6 +324,7 @@ async fn full_participation_no_skips() { check_split_slot(&harness, store); check_chain_dump(&harness, num_blocks_produced + 1); check_iterators(&harness); + check_db_invariants(&harness); } #[tokio::test] @@ -352,6 +369,7 @@ async fn randomised_skips() { check_split_slot(&harness, store.clone()); check_chain_dump(&harness, num_blocks_produced + 1); check_iterators(&harness); + check_db_invariants(&harness); } #[tokio::test] @@ -400,6 +418,7 @@ async fn long_skip() { check_split_slot(&harness, store); check_chain_dump(&harness, initial_blocks + final_blocks + 1); check_iterators(&harness); + check_db_invariants(&harness); } /// Go forward to the point where the genesis randao value is no longer part of the vector. @@ -1769,6 +1788,8 @@ async fn prunes_abandoned_fork_between_two_finalized_checkpoints() { } assert!(!rig.knows_head(&stray_head)); + + check_db_invariants(&rig); } #[tokio::test] @@ -1897,6 +1918,8 @@ async fn pruning_does_not_touch_abandoned_block_shared_with_canonical_chain() { assert!(!rig.knows_head(&stray_head)); let chain_dump = rig.chain.chain_dump().unwrap(); assert!(get_blocks(&chain_dump).contains(&shared_head)); + + check_db_invariants(&rig); } #[tokio::test] @@ -1988,6 +2011,8 @@ async fn pruning_does_not_touch_blocks_prior_to_finalization() { } rig.assert_knows_head(stray_head.into()); + + check_db_invariants(&rig); } #[tokio::test] @@ -2127,6 +2152,8 @@ async fn prunes_fork_growing_past_youngest_finalized_checkpoint() { } assert!(!rig.knows_head(&stray_head)); + + check_db_invariants(&rig); } // This is to check if state outside of normal block processing are pruned correctly. @@ -2377,6 +2404,8 @@ async fn finalizes_non_epoch_start_slot() { state_hash ); } + + check_db_invariants(&rig); } fn check_all_blocks_exist<'a>( @@ -2643,6 +2672,8 @@ async fn pruning_test( check_all_states_exist(&harness, all_canonical_states.iter()); check_no_states_exist(&harness, stray_states.difference(&all_canonical_states)); check_no_blocks_exist(&harness, stray_blocks.values()); + + check_db_invariants(&harness); } #[tokio::test] @@ -2707,6 +2738,8 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { vec![(genesis_state_root, Slot::new(0))], "get_states_descendant_of_block({bad_block_parent_root:?})" ); + + check_db_invariants(&harness); } #[tokio::test] @@ -3361,6 +3394,16 @@ async fn weak_subjectivity_sync_test( store.clone().reconstruct_historic_states(None).unwrap(); assert_eq!(store.get_anchor_info().anchor_slot, wss_aligned_slot); assert_eq!(store.get_anchor_info().state_upper_limit, Slot::new(0)); + + // Check database invariants after full checkpoint sync + backfill + reconstruction. + let result = beacon_chain + .check_database_invariants() + .expect("invariant check should not error"); + assert!( + result.is_ok(), + "database invariant violations:\n{:#?}", + result.violations, + ); } // This test prunes data columns from epoch 0 and then tries to re-import them via diff --git a/beacon_node/http_api/src/database.rs b/beacon_node/http_api/src/database.rs index 8a50ec45b0..4737d92079 100644 --- a/beacon_node/http_api/src/database.rs +++ b/beacon_node/http_api/src/database.rs @@ -2,6 +2,7 @@ use beacon_chain::store::metadata::CURRENT_SCHEMA_VERSION; use beacon_chain::{BeaconChain, BeaconChainTypes}; use serde::Serialize; use std::sync::Arc; +use store::invariants::InvariantCheckResult; use store::{AnchorInfo, BlobInfo, Split, StoreConfig}; #[derive(Debug, Serialize)] @@ -30,3 +31,11 @@ pub fn info( blob_info, }) } + +pub fn check_invariants( + chain: Arc>, +) -> Result { + chain.check_database_invariants().map_err(|e| { + warp_utils::reject::custom_bad_request(format!("error checking database invariants: {e:?}")) + }) +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 0ef8654d8d..26bad809df 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3007,6 +3007,19 @@ pub fn serve( }, ); + // GET lighthouse/database/invariants + let get_lighthouse_database_invariants = database_path + .and(warp::path("invariants")) + .and(warp::path::end()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .then( + |task_spawner: TaskSpawner, chain: Arc>| { + task_spawner + .blocking_json_task(Priority::P1, move || database::check_invariants(chain)) + }, + ); + // POST lighthouse/database/reconstruct let post_lighthouse_database_reconstruct = database_path .and(warp::path("reconstruct")) @@ -3336,6 +3349,7 @@ pub fn serve( .uor(get_lighthouse_validator_inclusion) .uor(get_lighthouse_staking) .uor(get_lighthouse_database_info) + .uor(get_lighthouse_database_invariants) .uor(get_lighthouse_custody_info) .uor(get_lighthouse_attestation_performance) .uor(get_beacon_light_client_optimistic_update) diff --git a/beacon_node/store/src/invariants.rs b/beacon_node/store/src/invariants.rs new file mode 100644 index 0000000000..eb5232d344 --- /dev/null +++ b/beacon_node/store/src/invariants.rs @@ -0,0 +1,781 @@ +//! Database invariant checks for the hot and cold databases. +//! +//! These checks verify the consistency of data stored in the database. They are designed to be +//! called from the HTTP API and from tests to detect data corruption or bugs in the store logic. +//! +//! See the `check_invariants` and `check_database_invariants` methods for the full list. + +use crate::hdiff::StorageStrategy; +use crate::hot_cold_store::{ColdStateSummary, HotStateSummary}; +use crate::{DBColumn, Error, ItemStore}; +use crate::{HotColdDB, Split}; +use serde::Serialize; +use ssz::Decode; +use std::cmp; +use std::collections::HashSet; +use types::*; + +/// Result of running invariant checks on the database. +#[derive(Debug, Clone, Serialize)] +pub struct InvariantCheckResult { + /// List of invariant violations found. + pub violations: Vec, +} + +impl InvariantCheckResult { + pub fn new() -> Self { + Self { + violations: Vec::new(), + } + } + + pub fn is_ok(&self) -> bool { + self.violations.is_empty() + } + + pub fn add_violation(&mut self, violation: InvariantViolation) { + self.violations.push(violation); + } + + pub fn merge(&mut self, other: InvariantCheckResult) { + self.violations.extend(other.violations); + } +} + +impl Default for InvariantCheckResult { + fn default() -> Self { + Self::new() + } +} + +/// Context data from the beacon chain needed for invariant checks. +/// +/// This allows all invariant checks to live in the store crate while still checking +/// invariants that depend on fork choice, state cache, and custody context. +pub struct InvariantContext { + /// Block roots tracked by fork choice (invariant 1). + pub fork_choice_blocks: Vec<(Hash256, Slot)>, + /// State roots held in the in-memory state cache (invariant 8). + pub state_cache_roots: Vec, + /// Custody columns for the current epoch (invariant 7). + pub custody_columns: Vec, + /// Compressed pubkey bytes from the in-memory validator pubkey cache, indexed by validator index + /// (invariant 9). + pub pubkey_cache_pubkeys: Vec>, +} + +/// A single invariant violation. +#[derive(Debug, Clone, Serialize)] +pub enum InvariantViolation { + /// Invariant 1: fork choice block consistency. + /// + /// ```text + /// block in fork_choice && descends_from_finalized -> block in hot_db + /// ``` + ForkChoiceBlockMissing { block_root: Hash256, slot: Slot }, + /// Invariant 2: block and state consistency. + /// + /// ```text + /// block in hot_db && block.slot >= split.slot + /// -> state_summary for block.state_root() in hot_db + /// ``` + HotBlockMissingStateSummary { + block_root: Hash256, + slot: Slot, + state_root: Hash256, + }, + /// Invariant 3: state summary diff consistency. + /// + /// ```text + /// state_summary in hot_db + /// -> state diff/snapshot/nothing in hot_db according to hierarchy rules + /// ``` + HotStateMissingSnapshot { state_root: Hash256, slot: Slot }, + /// Invariant 3: state summary diff consistency (missing diff). + /// + /// ```text + /// state_summary in hot_db + /// -> state diff/snapshot/nothing in hot_db according to hierarchy rules + /// ``` + HotStateMissingDiff { state_root: Hash256, slot: Slot }, + /// Invariant 3: DiffFrom/ReplayFrom base slot must reference an existing summary. + /// + /// ```text + /// state_summary in hot_db + /// -> state diff/snapshot/nothing in hot_db according to hierarchy rules + /// ``` + HotStateBaseSummaryMissing { + slot: Slot, + base_state_root: Hash256, + }, + /// Invariant 4: state summary chain consistency. + /// + /// ```text + /// state_summary in hot_db && state_summary.slot > split.slot + /// -> state_summary for previous_state_root in hot_db + /// ``` + HotStateMissingPreviousSummary { + slot: Slot, + previous_state_root: Hash256, + }, + /// Invariant 5: block and execution payload consistency. + /// + /// ```text + /// block in hot_db && !prune_payloads -> payload for block.root in hot_db + /// ``` + ExecutionPayloadMissing { block_root: Hash256, slot: Slot }, + /// Invariant 6: block and blobs consistency. + /// + /// ```text + /// block in hot_db && num_blob_commitments > 0 + /// -> blob_list for block.root in hot_db + /// ``` + BlobSidecarMissing { block_root: Hash256, slot: Slot }, + /// Invariant 7: block and data columns consistency. + /// + /// ```text + /// block in hot_db && num_blob_commitments > 0 + /// && block.slot >= earliest_available_slot + /// && data_column_idx in custody_columns + /// -> (block_root, data_column_idx) in hot_db + /// ``` + DataColumnMissing { + block_root: Hash256, + slot: Slot, + column_index: ColumnIndex, + }, + /// Invariant 8: state cache and disk consistency. + /// + /// ```text + /// state in state_cache -> state_summary in hot_db + /// ``` + StateCacheMissingSummary { state_root: Hash256 }, + /// Invariant 9: pubkey cache consistency. + /// + /// ```text + /// state_summary in hot_db + /// -> all validator pubkeys from state.validators are in the hot_db + /// ``` + PubkeyCacheMissing { validator_index: usize }, + /// Invariant 9b: pubkey cache value mismatch. + /// + /// ```text + /// pubkey_cache[i] == hot_db(PubkeyCache)[i] + /// ``` + PubkeyCacheMismatch { validator_index: usize }, + /// Invariant 10: block root indices mapping. + /// + /// ```text + /// oldest_block_slot <= i < split.slot + /// -> block_root for slot i in cold_db + /// && block for block_root in hot_db + /// ``` + ColdBlockRootMissing { + slot: Slot, + oldest_block_slot: Slot, + split_slot: Slot, + }, + /// Invariant 10: block root index references a block that must exist. + /// + /// ```text + /// oldest_block_slot <= i < split.slot + /// -> block_root for slot i in cold_db + /// && block for block_root in hot_db + /// ``` + ColdBlockRootOrphan { slot: Slot, block_root: Hash256 }, + /// Invariant 11: state root indices mapping. + /// + /// ```text + /// (i <= state_lower_limit || i >= min(split.slot, state_upper_limit)) && i < split.slot + /// -> i |-> state_root in cold_db(BeaconStateRoots) + /// && state_root |-> cold_state_summary in cold_db(BeaconColdStateSummary) + /// && cold_state_summary.slot == i + /// ``` + ColdStateRootMissing { + slot: Slot, + state_lower_limit: Slot, + state_upper_limit: Slot, + split_slot: Slot, + }, + /// Invariant 11: state root index must have a cold state summary. + /// + /// ```text + /// (i <= state_lower_limit || i >= min(split.slot, state_upper_limit)) && i < split.slot + /// -> i |-> state_root in cold_db(BeaconStateRoots) + /// && state_root |-> cold_state_summary in cold_db(BeaconColdStateSummary) + /// && cold_state_summary.slot == i + /// ``` + ColdStateRootMissingSummary { slot: Slot, state_root: Hash256 }, + /// Invariant 11: cold state summary slot must match index slot. + /// + /// ```text + /// (i <= state_lower_limit || i >= min(split.slot, state_upper_limit)) && i < split.slot + /// -> i |-> state_root in cold_db(BeaconStateRoots) + /// && state_root |-> cold_state_summary in cold_db(BeaconColdStateSummary) + /// && cold_state_summary.slot == i + /// ``` + ColdStateRootSlotMismatch { + slot: Slot, + state_root: Hash256, + summary_slot: Slot, + }, + /// Invariant 12: cold state diff consistency. + /// + /// ```text + /// cold_state_summary in cold_db + /// -> slot |-> state diff/snapshot/nothing in cold_db according to diff hierarchy + /// ``` + ColdStateMissingSnapshot { state_root: Hash256, slot: Slot }, + /// Invariant 12: cold state diff consistency (missing diff). + /// + /// ```text + /// cold_state_summary in cold_db + /// -> slot |-> state diff/snapshot/nothing in cold_db according to diff hierarchy + /// ``` + ColdStateMissingDiff { state_root: Hash256, slot: Slot }, + /// Invariant 12: DiffFrom/ReplayFrom base slot must reference an existing summary. + /// + /// ```text + /// cold_state_summary in cold_db + /// -> slot |-> state diff/snapshot/nothing in cold_db according to diff hierarchy + /// ``` + ColdStateBaseSummaryMissing { slot: Slot, base_slot: Slot }, +} + +impl, Cold: ItemStore> HotColdDB { + /// Run all database invariant checks. + /// + /// The `ctx` parameter provides data from the beacon chain layer (fork choice, state cache, + /// custody columns, pubkey cache) so that all invariant checks can live in this single file. + pub fn check_invariants(&self, ctx: &InvariantContext) -> Result { + let mut result = InvariantCheckResult::new(); + let split = self.get_split_info(); + + result.merge(self.check_fork_choice_block_consistency(ctx)?); + result.merge(self.check_hot_block_invariants(&split, ctx)?); + result.merge(self.check_hot_state_summary_diff_consistency()?); + result.merge(self.check_hot_state_summary_chain_consistency(&split)?); + result.merge(self.check_state_cache_consistency(ctx)?); + result.merge(self.check_cold_block_root_indices(&split)?); + result.merge(self.check_cold_state_root_indices(&split)?); + result.merge(self.check_cold_state_diff_consistency()?); + result.merge(self.check_pubkey_cache_consistency(ctx)?); + + Ok(result) + } + + /// Invariant 1 (Hot DB): Fork choice block consistency. + /// + /// ```text + /// block in fork_choice && descends_from_finalized -> block in hot_db + /// ``` + /// + /// Every canonical fork choice block (descending from finalized) must exist in the hot + /// database. Pruned non-canonical fork blocks may linger in the proto-array and are + /// excluded from this check. + fn check_fork_choice_block_consistency( + &self, + ctx: &InvariantContext, + ) -> Result { + let mut result = InvariantCheckResult::new(); + + for &(block_root, slot) in &ctx.fork_choice_blocks { + let exists = self + .hot_db + .key_exists(DBColumn::BeaconBlock, block_root.as_slice())?; + if !exists { + result + .add_violation(InvariantViolation::ForkChoiceBlockMissing { block_root, slot }); + } + } + + Ok(result) + } + + /// Invariants 2, 5, 6, 7 (Hot DB): Block-related consistency checks. + /// + /// Iterates hot DB blocks once and checks: + /// - Invariant 2: block-state summary consistency + /// - Invariant 5: execution payload consistency (when prune_payloads=false) + /// - Invariant 6: blob sidecar consistency (Deneb to Fulu) + /// - Invariant 7: data column consistency (post-Fulu, when custody_columns provided) + fn check_hot_block_invariants( + &self, + split: &Split, + ctx: &InvariantContext, + ) -> Result { + let mut result = InvariantCheckResult::new(); + + let check_payloads = !self.get_config().prune_payloads; + let bellatrix_fork_slot = self + .spec + .bellatrix_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let deneb_fork_slot = self + .spec + .deneb_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let fulu_fork_slot = self + .spec + .fulu_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); + let oldest_blob_slot = self.get_blob_info().oldest_blob_slot; + let oldest_data_column_slot = self.get_data_column_info().oldest_data_column_slot; + + for res in self.hot_db.iter_column::(DBColumn::BeaconBlock) { + let (block_root, block_bytes) = res?; + let block = SignedBlindedBeaconBlock::::from_ssz_bytes(&block_bytes, &self.spec)?; + let slot = block.slot(); + + // Invariant 2: block-state consistency. + if slot >= split.slot { + let state_root = block.state_root(); + let has_summary = self + .hot_db + .key_exists(DBColumn::BeaconStateHotSummary, state_root.as_slice())?; + if !has_summary { + result.add_violation(InvariantViolation::HotBlockMissingStateSummary { + block_root, + slot, + state_root, + }); + } + } + + // Invariant 5: execution payload consistency. + // TODO(gloas): reconsider this invariant + if check_payloads + && let Some(bellatrix_slot) = bellatrix_fork_slot + && slot >= bellatrix_slot + && !self.execution_payload_exists(&block_root)? + && !self.payload_envelope_exists(&block_root)? + { + result.add_violation(InvariantViolation::ExecutionPayloadMissing { + block_root, + slot, + }); + } + + // Invariant 6: blob sidecar consistency. + // Only check blocks that actually have blob KZG commitments — blocks with 0 + // commitments legitimately have no blob sidecars stored. + if let Some(deneb_slot) = deneb_fork_slot + && let Some(oldest_blob) = oldest_blob_slot + && slot >= deneb_slot + && slot >= oldest_blob + && fulu_fork_slot.is_none_or(|fulu_slot| slot < fulu_slot) + && block.num_expected_blobs() > 0 + { + let has_blob = self + .blobs_db + .key_exists(DBColumn::BeaconBlob, block_root.as_slice())?; + if !has_blob { + result + .add_violation(InvariantViolation::BlobSidecarMissing { block_root, slot }); + } + } + + // Invariant 7: data column consistency. + // Only check blocks that actually have blob KZG commitments. + // TODO(gloas): reconsider this invariant — non-canonical payloads won't have + // their data column sidecars stored. + if !ctx.custody_columns.is_empty() + && let Some(fulu_slot) = fulu_fork_slot + && let Some(oldest_dc) = oldest_data_column_slot + && slot >= fulu_slot + && slot >= oldest_dc + && block.num_expected_blobs() > 0 + { + let stored_columns = self.get_data_column_keys(block_root)?; + for col_idx in &ctx.custody_columns { + if !stored_columns.contains(col_idx) { + result.add_violation(InvariantViolation::DataColumnMissing { + block_root, + slot, + column_index: *col_idx, + }); + } + } + } + } + + Ok(result) + } + + /// Invariant 3 (Hot DB): State summary diff/snapshot consistency. + /// + /// ```text + /// state_summary in hot_db + /// -> state diff/snapshot/nothing in hot_db per HDiff hierarchy rules + /// ``` + /// + /// Each hot state summary should have the correct storage artifact (snapshot, diff, or + /// nothing) according to the HDiff hierarchy configuration. The hierarchy uses the + /// anchor_slot as its start point for the hot DB. + fn check_hot_state_summary_diff_consistency(&self) -> Result { + let mut result = InvariantCheckResult::new(); + + let anchor_slot = self.get_anchor_info().anchor_slot; + + // Collect all summary slots and their strategies in a first pass. + let mut known_state_roots = HashSet::new(); + let mut base_state_refs: Vec<(Slot, Hash256)> = Vec::new(); + + for res in self + .hot_db + .iter_column::(DBColumn::BeaconStateHotSummary) + { + let (state_root, value) = res?; + let summary = HotStateSummary::from_ssz_bytes(&value)?; + + known_state_roots.insert(state_root); + + match self.hierarchy.storage_strategy(summary.slot, anchor_slot)? { + StorageStrategy::Snapshot => { + let has_snapshot = self + .hot_db + .key_exists(DBColumn::BeaconStateHotSnapshot, state_root.as_slice())?; + if !has_snapshot { + result.add_violation(InvariantViolation::HotStateMissingSnapshot { + state_root, + slot: summary.slot, + }); + } + } + StorageStrategy::DiffFrom(base_slot) => { + let has_diff = self + .hot_db + .key_exists(DBColumn::BeaconStateHotDiff, state_root.as_slice())?; + if !has_diff { + result.add_violation(InvariantViolation::HotStateMissingDiff { + state_root, + slot: summary.slot, + }); + } + if let Ok(base_root) = summary.diff_base_state.get_root(base_slot) { + base_state_refs.push((summary.slot, base_root)); + } + } + StorageStrategy::ReplayFrom(base_slot) => { + if let Ok(base_root) = summary.diff_base_state.get_root(base_slot) { + base_state_refs.push((summary.slot, base_root)); + } + } + } + } + + // Verify that all diff base state roots reference existing summaries. + for (slot, base_state_root) in base_state_refs { + if !known_state_roots.contains(&base_state_root) { + result.add_violation(InvariantViolation::HotStateBaseSummaryMissing { + slot, + base_state_root, + }); + } + } + + Ok(result) + } + + /// Invariant 4 (Hot DB): State summary chain consistency. + /// + /// ```text + /// state_summary in hot_db && state_summary.slot > split.slot + /// -> state_summary for previous_state_root in hot_db + /// ``` + /// + /// The chain of `previous_state_root` links must be continuous back to the split state. + /// The split state itself is the boundary and does not need a predecessor in the hot DB. + fn check_hot_state_summary_chain_consistency( + &self, + split: &Split, + ) -> Result { + let mut result = InvariantCheckResult::new(); + + for res in self + .hot_db + .iter_column::(DBColumn::BeaconStateHotSummary) + { + let (_state_root, value) = res?; + let summary = HotStateSummary::from_ssz_bytes(&value)?; + + if summary.slot > split.slot { + let prev_root = summary.previous_state_root; + let has_prev = self + .hot_db + .key_exists(DBColumn::BeaconStateHotSummary, prev_root.as_slice())?; + if !has_prev { + result.add_violation(InvariantViolation::HotStateMissingPreviousSummary { + slot: summary.slot, + previous_state_root: prev_root, + }); + } + } + } + + Ok(result) + } + + /// Invariant 8 (Hot DB): State cache and disk consistency. + /// + /// ```text + /// state in state_cache -> state_summary in hot_db + /// ``` + /// + /// Every state held in the in-memory state cache (including the finalized state) should + /// have a corresponding hot state summary on disk. + fn check_state_cache_consistency( + &self, + ctx: &InvariantContext, + ) -> Result { + let mut result = InvariantCheckResult::new(); + + for &state_root in &ctx.state_cache_roots { + let has_summary = self + .hot_db + .key_exists(DBColumn::BeaconStateHotSummary, state_root.as_slice())?; + if !has_summary { + result.add_violation(InvariantViolation::StateCacheMissingSummary { state_root }); + } + } + + Ok(result) + } + + /// Invariant 10 (Cold DB): Block root indices. + /// + /// ```text + /// oldest_block_slot <= i < split.slot + /// -> block_root for slot i in cold_db + /// && block for block_root in hot_db + /// ``` + /// + /// Every slot in the cold range (from `oldest_block_slot` to `split.slot`) should have a + /// block root index entry, and the referenced block should exist in the hot DB. Note that + /// skip slots store the most recent non-skipped block's root, so `block.slot()` may differ + /// from the index slot. + fn check_cold_block_root_indices(&self, split: &Split) -> Result { + let mut result = InvariantCheckResult::new(); + + let anchor_info = self.get_anchor_info(); + + if anchor_info.oldest_block_slot >= split.slot { + return Ok(result); + } + + for slot_val in anchor_info.oldest_block_slot.as_u64()..split.slot.as_u64() { + let slot = Slot::new(slot_val); + + let slot_bytes = slot_val.to_be_bytes(); + let block_root_bytes = self + .cold_db + .get_bytes(DBColumn::BeaconBlockRoots, &slot_bytes)?; + + let Some(root_bytes) = block_root_bytes else { + result.add_violation(InvariantViolation::ColdBlockRootMissing { + slot, + oldest_block_slot: anchor_info.oldest_block_slot, + split_slot: split.slot, + }); + continue; + }; + + if root_bytes.len() != 32 { + return Err(Error::InvalidKey(format!( + "cold block root at slot {slot} has invalid length {}", + root_bytes.len() + ))); + } + + let block_root = Hash256::from_slice(&root_bytes); + let block_exists = self + .hot_db + .key_exists(DBColumn::BeaconBlock, block_root.as_slice())?; + if !block_exists { + result.add_violation(InvariantViolation::ColdBlockRootOrphan { slot, block_root }); + } + } + + Ok(result) + } + + /// Invariant 11 (Cold DB): State root indices. + /// + /// ```text + /// (i <= state_lower_limit || i >= min(split.slot, state_upper_limit)) && i < split.slot + /// -> i |-> state_root in cold_db(BeaconStateRoots) + /// && state_root |-> cold_state_summary in cold_db(BeaconColdStateSummary) + /// && cold_state_summary.slot == i + /// ``` + fn check_cold_state_root_indices(&self, split: &Split) -> Result { + let mut result = InvariantCheckResult::new(); + + let anchor_info = self.get_anchor_info(); + + // Expected slots are: (i <= state_lower_limit || i >= effective_upper) && i < split.slot + // where effective_upper = min(split.slot, state_upper_limit). + for slot_val in 0..split.slot.as_u64() { + let slot = Slot::new(slot_val); + + if slot <= anchor_info.state_lower_limit + || slot >= cmp::min(split.slot, anchor_info.state_upper_limit) + { + let slot_bytes = slot_val.to_be_bytes(); + let Some(root_bytes) = self + .cold_db + .get_bytes(DBColumn::BeaconStateRoots, &slot_bytes)? + else { + result.add_violation(InvariantViolation::ColdStateRootMissing { + slot, + state_lower_limit: anchor_info.state_lower_limit, + state_upper_limit: anchor_info.state_upper_limit, + split_slot: split.slot, + }); + continue; + }; + + if root_bytes.len() != 32 { + return Err(Error::InvalidKey(format!( + "cold state root at slot {slot} has invalid length {}", + root_bytes.len() + ))); + } + + let state_root = Hash256::from_slice(&root_bytes); + + match self + .cold_db + .get_bytes(DBColumn::BeaconColdStateSummary, state_root.as_slice())? + { + None => { + result.add_violation(InvariantViolation::ColdStateRootMissingSummary { + slot, + state_root, + }); + } + Some(summary_bytes) => { + let summary = ColdStateSummary::from_ssz_bytes(&summary_bytes)?; + if summary.slot != slot { + result.add_violation(InvariantViolation::ColdStateRootSlotMismatch { + slot, + state_root, + summary_slot: summary.slot, + }); + } + } + } + } + } + + Ok(result) + } + + /// Invariant 12 (Cold DB): Cold state diff/snapshot consistency. + /// + /// ```text + /// cold_state_summary in cold_db + /// -> state diff/snapshot/nothing in cold_db per HDiff hierarchy rules + /// ``` + /// + /// Each cold state summary should have the correct storage artifact according to the + /// HDiff hierarchy. Cold states always use genesis (slot 0) as the hierarchy start since + /// they are finalized and have no anchor_slot dependency. + fn check_cold_state_diff_consistency(&self) -> Result { + let mut result = InvariantCheckResult::new(); + + let mut summary_slots = HashSet::new(); + let mut base_slot_refs = Vec::new(); + + for res in self + .cold_db + .iter_column::(DBColumn::BeaconColdStateSummary) + { + let (state_root, value) = res?; + let summary = ColdStateSummary::from_ssz_bytes(&value)?; + + summary_slots.insert(summary.slot); + + let slot_bytes = summary.slot.as_u64().to_be_bytes(); + + match self + .hierarchy + .storage_strategy(summary.slot, Slot::new(0))? + { + StorageStrategy::Snapshot => { + let has_snapshot = self + .cold_db + .key_exists(DBColumn::BeaconStateSnapshot, &slot_bytes)?; + if !has_snapshot { + result.add_violation(InvariantViolation::ColdStateMissingSnapshot { + state_root, + slot: summary.slot, + }); + } + } + StorageStrategy::DiffFrom(base_slot) => { + let has_diff = self + .cold_db + .key_exists(DBColumn::BeaconStateDiff, &slot_bytes)?; + if !has_diff { + result.add_violation(InvariantViolation::ColdStateMissingDiff { + state_root, + slot: summary.slot, + }); + } + base_slot_refs.push((summary.slot, base_slot)); + } + StorageStrategy::ReplayFrom(base_slot) => { + base_slot_refs.push((summary.slot, base_slot)); + } + } + } + + // Verify that all DiffFrom/ReplayFrom base slots reference existing summaries. + for (slot, base_slot) in base_slot_refs { + if !summary_slots.contains(&base_slot) { + result.add_violation(InvariantViolation::ColdStateBaseSummaryMissing { + slot, + base_slot, + }); + } + } + + Ok(result) + } + + /// Invariant 9 (Hot DB): Pubkey cache consistency. + /// + /// ```text + /// all validator pubkeys from states are in hot_db(PubkeyCache) + /// ``` + /// + /// Checks that the in-memory pubkey cache and the on-disk PubkeyCache column have the same + /// number of entries AND that each pubkey matches at every validator index. + fn check_pubkey_cache_consistency( + &self, + ctx: &InvariantContext, + ) -> Result { + let mut result = InvariantCheckResult::new(); + + // Read on-disk pubkeys by sequential validator index (matching how they are stored + // with Hash256::from_low_u64_be(index) as key). + // Iterate in-memory pubkeys and verify each matches on disk. + for (validator_index, in_memory_bytes) in ctx.pubkey_cache_pubkeys.iter().enumerate() { + let mut key = [0u8; 32]; + key[24..].copy_from_slice(&(validator_index as u64).to_be_bytes()); + match self.hot_db.get_bytes(DBColumn::PubkeyCache, &key)? { + Some(on_disk_bytes) if in_memory_bytes != &on_disk_bytes => { + result + .add_violation(InvariantViolation::PubkeyCacheMismatch { validator_index }); + } + None => { + result + .add_violation(InvariantViolation::PubkeyCacheMissing { validator_index }); + } + _ => {} + } + } + + Ok(result) + } +} diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3363eb800c..bfa1200602 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -15,6 +15,7 @@ pub mod hdiff; pub mod historic_state_cache; pub mod hot_cold_store; mod impls; +pub mod invariants; mod memory_store; pub mod metadata; pub mod metrics; diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 4b0d1ee016..6d159c9361 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -111,6 +111,19 @@ impl StateCache { self.hdiff_buffers.mem_usage() } + /// Return all state roots currently held in the cache, including the finalized state. + pub fn state_roots(&self) -> Vec { + let mut roots: Vec = self + .states + .iter() + .map(|(&state_root, _)| state_root) + .collect(); + if let Some(ref finalized) = self.finalized_state { + roots.push(finalized.state_root); + } + roots + } + pub fn update_finalized_state( &mut self, state_root: Hash256,