diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index c0bc17e118..edcc26af7a 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -13,6 +13,7 @@ node_test_rig = { path = "../testing/node_test_rig" } [features] write_ssz_files = ["beacon_chain/write_ssz_files"] # Writes debugging .ssz files to /tmp during block processing. +tree-states = ["beacon_chain/tree-states"] [dependencies] eth2_config = { path = "../common/eth2_config" } diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 9f3db09b74..4ef241eee8 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -10,6 +10,7 @@ default = ["participation_metrics"] write_ssz_files = [] # Writes debugging .ssz files to /tmp during block processing. participation_metrics = [] # Exposes validator participation metrics to Prometheus. fork_from_env = [] # Initialise the harness chain spec from the FORK_NAME env variable +tree-states = ["store/milhouse"] [dev-dependencies] maplit = "1.0.2" diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a65a943b93..4f7e827cc6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -36,7 +36,6 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_B use crate::persisted_fork_choice::PersistedForkChoice; use crate::pre_finalization_cache::PreFinalizationBlockCache; use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache}; -use crate::snapshot_cache::SnapshotCache; use crate::sync_committee_verification::{ Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution, }; @@ -92,9 +91,6 @@ pub type ForkChoiceError = fork_choice::Error; /// head. pub const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1); -/// The time-out before failure during an operation to take a read/write RwLock on the block -/// processing cache. -pub const BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); /// The time-out before failure during an operation to take a read/write RwLock on the /// attestation cache. pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1); @@ -323,8 +319,6 @@ pub struct BeaconChain { pub event_handler: Option>, /// Used to track the heads of the beacon chain. pub(crate) head_tracker: Arc, - /// A cache dedicated to block processing. - pub(crate) snapshot_cache: TimeoutRwLock>, /// Caches the attester shuffling for a given epoch and shuffling key root. pub(crate) shuffling_cache: TimeoutRwLock, /// Caches the beacon block proposer shuffling for a given epoch and shuffling key root. @@ -2530,7 +2524,7 @@ impl BeaconChain { // Do not import a block that doesn't descend from the finalized root. let signed_block = check_block_is_finalized_descendant::(signed_block, &fork_choice, &self.store)?; - let (block, block_signature) = signed_block.clone().deconstruct(); + let (block, _) = signed_block.clone().deconstruct(); // compare the existing finalized checkpoint with the incoming block's finalized checkpoint let old_finalized_checkpoint = fork_choice.finalized_checkpoint(); @@ -2784,30 +2778,6 @@ impl BeaconChain { let parent_root = block.parent_root(); let slot = block.slot(); - let signed_block = SignedBeaconBlock::from_block(block, block_signature); - - self.snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .ok_or(Error::SnapshotCacheLockTimeout) - .map(|mut snapshot_cache| { - snapshot_cache.insert( - BeaconSnapshot { - beacon_state: state, - beacon_block: signed_block, - beacon_block_root: block_root, - }, - None, - &self.spec, - ) - }) - .unwrap_or_else(|e| { - error!( - self.log, - "Failed to insert snapshot"; - "error" => ?e, - "task" => "process block" - ); - }); self.head_tracker .register_block(block_root, parent_root, slot); @@ -2888,28 +2858,11 @@ impl BeaconChain { .head_info() .map_err(BlockProductionError::UnableToGetHeadInfo)?; let (state, state_root_opt) = if head_info.slot < slot { - // Normal case: proposing a block atop the current head. Use the snapshot cache. - if let Some(pre_state) = self - .snapshot_cache - .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|snapshot_cache| { - snapshot_cache.get_state_for_block_production(head_info.block_root) - }) - { - (pre_state.pre_state, pre_state.state_root) - } else { - warn!( - self.log, - "Block production cache miss"; - "message" => "this block is more likely to be orphaned", - "slot" => slot, - ); - let state = self - .state_at_slot(slot - 1, StateSkipConfig::WithStateRoots) - .map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?; + let state = self + .state_at_slot(slot - 1, StateSkipConfig::WithStateRoots) + .map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?; - (state, None) - } + (state, None) } else { warn!( self.log, @@ -3206,40 +3159,23 @@ impl BeaconChain { // At this point we know that the new head block is not the same as the previous one metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD); - // Try and obtain the snapshot for `beacon_block_root` from the snapshot cache, falling - // back to a database read if that fails. - let new_head = self - .snapshot_cache - .try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|snapshot_cache| { - snapshot_cache.get_cloned(beacon_block_root, CloneConfig::committee_caches_only()) - }) - .map::, _>(Ok) - .unwrap_or_else(|| { - let beacon_block = self - .get_block(&beacon_block_root)? - .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; + let new_head = { + let beacon_block = self + .get_block(&beacon_block_root)? + .ok_or(Error::MissingBeaconBlock(beacon_block_root))?; - let beacon_state_root = beacon_block.state_root(); - let beacon_state: BeaconState = self - .get_state(&beacon_state_root, Some(beacon_block.slot()))? - .ok_or(Error::MissingBeaconState(beacon_state_root))?; + let beacon_state_root = beacon_block.state_root(); + let mut beacon_state: BeaconState = self + .get_state(&beacon_state_root, Some(beacon_block.slot()))? + .ok_or(Error::MissingBeaconState(beacon_state_root))?; + beacon_state.build_all_committee_caches(&self.spec)?; - Ok(BeaconSnapshot { - beacon_block, - beacon_block_root, - beacon_state, - }) - }) - .and_then(|mut snapshot| { - // Regardless of where we got the state from, attempt to build the committee - // caches. - snapshot - .beacon_state - .build_all_committee_caches(&self.spec) - .map_err(Into::into) - .map(|()| snapshot) - })?; + BeaconSnapshot { + beacon_block, + beacon_block_root, + beacon_state, + } + }; // Attempt to detect if the new head is not on the same chain as the previous block // (i.e., a re-org). @@ -3428,20 +3364,6 @@ impl BeaconChain { } } - self.snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .map(|mut snapshot_cache| { - snapshot_cache.update_head(beacon_block_root); - }) - .unwrap_or_else(|| { - error!( - self.log, - "Failed to obtain cache write lock"; - "lock" => "snapshot_cache", - "task" => "update head" - ); - }); - if is_epoch_transition || is_reorg { self.persist_head_and_fork_choice()?; self.op_pool.prune_attestations(self.epoch()?); @@ -3742,26 +3664,6 @@ impl BeaconChain { .start_slot(T::EthSpec::slots_per_epoch()), ); - self.snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .map(|mut snapshot_cache| { - snapshot_cache.prune(new_finalized_checkpoint.epoch); - debug!( - self.log, - "Snapshot cache pruned"; - "new_len" => snapshot_cache.len(), - "remaining_roots" => ?snapshot_cache.beacon_block_roots(), - ); - }) - .unwrap_or_else(|| { - error!( - self.log, - "Failed to obtain cache write lock"; - "lock" => "snapshot_cache", - "task" => "prune" - ); - }); - self.op_pool.prune_all(head_state, self.epoch()?); self.store_migrator.process_finalization( diff --git a/beacon_node/beacon_chain/src/beacon_snapshot.rs b/beacon_node/beacon_chain/src/beacon_snapshot.rs index b9de6e9eba..2b0d250c75 100644 --- a/beacon_node/beacon_chain/src/beacon_snapshot.rs +++ b/beacon_node/beacon_chain/src/beacon_snapshot.rs @@ -10,6 +10,19 @@ pub struct BeaconSnapshot { pub beacon_state: BeaconState, } +/// This snapshot is to be used for verifying a child of `self.beacon_block`. +#[derive(Debug)] +pub struct PreProcessingSnapshot { + /// This state is equivalent to the `self.beacon_block.state_root()` state that has been + /// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for + /// the application of another block. + pub pre_state: BeaconState, + /// This value is only set to `Some` if the `pre_state` was *not* advanced forward. + pub beacon_state_root: Option, + pub beacon_block: SignedBeaconBlock, + pub beacon_block_root: Hash256, +} + impl BeaconSnapshot { /// Create a new checkpoint. pub fn new( diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index c2dc0028e9..cdc23e08b1 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -40,17 +40,14 @@ //! END //! //! ``` +use crate::beacon_snapshot::PreProcessingSnapshot; use crate::execution_payload::{ execute_payload, validate_execution_payload_for_gossip, validate_merge_block, }; -use crate::snapshot_cache::PreProcessingSnapshot; use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::{ - beacon_chain::{ - BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY, - VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT, - }, + beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT}, metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use eth2::types::EventKind; @@ -72,7 +69,7 @@ use std::borrow::Cow; use std::fs; use std::io::Write; use std::time::Duration; -use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp}; +use store::{Error as DBError, HotColdDB, KeyValueStore, StoreOp}; use tree_hash::TreeHash; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Hash256, @@ -1033,7 +1030,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { // Perform a sanity check on the pre-state. let parent_slot = parent.beacon_block.slot(); - if state.slot() < parent_slot || state.slot() > parent_slot + 1 { + if state.slot() < parent_slot { return Err(BeaconChainError::BadPreState { parent_root: parent.beacon_block_root, parent_slot, @@ -1061,29 +1058,10 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { // Store the state immediately, marking it as temporary, and staging the deletion // of its temporary status as part of the larger atomic operation. let txn_lock = chain.store.hot_db.begin_rw_transaction(); - let state_already_exists = - chain.store.load_hot_state_summary(&state_root)?.is_some(); - - let state_batch = if state_already_exists { - // If the state exists, it could be temporary or permanent, but in neither case - // should we rewrite it or store a new temporary flag for it. We *will* stage - // the temporary flag for deletion because it's OK to double-delete the flag, - // and we don't mind if another thread gets there first. - vec![] - } else { - vec![ - if state.slot() % T::EthSpec::slots_per_epoch() == 0 { - StoreOp::PutState(state_root, &state) - } else { - StoreOp::PutStateSummary( - state_root, - HotStateSummary::new(&state_root, &state)?, - ) - }, - StoreOp::PutStateTemporaryFlag(state_root), - ] - }; - chain.store.do_atomically(state_batch)?; + chain.store.do_atomically(vec![ + StoreOp::PutState(state_root, &state), + StoreOp::PutStateTemporaryFlag(state_root), + ])?; drop(txn_lock); confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root)); @@ -1435,8 +1413,6 @@ fn load_parent( ), BlockError, > { - let spec = &chain.spec; - // Reject any block if its parent is not known to fork choice. // // A block that is not in fork choice is either: @@ -1455,44 +1431,9 @@ fn load_parent( return Err(BlockError::ParentUnknown(Box::new(block))); } - let block_delay = chain - .block_times_cache - .read() - .get_block_delays( - block.canonical_root(), - chain - .slot_clock - .start_of(block.slot()) - .unwrap_or_else(|| Duration::from_secs(0)), - ) - .observed; - let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ); - let result = if let Some((snapshot, cloned)) = chain - .snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .and_then(|mut snapshot_cache| { - snapshot_cache.get_state_for_block_processing( - block.parent_root(), - block.slot(), - block_delay, - spec, - ) - }) { - if cloned { - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES); - debug!( - chain.log, - "Cloned snapshot for late block/skipped slot"; - "slot" => %block.slot(), - "parent_slot" => %snapshot.beacon_block.slot(), - "parent_root" => ?block.parent_root(), - "block_delay" => ?block_delay, - ); - } - Ok((snapshot, block)) - } else { + let result = { // Load the blocks parent block from the database, returning invalid if that block is not // found. // @@ -1515,28 +1456,25 @@ fn load_parent( // Load the parent blocks state from the database, returning an error if it is not found. // It is an error because if we know the parent block we should also know the parent state. let parent_state_root = parent_block.state_root(); - let parent_state = chain - .get_state(&parent_state_root, Some(parent_block.slot()))? + let (advanced_state_root, state) = chain + .store + .get_advanced_state(block.parent_root(), block.slot(), parent_state_root)? .ok_or_else(|| { BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root)) })?; - metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES); - debug!( - chain.log, - "Missed snapshot cache"; - "slot" => block.slot(), - "parent_slot" => parent_block.slot(), - "parent_root" => ?block.parent_root(), - "block_delay" => ?block_delay, - ); + let beacon_state_root = if parent_state_root == advanced_state_root { + Some(parent_state_root) + } else { + None + }; Ok(( PreProcessingSnapshot { beacon_block: parent_block, beacon_block_root: root, - pre_state: parent_state, - beacon_state_root: Some(parent_state_root), + pre_state: state, + beacon_state_root, }, block, )) diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index f7d7722cdf..11c3838256 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -5,7 +5,6 @@ use crate::head_tracker::HeadTracker; use crate::migrate::{BackgroundMigrator, MigratorConfig}; use crate::persisted_beacon_chain::PersistedBeaconChain; use crate::shuffling_cache::ShufflingCache; -use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE}; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_monitor::ValidatorMonitor; use crate::validator_pubkey_cache::ValidatorPubkeyCache; @@ -29,8 +28,8 @@ use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; use task_executor::ShutdownReason; use types::{ - BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, - Signature, SignedBeaconBlock, Slot, + BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256, + PublicKeyBytes, Signature, SignedBeaconBlock, Slot, }; /// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing @@ -303,6 +302,15 @@ where let beacon_state_root = beacon_block.message().state_root(); let beacon_block_root = beacon_block.canonical_root(); + store + .update_finalized_state( + beacon_state_root, + beacon_block_root, + Epoch::new(0), + beacon_state.clone(), + ) + .map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?; + store .put_state(&beacon_state_root, &beacon_state) .map_err(|e| format!("Failed to store genesis state: {:?}", e))?; @@ -425,6 +433,14 @@ where // Write the state and block non-atomically, it doesn't matter if they're forgotten // about on a crash restart. + store + .update_finalized_state( + weak_subj_state_root, + weak_subj_block_root, + weak_subj_slot.epoch(TEthSpec::slots_per_epoch()), + weak_subj_state.clone(), + ) + .map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?; store .put_state(&weak_subj_state_root, &weak_subj_state) .map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?; @@ -751,10 +767,6 @@ where fork_choice: RwLock::new(fork_choice), event_handler: self.event_handler, head_tracker, - snapshot_cache: TimeoutRwLock::new(SnapshotCache::new( - DEFAULT_SNAPSHOT_CACHE_SIZE, - canonical_head, - )), shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()), beacon_proposer_cache: <_>::default(), block_times_cache: <_>::default(), diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index d41c1a5cc5..2d34937fdb 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -30,7 +30,6 @@ mod persisted_fork_choice; mod pre_finalization_cache; pub mod schema_change; mod shuffling_cache; -mod snapshot_cache; pub mod state_advance_timer; pub mod sync_committee_verification; pub mod test_utils; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 41b7604532..7eb2b19b62 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -7,9 +7,6 @@ use slot_clock::SlotClock; use std::time::Duration; use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; -/// The maximum time to wait for the snapshot cache lock during a metrics scrape. -const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100); - lazy_static! { /* * Block Processing @@ -935,15 +932,10 @@ pub fn scrape_for_metrics(beacon_chain: &BeaconChain) { let attestation_stats = beacon_chain.op_pool.attestation_stats(); - if let Some(snapshot_cache) = beacon_chain - .snapshot_cache - .try_write_for(SNAPSHOT_CACHE_TIMEOUT) - { - set_gauge( - &BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE, - snapshot_cache.len() as i64, - ) - } + set_gauge_by_usize( + &BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE, + beacon_chain.store.state_cache_len(), + ); if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() { set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size); diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 5ae7627321..fb8edd510c 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -197,6 +197,7 @@ impl, Cold: ItemStore> BackgroundMigrator state, @@ -237,7 +238,12 @@ impl, Cold: ItemStore> BackgroundMigrator {} Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => { debug!( diff --git a/beacon_node/beacon_chain/src/snapshot_cache.rs b/beacon_node/beacon_chain/src/snapshot_cache.rs deleted file mode 100644 index f4bbae8a32..0000000000 --- a/beacon_node/beacon_chain/src/snapshot_cache.rs +++ /dev/null @@ -1,520 +0,0 @@ -use crate::BeaconSnapshot; -use itertools::process_results; -use std::cmp; -use std::time::Duration; -use types::{ - beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, - Slot, -}; - -/// The default size of the cache. -pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4; - -/// The minimum block delay to clone the state in the cache instead of removing it. -/// This helps keep block processing fast during re-orgs from late blocks. -const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6); - -/// This snapshot is to be used for verifying a child of `self.beacon_block`. -#[derive(Debug)] -pub struct PreProcessingSnapshot { - /// This state is equivalent to the `self.beacon_block.state_root()` state that has been - /// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for - /// the application of another block. - pub pre_state: BeaconState, - /// This value is only set to `Some` if the `pre_state` was *not* advanced forward. - pub beacon_state_root: Option, - pub beacon_block: SignedBeaconBlock, - pub beacon_block_root: Hash256, -} - -impl From> for PreProcessingSnapshot { - fn from(snapshot: BeaconSnapshot) -> Self { - let beacon_state_root = Some(snapshot.beacon_state_root()); - Self { - pre_state: snapshot.beacon_state, - beacon_state_root, - beacon_block: snapshot.beacon_block, - beacon_block_root: snapshot.beacon_block_root, - } - } -} - -impl CacheItem { - pub fn new_without_pre_state(snapshot: BeaconSnapshot) -> Self { - Self { - beacon_block: snapshot.beacon_block, - beacon_block_root: snapshot.beacon_block_root, - beacon_state: snapshot.beacon_state, - pre_state: None, - } - } - - fn clone_to_snapshot_with(&self, clone_config: CloneConfig) -> BeaconSnapshot { - BeaconSnapshot { - beacon_state: self.beacon_state.clone_with(clone_config), - beacon_block: self.beacon_block.clone(), - beacon_block_root: self.beacon_block_root, - } - } - - pub fn into_pre_state(self) -> PreProcessingSnapshot { - // Do not include the beacon state root if the state has been advanced. - let beacon_state_root = - Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none()); - - PreProcessingSnapshot { - beacon_block: self.beacon_block, - beacon_block_root: self.beacon_block_root, - pre_state: self.pre_state.unwrap_or(self.beacon_state), - beacon_state_root, - } - } - - pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot { - // Do not include the beacon state root if the state has been advanced. - let beacon_state_root = - Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none()); - - PreProcessingSnapshot { - beacon_block: self.beacon_block.clone(), - beacon_block_root: self.beacon_block_root, - pre_state: self - .pre_state - .as_ref() - .map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()), - beacon_state_root, - } - } -} - -/// The information required for block production. -pub struct BlockProductionPreState { - /// This state may or may not have been advanced forward a single slot. - /// - /// See the documentation in the `crate::state_advance_timer` module for more information. - pub pre_state: BeaconState, - /// This value will only be `Some` if `self.pre_state` was **not** advanced forward a single - /// slot. - /// - /// This value can be used to avoid tree-hashing the state during the first call to - /// `per_slot_processing`. - pub state_root: Option, -} - -pub enum StateAdvance { - /// The cache does not contain the supplied block root. - BlockNotFound, - /// The cache contains the supplied block root but the state has already been advanced. - AlreadyAdvanced, - /// The cache contains the supplied block root and the state has not yet been advanced. - State { - state: Box>, - state_root: Hash256, - block_slot: Slot, - }, -} - -/// The item stored in the `SnapshotCache`. -pub struct CacheItem { - beacon_block: SignedBeaconBlock, - beacon_block_root: Hash256, - /// This state is equivalent to `self.beacon_block.state_root()`. - beacon_state: BeaconState, - /// This state is equivalent to `self.beacon_state` that has had `per_slot_processing` applied - /// to it. This state assists in optimizing block processing. - pre_state: Option>, -} - -impl Into> for CacheItem { - fn into(self) -> BeaconSnapshot { - BeaconSnapshot { - beacon_state: self.beacon_state, - beacon_block: self.beacon_block, - beacon_block_root: self.beacon_block_root, - } - } -} - -/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing. -/// -/// ## Cache Queuing -/// -/// The cache has a non-standard queue mechanism (specifically, it is not LRU). -/// -/// The cache has a max number of elements (`max_len`). Until `max_len` is achieved, all snapshots -/// are simply added to the queue. Once `max_len` is achieved, adding a new snapshot will cause an -/// existing snapshot to be ejected. The ejected snapshot will: -/// -/// - Never be the `head_block_root`. -/// - Be the snapshot with the lowest `state.slot` (ties broken arbitrarily). -pub struct SnapshotCache { - max_len: usize, - head_block_root: Hash256, - snapshots: Vec>, -} - -impl SnapshotCache { - /// Instantiate a new cache which contains the `head` snapshot. - /// - /// Setting `max_len = 0` is equivalent to setting `max_len = 1`. - pub fn new(max_len: usize, head: BeaconSnapshot) -> Self { - Self { - max_len: cmp::max(max_len, 1), - head_block_root: head.beacon_block_root, - snapshots: vec![CacheItem::new_without_pre_state(head)], - } - } - - /// The block roots of all snapshots contained in `self`. - pub fn beacon_block_roots(&self) -> Vec { - self.snapshots.iter().map(|s| s.beacon_block_root).collect() - } - - /// The number of snapshots contained in `self`. - pub fn len(&self) -> usize { - self.snapshots.len() - } - - /// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see - /// struct-level documentation for more info). - pub fn insert( - &mut self, - snapshot: BeaconSnapshot, - pre_state: Option>, - spec: &ChainSpec, - ) { - let parent_root = snapshot.beacon_block.message().parent_root(); - let item = CacheItem { - beacon_block: snapshot.beacon_block, - beacon_block_root: snapshot.beacon_block_root, - beacon_state: snapshot.beacon_state, - pre_state, - }; - - // Remove the grandparent of the block that was just inserted. - // - // Assuming it's unlikely to see re-orgs deeper than one block, this method helps keep the - // cache small by removing any states that already have more than one descendant. - // - // Remove the grandparent first to free up room in the cache. - let grandparent_result = - process_results(item.beacon_state.rev_iter_block_roots(spec), |iter| { - iter.map(|(_slot, root)| root) - .find(|root| *root != item.beacon_block_root && *root != parent_root) - }); - if let Ok(Some(grandparent_root)) = grandparent_result { - let head_block_root = self.head_block_root; - self.snapshots.retain(|snapshot| { - let root = snapshot.beacon_block_root; - root == head_block_root || root != grandparent_root - }); - } - - if self.snapshots.len() < self.max_len { - self.snapshots.push(item); - } else { - let insert_at = self - .snapshots - .iter() - .enumerate() - .filter_map(|(i, snapshot)| { - if snapshot.beacon_block_root != self.head_block_root { - Some((i, snapshot.beacon_state.slot())) - } else { - None - } - }) - .min_by_key(|(_i, slot)| *slot) - .map(|(i, _slot)| i); - - if let Some(i) = insert_at { - self.snapshots[i] = item; - } - } - } - - /// If available, returns a `CacheItem` that should be used for importing/processing a block. - /// The method will remove the block from `self`, carrying across any caches that may or may not - /// be built. - /// - /// In the event the block being processed was observed late, clone the cache instead of - /// moving it. This allows us to process the next block quickly in the case of a re-org. - /// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are - /// later than 1 slot still have access to the cache and can be processed quickly. - pub fn get_state_for_block_processing( - &mut self, - block_root: Hash256, - block_slot: Slot, - block_delay: Option, - spec: &ChainSpec, - ) -> Option<(PreProcessingSnapshot, bool)> { - self.snapshots - .iter() - .position(|snapshot| snapshot.beacon_block_root == block_root) - .map(|i| { - if let Some(cache) = self.snapshots.get(i) { - if block_slot > cache.beacon_block.slot() + 1 { - return (cache.clone_as_pre_state(), true); - } - if let Some(delay) = block_delay { - if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE - && delay <= Duration::from_secs(spec.seconds_per_slot) * 4 - { - return (cache.clone_as_pre_state(), true); - } - } - } - (self.snapshots.remove(i).into_pre_state(), false) - }) - } - - /// If available, obtains a clone of a `BeaconState` that should be used for block production. - /// The clone will use `CloneConfig:all()`, ensuring any tree-hash cache is cloned too. - /// - /// ## Note - /// - /// This method clones the `BeaconState` (instead of removing it) since we assume that any block - /// we produce will soon be pushed to the `BeaconChain` for importing/processing. Keeping a copy - /// of that `BeaconState` in `self` will greatly help with import times. - pub fn get_state_for_block_production( - &self, - block_root: Hash256, - ) -> Option> { - self.snapshots - .iter() - .find(|snapshot| snapshot.beacon_block_root == block_root) - .map(|snapshot| { - if let Some(pre_state) = &snapshot.pre_state { - BlockProductionPreState { - pre_state: pre_state.clone_with(CloneConfig::all()), - state_root: None, - } - } else { - BlockProductionPreState { - pre_state: snapshot.beacon_state.clone_with(CloneConfig::all()), - state_root: Some(snapshot.beacon_block.state_root()), - } - } - }) - } - - /// If there is a snapshot with `block_root`, clone it and return the clone. - pub fn get_cloned( - &self, - block_root: Hash256, - clone_config: CloneConfig, - ) -> Option> { - self.snapshots - .iter() - .find(|snapshot| snapshot.beacon_block_root == block_root) - .map(|snapshot| snapshot.clone_to_snapshot_with(clone_config)) - } - - pub fn get_for_state_advance(&mut self, block_root: Hash256) -> StateAdvance { - if let Some(snapshot) = self - .snapshots - .iter_mut() - .find(|snapshot| snapshot.beacon_block_root == block_root) - { - if snapshot.pre_state.is_some() { - StateAdvance::AlreadyAdvanced - } else { - let cloned = snapshot - .beacon_state - .clone_with(CloneConfig::committee_caches_only()); - - StateAdvance::State { - state: Box::new(std::mem::replace(&mut snapshot.beacon_state, cloned)), - state_root: snapshot.beacon_block.state_root(), - block_slot: snapshot.beacon_block.slot(), - } - } - } else { - StateAdvance::BlockNotFound - } - } - - pub fn update_pre_state(&mut self, block_root: Hash256, state: BeaconState) -> Option<()> { - self.snapshots - .iter_mut() - .find(|snapshot| snapshot.beacon_block_root == block_root) - .map(|snapshot| { - snapshot.pre_state = Some(state); - }) - } - - /// Removes all snapshots from the queue that are less than or equal to the finalized epoch. - pub fn prune(&mut self, finalized_epoch: Epoch) { - self.snapshots.retain(|snapshot| { - snapshot.beacon_state.slot() > finalized_epoch.start_slot(T::slots_per_epoch()) - }) - } - - /// Inform the cache that the head of the beacon chain has changed. - /// - /// The snapshot that matches this `head_block_root` will never be ejected from the cache - /// during `Self::insert`. - pub fn update_head(&mut self, head_block_root: Hash256) { - self.head_block_root = head_block_root - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType}; - use types::{ - test_utils::generate_deterministic_keypair, BeaconBlock, Epoch, MainnetEthSpec, - SignedBeaconBlock, Slot, - }; - - fn get_harness() -> BeaconChainHarness> { - let harness = BeaconChainHarness::builder(MainnetEthSpec) - .default_spec() - .deterministic_keypairs(1) - .fresh_ephemeral_store() - .build(); - - harness.advance_slot(); - - harness - } - - const CACHE_SIZE: usize = 4; - - fn get_snapshot(i: u64) -> BeaconSnapshot { - let spec = MainnetEthSpec::default_spec(); - - let beacon_state = get_harness().chain.head_beacon_state().unwrap(); - - let signed_beacon_block = SignedBeaconBlock::from_block( - BeaconBlock::empty(&spec), - generate_deterministic_keypair(0) - .sk - .sign(Hash256::from_low_u64_be(42)), - ); - - BeaconSnapshot { - beacon_state, - beacon_block: signed_beacon_block, - beacon_block_root: Hash256::from_low_u64_be(i), - } - } - - #[test] - fn insert_get_prune_update() { - let spec = MainnetEthSpec::default_spec(); - let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0)); - - // Insert a bunch of entries in the cache. It should look like this: - // - // Index Root - // 0 0 <--head - // 1 1 - // 2 2 - // 3 3 - for i in 1..CACHE_SIZE as u64 { - let mut snapshot = get_snapshot(i); - - // Each snapshot should be one slot into an epoch, with each snapshot one epoch apart. - *snapshot.beacon_state.slot_mut() = - Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1); - - cache.insert(snapshot, None, &spec); - - assert_eq!( - cache.snapshots.len(), - i as usize + 1, - "cache length should be as expected" - ); - assert_eq!(cache.head_block_root, Hash256::from_low_u64_be(0)); - } - - // Insert a new value in the cache. Afterwards it should look like: - // - // Index Root - // 0 0 <--head - // 1 42 - // 2 2 - // 3 3 - assert_eq!(cache.snapshots.len(), CACHE_SIZE); - cache.insert(get_snapshot(42), None, &spec); - assert_eq!(cache.snapshots.len(), CACHE_SIZE); - - assert!( - cache - .get_state_for_block_processing( - Hash256::from_low_u64_be(1), - Slot::new(0), - None, - &spec - ) - .is_none(), - "the snapshot with the lowest slot should have been removed during the insert function" - ); - assert!(cache - .get_cloned(Hash256::from_low_u64_be(1), CloneConfig::none()) - .is_none()); - - assert_eq!( - cache - .get_cloned(Hash256::from_low_u64_be(0), CloneConfig::none()) - .expect("the head should still be in the cache") - .beacon_block_root, - Hash256::from_low_u64_be(0), - "get_cloned should get the correct snapshot" - ); - assert_eq!( - cache - .get_state_for_block_processing( - Hash256::from_low_u64_be(0), - Slot::new(0), - None, - &spec - ) - .expect("the head should still be in the cache") - .0 - .beacon_block_root, - Hash256::from_low_u64_be(0), - "get_state_for_block_processing should get the correct snapshot" - ); - - assert_eq!( - cache.snapshots.len(), - CACHE_SIZE - 1, - "get_state_for_block_processing should shorten the cache" - ); - - // Prune the cache. Afterwards it should look like: - // - // Index Root - // 0 2 - // 1 3 - cache.prune(Epoch::new(2)); - - assert_eq!(cache.snapshots.len(), 2); - - cache.update_head(Hash256::from_low_u64_be(2)); - - // Over-fill the cache so it needs to eject some old values on insert. - for i in 0..CACHE_SIZE as u64 { - cache.insert(get_snapshot(u64::max_value() - i), None, &spec); - } - - // Ensure that the new head value was not removed from the cache. - assert_eq!( - cache - .get_state_for_block_processing( - Hash256::from_low_u64_be(2), - Slot::new(0), - None, - &spec - ) - .expect("the new head should still be in the cache") - .0 - .beacon_block_root, - Hash256::from_low_u64_be(2), - "get_state_for_block_processing should get the correct snapshot" - ); - } -} diff --git a/beacon_node/beacon_chain/src/state_advance_timer.rs b/beacon_node/beacon_chain/src/state_advance_timer.rs index 6a3c3ea00e..913cad90b2 100644 --- a/beacon_node/beacon_chain/src/state_advance_timer.rs +++ b/beacon_node/beacon_chain/src/state_advance_timer.rs @@ -15,9 +15,7 @@ //! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles. use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS; use crate::{ - beacon_chain::{ATTESTATION_CACHE_LOCK_TIMEOUT, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT}, - snapshot_cache::StateAdvance, - BeaconChain, BeaconChainError, BeaconChainTypes, + beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT, BeaconChain, BeaconChainError, BeaconChainTypes, }; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; @@ -28,7 +26,7 @@ use std::sync::{ }; use task_executor::TaskExecutor; use tokio::time::sleep; -use types::{AttestationShufflingId, EthSpec, Hash256, RelativeEpoch, Slot}; +use types::{AttestationShufflingId, BeaconStateError, EthSpec, Hash256, RelativeEpoch, Slot}; /// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform /// the state advancement. @@ -40,6 +38,8 @@ const MAX_ADVANCE_DISTANCE: u64 = 4; #[derive(Debug)] enum Error { BeaconChain(BeaconChainError), + BeaconState(BeaconStateError), + Store(store::Error), HeadMissingFromSnapshotCache(Hash256), MaxDistanceExceeded { current_slot: Slot, @@ -60,6 +60,18 @@ impl From for Error { } } +impl From for Error { + fn from(e: BeaconStateError) -> Self { + Self::BeaconState(e) + } +} + +impl From for Error { + fn from(e: store::Error) -> Self { + Self::Store(e) + } +} + /// Provides a simple thread-safe lock to be used for task co-ordination. Practically equivalent to /// `Mutex<()>`. #[derive(Clone)] @@ -166,11 +178,6 @@ async fn state_advance_timer( } } -/// Reads the `snapshot_cache` from the `beacon_chain` and attempts to take a clone of the -/// `BeaconState` of the head block. If it obtains this clone, the state will be advanced a single -/// slot then placed back in the `snapshot_cache` to be used for block verification. -/// -/// See the module-level documentation for rationale. fn advance_head( beacon_chain: &BeaconChain, log: &Logger, @@ -200,46 +207,37 @@ fn advance_head( // majority of attestations. beacon_chain.fork_choice()?; - let head_root = beacon_chain.head_info()?.block_root; + let head_info = beacon_chain.head_info()?; + let head_block_root = head_info.block_root; - let (head_slot, head_state_root, mut state) = match beacon_chain - .snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .ok_or(BeaconChainError::SnapshotCacheLockTimeout)? - .get_for_state_advance(head_root) - { - StateAdvance::AlreadyAdvanced => { - return Err(Error::StateAlreadyAdvanced { - block_root: head_root, - }) - } - StateAdvance::BlockNotFound => return Err(Error::HeadMissingFromSnapshotCache(head_root)), - StateAdvance::State { - state, - state_root, - block_slot, - } => (block_slot, state_root, *state), - }; + let (head_state_root, mut state) = beacon_chain + .store + .get_advanced_state(head_block_root, current_slot, head_info.state_root)? + .ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?; - let initial_slot = state.slot(); - let initial_epoch = state.current_epoch(); - - let state_root = if state.slot() == head_slot { - Some(head_state_root) - } else { + if state.slot() == current_slot { + return Err(Error::StateAlreadyAdvanced { + block_root: head_block_root, + }); + } else if state.slot() + 1 != current_slot { // Protect against advancing a state more than a single slot. // // Advancing more than one slot without storing the intermediate state would corrupt the // database. Future works might store temporary, intermediate states inside this function. return Err(Error::BadStateSlot { - _block_slot: head_slot, + // FIXME(sproul): wrong + _block_slot: state.slot(), _state_slot: state.slot(), }); - }; + } + + let initial_slot = state.slot(); + let initial_epoch = state.current_epoch(); // Advance the state a single slot. - if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec) - .map_err(BeaconChainError::from)? + if let Some(summary) = + per_slot_processing(&mut state, Some(head_state_root), &beacon_chain.spec) + .map_err(BeaconChainError::from)? { // Expose Prometheus metrics. if let Err(e) = summary.observe_metrics() { @@ -273,7 +271,7 @@ fn advance_head( debug!( log, "Advanced head state one slot"; - "head_root" => ?head_root, + "head_block_root" => ?head_block_root, "state_slot" => state.slot(), "current_slot" => current_slot, ); @@ -292,14 +290,14 @@ fn advance_head( if initial_epoch < state.current_epoch() { // Update the proposer cache. // - // We supply the `head_root` as the decision block since the prior `if` statement guarantees + // We supply the `head_block_root` as the decision block since the prior `if` statement guarantees // the head root is the latest block from the prior epoch. beacon_chain .beacon_proposer_cache .lock() .insert( state.current_epoch(), - head_root, + head_block_root, state .get_beacon_proposer_indices(&beacon_chain.spec) .map_err(BeaconChainError::from)?, @@ -308,8 +306,9 @@ fn advance_head( .map_err(BeaconChainError::from)?; // Update the attester cache. - let shuffling_id = AttestationShufflingId::new(head_root, &state, RelativeEpoch::Next) - .map_err(BeaconChainError::from)?; + let shuffling_id = + AttestationShufflingId::new(head_block_root, &state, RelativeEpoch::Next) + .map_err(BeaconChainError::from)?; let committee_cache = state .committee_cache(RelativeEpoch::Next) .map_err(BeaconChainError::from)?; @@ -322,7 +321,7 @@ fn advance_head( debug!( log, "Primed proposer and attester caches"; - "head_root" => ?head_root, + "head_block_root" => ?head_block_root, "next_epoch_shuffling_root" => ?shuffling_id.shuffling_decision_block, "state_epoch" => state.current_epoch(), "current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()), @@ -332,44 +331,19 @@ fn advance_head( // Apply the state to the attester cache, if the cache deems it interesting. beacon_chain .attester_cache - .maybe_cache_state(&state, head_root, &beacon_chain.spec) + .maybe_cache_state(&state, head_block_root, &beacon_chain.spec) .map_err(BeaconChainError::from)?; let final_slot = state.slot(); - // Insert the advanced state back into the snapshot cache. - beacon_chain - .snapshot_cache - .try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT) - .ok_or(BeaconChainError::SnapshotCacheLockTimeout)? - .update_pre_state(head_root, state) - .ok_or(Error::HeadMissingFromSnapshotCache(head_root))?; - - // If we have moved into the next slot whilst processing the state then this function is going - // to become ineffective and likely become a hindrance as we're stealing the tree hash cache - // from the snapshot cache (which may force the next block to rebuild a new one). - // - // If this warning occurs very frequently on well-resourced machines then we should consider - // starting it earlier in the slot. Otherwise, it's a good indication that the machine is too - // slow/overloaded and will be useful information for the user. - let starting_slot = current_slot; - let current_slot = beacon_chain.slot()?; - if starting_slot < current_slot { - warn!( - log, - "State advance too slow"; - "head_root" => %head_root, - "advanced_slot" => final_slot, - "current_slot" => current_slot, - "starting_slot" => starting_slot, - "msg" => "system resources may be overloaded", - ); - } + // Write the advanced state to the database. + let advanced_state_root = state.update_tree_hash_cache()?; + beacon_chain.store.put_state(&advanced_state_root, &state)?; debug!( log, "Completed state advance"; - "head_root" => ?head_root, + "head_block_root" => ?head_block_root, "advanced_slot" => final_slot, "initial_slot" => initial_slot, ); diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 574895296d..8a7c90e0d6 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -529,6 +529,7 @@ where .store .load_hot_state(&state_hash.into(), StateRootStrategy::Accurate) .unwrap() + .map(|(state, _)| state) } pub fn get_cold_state(&self, state_hash: BeaconStateHash) -> Option> { diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 208776c1ef..cde8d1f3ae 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -5,7 +5,8 @@ use ssz_derive::{Decode, Encode}; use types::{EthSpec, MinimalEthSpec}; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; -pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5; +pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64; +pub const DEFAULT_STATE_CACHE_SIZE: usize = 128; /// Database configuration parameters. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -14,6 +15,8 @@ pub struct StoreConfig { pub slots_per_restore_point: u64, /// Maximum number of blocks to store in the in-memory block cache. pub block_cache_size: usize, + /// Maximum number of states to sore in the in-memory state cache. + pub state_cache_size: usize, /// Whether to compact the database on initialization. pub compact_on_init: bool, /// Whether to compact the database during database pruning. @@ -37,6 +40,7 @@ impl Default for StoreConfig { // Safe default for tests, shouldn't ever be read by a CLI node. slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64, block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, + state_cache_size: DEFAULT_STATE_CACHE_SIZE, compact_on_init: false, compact_on_prune: true, } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 62441ce0f2..7f2723416e 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -14,6 +14,7 @@ use crate::metadata::{ SCHEMA_VERSION_KEY, SPLIT_KEY, }; use crate::metrics; +use crate::state_cache::StateCache; use crate::{ get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, @@ -58,6 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { pub hot_db: Hot, /// LRU cache of deserialized blocks. Updated whenever a block is loaded. block_cache: Mutex>>, + /// Cache of beacon states. + state_cache: Mutex>, /// Chain spec. pub(crate) spec: ChainSpec, /// Logger. @@ -123,6 +126,7 @@ impl HotColdDB, MemoryStore> { cold_db: MemoryStore::open(), hot_db: MemoryStore::open(), block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + state_cache: Mutex::new(StateCache::new(config.state_cache_size)), config, spec, log, @@ -156,6 +160,7 @@ impl HotColdDB, LevelDB> { cold_db: LevelDB::open(cold_path)?, hot_db: LevelDB::open(hot_path)?, block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + state_cache: Mutex::new(StateCache::new(config.state_cache_size)), config, spec, log, @@ -233,6 +238,22 @@ impl HotColdDB, LevelDB> { } impl, Cold: ItemStore> HotColdDB { + pub fn update_finalized_state( + &self, + state_root: Hash256, + block_root: Hash256, + epoch: Epoch, + state: BeaconState, + ) -> Result<(), Error> { + self.state_cache + .lock() + .update_finalized_state(state_root, block_root, epoch, state) + } + + pub fn state_cache_len(&self) -> usize { + self.state_cache.lock().len() + } + /// Store a block and update the LRU cache. pub fn put_block( &self, @@ -362,16 +383,34 @@ impl, Cold: ItemStore> HotColdDB // chain. This way we avoid returning a state that doesn't match `state_root`. self.load_cold_state(state_root) } else { - self.load_hot_state(state_root, StateRootStrategy::Accurate) + self.get_hot_state(state_root, StateRootStrategy::Accurate) } } else { - match self.load_hot_state(state_root, StateRootStrategy::Accurate)? { + match self.get_hot_state(state_root, StateRootStrategy::Accurate)? { Some(state) => Ok(Some(state)), None => self.load_cold_state(state_root), } } } + /// Get a state with `latest_block_root == block_root` advanced through to at most `slot`. + /// + /// The `state_root` argument is used to look up the block's un-advanced state in case of a + /// cache miss. + pub fn get_advanced_state( + &self, + block_root: Hash256, + slot: Slot, + state_root: Hash256, + ) -> Result)>, Error> { + if let Some(cached) = self.state_cache.lock().get_by_block_root(block_root, slot) { + return Ok(Some(cached)); + } + Ok(self + .get_hot_state(&state_root, StateRootStrategy::Accurate)? + .map(|state| (state_root, state))) + } + /// Fetch a state from the store, but don't compute all of the values when replaying blocks /// upon that state (e.g., state roots). Additionally, only states from the hot store are /// returned. @@ -403,7 +442,7 @@ impl, Cold: ItemStore> HotColdDB } .into()) } else { - self.load_hot_state(state_root, StateRootStrategy::Inconsistent) + self.get_hot_state(state_root, StateRootStrategy::Inconsistent) } } @@ -496,7 +535,7 @@ impl, Cold: ItemStore> HotColdDB // `StateRootStrategy` should be irrelevant here since we never replay blocks for an epoch // boundary state in the hot DB. let state = self - .load_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)? + .get_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)? .ok_or(HotColdDBError::MissingEpochBoundaryState( epoch_boundary_state_root, ))?; @@ -539,10 +578,6 @@ impl, Cold: ItemStore> HotColdDB self.store_hot_state(state_root, state, &mut key_value_batch)?; } - StoreOp::PutStateSummary(state_root, summary) => { - key_value_batch.push(summary.as_kv_store_op(*state_root)); - } - StoreOp::PutStateTemporaryFlag(state_root) => { key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root)); } @@ -588,8 +623,6 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutState(_, _) => (), - StoreOp::PutStateSummary(_, _) => (), - StoreOp::PutStateTemporaryFlag(_) => (), StoreOp::DeleteStateTemporaryFlag(_) => (), @@ -614,6 +647,19 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { + // Put the state in the cache. + // FIXME(sproul): could optimise out the block root + let block_root = state.get_latest_block_root(*state_root); + + if self + .state_cache + .lock() + .put_state(*state_root, block_root, state)? + { + // Already exists in database. + return Ok(()); + } + // On the epoch boundary, store the full state. if state.slot() % E::slots_per_epoch() == 0 { trace!( @@ -635,14 +681,43 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } - /// Load a post-finalization state from the hot database. - /// - /// Will replay blocks from the nearest epoch boundary. - pub fn load_hot_state( + /// Get a post-finalization state from the database or store. + pub fn get_hot_state( &self, state_root: &Hash256, state_root_strategy: StateRootStrategy, ) -> Result>, Error> { + if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { + return Ok(Some(state)); + } + warn!( + self.log, + "State cache missed"; + "state_root" => ?state_root, + ); + + let state_from_disk = self.load_hot_state(state_root, state_root_strategy)?; + + if let Some((state, block_root)) = state_from_disk { + self.state_cache + .lock() + .put_state(*state_root, block_root, &state)?; + Ok(Some(state)) + } else { + Ok(None) + } + } + + /// Load a post-finalization state from the hot database. + /// + /// Will replay blocks from the nearest epoch boundary. + /// + /// Return the `(state, latest_block_root)` if found. + pub fn load_hot_state( + &self, + state_root: &Hash256, + state_root_strategy: StateRootStrategy, + ) -> Result, Hash256)>, Error> { metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT); // If the state is marked as temporary, do not return it. It will become visible @@ -678,7 +753,7 @@ impl, Cold: ItemStore> HotColdDB )? }; - Ok(Some(state)) + Ok(Some((state, latest_block_root))) } else { Ok(None) } @@ -1262,17 +1337,18 @@ impl, Cold: ItemStore> HotColdDB /// Advance the split point of the store, moving new finalized states to the freezer. pub fn migrate_database, Cold: ItemStore>( store: Arc>, - frozen_head_root: Hash256, - frozen_head: &BeaconState, + finalized_state_root: Hash256, + finalized_block_root: Hash256, + finalized_state: &BeaconState, ) -> Result<(), Error> { debug!( store.log, "Freezer migration started"; - "slot" => frozen_head.slot() + "slot" => finalized_state.slot() ); // 0. Check that the migration is sensible. - // The new frozen head must increase the current split slot, and lie on an epoch + // The new finalized state must increase the current split slot, and lie on an epoch // boundary (in order for the hot state summary scheme to work). let current_split_slot = store.split.read_recursive().slot; let anchor_slot = store @@ -1281,23 +1357,23 @@ pub fn migrate_database, Cold: ItemStore>( .as_ref() .map(|a| a.anchor_slot); - if frozen_head.slot() < current_split_slot { + if finalized_state.slot() < current_split_slot { return Err(HotColdDBError::FreezeSlotError { current_split_slot, - proposed_split_slot: frozen_head.slot(), + proposed_split_slot: finalized_state.slot(), } .into()); } - if frozen_head.slot() % E::slots_per_epoch() != 0 { - return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into()); + if finalized_state.slot() % E::slots_per_epoch() != 0 { + return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into()); } let mut hot_db_ops: Vec> = Vec::new(); - // 1. Copy all of the states between the head and the split slot, from the hot DB - // to the cold DB. - let state_root_iter = StateRootsIterator::new(&store, frozen_head); + // 1. Copy all of the states between the new finalized state and the split slot, from the hot DB + // to the cold DB. + let state_root_iter = StateRootsIterator::new(&store, finalized_state); for maybe_pair in state_root_iter.take_while(|result| match result { Ok((_, slot)) => { slot >= ¤t_split_slot @@ -1310,7 +1386,8 @@ pub fn migrate_database, Cold: ItemStore>( let mut cold_db_ops: Vec = Vec::new(); if slot % store.config.slots_per_restore_point == 0 { - let state: BeaconState = get_full_state(&store.hot_db, &state_root, &store.spec)? + let state: BeaconState = store + .get_hot_state(&state_root, StateRootStrategy::Accurate)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; @@ -1367,8 +1444,8 @@ pub fn migrate_database, Cold: ItemStore>( // Before updating the in-memory split value, we flush it to disk first, so that should the // OS process die at this point, we pick up from the right place after a restart. let split = Split { - slot: frozen_head.slot(), - state_root: frozen_head_root, + slot: finalized_state.slot(), + state_root: finalized_state_root, }; store.hot_db.put_sync(&SPLIT_KEY, &split)?; @@ -1381,10 +1458,18 @@ pub fn migrate_database, Cold: ItemStore>( // Delete the states from the hot database if we got this far. store.do_atomically(hot_db_ops)?; + // Update the cache's view of the finalized state. + store.update_finalized_state( + finalized_state_root, + finalized_block_root, + finalized_state.slot().epoch(E::slots_per_epoch()), + finalized_state.clone(), + )?; + debug!( store.log, "Freezer migration complete"; - "slot" => frozen_head.slot() + "slot" => finalized_state.slot() ); Ok(()) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 145ba2d54c..a44cef4138 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -139,7 +139,6 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Box>), PutState(Hash256, &'a BeaconState), - PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 0b55f137a9..a4f64835c8 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -1,3 +1,165 @@ +use crate::Error; +use lru::LruCache; +use std::collections::{BTreeMap, HashMap, HashSet}; +use types::{BeaconState, Epoch, EthSpec, Hash256, Slot}; + +#[derive(Debug)] +pub struct FinalizedState { + state_root: Hash256, + epoch: Epoch, + state: BeaconState, +} + +/// Map from block_root -> slot -> state_root. +#[derive(Debug, Default)] +pub struct BlockMap { + blocks: HashMap, +} + +/// Map from slot -> state_root. +#[derive(Debug, Default)] +pub struct SlotMap { + slots: BTreeMap, +} + +#[derive(Debug)] +pub struct StateCache { + finalized_state: Option>, + states: LruCache>, + block_map: BlockMap, +} + +impl StateCache { + pub fn new(capacity: usize) -> Self { + StateCache { + finalized_state: None, + states: LruCache::new(capacity), + block_map: BlockMap::default(), + } + } + + pub fn len(&self) -> usize { + self.states.len() + } + + pub fn update_finalized_state( + &mut self, + state_root: Hash256, + block_root: Hash256, + epoch: Epoch, + state: BeaconState, + ) -> Result<(), Error> { + if self + .finalized_state + .as_ref() + .map_or(false, |finalized_state| epoch < finalized_state.epoch) + { + // FIXME(sproul): panic + panic!("decreasing epoch"); + } + + let finalized_slot = epoch.start_slot(E::slots_per_epoch()); + + // Add to block map. + self.block_map + .insert(block_root, finalized_slot, state_root); + + // Prune block map. + let state_roots_to_prune = self.block_map.prune(finalized_slot); + + // Delete states. + for state_root in state_roots_to_prune { + self.states.pop(&state_root); + } + + // Update finalized state. + self.finalized_state = Some(FinalizedState { + state_root, + epoch, + state, + }); + Ok(()) + } + + /// Return a bool indicating whether the state already existed in the cache. + pub fn put_state( + &mut self, + state_root: Hash256, + block_root: Hash256, + state: &BeaconState, + ) -> Result { + if self.states.peek(&state_root).is_some() { + return Ok(true); + } + + // Insert the full state into the cache. + self.states.put(state_root, state.clone()); + + // Record the connection from block root and slot to this state. + let slot = state.slot(); + self.block_map.insert(block_root, slot, state_root); + + Ok(false) + } + + pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option> { + if let Some(ref finalized_state) = self.finalized_state { + if state_root == finalized_state.state_root { + return Some(finalized_state.state.clone()); + } + } + self.states.get(&state_root).cloned() + } + + pub fn get_by_block_root( + &mut self, + block_root: Hash256, + slot: Slot, + ) -> Option<(Hash256, BeaconState)> { + let slot_map = self.block_map.blocks.get(&block_root)?; + + // Find the state at `slot`, or failing that the most recent ancestor. + let state_root = slot_map + .slots + .iter() + .rev() + .find_map(|(ancestor_slot, state_root)| { + (*ancestor_slot <= slot).then(|| *state_root) + })?; + + let state = self.get_by_state_root(state_root)?; + Some((state_root, state)) + } +} + +impl BlockMap { + fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) { + let slot_map = self + .blocks + .entry(block_root) + .or_insert_with(SlotMap::default); + slot_map.slots.insert(slot, state_root); + } + + fn prune(&mut self, finalized_slot: Slot) -> HashSet { + let mut pruned_states = HashSet::new(); + + self.blocks.retain(|_, slot_map| { + slot_map.slots.retain(|slot, state_root| { + let keep = *slot > finalized_slot; + if !keep { + pruned_states.insert(*state_root); + } + keep + }); + + !slot_map.slots.is_empty() + }); + + pruned_states + } +} + #[cfg(test)] mod test { use super::*; diff --git a/consensus/types/src/beacon_state.rs b/consensus/types/src/beacon_state.rs index f4954bac25..a7ec919562 100644 --- a/consensus/types/src/beacon_state.rs +++ b/consensus/types/src/beacon_state.rs @@ -1518,17 +1518,16 @@ impl BeaconState { /// Get the committee cache for some `slot`. /// /// Return an error if the cache for the slot's epoch is not initialized. - fn committee_cache_at_slot(&self, slot: Slot) -> Result<&CommitteeCache, Error> { + fn committee_cache_at_slot(&self, slot: Slot) -> Result<&Arc, Error> { let epoch = slot.epoch(T::slots_per_epoch()); let relative_epoch = RelativeEpoch::from_epoch(self.current_epoch(), epoch)?; self.committee_cache(relative_epoch) } /// Get the committee cache at a given index. - fn committee_cache_at_index(&self, index: usize) -> Result<&CommitteeCache, Error> { + fn committee_cache_at_index(&self, index: usize) -> Result<&Arc, Error> { self.committee_caches() .get(index) - .map(Arc::as_ref) .ok_or(Error::CommitteeCachesOutOfBounds(index)) } @@ -1544,7 +1543,10 @@ impl BeaconState { /// Returns the cache for some `RelativeEpoch`. Returns an error if the cache has not been /// initialized. - pub fn committee_cache(&self, relative_epoch: RelativeEpoch) -> Result<&CommitteeCache, Error> { + pub fn committee_cache( + &self, + relative_epoch: RelativeEpoch, + ) -> Result<&Arc, Error> { let i = Self::committee_cache_index(relative_epoch); let cache = self.committee_cache_at_index(i)?; diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index ee3d4c4614..0ec85ec5ae 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -19,7 +19,7 @@ spec-minimal = [] # Support Gnosis spec and Gnosis Beacon Chain. gnosis = [] # Use `milhouse` tree states. -tree-states = ["store/milhouse"] +tree-states = ["beacon_node/tree-states"] [dependencies] beacon_node = { "path" = "../beacon_node" }