mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-03 00:31:50 +00:00
Remove state lru cache (#8724)
N/A In https://github.com/sigp/lighthouse/pull/4801 , we added a state lru cache to avoid having too many states in memory which was a concern with 200mb+ states pre tree-states. With https://github.com/sigp/lighthouse/pull/5891 , we made the overflow cache a simpler in memory lru cache that can only hold 32 pending states at the most and doesn't flush anything to disk. As noted in #5891, we can always fetch older blocks which never became available over rpc if they become available later. Since we merged tree states, I don't think the state lru cache is relevant anymore. Instead of having the `DietAvailabilityPendingExecutedBlock` that stores only the state root, we can just store the full state in the `AvailabilityPendingExecutedBlock`. Given entries in the cache can span max 1 epoch (cache size is 32), the underlying `BeaconState` objects in the cache share most of their memory. The state_lru_cache is one level of indirection that doesn't give us any benefit. Please check me on this cc @dapplion Co-Authored-By: Pawan Dhananjay <pawandhananjay@gmail.com>
This commit is contained in:
@@ -279,7 +279,7 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct BlockImportData<E: EthSpec> {
|
||||
pub block_root: Hash256,
|
||||
pub state: BeaconState<E>,
|
||||
|
||||
@@ -1049,7 +1049,6 @@ where
|
||||
complete_blob_backfill,
|
||||
slot_clock,
|
||||
self.kzg.clone(),
|
||||
store,
|
||||
Arc::new(custody_context),
|
||||
self.spec,
|
||||
)
|
||||
|
||||
@@ -5,9 +5,7 @@ use crate::block_verification_types::{AvailabilityPendingExecutedBlock, Availabl
|
||||
use crate::data_availability_checker::overflow_lru_cache::{
|
||||
DataAvailabilityCheckerInner, ReconstructColumnsDecision,
|
||||
};
|
||||
use crate::{
|
||||
BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics,
|
||||
};
|
||||
use crate::{BeaconChain, BeaconChainTypes, BlockProcessStatus, CustodyContext, metrics};
|
||||
use educe::Educe;
|
||||
use kzg::Kzg;
|
||||
use slot_clock::SlotClock;
|
||||
@@ -27,7 +25,6 @@ use types::{
|
||||
|
||||
mod error;
|
||||
mod overflow_lru_cache;
|
||||
mod state_lru_cache;
|
||||
|
||||
use crate::data_availability_checker::error::Error;
|
||||
use crate::data_column_verification::{
|
||||
@@ -53,7 +50,6 @@ use types::new_non_zero_usize;
|
||||
/// `PendingComponents` are now never removed from the cache manually are only removed via LRU
|
||||
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
|
||||
const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
||||
const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
||||
|
||||
/// Cache to hold fully valid data that can't be imported to fork-choice yet. After Dencun hard-fork
|
||||
/// blocks have a sidecar of data that is received separately from the network. We call the concept
|
||||
@@ -122,13 +118,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
complete_blob_backfill: bool,
|
||||
slot_clock: T::SlotClock,
|
||||
kzg: Arc<Kzg>,
|
||||
store: BeaconStore<T>,
|
||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Result<Self, AvailabilityCheckError> {
|
||||
let inner = DataAvailabilityCheckerInner::new(
|
||||
OVERFLOW_LRU_CAPACITY_NON_ZERO,
|
||||
store,
|
||||
custody_context.clone(),
|
||||
spec.clone(),
|
||||
)?;
|
||||
@@ -469,7 +463,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
/// Collects metrics from the data availability checker.
|
||||
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
|
||||
DataAvailabilityCheckerMetrics {
|
||||
state_cache_size: self.availability_cache.state_cache_size(),
|
||||
block_cache_size: self.availability_cache.block_cache_size(),
|
||||
}
|
||||
}
|
||||
@@ -565,7 +558,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
||||
|
||||
/// Helper struct to group data availability checker metrics.
|
||||
pub struct DataAvailabilityCheckerMetrics {
|
||||
pub state_cache_size: usize,
|
||||
pub block_cache_size: usize,
|
||||
}
|
||||
|
||||
@@ -912,7 +904,6 @@ mod test {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use store::HotColdDB;
|
||||
use types::data::DataColumn;
|
||||
use types::{
|
||||
ChainSpec, ColumnIndex, DataColumnSidecarFulu, EthSpec, ForkName, MainnetEthSpec, Slot,
|
||||
@@ -1253,7 +1244,6 @@ mod test {
|
||||
spec.get_slot_duration(),
|
||||
);
|
||||
let kzg = get_kzg(&spec);
|
||||
let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap());
|
||||
let ordered_custody_column_indices = generate_data_column_indices_rand_order::<E>();
|
||||
let custody_context = Arc::new(CustodyContext::new(
|
||||
NodeCustodyType::Fullnode,
|
||||
@@ -1265,7 +1255,6 @@ mod test {
|
||||
complete_blob_backfill,
|
||||
slot_clock,
|
||||
kzg,
|
||||
store,
|
||||
custody_context,
|
||||
spec,
|
||||
)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
use super::AvailableBlockData;
|
||||
use super::state_lru_cache::{DietAvailabilityPendingExecutedBlock, StateLRUCache};
|
||||
use crate::CustodyContext;
|
||||
use crate::beacon_chain::BeaconStore;
|
||||
use crate::blob_verification::KzgVerifiedBlob;
|
||||
use crate::block_verification_types::{
|
||||
AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
|
||||
@@ -23,10 +21,9 @@ use types::{
|
||||
DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum CachedBlock<E: EthSpec> {
|
||||
PreExecution(Arc<SignedBeaconBlock<E>>, BlockImportSource),
|
||||
Executed(Box<DietAvailabilityPendingExecutedBlock<E>>),
|
||||
Executed(Box<AvailabilityPendingExecutedBlock<E>>),
|
||||
}
|
||||
|
||||
impl<E: EthSpec> CachedBlock<E> {
|
||||
@@ -43,7 +40,7 @@ impl<E: EthSpec> CachedBlock<E> {
|
||||
fn as_block(&self) -> &SignedBeaconBlock<E> {
|
||||
match self {
|
||||
CachedBlock::PreExecution(b, _) => b,
|
||||
CachedBlock::Executed(b) => b.as_block(),
|
||||
CachedBlock::Executed(b) => b.block.as_ref(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,14 +81,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
&self.verified_blobs
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn get_diet_block(&self) -> Option<&DietAvailabilityPendingExecutedBlock<E>> {
|
||||
self.block.as_ref().and_then(|block| match block {
|
||||
CachedBlock::Executed(block) => Some(block.as_ref()),
|
||||
_ => None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns an immutable reference to the cached data column.
|
||||
pub fn get_cached_data_column(
|
||||
&self,
|
||||
@@ -129,7 +118,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
}
|
||||
|
||||
/// Inserts an executed block into the cache.
|
||||
pub fn insert_executed_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
|
||||
pub fn insert_executed_block(&mut self, block: AvailabilityPendingExecutedBlock<E>) {
|
||||
self.block = Some(CachedBlock::Executed(Box::new(block)))
|
||||
}
|
||||
|
||||
@@ -201,7 +190,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
/// Inserts a new block and revalidates the existing blobs against it.
|
||||
///
|
||||
/// Blobs that don't match the new block's commitments are evicted.
|
||||
pub fn merge_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
|
||||
pub fn merge_block(&mut self, block: AvailabilityPendingExecutedBlock<E>) {
|
||||
self.insert_executed_block(block);
|
||||
let reinsert = self.get_cached_blobs_mut().take();
|
||||
self.merge_blobs(reinsert);
|
||||
@@ -209,21 +198,11 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
|
||||
/// Returns Some if the block has received all its required data for import. The return value
|
||||
/// must be persisted in the DB along with the block.
|
||||
///
|
||||
/// WARNING: This function can potentially take a lot of time if the state needs to be
|
||||
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
|
||||
pub fn make_available<R>(
|
||||
pub fn make_available(
|
||||
&self,
|
||||
spec: &Arc<ChainSpec>,
|
||||
num_expected_columns_opt: Option<usize>,
|
||||
recover: R,
|
||||
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError>
|
||||
where
|
||||
R: FnOnce(
|
||||
DietAvailabilityPendingExecutedBlock<E>,
|
||||
&Span,
|
||||
) -> Result<AvailabilityPendingExecutedBlock<E>, AvailabilityCheckError>,
|
||||
{
|
||||
) -> Result<Option<AvailableExecutedBlock<E>>, AvailabilityCheckError> {
|
||||
let Some(CachedBlock::Executed(block)) = &self.block else {
|
||||
// Block not available yet
|
||||
return Ok(None);
|
||||
@@ -266,7 +245,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
)));
|
||||
}
|
||||
Ordering::Equal => {
|
||||
let max_blobs = spec.max_blobs_per_block(block.epoch()) as usize;
|
||||
let max_blobs = spec.max_blobs_per_block(block.block.epoch()) as usize;
|
||||
let blobs_vec = self
|
||||
.verified_blobs
|
||||
.iter()
|
||||
@@ -311,11 +290,11 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
block,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
} = recover(*block.clone(), &self.span)?;
|
||||
} = block.as_ref();
|
||||
|
||||
let available_block = AvailableBlock {
|
||||
block_root: self.block_root,
|
||||
block,
|
||||
block: block.clone(),
|
||||
blob_data,
|
||||
blobs_available_timestamp,
|
||||
spec: spec.clone(),
|
||||
@@ -326,8 +305,8 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
});
|
||||
Ok(Some(AvailableExecutedBlock::new(
|
||||
available_block,
|
||||
import_data,
|
||||
payload_verification_outcome,
|
||||
import_data.clone(),
|
||||
payload_verification_outcome.clone(),
|
||||
)))
|
||||
}
|
||||
|
||||
@@ -399,9 +378,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
||||
pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
|
||||
/// Contains all the data we keep in memory, protected by an RwLock
|
||||
critical: RwLock<LruCache<Hash256, PendingComponents<T::EthSpec>>>,
|
||||
/// This cache holds a limited number of states in memory and reconstructs them
|
||||
/// from disk when necessary. This is necessary until we merge tree-states
|
||||
state_cache: StateLRUCache<T>,
|
||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||
spec: Arc<ChainSpec>,
|
||||
}
|
||||
@@ -418,13 +394,11 @@ pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
|
||||
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
pub fn new(
|
||||
capacity: NonZeroUsize,
|
||||
beacon_store: BeaconStore<T>,
|
||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||
spec: Arc<ChainSpec>,
|
||||
) -> Result<Self, AvailabilityCheckError> {
|
||||
Ok(Self {
|
||||
critical: RwLock::new(LruCache::new(capacity)),
|
||||
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
|
||||
custody_context,
|
||||
spec,
|
||||
})
|
||||
@@ -441,7 +415,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
BlockProcessStatus::NotValidated(b.clone(), *source)
|
||||
}
|
||||
CachedBlock::Executed(b) => {
|
||||
BlockProcessStatus::ExecutionValidated(b.block_cloned())
|
||||
BlockProcessStatus::ExecutionValidated(b.block.clone())
|
||||
}
|
||||
})
|
||||
})
|
||||
@@ -580,11 +554,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>,
|
||||
num_expected_columns_opt: Option<usize>,
|
||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||
if let Some(available_block) = pending_components.make_available(
|
||||
&self.spec,
|
||||
num_expected_columns_opt,
|
||||
|block, span| self.state_cache.recover_pending_executed_block(block, span),
|
||||
)? {
|
||||
if let Some(available_block) =
|
||||
pending_components.make_available(&self.spec, num_expected_columns_opt)?
|
||||
{
|
||||
// Explicitly drop read lock before acquiring write lock
|
||||
drop(pending_components);
|
||||
if let Some(components) = self.critical.write().get_mut(&block_root) {
|
||||
@@ -739,14 +711,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
let epoch = executed_block.as_block().epoch();
|
||||
let block_root = executed_block.import_data.block_root;
|
||||
|
||||
// register the block to get the diet block
|
||||
let diet_executed_block = self
|
||||
.state_cache
|
||||
.register_pending_executed_block(executed_block);
|
||||
|
||||
let pending_components =
|
||||
self.update_or_insert_pending_components(block_root, epoch, |pending_components| {
|
||||
pending_components.merge_block(diet_executed_block);
|
||||
pending_components.merge_block(executed_block);
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
@@ -780,9 +747,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
|
||||
/// maintain the cache
|
||||
pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
|
||||
// clean up any lingering states in the state cache
|
||||
self.state_cache.do_maintenance(cutoff_epoch);
|
||||
|
||||
// Collect keys of pending blocks from a previous epoch to cutoff
|
||||
let mut write_lock = self.critical.write();
|
||||
let mut keys_to_remove = vec![];
|
||||
@@ -801,17 +765,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// get the state cache for inspection (used only for tests)
|
||||
pub fn state_lru_cache(&self) -> &StateLRUCache<T> {
|
||||
&self.state_cache
|
||||
}
|
||||
|
||||
/// Number of states stored in memory in the cache.
|
||||
pub fn state_cache_size(&self) -> usize {
|
||||
self.state_cache.lru_cache().read().len()
|
||||
}
|
||||
|
||||
/// Number of pending component entries in memory in the cache.
|
||||
pub fn block_cache_size(&self) -> usize {
|
||||
self.critical.read().len()
|
||||
@@ -828,21 +781,18 @@ mod test {
|
||||
block_verification::PayloadVerificationOutcome,
|
||||
block_verification_types::{AsBlock, BlockImportData},
|
||||
custody_context::NodeCustodyType,
|
||||
data_availability_checker::STATE_LRU_CAPACITY_NON_ZERO,
|
||||
test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType},
|
||||
};
|
||||
use fork_choice::PayloadVerificationStatus;
|
||||
use logging::create_test_tracing_subscriber;
|
||||
use state_processing::ConsensusContext;
|
||||
use std::collections::VecDeque;
|
||||
use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend};
|
||||
use tempfile::{TempDir, tempdir};
|
||||
use tracing::{debug_span, info};
|
||||
use tracing::info;
|
||||
use types::new_non_zero_usize;
|
||||
use types::{ExecPayload, MinimalEthSpec};
|
||||
|
||||
const LOW_VALIDATOR_COUNT: usize = 32;
|
||||
const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
|
||||
|
||||
fn get_store_with_spec<E: EthSpec>(
|
||||
db_path: &TempDir,
|
||||
@@ -1021,7 +971,6 @@ mod test {
|
||||
let chain_db_path = tempdir().expect("should get temp dir");
|
||||
let harness = get_deneb_chain(&chain_db_path).await;
|
||||
let spec = harness.spec.clone();
|
||||
let test_store = harness.chain.store.clone();
|
||||
let capacity_non_zero = new_non_zero_usize(capacity);
|
||||
let custody_context = Arc::new(CustodyContext::new(
|
||||
NodeCustodyType::Fullnode,
|
||||
@@ -1031,7 +980,6 @@ mod test {
|
||||
let cache = Arc::new(
|
||||
DataAvailabilityCheckerInner::<T>::new(
|
||||
capacity_non_zero,
|
||||
test_store,
|
||||
custody_context,
|
||||
spec.clone(),
|
||||
)
|
||||
@@ -1137,121 +1085,6 @@ mod test {
|
||||
"cache should still have available block"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// ensure the state cache keeps memory usage low and that it can properly recover states
|
||||
// THIS TEST CAN BE DELETED ONCE TREE STATES IS MERGED AND WE RIP OUT THE STATE CACHE
|
||||
async fn overflow_cache_test_state_cache() {
|
||||
type E = MinimalEthSpec;
|
||||
type T = DiskHarnessType<E>;
|
||||
let capacity = STATE_LRU_CAPACITY * 2;
|
||||
let (harness, cache, _path) = setup_harness_and_cache::<E, T>(capacity).await;
|
||||
|
||||
let mut pending_blocks = VecDeque::new();
|
||||
let mut states = Vec::new();
|
||||
let mut state_roots = Vec::new();
|
||||
// Get enough blocks to fill the cache to capacity, ensuring all blocks have blobs
|
||||
while pending_blocks.len() < capacity {
|
||||
let (mut pending_block, _) = availability_pending_block(&harness).await;
|
||||
if pending_block.num_blobs_expected() == 0 {
|
||||
// we need blocks with blobs
|
||||
continue;
|
||||
}
|
||||
let state_root = pending_block.import_data.state.canonical_root().unwrap();
|
||||
states.push(pending_block.import_data.state.clone());
|
||||
pending_blocks.push_back(pending_block);
|
||||
state_roots.push(state_root);
|
||||
}
|
||||
|
||||
let state_cache = cache.state_lru_cache().lru_cache();
|
||||
let mut pushed_diet_blocks = VecDeque::new();
|
||||
|
||||
for i in 0..capacity {
|
||||
let pending_block = pending_blocks.pop_front().expect("should have block");
|
||||
let block_root = pending_block.as_block().canonical_root();
|
||||
|
||||
assert_eq!(
|
||||
state_cache.read().len(),
|
||||
std::cmp::min(i, STATE_LRU_CAPACITY),
|
||||
"state cache should be empty at start"
|
||||
);
|
||||
|
||||
if i >= STATE_LRU_CAPACITY {
|
||||
let lru_root = state_roots[i - STATE_LRU_CAPACITY];
|
||||
assert_eq!(
|
||||
state_cache.read().peek_lru().map(|(root, _)| root),
|
||||
Some(&lru_root),
|
||||
"lru block should be in cache"
|
||||
);
|
||||
}
|
||||
|
||||
// put the block in the cache
|
||||
let availability = cache
|
||||
.put_executed_block(pending_block)
|
||||
.expect("should put block");
|
||||
|
||||
// grab the diet block from the cache for later testing
|
||||
let diet_block = cache
|
||||
.critical
|
||||
.read()
|
||||
.peek(&block_root)
|
||||
.and_then(|pending_components| pending_components.get_diet_block().cloned())
|
||||
.expect("should exist");
|
||||
pushed_diet_blocks.push_back(diet_block);
|
||||
|
||||
// should be unavailable since we made sure all blocks had blobs
|
||||
assert!(
|
||||
matches!(availability, Availability::MissingComponents(_)),
|
||||
"should be pending blobs"
|
||||
);
|
||||
|
||||
if i >= STATE_LRU_CAPACITY {
|
||||
let evicted_index = i - STATE_LRU_CAPACITY;
|
||||
let evicted_root = state_roots[evicted_index];
|
||||
assert!(
|
||||
state_cache.read().peek(&evicted_root).is_none(),
|
||||
"lru root should be evicted"
|
||||
);
|
||||
// get the diet block via direct conversion (testing only)
|
||||
let diet_block = pushed_diet_blocks.pop_front().expect("should have block");
|
||||
// reconstruct the pending block by replaying the block on the parent state
|
||||
let recovered_pending_block = cache
|
||||
.state_lru_cache()
|
||||
.recover_pending_executed_block(diet_block, &debug_span!("test"))
|
||||
.expect("should reconstruct pending block");
|
||||
|
||||
// assert the recovered state is the same as the original
|
||||
assert_eq!(
|
||||
recovered_pending_block.import_data.state, states[evicted_index],
|
||||
"recovered state should be the same as the original"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// now check the last block
|
||||
let last_block = pushed_diet_blocks.pop_back().expect("should exist").clone();
|
||||
// the state should still be in the cache
|
||||
assert!(
|
||||
state_cache
|
||||
.read()
|
||||
.peek(&last_block.as_block().state_root())
|
||||
.is_some(),
|
||||
"last block state should still be in cache"
|
||||
);
|
||||
// get the diet block via direct conversion (testing only)
|
||||
let diet_block = last_block.clone();
|
||||
// recover the pending block from the cache
|
||||
let recovered_pending_block = cache
|
||||
.state_lru_cache()
|
||||
.recover_pending_executed_block(diet_block, &debug_span!("test"))
|
||||
.expect("should reconstruct pending block");
|
||||
// assert the recovered state is the same as the original
|
||||
assert_eq!(
|
||||
Some(&recovered_pending_block.import_data.state),
|
||||
states.last(),
|
||||
"recovered state should be the same as the original"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -1307,7 +1140,7 @@ mod pending_components_tests {
|
||||
}
|
||||
|
||||
type PendingComponentsSetup<E> = (
|
||||
DietAvailabilityPendingExecutedBlock<E>,
|
||||
AvailabilityPendingExecutedBlock<E>,
|
||||
RuntimeFixedVector<Option<KzgVerifiedBlob<E>>>,
|
||||
RuntimeFixedVector<Option<KzgVerifiedBlob<E>>>,
|
||||
);
|
||||
@@ -1351,7 +1184,7 @@ mod pending_components_tests {
|
||||
is_valid_merge_transition_block: false,
|
||||
},
|
||||
};
|
||||
(block.into(), blobs, invalid_blobs)
|
||||
(block, blobs, invalid_blobs)
|
||||
}
|
||||
|
||||
pub fn assert_cache_consistent(cache: PendingComponents<E>, max_len: usize) {
|
||||
|
||||
@@ -1,215 +0,0 @@
|
||||
use crate::block_verification_types::AsBlock;
|
||||
use crate::{
|
||||
AvailabilityPendingExecutedBlock, BeaconChainTypes, BeaconStore, PayloadVerificationOutcome,
|
||||
block_verification_types::BlockImportData,
|
||||
data_availability_checker::{AvailabilityCheckError, STATE_LRU_CAPACITY_NON_ZERO},
|
||||
};
|
||||
use lru::LruCache;
|
||||
use parking_lot::RwLock;
|
||||
use state_processing::BlockReplayer;
|
||||
use std::sync::Arc;
|
||||
use store::OnDiskConsensusContext;
|
||||
use tracing::{Span, debug_span, instrument};
|
||||
use types::{BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
|
||||
|
||||
/// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except
|
||||
/// that it is much smaller because it contains only a state root instead of
|
||||
/// a full `BeaconState`.
|
||||
#[derive(Clone)]
|
||||
pub struct DietAvailabilityPendingExecutedBlock<E: EthSpec> {
|
||||
block: Arc<SignedBeaconBlock<E>>,
|
||||
state_root: Hash256,
|
||||
parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
|
||||
consensus_context: OnDiskConsensusContext<E>,
|
||||
payload_verification_outcome: PayloadVerificationOutcome,
|
||||
}
|
||||
|
||||
/// just implementing the same methods as `AvailabilityPendingExecutedBlock`
|
||||
impl<E: EthSpec> DietAvailabilityPendingExecutedBlock<E> {
|
||||
pub fn as_block(&self) -> &SignedBeaconBlock<E> {
|
||||
&self.block
|
||||
}
|
||||
|
||||
pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
|
||||
self.block.clone()
|
||||
}
|
||||
|
||||
pub fn num_blobs_expected(&self) -> usize {
|
||||
self.block
|
||||
.message()
|
||||
.body()
|
||||
.blob_kzg_commitments()
|
||||
.map_or(0, |commitments| commitments.len())
|
||||
}
|
||||
|
||||
/// Returns the epoch corresponding to `self.slot()`.
|
||||
pub fn epoch(&self) -> Epoch {
|
||||
self.block.slot().epoch(E::slots_per_epoch())
|
||||
}
|
||||
}
|
||||
|
||||
/// This LRU cache holds BeaconStates used for block import. If the cache overflows,
|
||||
/// the least recently used state will be dropped. If the dropped state is needed
|
||||
/// later on, it will be recovered from the parent state and replaying the block.
|
||||
///
|
||||
/// WARNING: This cache assumes the parent block of any `AvailabilityPendingExecutedBlock`
|
||||
/// has already been imported into ForkChoice. If this is not the case, the cache
|
||||
/// will fail to recover the state when the cache overflows because it can't load
|
||||
/// the parent state!
|
||||
pub struct StateLRUCache<T: BeaconChainTypes> {
|
||||
states: RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>>,
|
||||
store: BeaconStore<T>,
|
||||
spec: Arc<ChainSpec>,
|
||||
}
|
||||
|
||||
impl<T: BeaconChainTypes> StateLRUCache<T> {
|
||||
pub fn new(store: BeaconStore<T>, spec: Arc<ChainSpec>) -> Self {
|
||||
Self {
|
||||
states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY_NON_ZERO)),
|
||||
store,
|
||||
spec,
|
||||
}
|
||||
}
|
||||
|
||||
/// This will store the state in the LRU cache and return a
|
||||
/// `DietAvailabilityPendingExecutedBlock` which is much cheaper to
|
||||
/// keep around in memory.
|
||||
pub fn register_pending_executed_block(
|
||||
&self,
|
||||
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
|
||||
) -> DietAvailabilityPendingExecutedBlock<T::EthSpec> {
|
||||
let state = executed_block.import_data.state;
|
||||
let state_root = executed_block.block.state_root();
|
||||
self.states.write().put(state_root, state);
|
||||
|
||||
DietAvailabilityPendingExecutedBlock {
|
||||
block: executed_block.block,
|
||||
state_root,
|
||||
parent_block: executed_block.import_data.parent_block,
|
||||
consensus_context: OnDiskConsensusContext::from_consensus_context(
|
||||
executed_block.import_data.consensus_context,
|
||||
),
|
||||
payload_verification_outcome: executed_block.payload_verification_outcome,
|
||||
}
|
||||
}
|
||||
|
||||
/// Recover the `AvailabilityPendingExecutedBlock` from the diet version.
|
||||
/// This method will first check the cache and if the state is not found
|
||||
/// it will reconstruct the state by loading the parent state from disk and
|
||||
/// replaying the block.
|
||||
#[instrument(skip_all, parent = _span, level = "debug")]
|
||||
pub fn recover_pending_executed_block(
|
||||
&self,
|
||||
diet_executed_block: DietAvailabilityPendingExecutedBlock<T::EthSpec>,
|
||||
_span: &Span,
|
||||
) -> Result<AvailabilityPendingExecutedBlock<T::EthSpec>, AvailabilityCheckError> {
|
||||
// Keep the state in the cache to prevent reconstruction in race conditions
|
||||
let state = if let Some(state) = self.states.write().get(&diet_executed_block.state_root) {
|
||||
state.clone()
|
||||
} else {
|
||||
self.reconstruct_state(&diet_executed_block)?
|
||||
};
|
||||
let block_root = diet_executed_block.block.canonical_root();
|
||||
Ok(AvailabilityPendingExecutedBlock {
|
||||
block: diet_executed_block.block,
|
||||
import_data: BlockImportData {
|
||||
block_root,
|
||||
state,
|
||||
parent_block: diet_executed_block.parent_block,
|
||||
consensus_context: diet_executed_block
|
||||
.consensus_context
|
||||
.into_consensus_context(),
|
||||
},
|
||||
payload_verification_outcome: diet_executed_block.payload_verification_outcome,
|
||||
})
|
||||
}
|
||||
|
||||
/// Reconstruct the state by loading the parent state from disk and replaying
|
||||
/// the block.
|
||||
#[instrument(skip_all, level = "debug")]
|
||||
fn reconstruct_state(
|
||||
&self,
|
||||
diet_executed_block: &DietAvailabilityPendingExecutedBlock<T::EthSpec>,
|
||||
) -> Result<BeaconState<T::EthSpec>, AvailabilityCheckError> {
|
||||
let parent_block_root = diet_executed_block.parent_block.canonical_root();
|
||||
let parent_block_state_root = diet_executed_block.parent_block.state_root();
|
||||
let (parent_state_root, parent_state) = self
|
||||
.store
|
||||
.get_advanced_hot_state(
|
||||
parent_block_root,
|
||||
diet_executed_block.parent_block.slot(),
|
||||
parent_block_state_root,
|
||||
)
|
||||
.map_err(AvailabilityCheckError::StoreError)?
|
||||
.ok_or(AvailabilityCheckError::ParentStateMissing(
|
||||
parent_block_state_root,
|
||||
))?;
|
||||
|
||||
let state_roots = vec![
|
||||
Ok((parent_state_root, diet_executed_block.parent_block.slot())),
|
||||
Ok((
|
||||
diet_executed_block.state_root,
|
||||
diet_executed_block.block.slot(),
|
||||
)),
|
||||
];
|
||||
|
||||
let block_replayer: BlockReplayer<'_, T::EthSpec, AvailabilityCheckError, _> =
|
||||
BlockReplayer::new(parent_state, &self.spec)
|
||||
.no_signature_verification()
|
||||
.state_root_iter(state_roots.into_iter())
|
||||
.minimal_block_root_verification();
|
||||
|
||||
let block_replayer = debug_span!("reconstruct_state_apply_blocks").in_scope(|| {
|
||||
block_replayer.apply_blocks(vec![diet_executed_block.block.clone_as_blinded()], None)
|
||||
});
|
||||
|
||||
block_replayer
|
||||
.map(|block_replayer| block_replayer.into_state())
|
||||
.and_then(|mut state| {
|
||||
state
|
||||
.build_exit_cache(&self.spec)
|
||||
.map_err(AvailabilityCheckError::RebuildingStateCaches)?;
|
||||
state
|
||||
.update_tree_hash_cache()
|
||||
.map_err(AvailabilityCheckError::RebuildingStateCaches)?;
|
||||
Ok(state)
|
||||
})
|
||||
}
|
||||
|
||||
/// returns the state cache for inspection
|
||||
pub fn lru_cache(&self) -> &RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>> {
|
||||
&self.states
|
||||
}
|
||||
|
||||
/// remove any states from the cache from before the given epoch
|
||||
pub fn do_maintenance(&self, cutoff_epoch: Epoch) {
|
||||
let mut write_lock = self.states.write();
|
||||
while let Some((_, state)) = write_lock.peek_lru() {
|
||||
if state.slot().epoch(T::EthSpec::slots_per_epoch()) < cutoff_epoch {
|
||||
write_lock.pop_lru();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This can only be used during testing. The intended way to
|
||||
/// obtain a `DietAvailabilityPendingExecutedBlock` is to call
|
||||
/// `register_pending_executed_block` on the `StateLRUCache`.
|
||||
#[cfg(test)]
|
||||
impl<E: EthSpec> From<AvailabilityPendingExecutedBlock<E>>
|
||||
for DietAvailabilityPendingExecutedBlock<E>
|
||||
{
|
||||
fn from(mut value: AvailabilityPendingExecutedBlock<E>) -> Self {
|
||||
Self {
|
||||
block: value.block,
|
||||
state_root: value.import_data.state.canonical_root().unwrap(),
|
||||
parent_block: value.import_data.parent_block,
|
||||
consensus_context: OnDiskConsensusContext::from_consensus_context(
|
||||
value.import_data.consensus_context,
|
||||
),
|
||||
payload_verification_outcome: value.payload_verification_outcome,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1869,13 +1869,6 @@ pub static DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE: LazyLock<Result<I
|
||||
"Number of entries in the data availability overflow block memory cache.",
|
||||
)
|
||||
});
|
||||
pub static DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE: LazyLock<Result<IntGauge>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_int_gauge(
|
||||
"data_availability_overflow_memory_state_cache_size",
|
||||
"Number of entries in the data availability overflow state memory cache.",
|
||||
)
|
||||
});
|
||||
pub static DATA_AVAILABILITY_RECONSTRUCTION_TIME: LazyLock<Result<Histogram>> =
|
||||
LazyLock::new(|| {
|
||||
try_create_histogram(
|
||||
@@ -1983,10 +1976,6 @@ pub fn scrape_for_metrics<T: BeaconChainTypes>(beacon_chain: &BeaconChain<T>) {
|
||||
&DATA_AVAILABILITY_OVERFLOW_MEMORY_BLOCK_CACHE_SIZE,
|
||||
da_checker_metrics.block_cache_size,
|
||||
);
|
||||
set_gauge_by_usize(
|
||||
&DATA_AVAILABILITY_OVERFLOW_MEMORY_STATE_CACHE_SIZE,
|
||||
da_checker_metrics.state_cache_size,
|
||||
);
|
||||
|
||||
if let Some((size, num_lookups)) = beacon_chain.pre_finalization_block_cache.metrics() {
|
||||
set_gauge_by_usize(&PRE_FINALIZATION_BLOCK_CACHE_SIZE, size);
|
||||
|
||||
@@ -222,7 +222,6 @@ pub fn test_da_checker<E: EthSpec>(
|
||||
Duration::from_secs(spec.seconds_per_slot),
|
||||
);
|
||||
let kzg = get_kzg(&spec);
|
||||
let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap());
|
||||
let ordered_custody_column_indices = generate_data_column_indices_rand_order::<E>();
|
||||
let custody_context = Arc::new(CustodyContext::new(
|
||||
node_custody_type,
|
||||
@@ -234,7 +233,6 @@ pub fn test_da_checker<E: EthSpec>(
|
||||
complete_blob_backfill,
|
||||
slot_clock,
|
||||
kzg,
|
||||
store,
|
||||
custody_context,
|
||||
spec,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user