diff --git a/beacon_node/beacon_chain/src/attestation_rewards.rs b/beacon_node/beacon_chain/src/attestation_rewards.rs index 4f7c480c8c..46dda286f1 100644 --- a/beacon_node/beacon_chain/src/attestation_rewards.rs +++ b/beacon_node/beacon_chain/src/attestation_rewards.rs @@ -47,8 +47,10 @@ impl BeaconChain { .state_root_at_slot(state_slot)? .ok_or(BeaconChainError::NoStateForSlot(state_slot))?; + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let state = self - .get_state(&state_root, Some(state_slot))? + .get_state(&state_root, Some(state_slot), true)? .ok_or(BeaconChainError::MissingBeaconState(state_root))?; if state.fork_name_unchecked().altair_enabled() { diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 7f356bd621..ae715afcd0 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -325,8 +325,10 @@ impl AttesterCache { return Ok(value); } + // We use `cache_state = true` here because if we are attesting to the state it's likely + // to be recent and useful for other things. let mut state: BeaconState = chain - .get_state(&state_root, None)? + .get_state(&state_root, None, true)? .ok_or(Error::MissingBeaconState(state_root))?; if state.slot() > slot { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 6ce0c00d10..b24211009e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -815,8 +815,10 @@ impl BeaconChain { let block = self .get_blinded_block(&block_root)? .ok_or(Error::MissingBeaconBlock(block_root))?; + // This method is only used in tests, so we may as well cache states to make CI go brr. + // TODO(release-v7) move this method out of beacon chain and into `store_tests`` or something equivalent. let state = self - .get_state(&block.state_root(), Some(block.slot()))? + .get_state(&block.state_root(), Some(block.slot()), true)? .ok_or_else(|| Error::MissingBeaconState(block.state_root()))?; let iter = BlockRootsIterator::owned(&self.store, state); Ok(std::iter::once(Ok((block_root, block.slot()))) @@ -1343,8 +1345,9 @@ impl BeaconChain { &self, state_root: &Hash256, slot: Option, + update_cache: bool, ) -> Result>, Error> { - Ok(self.store.get_state(state_root, slot)?) + Ok(self.store.get_state(state_root, slot, update_cache)?) } /// Return the sync committee at `slot + 1` from the canonical chain. @@ -1519,8 +1522,14 @@ impl BeaconChain { })? .ok_or(Error::NoStateForSlot(slot))?; + // This branch is mostly reached from the HTTP API when doing analysis, or in niche + // situations when producing a block. In the HTTP API case we assume the user wants + // to cache states so that future calls are faster, and that if the cache is + // struggling due to non-finality that they will dial down inessential calls. In the + // block proposal case we want to cache the state so that we can process the block + // quickly after it has been signed. Ok(self - .get_state(&state_root, Some(slot))? + .get_state(&state_root, Some(slot), true)? .ok_or(Error::NoStateForSlot(slot))?) } } @@ -6916,9 +6925,11 @@ impl BeaconChain { })?; let beacon_state_root = beacon_block.state_root(); + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let mut beacon_state = self .store - .get_state(&beacon_state_root, Some(beacon_block.slot()))? + .get_state(&beacon_state_root, Some(beacon_block.slot()), true)? .ok_or_else(|| { Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root)) })?; @@ -7070,8 +7081,10 @@ impl BeaconChain { if signed_beacon_block.slot() % T::EthSpec::slots_per_epoch() == 0 { let block = self.get_blinded_block(&block_hash).unwrap().unwrap(); + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let state = self - .get_state(&block.state_root(), Some(block.slot())) + .get_state(&block.state_root(), Some(block.slot()), true) .unwrap() .unwrap(); finalized_blocks.insert(state.finalized_checkpoint().root); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 8d62478bea..02b566971a 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -298,8 +298,13 @@ where .get_blinded_block(&chain.genesis_block_root) .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; + // We're resuming from some state in the db so it makes sense to cache it. let genesis_state = store - .get_state(&genesis_block.state_root(), Some(genesis_block.slot())) + .get_state( + &genesis_block.state_root(), + Some(genesis_block.slot()), + true, + ) .map_err(|e| descriptive_db_error("genesis state", &e))? .ok_or("Genesis state not found in store")?; diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 4e21372efb..4d2ff11b38 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -784,6 +784,12 @@ impl BeaconChain { .execution_status .is_optimistic_or_invalid(); + // Update the state cache so it doesn't mistakenly prune the new head. + self.store + .state_cache + .lock() + .update_head_block_root(new_cached_head.head_block_root()); + // Detect and potentially report any re-orgs. let reorg_distance = detect_reorg( &old_snapshot.beacon_state, diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 8d1c29f46f..48ff87fe3c 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -116,8 +116,9 @@ pub fn reset_fork_choice_to_finalization, Cold: It // Advance finalized state to finalized epoch (to handle skipped slots). let finalized_state_root = finalized_block.state_root(); + // The enshrined finalized state should be in the state cache. let mut finalized_state = store - .get_state(&finalized_state_root, Some(finalized_block.slot())) + .get_state(&finalized_state_root, Some(finalized_block.slot()), true) .map_err(|e| format!("Error loading finalized state: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 78442d8df0..dd25da0847 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -319,8 +319,11 @@ impl LightClientServerCache { metrics::inc_counter(&metrics::LIGHT_CLIENT_SERVER_CACHE_PREV_BLOCK_CACHE_MISS); // Compute the value, handling potential errors. + // This state should already be cached. By electing not to cache it here + // we remove any chance of the light client server from affecting the state cache. + // We'd like the light client server to be as minimally invasive as possible. let mut state = store - .get_state(block_state_root, Some(block_slot))? + .get_state(block_state_root, Some(block_slot), false)? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!("Missing state {:?}", block_state_root)) })?; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index bc4b8e1ed8..a8543fab9b 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -320,7 +320,8 @@ impl, Cold: ItemStore> BackgroundMigrator state, other => { error!( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 24c85b3e07..fa26c9dbd9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -890,7 +890,7 @@ where pub fn get_hot_state(&self, state_hash: BeaconStateHash) -> Option> { self.chain .store - .load_hot_state(&state_hash.into()) + .load_hot_state(&state_hash.into(), true) .unwrap() .map(|(state, _)| state) } diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index dcc63ddf62..30eec539fc 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -36,6 +36,9 @@ pub const VALIDATOR_COUNT: usize = 256; pub const CAPELLA_FORK_EPOCH: usize = 1; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + /// A cached set of keys. static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT)); @@ -1225,7 +1228,11 @@ async fn attestation_that_skips_epochs() { let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state( + &earlier_block.state_root(), + Some(earlier_slot), + CACHE_STATE_IN_TESTS, + ) .expect("should not error getting state") .expect("should find state"); @@ -1329,9 +1336,14 @@ async fn attestation_validator_receive_proposer_reward_and_withdrawals() { .await; let current_slot = harness.get_current_slot(); + let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state( + &earlier_block.state_root(), + Some(earlier_slot), + CACHE_STATE_IN_TESTS, + ) .expect("should not error getting state") .expect("should find state"); @@ -1399,7 +1411,11 @@ async fn attestation_to_finalized_block() { let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state( + &earlier_block.state_root(), + Some(earlier_slot), + CACHE_STATE_IN_TESTS, + ) .expect("should not error getting state") .expect("should find state"); diff --git a/beacon_node/beacon_chain/tests/rewards.rs b/beacon_node/beacon_chain/tests/rewards.rs index 41e6467b0f..710752d9cc 100644 --- a/beacon_node/beacon_chain/tests/rewards.rs +++ b/beacon_node/beacon_chain/tests/rewards.rs @@ -20,6 +20,9 @@ use types::{ChainSpec, ForkName, Slot}; pub const VALIDATOR_COUNT: usize = 64; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + type E = MinimalEthSpec; static KEYPAIRS: LazyLock> = @@ -116,8 +119,13 @@ async fn test_sync_committee_rewards() { .get_blinded_block(&block.parent_root()) .unwrap() .unwrap(); + let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state( + &parent_block.state_root(), + Some(parent_block.slot()), + CACHE_STATE_IN_TESTS, + ) .unwrap() .unwrap(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7a2df76970..d1950ab7ce 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -39,6 +39,9 @@ use types::*; pub const LOW_VALIDATOR_COUNT: usize = 24; pub const HIGH_VALIDATOR_COUNT: usize = 64; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + /// A cached set of keys. static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(HIGH_VALIDATOR_COUNT)); @@ -758,6 +761,7 @@ async fn delete_blocks_and_states() { .get_state( &faulty_head_block.state_root(), Some(faulty_head_block.slot()), + CACHE_STATE_IN_TESTS, ) .expect("no db error") .expect("faulty head state exists"); @@ -771,7 +775,12 @@ async fn delete_blocks_and_states() { break; } store.delete_state(&state_root, slot).unwrap(); - assert_eq!(store.get_state(&state_root, Some(slot)).unwrap(), None); + assert_eq!( + store + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) + .unwrap(), + None + ); } // Double-deleting should also be OK (deleting non-existent things is fine) @@ -1055,7 +1064,11 @@ fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconStat .unwrap(); harness .chain - .get_state(&head_block.state_root(), Some(head_block.slot())) + .get_state( + &head_block.state_root(), + Some(head_block.slot()), + CACHE_STATE_IN_TESTS, + ) .unwrap() .unwrap() } @@ -1892,7 +1905,10 @@ fn check_all_states_exist<'a>( states: impl Iterator, ) { for &state_hash in states { - let state = harness.chain.get_state(&state_hash.into(), None).unwrap(); + let state = harness + .chain + .get_state(&state_hash.into(), None, CACHE_STATE_IN_TESTS) + .unwrap(); assert!( state.is_some(), "expected state {:?} to be in DB", @@ -1910,7 +1926,7 @@ fn check_no_states_exist<'a>( assert!( harness .chain - .get_state(&state_root.into(), None) + .get_state(&state_root.into(), None, CACHE_STATE_IN_TESTS) .unwrap() .is_none(), "state {:?} should not be in the DB", @@ -2344,7 +2360,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .get_or_reconstruct_blobs(&wss_block_root) .unwrap(); let wss_state = full_store - .get_state(&wss_state_root, Some(checkpoint_slot)) + .get_state(&wss_state_root, Some(checkpoint_slot), CACHE_STATE_IN_TESTS) .unwrap() .unwrap(); @@ -2460,7 +2476,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Check that the new block's state can be loaded correctly. let mut state = beacon_chain .store - .get_state(&state_root, Some(slot)) + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2594,7 +2610,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .unwrap() .map(Result::unwrap) { - let mut state = store.get_state(&state_root, Some(slot)).unwrap().unwrap(); + let mut state = store + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) + .unwrap() + .unwrap(); assert_eq!(state.slot(), slot); assert_eq!(state.canonical_root().unwrap(), state_root); } @@ -3424,9 +3443,10 @@ async fn prune_historic_states() { let store = get_store(&db_path); let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let genesis_state_root = harness.chain.genesis_state_root; + let genesis_state = harness .chain - .get_state(&genesis_state_root, None) + .get_state(&genesis_state_root, None, CACHE_STATE_IN_TESTS) .unwrap() .unwrap(); @@ -3447,7 +3467,10 @@ async fn prune_historic_states() { .map(Result::unwrap) .collect::>(); for &(state_root, slot) in &first_epoch_state_roots { - assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some()); + assert!(store + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) + .unwrap() + .is_some()); } store @@ -3462,7 +3485,10 @@ async fn prune_historic_states() { // Ensure all epoch 0 states other than the genesis have been pruned. for &(state_root, slot) in &first_epoch_state_roots { assert_eq!( - store.get_state(&state_root, Some(slot)).unwrap().is_some(), + store + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) + .unwrap() + .is_some(), slot == 0 ); } @@ -3588,7 +3614,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { harness .chain .store - .get_state(&checkpoint.beacon_state_root(), None) + .get_state(&checkpoint.beacon_state_root(), None, CACHE_STATE_IN_TESTS) .expect("no error") .expect("state exists") .slot(), @@ -3650,7 +3676,7 @@ fn check_iterators(harness: &TestHarness) { harness .chain .store - .get_state(&state_root, Some(slot)) + .get_state(&state_root, Some(slot), CACHE_STATE_IN_TESTS) .unwrap() .is_some(), "state {:?} from canonical chain should be in DB", diff --git a/beacon_node/beacon_chain/tests/sync_committee_verification.rs b/beacon_node/beacon_chain/tests/sync_committee_verification.rs index 6d30b8a4e3..c8bbcce20d 100644 --- a/beacon_node/beacon_chain/tests/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/tests/sync_committee_verification.rs @@ -21,6 +21,9 @@ pub type E = MainnetEthSpec; pub const VALIDATOR_COUNT: usize = 256; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + /// A cached set of keys. static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT)); @@ -755,7 +758,10 @@ async fn unaggregated_gossip_verification() { // Load the block and state for the given root. let block = chain.get_block(&root).await.unwrap().unwrap(); - let mut state = chain.get_state(&block.state_root(), None).unwrap().unwrap(); + let mut state = chain + .get_state(&block.state_root(), None, CACHE_STATE_IN_TESTS) + .unwrap() + .unwrap(); // Advance the state to simulate a pre-state for block production. let slot = valid_sync_committee_message.slot + 1; diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs index 2f3f340445..23ab5e3752 100644 --- a/beacon_node/http_api/src/attestation_performance.rs +++ b/beacon_node/http_api/src/attestation_performance.rs @@ -126,8 +126,11 @@ pub fn get_attestation_performance( // Load state for block replay. let state_root = prior_block.state_root(); + + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let state = chain - .get_state(&state_root, Some(prior_slot)) + .get_state(&state_root, Some(prior_slot), true) .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) .map_err(unhandled_error)?; diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs index 431547f10b..249a6732dc 100644 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ b/beacon_node/http_api/src/block_packing_efficiency.rs @@ -285,8 +285,10 @@ pub fn get_block_packing_efficiency( // Load state for block replay. let starting_state_root = first_block.state_root(); + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let starting_state = chain - .get_state(&starting_state_root, Some(prior_slot)) + .get_state(&starting_state_root, Some(prior_slot), true) .and_then(|maybe_state| { maybe_state.ok_or(BeaconChainError::MissingBeaconState(starting_state_root)) }) diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 0cc878bb48..8466da4de1 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -43,8 +43,10 @@ pub fn get_block_rewards( .map_err(unhandled_error)? .ok_or_else(|| custom_bad_request(format!("prior state at slot {} unknown", prior_slot)))?; + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let mut state = chain - .get_state(&state_root, Some(prior_slot)) + .get_state(&state_root, Some(prior_slot), true) .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) .map_err(unhandled_error)?; @@ -133,8 +135,10 @@ pub fn compute_block_rewards( )) })?; + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .map_err(unhandled_error)? .ok_or_else(|| { custom_bad_request(format!( diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 353390cdad..a9f66de467 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -189,8 +189,10 @@ impl StateId { _ => (self.root(chain)?, None), }; + // This branch is reached from the HTTP API. We assume the user wants + // to cache states so that future calls are faster. let state = chain - .get_state(&state_root, slot_opt) + .get_state(&state_root, slot_opt, true) .map_err(warp_utils::reject::unhandled_error) .and_then(|opt| { opt.ok_or_else(|| { diff --git a/beacon_node/http_api/src/sync_committee_rewards.rs b/beacon_node/http_api/src/sync_committee_rewards.rs index ec63372406..987dfdff59 100644 --- a/beacon_node/http_api/src/sync_committee_rewards.rs +++ b/beacon_node/http_api/src/sync_committee_rewards.rs @@ -58,8 +58,10 @@ pub fn get_state_before_applying_block( }) .map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?; + // We are about to apply a new block to the chain. It's parent state + // is a useful/recent state, we elect to cache it. let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .and_then(|maybe_state| { maybe_state .ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root())) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index d29c3e2127..5de096b25f 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -776,6 +776,15 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("state-cache-headroom") + .long("state-cache-headroom") + .value_name("N") + .help("Minimum number of states to cull from the state cache when it gets full") + .default_value("1") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("block-cache-size") .long("block-cache-size") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index f1db7590e8..cd92ee8fad 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -463,6 +463,12 @@ pub fn get_config( client_config.chain.epochs_per_migration = epochs_per_migration; } + if let Some(state_cache_headroom) = + clap_utils::parse_optional(cli_args, "state-cache-headroom")? + { + client_config.store.state_cache_headroom = state_cache_headroom; + } + if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? { client_config.store.prune_blobs = prune_blobs; } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 64765fd66a..a84573eb40 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,6 +21,7 @@ pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8; pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64); pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128); +pub const DEFAULT_STATE_CACHE_HEADROOM: NonZeroUsize = new_non_zero_usize(1); pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1; pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1); pub const DEFAULT_HDIFF_BUFFER_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(16); @@ -35,6 +36,8 @@ pub struct StoreConfig { pub block_cache_size: NonZeroUsize, /// Maximum number of states to store in the in-memory state cache. pub state_cache_size: NonZeroUsize, + /// Minimum number of states to cull from the state cache upon fullness. + pub state_cache_headroom: NonZeroUsize, /// Compression level for blocks, state diffs and other compressed values. pub compression_level: i32, /// Maximum number of historic states to store in the in-memory historic state cache. @@ -107,6 +110,7 @@ impl Default for StoreConfig { Self { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, state_cache_size: DEFAULT_STATE_CACHE_SIZE, + state_cache_headroom: DEFAULT_STATE_CACHE_HEADROOM, historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE, hdiff_buffer_cache_size: DEFAULT_HDIFF_BUFFER_CACHE_SIZE, compression_level: DEFAULT_COMPRESSION_LEVEL, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6dee0dc180..42d1fd31c2 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -73,7 +73,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// Cache of beacon states. /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. - state_cache: Mutex>, + pub state_cache: Mutex>, /// Cache of historic states and hierarchical diff buffers. /// /// This cache is never pruned. It is only populated in response to historical queries from the @@ -218,7 +218,10 @@ impl HotColdDB, MemoryStore> { blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), - state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + state_cache: Mutex::new(StateCache::new( + config.state_cache_size, + config.state_cache_headroom, + )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -264,7 +267,10 @@ impl HotColdDB, BeaconNodeBackend> { cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), - state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + state_cache: Mutex::new(StateCache::new( + config.state_cache_size, + config.state_cache_headroom, + )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -945,6 +951,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, slot: Option, + update_cache: bool, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); @@ -956,10 +963,10 @@ impl, Cold: ItemStore> HotColdDB // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { - self.get_hot_state(state_root) + self.get_hot_state(state_root, update_cache) } } else { - match self.get_hot_state(state_root)? { + match self.get_hot_state(state_root, update_cache)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } @@ -1014,22 +1021,28 @@ impl, Cold: ItemStore> HotColdDB } else { state_root }; + // It's a bit redundant but we elect to cache the state here and down below. let mut opt_state = self - .load_hot_state(&state_root)? + .load_hot_state(&state_root, true)? .map(|(state, _block_root)| (state_root, state)); if let Some((state_root, state)) = opt_state.as_mut() { state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; - self.state_cache - .lock() - .put_state(*state_root, block_root, state)?; - debug!( - self.log, - "Cached state"; - "state_root" => ?state_root, - "slot" => state.slot(), - ); + if let PutStateOutcome::New(deleted_states) = + self.state_cache + .lock() + .put_state(*state_root, block_root, state)? + { + debug!( + self.log, + "Cached state"; + "location" => "get_advanced_hot_state", + "deleted_states" => ?deleted_states, + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } } drop(split); Ok(opt_state) @@ -1126,6 +1139,8 @@ impl, Cold: ItemStore> HotColdDB /// Load an epoch boundary state by using the hot state summary look-up. /// /// Will fall back to the cold DB if a hot state summary is not found. + /// + /// NOTE: only used in tests at the moment pub fn load_epoch_boundary_state( &self, state_root: &Hash256, @@ -1136,9 +1151,11 @@ impl, Cold: ItemStore> HotColdDB }) = self.load_hot_state_summary(state_root)? { // NOTE: minor inefficiency here because we load an unnecessary hot state summary - let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; + let (state, _) = self + .load_hot_state(&epoch_boundary_state_root, true)? + .ok_or(HotColdDBError::MissingEpochBoundaryState( + epoch_boundary_state_root, + ))?; Ok(Some(state)) } else { // Try the cold DB @@ -1463,24 +1480,34 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { - // Put the state in the cache. - let block_root = state.get_latest_block_root(*state_root); - // Avoid storing states in the database if they already exist in the state cache. // The exception to this is the finalized state, which must exist in the cache before it // is stored on disk. - if let PutStateOutcome::Duplicate = - self.state_cache - .lock() - .put_state(*state_root, block_root, state)? - { - debug!( - self.log, - "Skipping storage of cached state"; - "slot" => state.slot(), - "state_root" => ?state_root - ); - return Ok(()); + match self.state_cache.lock().put_state( + *state_root, + state.get_latest_block_root(*state_root), + state, + )? { + PutStateOutcome::New(deleted_states) => { + debug!( + self.log, + "Cached state"; + "location" => "store_hot_state", + "deleted_states" => ?deleted_states, + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } + PutStateOutcome::Duplicate => { + debug!( + self.log, + "State already exists in state cache"; + "slot" => state.slot(), + "state_root" => ?state_root + ); + return Ok(()); + } + PutStateOutcome::Finalized => {} // Continue to store. } // On the epoch boundary, store the full state. @@ -1505,7 +1532,11 @@ impl, Cold: ItemStore> HotColdDB } /// Get a post-finalization state from the database or store. - pub fn get_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + pub fn get_hot_state( + &self, + state_root: &Hash256, + update_cache: bool, + ) -> Result>, Error> { if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { return Ok(Some(state)); } @@ -1519,20 +1550,35 @@ impl, Cold: ItemStore> HotColdDB ); } - let state_from_disk = self.load_hot_state(state_root)?; + let state_from_disk = self.load_hot_state(state_root, update_cache)?; if let Some((mut state, block_root)) = state_from_disk { state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; - self.state_cache - .lock() - .put_state(*state_root, block_root, &state)?; - debug!( - self.log, - "Cached state"; - "state_root" => ?state_root, - "slot" => state.slot(), - ); + if update_cache { + if let PutStateOutcome::New(deleted_states) = + self.state_cache + .lock() + .put_state(*state_root, block_root, &state)? + { + debug!( + self.log, + "Cached state"; + "location" => "get_hot_state", + "deleted_states" => ?deleted_states, + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } + } else { + debug!( + self.log, + "Did not cache state"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } + Ok(Some(state)) } else { Ok(None) @@ -1548,6 +1594,7 @@ impl, Cold: ItemStore> HotColdDB pub fn load_hot_state( &self, state_root: &Hash256, + update_cache: bool, ) -> Result, Hash256)>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); @@ -1579,17 +1626,19 @@ impl, Cold: ItemStore> HotColdDB let mut state = if slot % E::slots_per_epoch() == 0 { boundary_state } else { - // Cache ALL intermediate states that are reached during block replay. We may want - // to restrict this in future to only cache epoch boundary states. At worst we will - // cache up to 32 states for each state loaded, which should not flush out the cache - // entirely. + // If replaying blocks, and `update_cache` is true, also cache the epoch boundary + // state that this state is based on. It may be useful as the basis of more states + // in the same epoch. let state_cache_hook = |state_root, state: &mut BeaconState| { + if !update_cache || state.slot() % E::slots_per_epoch() != 0 { + return Ok(()); + } // Ensure all caches are built before attempting to cache. state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; let latest_block_root = state.get_latest_block_root(state_root); - if let PutStateOutcome::New = + if let PutStateOutcome::New(_) = self.state_cache .lock() .put_state(state_root, latest_block_root, state)? @@ -1598,7 +1647,8 @@ impl, Cold: ItemStore> HotColdDB self.log, "Cached ancestor state"; "state_root" => ?state_root, - "slot" => slot, + "state_slot" => state.slot(), + "descendant_slot" => slot, ); } Ok(()) @@ -2668,10 +2718,15 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); }; - // Load the split state so we can backtrack to find execution payloads. - let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( - HotColdDBError::MissingSplitState(split.state_root, split.slot), - )?; + // Load the split state so we can backtrack to find execution payloads. The split state + // should be in the state cache as the enshrined finalized state, so this should never + // cache miss. + let split_state = self + .get_state(&split.state_root, Some(split.slot), true)? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + ))?; // The finalized block may or may not have its execution payload stored, depending on // whether it was at a skipped slot. However for a fully pruned database its parent @@ -3169,8 +3224,10 @@ pub fn migrate_database, Cold: ItemStore>( // Store slot -> state_root and state_root -> slot mappings. store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; } else { + // This is some state that we want to migrate to the freezer db. + // There is no reason to cache this state. let state: BeaconState = store - .get_hot_state(&state_root)? + .get_hot_state(&state_root, false)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 97a88c01c8..a344bea8d4 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -27,8 +27,10 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> &self, store: &'a HotColdDB, ) -> Option> { + // Ancestor roots and their states are probably in the cold db + // but we set `update_cache` to false just in case let state = store - .get_state(&self.message().state_root(), Some(self.slot())) + .get_state(&self.message().state_root(), Some(self.slot()), false) .ok()??; Some(BlockRootsIterator::owned(store, state)) @@ -189,8 +191,10 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, let block = store .get_blinded_block(&block_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; + // We are querying some block from the database. It's not clear if the block's state is useful, + // we elect not to cache it. let state = store - .get_state(&block.state_root(), Some(block.slot()))? + .get_state(&block.state_root(), Some(block.slot()), false)? .ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?; Ok(Self::owned(store, state)) } @@ -362,8 +366,9 @@ fn next_historical_root_backtrack_state, Cold: Ite if new_state_slot >= historic_state_upper_limit { let new_state_root = current_state.get_state_root(new_state_slot)?; + // We are backtracking through historical states, we don't want to cache these. Ok(store - .get_state(new_state_root, Some(new_state_slot))? + .get_state(new_state_root, Some(new_state_slot), false)? .ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?) } else { Err(Error::HistoryUnavailable) diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 96e4de4639..281ecab152 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -33,26 +33,33 @@ pub struct SlotMap { #[derive(Debug)] pub struct StateCache { finalized_state: Option>, - states: LruCache>, + // Stores the tuple (state_root, state) as LruCache only returns the value on put and we need + // the state_root + states: LruCache)>, block_map: BlockMap, max_epoch: Epoch, + head_block_root: Hash256, + headroom: NonZeroUsize, } #[derive(Debug)] pub enum PutStateOutcome { Finalized, Duplicate, - New, + /// Includes deleted states as a result of this insertion + New(Vec), } #[allow(clippy::len_without_is_empty)] impl StateCache { - pub fn new(capacity: NonZeroUsize) -> Self { + pub fn new(capacity: NonZeroUsize, headroom: NonZeroUsize) -> Self { StateCache { finalized_state: None, states: LruCache::new(capacity), block_map: BlockMap::default(), max_epoch: Epoch::new(0), + head_block_root: Hash256::ZERO, + headroom, } } @@ -98,6 +105,13 @@ impl StateCache { Ok(()) } + /// Update the state cache's view of the enshrined head block. + /// + /// We never prune the unadvanced state for the head block. + pub fn update_head_block_root(&mut self, head_block_root: Hash256) { + self.head_block_root = head_block_root; + } + /// Rebase the given state on the finalized state in order to reduce its memory consumption. /// /// This function should only be called on states that are likely not to already share tree @@ -147,18 +161,26 @@ impl StateCache { self.max_epoch = std::cmp::max(state.current_epoch(), self.max_epoch); // If the cache is full, use the custom cull routine to make room. - if let Some(over_capacity) = self.len().checked_sub(self.capacity()) { - self.cull(over_capacity + 1); - } + let mut deleted_states = + if let Some(over_capacity) = self.len().checked_sub(self.capacity()) { + // The `over_capacity` should always be 0, but we add it here just in case. + self.cull(over_capacity + self.headroom.get()) + } else { + vec![] + }; // Insert the full state into the cache. - self.states.put(state_root, state.clone()); + if let Some((deleted_state_root, _)) = + self.states.put(state_root, (state_root, state.clone())) + { + deleted_states.push(deleted_state_root); + } // Record the connection from block root and slot to this state. let slot = state.slot(); self.block_map.insert(block_root, slot, state_root); - Ok(PutStateOutcome::New) + Ok(PutStateOutcome::New(deleted_states)) } pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option> { @@ -167,7 +189,7 @@ impl StateCache { return Some(finalized_state.state.clone()); } } - self.states.get(&state_root).cloned() + self.states.get(&state_root).map(|(_, state)| state.clone()) } pub fn get_by_block_root( @@ -211,7 +233,7 @@ impl StateCache { /// - Mid-epoch unadvanced states. /// - Epoch-boundary states that are too old to be finalized. /// - Epoch-boundary states that could be finalized. - pub fn cull(&mut self, count: usize) { + pub fn cull(&mut self, count: usize) -> Vec { let cull_exempt = std::cmp::max( 1, self.len() * CULL_EXEMPT_NUMERATOR / CULL_EXEMPT_DENOMINATOR, @@ -222,7 +244,8 @@ impl StateCache { let mut mid_epoch_state_roots = vec![]; let mut old_boundary_state_roots = vec![]; let mut good_boundary_state_roots = vec![]; - for (&state_root, state) in self.states.iter().skip(cull_exempt) { + + for (&state_root, (_, state)) in self.states.iter().skip(cull_exempt) { let is_advanced = state.slot() > state.latest_block_header().slot; let is_boundary = state.slot() % E::slots_per_epoch() == 0; let could_finalize = @@ -236,7 +259,8 @@ impl StateCache { } } else if is_advanced { advanced_state_roots.push(state_root); - } else { + } else if state.get_latest_block_root(state_root) != self.head_block_root { + // Never prune the head state mid_epoch_state_roots.push(state_root); } @@ -248,15 +272,19 @@ impl StateCache { // Stage 2: delete. // This could probably be more efficient in how it interacts with the block map. - for state_root in advanced_state_roots - .iter() - .chain(mid_epoch_state_roots.iter()) - .chain(old_boundary_state_roots.iter()) - .chain(good_boundary_state_roots.iter()) + let state_roots_to_delete = advanced_state_roots + .into_iter() + .chain(old_boundary_state_roots) + .chain(mid_epoch_state_roots) + .chain(good_boundary_state_roots) .take(count) - { + .collect::>(); + + for state_root in &state_roots_to_delete { self.delete_state(state_root); } + + state_roots_to_delete } } diff --git a/book/src/help_bn.md b/book/src/help_bn.md index 0a132bc338..dd09d83fa3 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -385,6 +385,9 @@ Options: Number of validators per chunk stored on disk. --slots-per-restore-point DEPRECATED. This flag has no effect. + --state-cache-headroom + Minimum number of states to cull from the state cache when it gets + full [default: 1] --state-cache-size Specifies the size of the state cache [default: 32] --suggested-fee-recipient diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index b224cde048..95bdee574d 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -25,6 +25,9 @@ pub type E = MainnetEthSpec; pub const VALIDATOR_COUNT: usize = 64; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + /// Defines some delay between when an attestation is created and when it is mutated. pub enum MutationDelay { /// No delay between creation and mutation. @@ -373,7 +376,7 @@ impl ForkChoiceTest { let state = harness .chain .store - .get_state(&state_root, None) + .get_state(&state_root, None, CACHE_STATE_IN_TESTS) .unwrap() .unwrap(); let balances = state diff --git a/consensus/state_processing/src/per_block_processing/tests.rs b/consensus/state_processing/src/per_block_processing/tests.rs index c59449634a..34e9ff120d 100644 --- a/consensus/state_processing/src/per_block_processing/tests.rs +++ b/consensus/state_processing/src/per_block_processing/tests.rs @@ -22,6 +22,9 @@ pub const VALIDATOR_COUNT: usize = 64; pub const EPOCH_OFFSET: u64 = 4; pub const NUM_ATTESTATIONS: u64 = 1; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + /// A cached set of keys. static KEYPAIRS: LazyLock> = LazyLock::new(|| generate_deterministic_keypairs(MAX_VALIDATOR_COUNT)); @@ -1114,9 +1117,10 @@ async fn block_replayer_peeking_state_roots() { .get_blinded_block(&parent_block_root) .unwrap() .unwrap(); + // Cache the state to make CI go brr. let parent_state = harness .chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .unwrap() .unwrap(); diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 90d9fc0b49..86104ce050 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1883,6 +1883,21 @@ fn state_cache_size_flag() { .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(64))); } #[test] +fn state_cache_headroom_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.store.state_cache_headroom, new_non_zero_usize(1))); +} +#[test] +fn state_cache_headroom_flag() { + CommandLineTest::new() + .flag("state-cache-headroom", Some("16")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.state_cache_headroom, new_non_zero_usize(16)) + }); +} +#[test] fn historic_state_cache_size_flag() { CommandLineTest::new() .flag("historic-state-cache-size", Some("4")) diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index a1c74389a7..05804d7e36 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -29,6 +29,9 @@ use types::{ IndexedAttestation, KzgProof, ProposerPreparationData, SignedBeaconBlock, Slot, Uint256, }; +// When set to true, cache any states fetched from the db. +pub const CACHE_STATE_IN_TESTS: bool = true; + #[derive(Default, Debug, PartialEq, Clone, Deserialize, Decode)] #[serde(deny_unknown_fields)] pub struct PowBlock { @@ -546,10 +549,15 @@ impl Tester { .unwrap() { let parent_state_root = parent_block.state_root(); + let mut state = self .harness .chain - .get_state(&parent_state_root, Some(parent_block.slot())) + .get_state( + &parent_state_root, + Some(parent_block.slot()), + CACHE_STATE_IN_TESTS, + ) .unwrap() .unwrap();