From defdc595fcca14ed9b57304480acc820c8d35384 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Mon, 3 Mar 2025 17:15:12 -0700 Subject: [PATCH] Prevent writing to state cache when migrating the database (#7067) * add an update_cache flag to get_state to have more granular control over when we write to the cache * State cache tweaks - add state-cache-headroom flag to control pruning - prune old epoch boundary states ahead of mid-epoch states - never prune head block's state - avoid caching ancestor states unless they are on an epoch boundary --------- Co-authored-by: Michael Sproul --- .../beacon_chain/src/attestation_rewards.rs | 2 +- .../beacon_chain/src/attester_cache.rs | 3 +- beacon_node/beacon_chain/src/beacon_chain.rs | 12 ++- beacon_node/beacon_chain/src/builder.rs | 6 +- .../beacon_chain/src/canonical_head.rs | 7 ++ beacon_node/beacon_chain/src/fork_revert.rs | 2 +- .../src/light_client_server_cache.rs | 2 +- beacon_node/beacon_chain/src/migrate.rs | 2 +- beacon_node/beacon_chain/src/test_utils.rs | 2 +- .../tests/attestation_verification.rs | 6 +- beacon_node/beacon_chain/tests/rewards.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 40 ++++--- .../tests/sync_committee_verification.rs | 5 +- .../http_api/src/attestation_performance.rs | 2 +- .../http_api/src/block_packing_efficiency.rs | 2 +- beacon_node/http_api/src/block_rewards.rs | 4 +- beacon_node/http_api/src/state_id.rs | 2 +- .../http_api/src/sync_committee_rewards.rs | 2 +- beacon_node/src/cli.rs | 9 ++ beacon_node/src/config.rs | 6 ++ beacon_node/store/src/config.rs | 3 + beacon_node/store/src/hot_cold_store.rs | 99 ++++++++++++------ beacon_node/store/src/iter.rs | 8 +- beacon_node/store/src/state_cache.rs | 21 +++- consensus/fork_choice/tests/tests.rs | 2 +- .../src/per_block_processing/tests.rs | 2 +- testing/.DS_Store | Bin 0 -> 6148 bytes testing/ef_tests/src/cases/fork_choice.rs | 2 +- 28 files changed, 178 insertions(+), 77 deletions(-) create mode 100644 testing/.DS_Store diff --git a/beacon_node/beacon_chain/src/attestation_rewards.rs b/beacon_node/beacon_chain/src/attestation_rewards.rs index 4f7c480c8c..125f838c63 100644 --- a/beacon_node/beacon_chain/src/attestation_rewards.rs +++ b/beacon_node/beacon_chain/src/attestation_rewards.rs @@ -48,7 +48,7 @@ impl BeaconChain { .ok_or(BeaconChainError::NoStateForSlot(state_slot))?; let state = self - .get_state(&state_root, Some(state_slot))? + .get_state(&state_root, Some(state_slot), true)? .ok_or(BeaconChainError::MissingBeaconState(state_root))?; if state.fork_name_unchecked().altair_enabled() { diff --git a/beacon_node/beacon_chain/src/attester_cache.rs b/beacon_node/beacon_chain/src/attester_cache.rs index 7f356bd621..abc174e640 100644 --- a/beacon_node/beacon_chain/src/attester_cache.rs +++ b/beacon_node/beacon_chain/src/attester_cache.rs @@ -287,6 +287,7 @@ impl AttesterCache { Ok(()) } + // TODO(holesky) check this out /// Read the state identified by `state_root` from the database, advance it to the required /// slot, use it to prime the cache and return the values for the provided `slot` and /// `committee_index`. @@ -326,7 +327,7 @@ impl AttesterCache { } let mut state: BeaconState = chain - .get_state(&state_root, None)? + .get_state(&state_root, None, true)? .ok_or(Error::MissingBeaconState(state_root))?; if state.slot() > slot { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fc4b4fc7a0..c2f52d89e7 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -821,7 +821,7 @@ impl BeaconChain { .get_blinded_block(&block_root)? .ok_or(Error::MissingBeaconBlock(block_root))?; let state = self - .get_state(&block.state_root(), Some(block.slot()))? + .get_state(&block.state_root(), Some(block.slot()), true)? .ok_or_else(|| Error::MissingBeaconState(block.state_root()))?; let iter = BlockRootsIterator::owned(&self.store, state); Ok(std::iter::once(Ok((block_root, block.slot()))) @@ -1348,8 +1348,9 @@ impl BeaconChain { &self, state_root: &Hash256, slot: Option, + update_cache: bool, ) -> Result>, Error> { - Ok(self.store.get_state(state_root, slot)?) + Ok(self.store.get_state(state_root, slot, update_cache)?) } /// Return the sync committee at `slot + 1` from the canonical chain. @@ -1524,8 +1525,9 @@ impl BeaconChain { })? .ok_or(Error::NoStateForSlot(slot))?; + // Since this slot is less than the current head, don't cache the state Ok(self - .get_state(&state_root, Some(slot))? + .get_state(&state_root, Some(slot), false)? .ok_or(Error::NoStateForSlot(slot))?) } } @@ -6943,7 +6945,7 @@ impl BeaconChain { let mut beacon_state = self .store - .get_state(&beacon_state_root, Some(beacon_block.slot()))? + .get_state(&beacon_state_root, Some(beacon_block.slot()), true)? .ok_or_else(|| { Error::DBInconsistent(format!("Missing state {:?}", beacon_state_root)) })?; @@ -7096,7 +7098,7 @@ impl BeaconChain { if signed_beacon_block.slot() % T::EthSpec::slots_per_epoch() == 0 { let block = self.get_blinded_block(&block_hash).unwrap().unwrap(); let state = self - .get_state(&block.state_root(), Some(block.slot())) + .get_state(&block.state_root(), Some(block.slot()), true) .unwrap() .unwrap(); finalized_blocks.insert(state.finalized_checkpoint().root); diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 8d62478bea..d9d7b8877c 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -299,7 +299,11 @@ where .map_err(|e| descriptive_db_error("genesis block", &e))? .ok_or("Genesis block not found in store")?; let genesis_state = store - .get_state(&genesis_block.state_root(), Some(genesis_block.slot())) + .get_state( + &genesis_block.state_root(), + Some(genesis_block.slot()), + true, + ) .map_err(|e| descriptive_db_error("genesis state", &e))? .ok_or("Genesis state not found in store")?; diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 4e21372efb..0b28c95bc7 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -655,6 +655,7 @@ impl BeaconChain { .get_full_block(&new_view.head_block_root)? .ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?; + // TODO(holesky) when we calculate the new snapshot we might be missing a state let (_, beacon_state) = self .store .get_advanced_hot_state( @@ -784,6 +785,12 @@ impl BeaconChain { .execution_status .is_optimistic_or_invalid(); + // Update the state cache so it doesn't mistakenly prune the new head. + self.store + .state_cache + .lock() + .update_head_block_root(new_cached_head.head_block_root()); + // Detect and potentially report any re-orgs. let reorg_distance = detect_reorg( &old_snapshot.beacon_state, diff --git a/beacon_node/beacon_chain/src/fork_revert.rs b/beacon_node/beacon_chain/src/fork_revert.rs index 8d1c29f46f..e37a46cceb 100644 --- a/beacon_node/beacon_chain/src/fork_revert.rs +++ b/beacon_node/beacon_chain/src/fork_revert.rs @@ -117,7 +117,7 @@ pub fn reset_fork_choice_to_finalization, Cold: It // Advance finalized state to finalized epoch (to handle skipped slots). let finalized_state_root = finalized_block.state_root(); let mut finalized_state = store - .get_state(&finalized_state_root, Some(finalized_block.slot())) + .get_state(&finalized_state_root, Some(finalized_block.slot()), true) .map_err(|e| format!("Error loading finalized state: {:?}", e))? .ok_or_else(|| { format!( diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 78442d8df0..fb6032edfc 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -320,7 +320,7 @@ impl LightClientServerCache { // Compute the value, handling potential errors. let mut state = store - .get_state(block_state_root, Some(block_slot))? + .get_state(block_state_root, Some(block_slot), false)? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!("Missing state {:?}", block_state_root)) })?; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 441237bf16..ca20321be0 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -356,7 +356,7 @@ impl, Cold: ItemStore> BackgroundMigrator state, other => { error!( diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 24c85b3e07..fa26c9dbd9 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -890,7 +890,7 @@ where pub fn get_hot_state(&self, state_hash: BeaconStateHash) -> Option> { self.chain .store - .load_hot_state(&state_hash.into()) + .load_hot_state(&state_hash.into(), true) .unwrap() .map(|(state, _)| state) } diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index dcc63ddf62..c45e9fa82d 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -1225,7 +1225,7 @@ async fn attestation_that_skips_epochs() { let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state(&earlier_block.state_root(), Some(earlier_slot), true) .expect("should not error getting state") .expect("should find state"); @@ -1331,7 +1331,7 @@ async fn attestation_validator_receive_proposer_reward_and_withdrawals() { let current_slot = harness.get_current_slot(); let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state(&earlier_block.state_root(), Some(earlier_slot), true) .expect("should not error getting state") .expect("should find state"); @@ -1399,7 +1399,7 @@ async fn attestation_to_finalized_block() { let mut state = harness .chain - .get_state(&earlier_block.state_root(), Some(earlier_slot)) + .get_state(&earlier_block.state_root(), Some(earlier_slot), true) .expect("should not error getting state") .expect("should find state"); diff --git a/beacon_node/beacon_chain/tests/rewards.rs b/beacon_node/beacon_chain/tests/rewards.rs index 41e6467b0f..9035a2eabd 100644 --- a/beacon_node/beacon_chain/tests/rewards.rs +++ b/beacon_node/beacon_chain/tests/rewards.rs @@ -117,7 +117,7 @@ async fn test_sync_committee_rewards() { .unwrap() .unwrap(); let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .unwrap() .unwrap(); diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 7a2df76970..d352955998 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -758,6 +758,7 @@ async fn delete_blocks_and_states() { .get_state( &faulty_head_block.state_root(), Some(faulty_head_block.slot()), + true, ) .expect("no db error") .expect("faulty head state exists"); @@ -771,7 +772,10 @@ async fn delete_blocks_and_states() { break; } store.delete_state(&state_root, slot).unwrap(); - assert_eq!(store.get_state(&state_root, Some(slot)).unwrap(), None); + assert_eq!( + store.get_state(&state_root, Some(slot), true).unwrap(), + None + ); } // Double-deleting should also be OK (deleting non-existent things is fine) @@ -1055,7 +1059,7 @@ fn get_state_for_block(harness: &TestHarness, block_root: Hash256) -> BeaconStat .unwrap(); harness .chain - .get_state(&head_block.state_root(), Some(head_block.slot())) + .get_state(&head_block.state_root(), Some(head_block.slot()), true) .unwrap() .unwrap() } @@ -1892,7 +1896,10 @@ fn check_all_states_exist<'a>( states: impl Iterator, ) { for &state_hash in states { - let state = harness.chain.get_state(&state_hash.into(), None).unwrap(); + let state = harness + .chain + .get_state(&state_hash.into(), None, true) + .unwrap(); assert!( state.is_some(), "expected state {:?} to be in DB", @@ -1910,7 +1917,7 @@ fn check_no_states_exist<'a>( assert!( harness .chain - .get_state(&state_root.into(), None) + .get_state(&state_root.into(), None, true) .unwrap() .is_none(), "state {:?} should not be in the DB", @@ -2344,7 +2351,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .get_or_reconstruct_blobs(&wss_block_root) .unwrap(); let wss_state = full_store - .get_state(&wss_state_root, Some(checkpoint_slot)) + .get_state(&wss_state_root, Some(checkpoint_slot), true) .unwrap() .unwrap(); @@ -2460,7 +2467,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // Check that the new block's state can be loaded correctly. let mut state = beacon_chain .store - .get_state(&state_root, Some(slot)) + .get_state(&state_root, Some(slot), true) .unwrap() .unwrap(); assert_eq!(state.update_tree_hash_cache().unwrap(), state_root); @@ -2594,7 +2601,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .unwrap() .map(Result::unwrap) { - let mut state = store.get_state(&state_root, Some(slot)).unwrap().unwrap(); + let mut state = store + .get_state(&state_root, Some(slot), true) + .unwrap() + .unwrap(); assert_eq!(state.slot(), slot); assert_eq!(state.canonical_root().unwrap(), state_root); } @@ -3426,7 +3436,7 @@ async fn prune_historic_states() { let genesis_state_root = harness.chain.genesis_state_root; let genesis_state = harness .chain - .get_state(&genesis_state_root, None) + .get_state(&genesis_state_root, None, true) .unwrap() .unwrap(); @@ -3447,7 +3457,10 @@ async fn prune_historic_states() { .map(Result::unwrap) .collect::>(); for &(state_root, slot) in &first_epoch_state_roots { - assert!(store.get_state(&state_root, Some(slot)).unwrap().is_some()); + assert!(store + .get_state(&state_root, Some(slot), true) + .unwrap() + .is_some()); } store @@ -3462,7 +3475,10 @@ async fn prune_historic_states() { // Ensure all epoch 0 states other than the genesis have been pruned. for &(state_root, slot) in &first_epoch_state_roots { assert_eq!( - store.get_state(&state_root, Some(slot)).unwrap().is_some(), + store + .get_state(&state_root, Some(slot), true) + .unwrap() + .is_some(), slot == 0 ); } @@ -3588,7 +3604,7 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { harness .chain .store - .get_state(&checkpoint.beacon_state_root(), None) + .get_state(&checkpoint.beacon_state_root(), None, true) .expect("no error") .expect("state exists") .slot(), @@ -3650,7 +3666,7 @@ fn check_iterators(harness: &TestHarness) { harness .chain .store - .get_state(&state_root, Some(slot)) + .get_state(&state_root, Some(slot), true) .unwrap() .is_some(), "state {:?} from canonical chain should be in DB", diff --git a/beacon_node/beacon_chain/tests/sync_committee_verification.rs b/beacon_node/beacon_chain/tests/sync_committee_verification.rs index 6d30b8a4e3..e20ac3eca7 100644 --- a/beacon_node/beacon_chain/tests/sync_committee_verification.rs +++ b/beacon_node/beacon_chain/tests/sync_committee_verification.rs @@ -755,7 +755,10 @@ async fn unaggregated_gossip_verification() { // Load the block and state for the given root. let block = chain.get_block(&root).await.unwrap().unwrap(); - let mut state = chain.get_state(&block.state_root(), None).unwrap().unwrap(); + let mut state = chain + .get_state(&block.state_root(), None, true) + .unwrap() + .unwrap(); // Advance the state to simulate a pre-state for block production. let slot = valid_sync_committee_message.slot + 1; diff --git a/beacon_node/http_api/src/attestation_performance.rs b/beacon_node/http_api/src/attestation_performance.rs index 2f3f340445..107c388342 100644 --- a/beacon_node/http_api/src/attestation_performance.rs +++ b/beacon_node/http_api/src/attestation_performance.rs @@ -127,7 +127,7 @@ pub fn get_attestation_performance( // Load state for block replay. let state_root = prior_block.state_root(); let state = chain - .get_state(&state_root, Some(prior_slot)) + .get_state(&state_root, Some(prior_slot), true) .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) .map_err(unhandled_error)?; diff --git a/beacon_node/http_api/src/block_packing_efficiency.rs b/beacon_node/http_api/src/block_packing_efficiency.rs index 431547f10b..317ada10a2 100644 --- a/beacon_node/http_api/src/block_packing_efficiency.rs +++ b/beacon_node/http_api/src/block_packing_efficiency.rs @@ -286,7 +286,7 @@ pub fn get_block_packing_efficiency( let starting_state_root = first_block.state_root(); let starting_state = chain - .get_state(&starting_state_root, Some(prior_slot)) + .get_state(&starting_state_root, Some(prior_slot), true) .and_then(|maybe_state| { maybe_state.ok_or(BeaconChainError::MissingBeaconState(starting_state_root)) }) diff --git a/beacon_node/http_api/src/block_rewards.rs b/beacon_node/http_api/src/block_rewards.rs index 0cc878bb48..b4566affe0 100644 --- a/beacon_node/http_api/src/block_rewards.rs +++ b/beacon_node/http_api/src/block_rewards.rs @@ -44,7 +44,7 @@ pub fn get_block_rewards( .ok_or_else(|| custom_bad_request(format!("prior state at slot {} unknown", prior_slot)))?; let mut state = chain - .get_state(&state_root, Some(prior_slot)) + .get_state(&state_root, Some(prior_slot), true) .and_then(|maybe_state| maybe_state.ok_or(BeaconChainError::MissingBeaconState(state_root))) .map_err(unhandled_error)?; @@ -134,7 +134,7 @@ pub fn compute_block_rewards( })?; let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .map_err(unhandled_error)? .ok_or_else(|| { custom_bad_request(format!( diff --git a/beacon_node/http_api/src/state_id.rs b/beacon_node/http_api/src/state_id.rs index 353390cdad..f4249f6251 100644 --- a/beacon_node/http_api/src/state_id.rs +++ b/beacon_node/http_api/src/state_id.rs @@ -190,7 +190,7 @@ impl StateId { }; let state = chain - .get_state(&state_root, slot_opt) + .get_state(&state_root, slot_opt, true) .map_err(warp_utils::reject::unhandled_error) .and_then(|opt| { opt.ok_or_else(|| { diff --git a/beacon_node/http_api/src/sync_committee_rewards.rs b/beacon_node/http_api/src/sync_committee_rewards.rs index ec63372406..2a29c63031 100644 --- a/beacon_node/http_api/src/sync_committee_rewards.rs +++ b/beacon_node/http_api/src/sync_committee_rewards.rs @@ -59,7 +59,7 @@ pub fn get_state_before_applying_block( .map_err(|e| custom_not_found(format!("Parent block is not available! {:?}", e)))?; let parent_state = chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .and_then(|maybe_state| { maybe_state .ok_or_else(|| BeaconChainError::MissingBeaconState(parent_block.state_root())) diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 6411ba366f..fdf7e01976 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -792,6 +792,15 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("state-cache-headroom") + .long("state-cache-headroom") + .value_name("N") + .help("Minimum number of states to cull from the state cache when it gets full") + .default_value("1") + .action(ArgAction::Set) + .display_order(0) + ) .arg( Arg::new("block-cache-size") .long("block-cache-size") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 8690ac390e..d69e78c3df 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -466,6 +466,12 @@ pub fn get_config( client_config.chain.epochs_per_migration = epochs_per_migration; } + if let Some(state_cache_headroom) = + clap_utils::parse_optional(cli_args, "state-cache-headroom")? + { + client_config.store.state_cache_headroom = state_cache_headroom; + } + if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? { client_config.store.prune_blobs = prune_blobs; } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 64765fd66a..9b2941ad0f 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -35,6 +35,8 @@ pub struct StoreConfig { pub block_cache_size: NonZeroUsize, /// Maximum number of states to store in the in-memory state cache. pub state_cache_size: NonZeroUsize, + /// Minimum number of states to cull from the state cache upon fullness. + pub state_cache_headroom: usize, /// Compression level for blocks, state diffs and other compressed values. pub compression_level: i32, /// Maximum number of historic states to store in the in-memory historic state cache. @@ -107,6 +109,7 @@ impl Default for StoreConfig { Self { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, state_cache_size: DEFAULT_STATE_CACHE_SIZE, + state_cache_headroom: 1, historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE, hdiff_buffer_cache_size: DEFAULT_HDIFF_BUFFER_CACHE_SIZE, compression_level: DEFAULT_COMPRESSION_LEVEL, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 6dee0dc180..6b2b0b8f0d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -73,7 +73,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// Cache of beacon states. /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. - state_cache: Mutex>, + pub state_cache: Mutex>, /// Cache of historic states and hierarchical diff buffers. /// /// This cache is never pruned. It is only populated in response to historical queries from the @@ -218,7 +218,10 @@ impl HotColdDB, MemoryStore> { blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), - state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + state_cache: Mutex::new(StateCache::new( + config.state_cache_size, + config.state_cache_headroom, + )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -264,7 +267,10 @@ impl HotColdDB, BeaconNodeBackend> { cold_db: BeaconNodeBackend::open(&config, cold_path)?, hot_db, block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), - state_cache: Mutex::new(StateCache::new(config.state_cache_size)), + state_cache: Mutex::new(StateCache::new( + config.state_cache_size, + config.state_cache_headroom, + )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -945,6 +951,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, slot: Option, + update_cache: bool, ) -> Result>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_GET_COUNT); @@ -956,10 +963,10 @@ impl, Cold: ItemStore> HotColdDB // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { - self.get_hot_state(state_root) + self.get_hot_state(state_root, update_cache) } } else { - match self.get_hot_state(state_root)? { + match self.get_hot_state(state_root, update_cache)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } @@ -1015,7 +1022,7 @@ impl, Cold: ItemStore> HotColdDB state_root }; let mut opt_state = self - .load_hot_state(&state_root)? + .load_hot_state(&state_root, true)? .map(|(state, _block_root)| (state_root, state)); if let Some((state_root, state)) = opt_state.as_mut() { @@ -1126,6 +1133,8 @@ impl, Cold: ItemStore> HotColdDB /// Load an epoch boundary state by using the hot state summary look-up. /// /// Will fall back to the cold DB if a hot state summary is not found. + /// + /// NOTE: only used in tests at the moment pub fn load_epoch_boundary_state( &self, state_root: &Hash256, @@ -1136,9 +1145,11 @@ impl, Cold: ItemStore> HotColdDB }) = self.load_hot_state_summary(state_root)? { // NOTE: minor inefficiency here because we load an unnecessary hot state summary - let (state, _) = self.load_hot_state(&epoch_boundary_state_root)?.ok_or( - HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), - )?; + let (state, _) = self + .load_hot_state(&epoch_boundary_state_root, true)? + .ok_or(HotColdDBError::MissingEpochBoundaryState( + epoch_boundary_state_root, + ))?; Ok(Some(state)) } else { // Try the cold DB @@ -1505,7 +1516,11 @@ impl, Cold: ItemStore> HotColdDB } /// Get a post-finalization state from the database or store. - pub fn get_hot_state(&self, state_root: &Hash256) -> Result>, Error> { + pub fn get_hot_state( + &self, + state_root: &Hash256, + update_cache: bool, + ) -> Result>, Error> { if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { return Ok(Some(state)); } @@ -1519,20 +1534,30 @@ impl, Cold: ItemStore> HotColdDB ); } - let state_from_disk = self.load_hot_state(state_root)?; + let state_from_disk = self.load_hot_state(state_root, update_cache)?; if let Some((mut state, block_root)) = state_from_disk { - state.update_tree_hash_cache()?; - state.build_all_caches(&self.spec)?; - self.state_cache - .lock() - .put_state(*state_root, block_root, &state)?; - debug!( - self.log, - "Cached state"; - "state_root" => ?state_root, - "slot" => state.slot(), - ); + if update_cache { + state.update_tree_hash_cache()?; + state.build_all_caches(&self.spec)?; + self.state_cache + .lock() + .put_state(*state_root, block_root, &state)?; + debug!( + self.log, + "Cached state"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } else { + debug!( + self.log, + "Did not cache state"; + "state_root" => ?state_root, + "slot" => state.slot(), + ); + } + Ok(Some(state)) } else { Ok(None) @@ -1548,6 +1573,7 @@ impl, Cold: ItemStore> HotColdDB pub fn load_hot_state( &self, state_root: &Hash256, + update_cache: bool, ) -> Result, Hash256)>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); @@ -1579,11 +1605,13 @@ impl, Cold: ItemStore> HotColdDB let mut state = if slot % E::slots_per_epoch() == 0 { boundary_state } else { - // Cache ALL intermediate states that are reached during block replay. We may want - // to restrict this in future to only cache epoch boundary states. At worst we will - // cache up to 32 states for each state loaded, which should not flush out the cache - // entirely. + // If replaying blocks, and `update_cache` is true, also cache the epoch boundary + // state that this state is based on. It may be useful as the basis of more states + // in the same epoch. let state_cache_hook = |state_root, state: &mut BeaconState| { + if !update_cache || state.slot() % E::slots_per_epoch() != 0 { + return Ok(()); + } // Ensure all caches are built before attempting to cache. state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; @@ -1598,7 +1626,8 @@ impl, Cold: ItemStore> HotColdDB self.log, "Cached ancestor state"; "state_root" => ?state_root, - "slot" => slot, + "state_slot" => state.slot(), + "descendant_slot" => slot, ); } Ok(()) @@ -2668,10 +2697,15 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); }; - // Load the split state so we can backtrack to find execution payloads. - let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or( - HotColdDBError::MissingSplitState(split.state_root, split.slot), - )?; + // Load the split state so we can backtrack to find execution payloads. The split state + // should be in the state cache as the enshrined finalized state, so this should never + // cache miss. + let split_state = self + .get_state(&split.state_root, Some(split.slot), true)? + .ok_or(HotColdDBError::MissingSplitState( + split.state_root, + split.slot, + ))?; // The finalized block may or may not have its execution payload stored, depending on // whether it was at a skipped slot. However for a fully pruned database its parent @@ -3169,8 +3203,9 @@ pub fn migrate_database, Cold: ItemStore>( // Store slot -> state_root and state_root -> slot mappings. store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; } else { + // We purposely do not update the state cache to store this state let state: BeaconState = store - .get_hot_state(&state_root)? + .get_hot_state(&state_root, false)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 97a88c01c8..d5ee1d7a89 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -27,8 +27,10 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> &self, store: &'a HotColdDB, ) -> Option> { + // ancestor roots and their states are probably in the cold db + // but we set `update_cache` to false just in case let state = store - .get_state(&self.message().state_root(), Some(self.slot())) + .get_state(&self.message().state_root(), Some(self.slot()), false) .ok()??; Some(BlockRootsIterator::owned(store, state)) @@ -190,7 +192,7 @@ impl<'a, E: EthSpec, Hot: ItemStore, Cold: ItemStore> RootsIterator<'a, E, .get_blinded_block(&block_hash)? .ok_or_else(|| BeaconStateError::MissingBeaconBlock(block_hash.into()))?; let state = store - .get_state(&block.state_root(), Some(block.slot()))? + .get_state(&block.state_root(), Some(block.slot()), false)? .ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?; Ok(Self::owned(store, state)) } @@ -363,7 +365,7 @@ fn next_historical_root_backtrack_state, Cold: Ite if new_state_slot >= historic_state_upper_limit { let new_state_root = current_state.get_state_root(new_state_slot)?; Ok(store - .get_state(new_state_root, Some(new_state_slot))? + .get_state(new_state_root, Some(new_state_slot), false)? .ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?) } else { Err(Error::HistoryUnavailable) diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 96e4de4639..f64780fd16 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -36,6 +36,8 @@ pub struct StateCache { states: LruCache>, block_map: BlockMap, max_epoch: Epoch, + head_block_root: Hash256, + headroom: usize, } #[derive(Debug)] @@ -47,12 +49,14 @@ pub enum PutStateOutcome { #[allow(clippy::len_without_is_empty)] impl StateCache { - pub fn new(capacity: NonZeroUsize) -> Self { + pub fn new(capacity: NonZeroUsize, headroom: usize) -> Self { StateCache { finalized_state: None, states: LruCache::new(capacity), block_map: BlockMap::default(), max_epoch: Epoch::new(0), + head_block_root: Hash256::ZERO, + headroom, } } @@ -98,6 +102,13 @@ impl StateCache { Ok(()) } + /// Update the state cache's view of the enshrined head block. + /// + /// We never prune the unadvanced state for the head block. + pub fn update_head_block_root(&mut self, head_block_root: Hash256) { + self.head_block_root = head_block_root; + } + /// Rebase the given state on the finalized state in order to reduce its memory consumption. /// /// This function should only be called on states that are likely not to already share tree @@ -148,7 +159,8 @@ impl StateCache { // If the cache is full, use the custom cull routine to make room. if let Some(over_capacity) = self.len().checked_sub(self.capacity()) { - self.cull(over_capacity + 1); + // The `over_capacity` should always be 0, but we add it here just in case. + self.cull((over_capacity + self.headroom).max(1)); } // Insert the full state into the cache. @@ -222,6 +234,7 @@ impl StateCache { let mut mid_epoch_state_roots = vec![]; let mut old_boundary_state_roots = vec![]; let mut good_boundary_state_roots = vec![]; + for (&state_root, state) in self.states.iter().skip(cull_exempt) { let is_advanced = state.slot() > state.latest_block_header().slot; let is_boundary = state.slot() % E::slots_per_epoch() == 0; @@ -236,7 +249,7 @@ impl StateCache { } } else if is_advanced { advanced_state_roots.push(state_root); - } else { + } else if state.get_latest_block_root(state_root) != self.head_block_root { mid_epoch_state_roots.push(state_root); } @@ -250,8 +263,8 @@ impl StateCache { // This could probably be more efficient in how it interacts with the block map. for state_root in advanced_state_roots .iter() - .chain(mid_epoch_state_roots.iter()) .chain(old_boundary_state_roots.iter()) + .chain(mid_epoch_state_roots.iter()) .chain(good_boundary_state_roots.iter()) .take(count) { diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index b224cde048..6431ea62cb 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -373,7 +373,7 @@ impl ForkChoiceTest { let state = harness .chain .store - .get_state(&state_root, None) + .get_state(&state_root, None, true) .unwrap() .unwrap(); let balances = state diff --git a/consensus/state_processing/src/per_block_processing/tests.rs b/consensus/state_processing/src/per_block_processing/tests.rs index c59449634a..78b1047296 100644 --- a/consensus/state_processing/src/per_block_processing/tests.rs +++ b/consensus/state_processing/src/per_block_processing/tests.rs @@ -1116,7 +1116,7 @@ async fn block_replayer_peeking_state_roots() { .unwrap(); let parent_state = harness .chain - .get_state(&parent_block.state_root(), Some(parent_block.slot())) + .get_state(&parent_block.state_root(), Some(parent_block.slot()), true) .unwrap() .unwrap(); diff --git a/testing/.DS_Store b/testing/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..13c02876de88eb2951f971b9e3855b78933d6ca6 GIT binary patch literal 6148 zcmeHKy-or_5dK!t5Py;@Ew8dCK7c1VUqBuJgLfbactj+$_ZB7=+S*tfOB-X!TUh!E zezUv45fHI4YG#tzZ*O;IcjhCz!vZkXK~ez%0A;FRX^YJQk$KTMS@R=1L}S-T(Z?0~ zomQnbXRQOufHLsg7?87DMv5ltJh_GQdzN*3672BqI$dPb&EmMx$r>_uGJ1Xso?ea? z+a)X6_0B7`@t(IEBQAwqTp&ZjsHU;iafvosa>IDGG=CzGWuyQ_MV(%VKxs}@hRr=dNax8%e{&jeaCMmFJpff z$xY?&Gr*cHQr>o`tumktC<7A)!mtMN|^tOdb`MD?$Eiz*qynK-`V&Vim|(MJhbL8kwa~j0cBv8fpzy8Nc}(l z{QW=cq+iN_GVre$Fr~N_SDBKpt#ipqtqrNCR1vYu9a Tester { let mut state = self .harness .chain - .get_state(&parent_state_root, Some(parent_block.slot())) + .get_state(&parent_state_root, Some(parent_block.slot()), true) .unwrap() .unwrap();