Revamp state advance, delete snapshot cache

This commit is contained in:
Michael Sproul
2022-02-14 16:16:12 +11:00
parent 42e4675c97
commit f888a08f15
18 changed files with 422 additions and 851 deletions

View File

@@ -13,6 +13,7 @@ node_test_rig = { path = "../testing/node_test_rig" }
[features]
write_ssz_files = ["beacon_chain/write_ssz_files"] # Writes debugging .ssz files to /tmp during block processing.
tree-states = ["beacon_chain/tree-states"]
[dependencies]
eth2_config = { path = "../common/eth2_config" }

View File

@@ -10,6 +10,7 @@ default = ["participation_metrics"]
write_ssz_files = [] # Writes debugging .ssz files to /tmp during block processing.
participation_metrics = [] # Exposes validator participation metrics to Prometheus.
fork_from_env = [] # Initialise the harness chain spec from the FORK_NAME env variable
tree-states = ["store/milhouse"]
[dev-dependencies]
maplit = "1.0.2"

View File

@@ -36,7 +36,6 @@ use crate::persisted_beacon_chain::{PersistedBeaconChain, DUMMY_CANONICAL_HEAD_B
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
@@ -92,9 +91,6 @@ pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
/// head.
pub const HEAD_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the block
/// processing cache.
pub const BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
/// The time-out before failure during an operation to take a read/write RwLock on the
/// attestation cache.
pub const ATTESTATION_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1);
@@ -323,8 +319,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// A cache dedicated to block processing.
pub(crate) snapshot_cache: TimeoutRwLock<SnapshotCache<T::EthSpec>>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub(crate) shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
@@ -2530,7 +2524,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Do not import a block that doesn't descend from the finalized root.
let signed_block =
check_block_is_finalized_descendant::<T, _>(signed_block, &fork_choice, &self.store)?;
let (block, block_signature) = signed_block.clone().deconstruct();
let (block, _) = signed_block.clone().deconstruct();
// compare the existing finalized checkpoint with the incoming block's finalized checkpoint
let old_finalized_checkpoint = fork_choice.finalized_checkpoint();
@@ -2784,30 +2778,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let parent_root = block.parent_root();
let slot = block.slot();
let signed_block = SignedBeaconBlock::from_block(block, block_signature);
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(Error::SnapshotCacheLockTimeout)
.map(|mut snapshot_cache| {
snapshot_cache.insert(
BeaconSnapshot {
beacon_state: state,
beacon_block: signed_block,
beacon_block_root: block_root,
},
None,
&self.spec,
)
})
.unwrap_or_else(|e| {
error!(
self.log,
"Failed to insert snapshot";
"error" => ?e,
"task" => "process block"
);
});
self.head_tracker
.register_block(block_root, parent_root, slot);
@@ -2888,28 +2858,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.head_info()
.map_err(BlockProductionError::UnableToGetHeadInfo)?;
let (state, state_root_opt) = if head_info.slot < slot {
// Normal case: proposing a block atop the current head. Use the snapshot cache.
if let Some(pre_state) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_state_for_block_production(head_info.block_root)
})
{
(pre_state.pre_state, pre_state.state_root)
} else {
warn!(
self.log,
"Block production cache miss";
"message" => "this block is more likely to be orphaned",
"slot" => slot,
);
let state = self
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
let state = self
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
(state, None)
}
(state, None)
} else {
warn!(
self.log,
@@ -3206,40 +3159,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// At this point we know that the new head block is not the same as the previous one
metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);
// Try and obtain the snapshot for `beacon_block_root` from the snapshot cache, falling
// back to a database read if that fails.
let new_head = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_cloned(beacon_block_root, CloneConfig::committee_caches_only())
})
.map::<Result<_, Error>, _>(Ok)
.unwrap_or_else(|| {
let beacon_block = self
.get_block(&beacon_block_root)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let new_head = {
let beacon_block = self
.get_block(&beacon_block_root)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root();
let beacon_state: BeaconState<T::EthSpec> = self
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
let beacon_state_root = beacon_block.state_root();
let mut beacon_state: BeaconState<T::EthSpec> = self
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
beacon_state.build_all_committee_caches(&self.spec)?;
Ok(BeaconSnapshot {
beacon_block,
beacon_block_root,
beacon_state,
})
})
.and_then(|mut snapshot| {
// Regardless of where we got the state from, attempt to build the committee
// caches.
snapshot
.beacon_state
.build_all_committee_caches(&self.spec)
.map_err(Into::into)
.map(|()| snapshot)
})?;
BeaconSnapshot {
beacon_block,
beacon_block_root,
beacon_state,
}
};
// Attempt to detect if the new head is not on the same chain as the previous block
// (i.e., a re-org).
@@ -3428,20 +3364,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
snapshot_cache.update_head(beacon_block_root);
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"task" => "update head"
);
});
if is_epoch_transition || is_reorg {
self.persist_head_and_fork_choice()?;
self.op_pool.prune_attestations(self.epoch()?);
@@ -3742,26 +3664,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.start_slot(T::EthSpec::slots_per_epoch()),
);
self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
snapshot_cache.prune(new_finalized_checkpoint.epoch);
debug!(
self.log,
"Snapshot cache pruned";
"new_len" => snapshot_cache.len(),
"remaining_roots" => ?snapshot_cache.beacon_block_roots(),
);
})
.unwrap_or_else(|| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"task" => "prune"
);
});
self.op_pool.prune_all(head_state, self.epoch()?);
self.store_migrator.process_finalization(

View File

@@ -10,6 +10,19 @@ pub struct BeaconSnapshot<E: EthSpec> {
pub beacon_state: BeaconState<E>,
}
/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` state that has been
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
/// the application of another block.
pub pre_state: BeaconState<T>,
/// This value is only set to `Some` if the `pre_state` was *not* advanced forward.
pub beacon_state_root: Option<Hash256>,
pub beacon_block: SignedBeaconBlock<T>,
pub beacon_block_root: Hash256,
}
impl<E: EthSpec> BeaconSnapshot<E> {
/// Create a new checkpoint.
pub fn new(

View File

@@ -40,17 +40,14 @@
//! END
//!
//! ```
use crate::beacon_snapshot::PreProcessingSnapshot;
use crate::execution_payload::{
execute_payload, validate_execution_payload_for_gossip, validate_merge_block,
};
use crate::snapshot_cache::PreProcessingSnapshot;
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{
beacon_chain::{
BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT, MAXIMUM_GOSSIP_CLOCK_DISPARITY,
VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT,
},
beacon_chain::{MAXIMUM_GOSSIP_CLOCK_DISPARITY, VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT},
metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use eth2::types::EventKind;
@@ -72,7 +69,7 @@ use std::borrow::Cow;
use std::fs;
use std::io::Write;
use std::time::Duration;
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use store::{Error as DBError, HotColdDB, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Hash256,
@@ -1033,7 +1030,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
// Perform a sanity check on the pre-state.
let parent_slot = parent.beacon_block.slot();
if state.slot() < parent_slot || state.slot() > parent_slot + 1 {
if state.slot() < parent_slot {
return Err(BeaconChainError::BadPreState {
parent_root: parent.beacon_block_root,
parent_slot,
@@ -1061,29 +1058,10 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();
let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
} else {
vec![
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
chain.store.do_atomically(state_batch)?;
chain.store.do_atomically(vec![
StoreOp::PutState(state_root, &state),
StoreOp::PutStateTemporaryFlag(state_root),
])?;
drop(txn_lock);
confirmation_db_batch.push(StoreOp::DeleteStateTemporaryFlag(state_root));
@@ -1435,8 +1413,6 @@ fn load_parent<T: BeaconChainTypes>(
),
BlockError<T::EthSpec>,
> {
let spec = &chain.spec;
// Reject any block if its parent is not known to fork choice.
//
// A block that is not in fork choice is either:
@@ -1455,44 +1431,9 @@ fn load_parent<T: BeaconChainTypes>(
return Err(BlockError::ParentUnknown(Box::new(block)));
}
let block_delay = chain
.block_times_cache
.read()
.get_block_delays(
block.canonical_root(),
chain
.slot_clock
.start_of(block.slot())
.unwrap_or_else(|| Duration::from_secs(0)),
)
.observed;
let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ);
let result = if let Some((snapshot, cloned)) = chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|mut snapshot_cache| {
snapshot_cache.get_state_for_block_processing(
block.parent_root(),
block.slot(),
block_delay,
spec,
)
}) {
if cloned {
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES);
debug!(
chain.log,
"Cloned snapshot for late block/skipped slot";
"slot" => %block.slot(),
"parent_slot" => %snapshot.beacon_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);
}
Ok((snapshot, block))
} else {
let result = {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
//
@@ -1515,28 +1456,25 @@ fn load_parent<T: BeaconChainTypes>(
// Load the parent blocks state from the database, returning an error if it is not found.
// It is an error because if we know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root();
let parent_state = chain
.get_state(&parent_state_root, Some(parent_block.slot()))?
let (advanced_state_root, state) = chain
.store
.get_advanced_state(block.parent_root(), block.slot(), parent_state_root)?
.ok_or_else(|| {
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES);
debug!(
chain.log,
"Missed snapshot cache";
"slot" => block.slot(),
"parent_slot" => parent_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);
let beacon_state_root = if parent_state_root == advanced_state_root {
Some(parent_state_root)
} else {
None
};
Ok((
PreProcessingSnapshot {
beacon_block: parent_block,
beacon_block_root: root,
pre_state: parent_state,
beacon_state_root: Some(parent_state_root),
pre_state: state,
beacon_state_root,
},
block,
))

View File

@@ -5,7 +5,6 @@ use crate::head_tracker::HeadTracker;
use crate::migrate::{BackgroundMigrator, MigratorConfig};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::ValidatorMonitor;
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
@@ -29,8 +28,8 @@ use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::ShutdownReason;
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
BeaconBlock, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, Graffiti, Hash256,
PublicKeyBytes, Signature, SignedBeaconBlock, Slot,
};
/// An empty struct used to "witness" all the `BeaconChainTypes` traits. It has no user-facing
@@ -303,6 +302,15 @@ where
let beacon_state_root = beacon_block.message().state_root();
let beacon_block_root = beacon_block.canonical_root();
store
.update_finalized_state(
beacon_state_root,
beacon_block_root,
Epoch::new(0),
beacon_state.clone(),
)
.map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?;
store
.put_state(&beacon_state_root, &beacon_state)
.map_err(|e| format!("Failed to store genesis state: {:?}", e))?;
@@ -425,6 +433,14 @@ where
// Write the state and block non-atomically, it doesn't matter if they're forgotten
// about on a crash restart.
store
.update_finalized_state(
weak_subj_state_root,
weak_subj_block_root,
weak_subj_slot.epoch(TEthSpec::slots_per_epoch()),
weak_subj_state.clone(),
)
.map_err(|e| format!("Failed to set genesis state as finalized state: {:?}", e))?;
store
.put_state(&weak_subj_state_root, &weak_subj_state)
.map_err(|e| format!("Failed to store weak subjectivity state: {:?}", e))?;
@@ -751,10 +767,6 @@ where
fork_choice: RwLock::new(fork_choice),
event_handler: self.event_handler,
head_tracker,
snapshot_cache: TimeoutRwLock::new(SnapshotCache::new(
DEFAULT_SNAPSHOT_CACHE_SIZE,
canonical_head,
)),
shuffling_cache: TimeoutRwLock::new(ShufflingCache::new()),
beacon_proposer_cache: <_>::default(),
block_times_cache: <_>::default(),

View File

@@ -30,7 +30,6 @@ mod persisted_fork_choice;
mod pre_finalization_cache;
pub mod schema_change;
mod shuffling_cache;
mod snapshot_cache;
pub mod state_advance_timer;
pub mod sync_committee_verification;
pub mod test_utils;

View File

@@ -7,9 +7,6 @@ use slot_clock::SlotClock;
use std::time::Duration;
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
/// The maximum time to wait for the snapshot cache lock during a metrics scrape.
const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100);
lazy_static! {
/*
* Block Processing
@@ -935,15 +932,10 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
let attestation_stats = beacon_chain.op_pool.attestation_stats();
if let Some(snapshot_cache) = beacon_chain
.snapshot_cache
.try_write_for(SNAPSHOT_CACHE_TIMEOUT)
{
set_gauge(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
snapshot_cache.len() as i64,
)
}
set_gauge_by_usize(
&BLOCK_PROCESSING_SNAPSHOT_CACHE_SIZE,
beacon_chain.store.state_cache_len(),
);
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size);

View File

@@ -197,6 +197,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
debug!(log, "Database consolidation started");
let finalized_state_root = notif.finalized_state_root;
let finalized_block_root = notif.finalized_checkpoint.root;
let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
Ok(Some(state)) => state,
@@ -237,7 +238,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
};
match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) {
match migrate_database(
db.clone(),
finalized_state_root.into(),
finalized_block_root,
&finalized_state,
) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(

View File

@@ -1,520 +0,0 @@
use crate::BeaconSnapshot;
use itertools::process_results;
use std::cmp;
use std::time::Duration;
use types::{
beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};
/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
/// The minimum block delay to clone the state in the cache instead of removing it.
/// This helps keep block processing fast during re-orgs from late blocks.
const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6);
/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
/// This state is equivalent to the `self.beacon_block.state_root()` state that has been
/// advanced forward one slot using `per_slot_processing`. This state is "primed and ready" for
/// the application of another block.
pub pre_state: BeaconState<T>,
/// This value is only set to `Some` if the `pre_state` was *not* advanced forward.
pub beacon_state_root: Option<Hash256>,
pub beacon_block: SignedBeaconBlock<T>,
pub beacon_block_root: Hash256,
}
impl<T: EthSpec> From<BeaconSnapshot<T>> for PreProcessingSnapshot<T> {
fn from(snapshot: BeaconSnapshot<T>) -> Self {
let beacon_state_root = Some(snapshot.beacon_state_root());
Self {
pre_state: snapshot.beacon_state,
beacon_state_root,
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
}
}
}
impl<T: EthSpec> CacheItem<T> {
pub fn new_without_pre_state(snapshot: BeaconSnapshot<T>) -> Self {
Self {
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state: None,
}
}
fn clone_to_snapshot_with(&self, clone_config: CloneConfig) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
}
}
pub fn into_pre_state(self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());
PreProcessingSnapshot {
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
pre_state: self.pre_state.unwrap_or(self.beacon_state),
beacon_state_root,
}
}
pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());
PreProcessingSnapshot {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
pre_state: self
.pre_state
.as_ref()
.map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()),
beacon_state_root,
}
}
}
/// The information required for block production.
pub struct BlockProductionPreState<T: EthSpec> {
/// This state may or may not have been advanced forward a single slot.
///
/// See the documentation in the `crate::state_advance_timer` module for more information.
pub pre_state: BeaconState<T>,
/// This value will only be `Some` if `self.pre_state` was **not** advanced forward a single
/// slot.
///
/// This value can be used to avoid tree-hashing the state during the first call to
/// `per_slot_processing`.
pub state_root: Option<Hash256>,
}
pub enum StateAdvance<T: EthSpec> {
/// The cache does not contain the supplied block root.
BlockNotFound,
/// The cache contains the supplied block root but the state has already been advanced.
AlreadyAdvanced,
/// The cache contains the supplied block root and the state has not yet been advanced.
State {
state: Box<BeaconState<T>>,
state_root: Hash256,
block_slot: Slot,
},
}
/// The item stored in the `SnapshotCache`.
pub struct CacheItem<T: EthSpec> {
beacon_block: SignedBeaconBlock<T>,
beacon_block_root: Hash256,
/// This state is equivalent to `self.beacon_block.state_root()`.
beacon_state: BeaconState<T>,
/// This state is equivalent to `self.beacon_state` that has had `per_slot_processing` applied
/// to it. This state assists in optimizing block processing.
pre_state: Option<BeaconState<T>>,
}
impl<T: EthSpec> Into<BeaconSnapshot<T>> for CacheItem<T> {
fn into(self) -> BeaconSnapshot<T> {
BeaconSnapshot {
beacon_state: self.beacon_state,
beacon_block: self.beacon_block,
beacon_block_root: self.beacon_block_root,
}
}
}
/// Provides a cache of `BeaconSnapshot` that is intended primarily for block processing.
///
/// ## Cache Queuing
///
/// The cache has a non-standard queue mechanism (specifically, it is not LRU).
///
/// The cache has a max number of elements (`max_len`). Until `max_len` is achieved, all snapshots
/// are simply added to the queue. Once `max_len` is achieved, adding a new snapshot will cause an
/// existing snapshot to be ejected. The ejected snapshot will:
///
/// - Never be the `head_block_root`.
/// - Be the snapshot with the lowest `state.slot` (ties broken arbitrarily).
pub struct SnapshotCache<T: EthSpec> {
max_len: usize,
head_block_root: Hash256,
snapshots: Vec<CacheItem<T>>,
}
impl<T: EthSpec> SnapshotCache<T> {
/// Instantiate a new cache which contains the `head` snapshot.
///
/// Setting `max_len = 0` is equivalent to setting `max_len = 1`.
pub fn new(max_len: usize, head: BeaconSnapshot<T>) -> Self {
Self {
max_len: cmp::max(max_len, 1),
head_block_root: head.beacon_block_root,
snapshots: vec![CacheItem::new_without_pre_state(head)],
}
}
/// The block roots of all snapshots contained in `self`.
pub fn beacon_block_roots(&self) -> Vec<Hash256> {
self.snapshots.iter().map(|s| s.beacon_block_root).collect()
}
/// The number of snapshots contained in `self`.
pub fn len(&self) -> usize {
self.snapshots.len()
}
/// Insert a snapshot, potentially removing an existing snapshot if `self` is at capacity (see
/// struct-level documentation for more info).
pub fn insert(
&mut self,
snapshot: BeaconSnapshot<T>,
pre_state: Option<BeaconState<T>>,
spec: &ChainSpec,
) {
let parent_root = snapshot.beacon_block.message().parent_root();
let item = CacheItem {
beacon_block: snapshot.beacon_block,
beacon_block_root: snapshot.beacon_block_root,
beacon_state: snapshot.beacon_state,
pre_state,
};
// Remove the grandparent of the block that was just inserted.
//
// Assuming it's unlikely to see re-orgs deeper than one block, this method helps keep the
// cache small by removing any states that already have more than one descendant.
//
// Remove the grandparent first to free up room in the cache.
let grandparent_result =
process_results(item.beacon_state.rev_iter_block_roots(spec), |iter| {
iter.map(|(_slot, root)| root)
.find(|root| *root != item.beacon_block_root && *root != parent_root)
});
if let Ok(Some(grandparent_root)) = grandparent_result {
let head_block_root = self.head_block_root;
self.snapshots.retain(|snapshot| {
let root = snapshot.beacon_block_root;
root == head_block_root || root != grandparent_root
});
}
if self.snapshots.len() < self.max_len {
self.snapshots.push(item);
} else {
let insert_at = self
.snapshots
.iter()
.enumerate()
.filter_map(|(i, snapshot)| {
if snapshot.beacon_block_root != self.head_block_root {
Some((i, snapshot.beacon_state.slot()))
} else {
None
}
})
.min_by_key(|(_i, slot)| *slot)
.map(|(i, _slot)| i);
if let Some(i) = insert_at {
self.snapshots[i] = item;
}
}
}
/// If available, returns a `CacheItem` that should be used for importing/processing a block.
/// The method will remove the block from `self`, carrying across any caches that may or may not
/// be built.
///
/// In the event the block being processed was observed late, clone the cache instead of
/// moving it. This allows us to process the next block quickly in the case of a re-org.
/// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are
/// later than 1 slot still have access to the cache and can be processed quickly.
pub fn get_state_for_block_processing(
&mut self,
block_root: Hash256,
block_slot: Slot,
block_delay: Option<Duration>,
spec: &ChainSpec,
) -> Option<(PreProcessingSnapshot<T>, bool)> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
.map(|i| {
if let Some(cache) = self.snapshots.get(i) {
if block_slot > cache.beacon_block.slot() + 1 {
return (cache.clone_as_pre_state(), true);
}
if let Some(delay) = block_delay {
if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE
&& delay <= Duration::from_secs(spec.seconds_per_slot) * 4
{
return (cache.clone_as_pre_state(), true);
}
}
}
(self.snapshots.remove(i).into_pre_state(), false)
})
}
/// If available, obtains a clone of a `BeaconState` that should be used for block production.
/// The clone will use `CloneConfig:all()`, ensuring any tree-hash cache is cloned too.
///
/// ## Note
///
/// This method clones the `BeaconState` (instead of removing it) since we assume that any block
/// we produce will soon be pushed to the `BeaconChain` for importing/processing. Keeping a copy
/// of that `BeaconState` in `self` will greatly help with import times.
pub fn get_state_for_block_production(
&self,
block_root: Hash256,
) -> Option<BlockProductionPreState<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| {
if let Some(pre_state) = &snapshot.pre_state {
BlockProductionPreState {
pre_state: pre_state.clone_with(CloneConfig::all()),
state_root: None,
}
} else {
BlockProductionPreState {
pre_state: snapshot.beacon_state.clone_with(CloneConfig::all()),
state_root: Some(snapshot.beacon_block.state_root()),
}
}
})
}
/// If there is a snapshot with `block_root`, clone it and return the clone.
pub fn get_cloned(
&self,
block_root: Hash256,
clone_config: CloneConfig,
) -> Option<BeaconSnapshot<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| snapshot.clone_to_snapshot_with(clone_config))
}
pub fn get_for_state_advance(&mut self, block_root: Hash256) -> StateAdvance<T> {
if let Some(snapshot) = self
.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
{
if snapshot.pre_state.is_some() {
StateAdvance::AlreadyAdvanced
} else {
let cloned = snapshot
.beacon_state
.clone_with(CloneConfig::committee_caches_only());
StateAdvance::State {
state: Box::new(std::mem::replace(&mut snapshot.beacon_state, cloned)),
state_root: snapshot.beacon_block.state_root(),
block_slot: snapshot.beacon_block.slot(),
}
}
} else {
StateAdvance::BlockNotFound
}
}
pub fn update_pre_state(&mut self, block_root: Hash256, state: BeaconState<T>) -> Option<()> {
self.snapshots
.iter_mut()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| {
snapshot.pre_state = Some(state);
})
}
/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
pub fn prune(&mut self, finalized_epoch: Epoch) {
self.snapshots.retain(|snapshot| {
snapshot.beacon_state.slot() > finalized_epoch.start_slot(T::slots_per_epoch())
})
}
/// Inform the cache that the head of the beacon chain has changed.
///
/// The snapshot that matches this `head_block_root` will never be ejected from the cache
/// during `Self::insert`.
pub fn update_head(&mut self, head_block_root: Hash256) {
self.head_block_root = head_block_root
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test_utils::{BeaconChainHarness, EphemeralHarnessType};
use types::{
test_utils::generate_deterministic_keypair, BeaconBlock, Epoch, MainnetEthSpec,
SignedBeaconBlock, Slot,
};
fn get_harness() -> BeaconChainHarness<EphemeralHarnessType<MainnetEthSpec>> {
let harness = BeaconChainHarness::builder(MainnetEthSpec)
.default_spec()
.deterministic_keypairs(1)
.fresh_ephemeral_store()
.build();
harness.advance_slot();
harness
}
const CACHE_SIZE: usize = 4;
fn get_snapshot(i: u64) -> BeaconSnapshot<MainnetEthSpec> {
let spec = MainnetEthSpec::default_spec();
let beacon_state = get_harness().chain.head_beacon_state().unwrap();
let signed_beacon_block = SignedBeaconBlock::from_block(
BeaconBlock::empty(&spec),
generate_deterministic_keypair(0)
.sk
.sign(Hash256::from_low_u64_be(42)),
);
BeaconSnapshot {
beacon_state,
beacon_block: signed_beacon_block,
beacon_block_root: Hash256::from_low_u64_be(i),
}
}
#[test]
fn insert_get_prune_update() {
let spec = MainnetEthSpec::default_spec();
let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0));
// Insert a bunch of entries in the cache. It should look like this:
//
// Index Root
// 0 0 <--head
// 1 1
// 2 2
// 3 3
for i in 1..CACHE_SIZE as u64 {
let mut snapshot = get_snapshot(i);
// Each snapshot should be one slot into an epoch, with each snapshot one epoch apart.
*snapshot.beacon_state.slot_mut() =
Slot::from(i * MainnetEthSpec::slots_per_epoch() + 1);
cache.insert(snapshot, None, &spec);
assert_eq!(
cache.snapshots.len(),
i as usize + 1,
"cache length should be as expected"
);
assert_eq!(cache.head_block_root, Hash256::from_low_u64_be(0));
}
// Insert a new value in the cache. Afterwards it should look like:
//
// Index Root
// 0 0 <--head
// 1 42
// 2 2
// 3 3
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
cache.insert(get_snapshot(42), None, &spec);
assert_eq!(cache.snapshots.len(), CACHE_SIZE);
assert!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(1),
Slot::new(0),
None,
&spec
)
.is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
assert!(cache
.get_cloned(Hash256::from_low_u64_be(1), CloneConfig::none())
.is_none());
assert_eq!(
cache
.get_cloned(Hash256::from_low_u64_be(0), CloneConfig::none())
.expect("the head should still be in the cache")
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_cloned should get the correct snapshot"
);
assert_eq!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(0),
Slot::new(0),
None,
&spec
)
.expect("the head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_state_for_block_processing should get the correct snapshot"
);
assert_eq!(
cache.snapshots.len(),
CACHE_SIZE - 1,
"get_state_for_block_processing should shorten the cache"
);
// Prune the cache. Afterwards it should look like:
//
// Index Root
// 0 2
// 1 3
cache.prune(Epoch::new(2));
assert_eq!(cache.snapshots.len(), 2);
cache.update_head(Hash256::from_low_u64_be(2));
// Over-fill the cache so it needs to eject some old values on insert.
for i in 0..CACHE_SIZE as u64 {
cache.insert(get_snapshot(u64::max_value() - i), None, &spec);
}
// Ensure that the new head value was not removed from the cache.
assert_eq!(
cache
.get_state_for_block_processing(
Hash256::from_low_u64_be(2),
Slot::new(0),
None,
&spec
)
.expect("the new head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(2),
"get_state_for_block_processing should get the correct snapshot"
);
}
}

View File

@@ -15,9 +15,7 @@
//! 2. There's a possibility that the head block is never built upon, causing wasted CPU cycles.
use crate::validator_monitor::HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS;
use crate::{
beacon_chain::{ATTESTATION_CACHE_LOCK_TIMEOUT, BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT},
snapshot_cache::StateAdvance,
BeaconChain, BeaconChainError, BeaconChainTypes,
beacon_chain::ATTESTATION_CACHE_LOCK_TIMEOUT, BeaconChain, BeaconChainError, BeaconChainTypes,
};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
@@ -28,7 +26,7 @@ use std::sync::{
};
use task_executor::TaskExecutor;
use tokio::time::sleep;
use types::{AttestationShufflingId, EthSpec, Hash256, RelativeEpoch, Slot};
use types::{AttestationShufflingId, BeaconStateError, EthSpec, Hash256, RelativeEpoch, Slot};
/// If the head slot is more than `MAX_ADVANCE_DISTANCE` from the current slot, then don't perform
/// the state advancement.
@@ -40,6 +38,8 @@ const MAX_ADVANCE_DISTANCE: u64 = 4;
#[derive(Debug)]
enum Error {
BeaconChain(BeaconChainError),
BeaconState(BeaconStateError),
Store(store::Error),
HeadMissingFromSnapshotCache(Hash256),
MaxDistanceExceeded {
current_slot: Slot,
@@ -60,6 +60,18 @@ impl From<BeaconChainError> for Error {
}
}
impl From<BeaconStateError> for Error {
fn from(e: BeaconStateError) -> Self {
Self::BeaconState(e)
}
}
impl From<store::Error> for Error {
fn from(e: store::Error) -> Self {
Self::Store(e)
}
}
/// Provides a simple thread-safe lock to be used for task co-ordination. Practically equivalent to
/// `Mutex<()>`.
#[derive(Clone)]
@@ -166,11 +178,6 @@ async fn state_advance_timer<T: BeaconChainTypes>(
}
}
/// Reads the `snapshot_cache` from the `beacon_chain` and attempts to take a clone of the
/// `BeaconState` of the head block. If it obtains this clone, the state will be advanced a single
/// slot then placed back in the `snapshot_cache` to be used for block verification.
///
/// See the module-level documentation for rationale.
fn advance_head<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
log: &Logger,
@@ -200,46 +207,37 @@ fn advance_head<T: BeaconChainTypes>(
// majority of attestations.
beacon_chain.fork_choice()?;
let head_root = beacon_chain.head_info()?.block_root;
let head_info = beacon_chain.head_info()?;
let head_block_root = head_info.block_root;
let (head_slot, head_state_root, mut state) = match beacon_chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::SnapshotCacheLockTimeout)?
.get_for_state_advance(head_root)
{
StateAdvance::AlreadyAdvanced => {
return Err(Error::StateAlreadyAdvanced {
block_root: head_root,
})
}
StateAdvance::BlockNotFound => return Err(Error::HeadMissingFromSnapshotCache(head_root)),
StateAdvance::State {
state,
state_root,
block_slot,
} => (block_slot, state_root, *state),
};
let (head_state_root, mut state) = beacon_chain
.store
.get_advanced_state(head_block_root, current_slot, head_info.state_root)?
.ok_or(Error::HeadMissingFromSnapshotCache(head_block_root))?;
let initial_slot = state.slot();
let initial_epoch = state.current_epoch();
let state_root = if state.slot() == head_slot {
Some(head_state_root)
} else {
if state.slot() == current_slot {
return Err(Error::StateAlreadyAdvanced {
block_root: head_block_root,
});
} else if state.slot() + 1 != current_slot {
// Protect against advancing a state more than a single slot.
//
// Advancing more than one slot without storing the intermediate state would corrupt the
// database. Future works might store temporary, intermediate states inside this function.
return Err(Error::BadStateSlot {
_block_slot: head_slot,
// FIXME(sproul): wrong
_block_slot: state.slot(),
_state_slot: state.slot(),
});
};
}
let initial_slot = state.slot();
let initial_epoch = state.current_epoch();
// Advance the state a single slot.
if let Some(summary) = per_slot_processing(&mut state, state_root, &beacon_chain.spec)
.map_err(BeaconChainError::from)?
if let Some(summary) =
per_slot_processing(&mut state, Some(head_state_root), &beacon_chain.spec)
.map_err(BeaconChainError::from)?
{
// Expose Prometheus metrics.
if let Err(e) = summary.observe_metrics() {
@@ -273,7 +271,7 @@ fn advance_head<T: BeaconChainTypes>(
debug!(
log,
"Advanced head state one slot";
"head_root" => ?head_root,
"head_block_root" => ?head_block_root,
"state_slot" => state.slot(),
"current_slot" => current_slot,
);
@@ -292,14 +290,14 @@ fn advance_head<T: BeaconChainTypes>(
if initial_epoch < state.current_epoch() {
// Update the proposer cache.
//
// We supply the `head_root` as the decision block since the prior `if` statement guarantees
// We supply the `head_block_root` as the decision block since the prior `if` statement guarantees
// the head root is the latest block from the prior epoch.
beacon_chain
.beacon_proposer_cache
.lock()
.insert(
state.current_epoch(),
head_root,
head_block_root,
state
.get_beacon_proposer_indices(&beacon_chain.spec)
.map_err(BeaconChainError::from)?,
@@ -308,8 +306,9 @@ fn advance_head<T: BeaconChainTypes>(
.map_err(BeaconChainError::from)?;
// Update the attester cache.
let shuffling_id = AttestationShufflingId::new(head_root, &state, RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let shuffling_id =
AttestationShufflingId::new(head_block_root, &state, RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
let committee_cache = state
.committee_cache(RelativeEpoch::Next)
.map_err(BeaconChainError::from)?;
@@ -322,7 +321,7 @@ fn advance_head<T: BeaconChainTypes>(
debug!(
log,
"Primed proposer and attester caches";
"head_root" => ?head_root,
"head_block_root" => ?head_block_root,
"next_epoch_shuffling_root" => ?shuffling_id.shuffling_decision_block,
"state_epoch" => state.current_epoch(),
"current_epoch" => current_slot.epoch(T::EthSpec::slots_per_epoch()),
@@ -332,44 +331,19 @@ fn advance_head<T: BeaconChainTypes>(
// Apply the state to the attester cache, if the cache deems it interesting.
beacon_chain
.attester_cache
.maybe_cache_state(&state, head_root, &beacon_chain.spec)
.maybe_cache_state(&state, head_block_root, &beacon_chain.spec)
.map_err(BeaconChainError::from)?;
let final_slot = state.slot();
// Insert the advanced state back into the snapshot cache.
beacon_chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::SnapshotCacheLockTimeout)?
.update_pre_state(head_root, state)
.ok_or(Error::HeadMissingFromSnapshotCache(head_root))?;
// If we have moved into the next slot whilst processing the state then this function is going
// to become ineffective and likely become a hindrance as we're stealing the tree hash cache
// from the snapshot cache (which may force the next block to rebuild a new one).
//
// If this warning occurs very frequently on well-resourced machines then we should consider
// starting it earlier in the slot. Otherwise, it's a good indication that the machine is too
// slow/overloaded and will be useful information for the user.
let starting_slot = current_slot;
let current_slot = beacon_chain.slot()?;
if starting_slot < current_slot {
warn!(
log,
"State advance too slow";
"head_root" => %head_root,
"advanced_slot" => final_slot,
"current_slot" => current_slot,
"starting_slot" => starting_slot,
"msg" => "system resources may be overloaded",
);
}
// Write the advanced state to the database.
let advanced_state_root = state.update_tree_hash_cache()?;
beacon_chain.store.put_state(&advanced_state_root, &state)?;
debug!(
log,
"Completed state advance";
"head_root" => ?head_root,
"head_block_root" => ?head_block_root,
"advanced_slot" => final_slot,
"initial_slot" => initial_slot,
);

View File

@@ -529,6 +529,7 @@ where
.store
.load_hot_state(&state_hash.into(), StateRootStrategy::Accurate)
.unwrap()
.map(|(state, _)| state)
}
pub fn get_cold_state(&self, state_hash: BeaconStateHash) -> Option<BeaconState<E>> {

View File

@@ -5,7 +5,8 @@ use ssz_derive::{Decode, Encode};
use types::{EthSpec, MinimalEthSpec};
pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 5;
pub const DEFAULT_BLOCK_CACHE_SIZE: usize = 64;
pub const DEFAULT_STATE_CACHE_SIZE: usize = 128;
/// Database configuration parameters.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -14,6 +15,8 @@ pub struct StoreConfig {
pub slots_per_restore_point: u64,
/// Maximum number of blocks to store in the in-memory block cache.
pub block_cache_size: usize,
/// Maximum number of states to sore in the in-memory state cache.
pub state_cache_size: usize,
/// Whether to compact the database on initialization.
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
@@ -37,6 +40,7 @@ impl Default for StoreConfig {
// Safe default for tests, shouldn't ever be read by a CLI node.
slots_per_restore_point: MinimalEthSpec::slots_per_historical_root() as u64,
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
state_cache_size: DEFAULT_STATE_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
}

View File

@@ -14,6 +14,7 @@ use crate::metadata::{
SCHEMA_VERSION_KEY, SPLIT_KEY,
};
use crate::metrics;
use crate::state_cache::StateCache;
use crate::{
get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem,
StoreOp,
@@ -58,6 +59,8 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
pub hot_db: Hot,
/// LRU cache of deserialized blocks. Updated whenever a block is loaded.
block_cache: Mutex<LruCache<Hash256, SignedBeaconBlock<E>>>,
/// Cache of beacon states.
state_cache: Mutex<StateCache<E>>,
/// Chain spec.
pub(crate) spec: ChainSpec,
/// Logger.
@@ -123,6 +126,7 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
cold_db: MemoryStore::open(),
hot_db: MemoryStore::open(),
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
config,
spec,
log,
@@ -156,6 +160,7 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
cold_db: LevelDB::open(cold_path)?,
hot_db: LevelDB::open(hot_path)?,
block_cache: Mutex::new(LruCache::new(config.block_cache_size)),
state_cache: Mutex::new(StateCache::new(config.state_cache_size)),
config,
spec,
log,
@@ -233,6 +238,22 @@ impl<E: EthSpec> HotColdDB<E, LevelDB<E>, LevelDB<E>> {
}
impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold> {
pub fn update_finalized_state(
&self,
state_root: Hash256,
block_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
) -> Result<(), Error> {
self.state_cache
.lock()
.update_finalized_state(state_root, block_root, epoch, state)
}
pub fn state_cache_len(&self) -> usize {
self.state_cache.lock().len()
}
/// Store a block and update the LRU cache.
pub fn put_block(
&self,
@@ -362,16 +383,34 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// chain. This way we avoid returning a state that doesn't match `state_root`.
self.load_cold_state(state_root)
} else {
self.load_hot_state(state_root, StateRootStrategy::Accurate)
self.get_hot_state(state_root, StateRootStrategy::Accurate)
}
} else {
match self.load_hot_state(state_root, StateRootStrategy::Accurate)? {
match self.get_hot_state(state_root, StateRootStrategy::Accurate)? {
Some(state) => Ok(Some(state)),
None => self.load_cold_state(state_root),
}
}
}
/// Get a state with `latest_block_root == block_root` advanced through to at most `slot`.
///
/// The `state_root` argument is used to look up the block's un-advanced state in case of a
/// cache miss.
pub fn get_advanced_state(
&self,
block_root: Hash256,
slot: Slot,
state_root: Hash256,
) -> Result<Option<(Hash256, BeaconState<E>)>, Error> {
if let Some(cached) = self.state_cache.lock().get_by_block_root(block_root, slot) {
return Ok(Some(cached));
}
Ok(self
.get_hot_state(&state_root, StateRootStrategy::Accurate)?
.map(|state| (state_root, state)))
}
/// Fetch a state from the store, but don't compute all of the values when replaying blocks
/// upon that state (e.g., state roots). Additionally, only states from the hot store are
/// returned.
@@ -403,7 +442,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
.into())
} else {
self.load_hot_state(state_root, StateRootStrategy::Inconsistent)
self.get_hot_state(state_root, StateRootStrategy::Inconsistent)
}
}
@@ -496,7 +535,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// `StateRootStrategy` should be irrelevant here since we never replay blocks for an epoch
// boundary state in the hot DB.
let state = self
.load_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)?
.get_hot_state(&epoch_boundary_state_root, StateRootStrategy::Accurate)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(
epoch_boundary_state_root,
))?;
@@ -539,10 +578,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.store_hot_state(state_root, state, &mut key_value_batch)?;
}
StoreOp::PutStateSummary(state_root, summary) => {
key_value_batch.push(summary.as_kv_store_op(*state_root));
}
StoreOp::PutStateTemporaryFlag(state_root) => {
key_value_batch.push(TemporaryFlag.as_kv_store_op(*state_root));
}
@@ -588,8 +623,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
StoreOp::PutState(_, _) => (),
StoreOp::PutStateSummary(_, _) => (),
StoreOp::PutStateTemporaryFlag(_) => (),
StoreOp::DeleteStateTemporaryFlag(_) => (),
@@ -614,6 +647,19 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
// Put the state in the cache.
// FIXME(sproul): could optimise out the block root
let block_root = state.get_latest_block_root(*state_root);
if self
.state_cache
.lock()
.put_state(*state_root, block_root, state)?
{
// Already exists in database.
return Ok(());
}
// On the epoch boundary, store the full state.
if state.slot() % E::slots_per_epoch() == 0 {
trace!(
@@ -635,14 +681,43 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}
/// Load a post-finalization state from the hot database.
///
/// Will replay blocks from the nearest epoch boundary.
pub fn load_hot_state(
/// Get a post-finalization state from the database or store.
pub fn get_hot_state(
&self,
state_root: &Hash256,
state_root_strategy: StateRootStrategy,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
return Ok(Some(state));
}
warn!(
self.log,
"State cache missed";
"state_root" => ?state_root,
);
let state_from_disk = self.load_hot_state(state_root, state_root_strategy)?;
if let Some((state, block_root)) = state_from_disk {
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?;
Ok(Some(state))
} else {
Ok(None)
}
}
/// Load a post-finalization state from the hot database.
///
/// Will replay blocks from the nearest epoch boundary.
///
/// Return the `(state, latest_block_root)` if found.
pub fn load_hot_state(
&self,
state_root: &Hash256,
state_root_strategy: StateRootStrategy,
) -> Result<Option<(BeaconState<E>, Hash256)>, Error> {
metrics::inc_counter(&metrics::BEACON_STATE_HOT_GET_COUNT);
// If the state is marked as temporary, do not return it. It will become visible
@@ -678,7 +753,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)?
};
Ok(Some(state))
Ok(Some((state, latest_block_root)))
} else {
Ok(None)
}
@@ -1262,17 +1337,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
/// Advance the split point of the store, moving new finalized states to the freezer.
pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
store: Arc<HotColdDB<E, Hot, Cold>>,
frozen_head_root: Hash256,
frozen_head: &BeaconState<E>,
finalized_state_root: Hash256,
finalized_block_root: Hash256,
finalized_state: &BeaconState<E>,
) -> Result<(), Error> {
debug!(
store.log,
"Freezer migration started";
"slot" => frozen_head.slot()
"slot" => finalized_state.slot()
);
// 0. Check that the migration is sensible.
// The new frozen head must increase the current split slot, and lie on an epoch
// The new finalized state must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.split.read_recursive().slot;
let anchor_slot = store
@@ -1281,23 +1357,23 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
.as_ref()
.map(|a| a.anchor_slot);
if frozen_head.slot() < current_split_slot {
if finalized_state.slot() < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
current_split_slot,
proposed_split_slot: frozen_head.slot(),
proposed_split_slot: finalized_state.slot(),
}
.into());
}
if frozen_head.slot() % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot()).into());
if finalized_state.slot() % E::slots_per_epoch() != 0 {
return Err(HotColdDBError::FreezeSlotUnaligned(finalized_state.slot()).into());
}
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();
// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(&store, frozen_head);
// 1. Copy all of the states between the new finalized state and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(&store, finalized_state);
for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => {
slot >= &current_split_slot
@@ -1310,7 +1386,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();
if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root, &store.spec)?
let state: BeaconState<E> = store
.get_hot_state(&state_root, StateRootStrategy::Accurate)?
.ok_or(HotColdDBError::MissingStateToFreeze(state_root))?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
@@ -1367,8 +1444,8 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Before updating the in-memory split value, we flush it to disk first, so that should the
// OS process die at this point, we pick up from the right place after a restart.
let split = Split {
slot: frozen_head.slot(),
state_root: frozen_head_root,
slot: finalized_state.slot(),
state_root: finalized_state_root,
};
store.hot_db.put_sync(&SPLIT_KEY, &split)?;
@@ -1381,10 +1458,18 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the states from the hot database if we got this far.
store.do_atomically(hot_db_ops)?;
// Update the cache's view of the finalized state.
store.update_finalized_state(
finalized_state_root,
finalized_block_root,
finalized_state.slot().epoch(E::slots_per_epoch()),
finalized_state.clone(),
)?;
debug!(
store.log,
"Freezer migration complete";
"slot" => frozen_head.slot()
"slot" => finalized_state.slot()
);
Ok(())

View File

@@ -139,7 +139,6 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Box<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),

View File

@@ -1,3 +1,165 @@
use crate::Error;
use lru::LruCache;
use std::collections::{BTreeMap, HashMap, HashSet};
use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
#[derive(Debug)]
pub struct FinalizedState<E: EthSpec> {
state_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
}
/// Map from block_root -> slot -> state_root.
#[derive(Debug, Default)]
pub struct BlockMap {
blocks: HashMap<Hash256, SlotMap>,
}
/// Map from slot -> state_root.
#[derive(Debug, Default)]
pub struct SlotMap {
slots: BTreeMap<Slot, Hash256>,
}
#[derive(Debug)]
pub struct StateCache<E: EthSpec> {
finalized_state: Option<FinalizedState<E>>,
states: LruCache<Hash256, BeaconState<E>>,
block_map: BlockMap,
}
impl<E: EthSpec> StateCache<E> {
pub fn new(capacity: usize) -> Self {
StateCache {
finalized_state: None,
states: LruCache::new(capacity),
block_map: BlockMap::default(),
}
}
pub fn len(&self) -> usize {
self.states.len()
}
pub fn update_finalized_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
epoch: Epoch,
state: BeaconState<E>,
) -> Result<(), Error> {
if self
.finalized_state
.as_ref()
.map_or(false, |finalized_state| epoch < finalized_state.epoch)
{
// FIXME(sproul): panic
panic!("decreasing epoch");
}
let finalized_slot = epoch.start_slot(E::slots_per_epoch());
// Add to block map.
self.block_map
.insert(block_root, finalized_slot, state_root);
// Prune block map.
let state_roots_to_prune = self.block_map.prune(finalized_slot);
// Delete states.
for state_root in state_roots_to_prune {
self.states.pop(&state_root);
}
// Update finalized state.
self.finalized_state = Some(FinalizedState {
state_root,
epoch,
state,
});
Ok(())
}
/// Return a bool indicating whether the state already existed in the cache.
pub fn put_state(
&mut self,
state_root: Hash256,
block_root: Hash256,
state: &BeaconState<E>,
) -> Result<bool, Error> {
if self.states.peek(&state_root).is_some() {
return Ok(true);
}
// Insert the full state into the cache.
self.states.put(state_root, state.clone());
// Record the connection from block root and slot to this state.
let slot = state.slot();
self.block_map.insert(block_root, slot, state_root);
Ok(false)
}
pub fn get_by_state_root(&mut self, state_root: Hash256) -> Option<BeaconState<E>> {
if let Some(ref finalized_state) = self.finalized_state {
if state_root == finalized_state.state_root {
return Some(finalized_state.state.clone());
}
}
self.states.get(&state_root).cloned()
}
pub fn get_by_block_root(
&mut self,
block_root: Hash256,
slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
let slot_map = self.block_map.blocks.get(&block_root)?;
// Find the state at `slot`, or failing that the most recent ancestor.
let state_root = slot_map
.slots
.iter()
.rev()
.find_map(|(ancestor_slot, state_root)| {
(*ancestor_slot <= slot).then(|| *state_root)
})?;
let state = self.get_by_state_root(state_root)?;
Some((state_root, state))
}
}
impl BlockMap {
fn insert(&mut self, block_root: Hash256, slot: Slot, state_root: Hash256) {
let slot_map = self
.blocks
.entry(block_root)
.or_insert_with(SlotMap::default);
slot_map.slots.insert(slot, state_root);
}
fn prune(&mut self, finalized_slot: Slot) -> HashSet<Hash256> {
let mut pruned_states = HashSet::new();
self.blocks.retain(|_, slot_map| {
slot_map.slots.retain(|slot, state_root| {
let keep = *slot > finalized_slot;
if !keep {
pruned_states.insert(*state_root);
}
keep
});
!slot_map.slots.is_empty()
});
pruned_states
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -1518,17 +1518,16 @@ impl<T: EthSpec> BeaconState<T> {
/// Get the committee cache for some `slot`.
///
/// Return an error if the cache for the slot's epoch is not initialized.
fn committee_cache_at_slot(&self, slot: Slot) -> Result<&CommitteeCache, Error> {
fn committee_cache_at_slot(&self, slot: Slot) -> Result<&Arc<CommitteeCache>, Error> {
let epoch = slot.epoch(T::slots_per_epoch());
let relative_epoch = RelativeEpoch::from_epoch(self.current_epoch(), epoch)?;
self.committee_cache(relative_epoch)
}
/// Get the committee cache at a given index.
fn committee_cache_at_index(&self, index: usize) -> Result<&CommitteeCache, Error> {
fn committee_cache_at_index(&self, index: usize) -> Result<&Arc<CommitteeCache>, Error> {
self.committee_caches()
.get(index)
.map(Arc::as_ref)
.ok_or(Error::CommitteeCachesOutOfBounds(index))
}
@@ -1544,7 +1543,10 @@ impl<T: EthSpec> BeaconState<T> {
/// Returns the cache for some `RelativeEpoch`. Returns an error if the cache has not been
/// initialized.
pub fn committee_cache(&self, relative_epoch: RelativeEpoch) -> Result<&CommitteeCache, Error> {
pub fn committee_cache(
&self,
relative_epoch: RelativeEpoch,
) -> Result<&Arc<CommitteeCache>, Error> {
let i = Self::committee_cache_index(relative_epoch);
let cache = self.committee_cache_at_index(i)?;

View File

@@ -19,7 +19,7 @@ spec-minimal = []
# Support Gnosis spec and Gnosis Beacon Chain.
gnosis = []
# Use `milhouse` tree states.
tree-states = ["store/milhouse"]
tree-states = ["beacon_node/tree-states"]
[dependencies]
beacon_node = { "path" = "../beacon_node" }