diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index db15214318..ecc55b19a9 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -996,7 +996,6 @@ where complete_blob_backfill, slot_clock.clone(), self.kzg.clone(), - store.clone(), custody_context.clone(), self.spec.clone(), ) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index af3cb72c03..881cbe8569 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -19,6 +19,7 @@ pub enum Error { StoreError(store::Error), DecodeError(ssz::DecodeError), ParentStateMissing(Hash256), + StateMissing(Hash256), BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, @@ -43,6 +44,7 @@ impl Error { | Error::DecodeError(_) | Error::Unexpected(_) | Error::ParentStateMissing(_) + | Error::StateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) | Error::SlotClockError diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs index 098bdc7916..187cfc54b0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2.rs @@ -7,7 +7,7 @@ use crate::data_availability_router::DataColumnCache; use crate::payload_verification_types::{ AvailabilityPendingExecutedPayload, AvailableExecutedPayload, PayloadProcessStatus, }; -use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics}; +use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics}; use educe::Educe; use kzg::Kzg; use slot_clock::SlotClock; @@ -25,7 +25,6 @@ use types::{ }; mod overflow_lru_cache; -mod state_lru_cache; use crate::data_column_verification::{ GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, @@ -46,7 +45,6 @@ use types::new_non_zero_usize; /// `PendingComponents` are now never removed from the cache manually are only removed via LRU /// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time. const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); -const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); /// Cache to hold fully valid data that can't be imported to fork-choice yet. After the Gloas hard-fork /// beacon blocks can be immediately imported into fork choice. The execution payload is now separated out from @@ -314,13 +312,11 @@ impl DataAvailabilityChecker { complete_blob_backfill: bool, slot_clock: T::SlotClock, kzg: Arc, - store: BeaconStore, custody_context: Arc>, spec: Arc, ) -> Result { let inner = DataAvailabilityCheckerInner::new( OVERFLOW_LRU_CAPACITY_NON_ZERO, - store, custody_context.clone(), spec.clone(), )?; @@ -449,7 +445,6 @@ impl DataAvailabilityChecker { /// Collects metrics from the data availability checker. pub fn metrics(&self) -> DataAvailabilityCheckerMetrics { DataAvailabilityCheckerMetrics { - state_cache_size: self.availability_cache.state_cache_size(), payload_cache_size: self.availability_cache.payload_cache_size(), } } @@ -457,7 +452,6 @@ impl DataAvailabilityChecker { /// Helper struct to group data availability checker metrics. pub struct DataAvailabilityCheckerMetrics { - pub state_cache_size: usize, pub payload_cache_size: usize, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs index 258bc10875..fe5c0860f0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker_v2/overflow_lru_cache.rs @@ -1,7 +1,5 @@ -use super::state_lru_cache::{DietAvailabilityPendingExecutedPayload, StateLRUCache}; use crate::BeaconChainTypes; use crate::CustodyContext; -use crate::beacon_chain::BeaconStore; use crate::data_availability_checker::AvailabilityCheckError; use crate::data_availability_checker_v2::{Availability, AvailablePayload, AvailablePayloadData}; use crate::data_column_verification::KzgVerifiedCustodyDataColumn; @@ -24,7 +22,7 @@ use types::{ #[derive(Clone)] pub enum CachedPayload { PreExecution(Arc>, BlockImportSource), - Executed(Box>), + Executed(Box>), } #[allow(dead_code)] @@ -37,13 +35,20 @@ impl CachedPayload { fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { match self { CachedPayload::PreExecution(p, _) => p, - CachedPayload::Executed(p) => p.as_payload(), + CachedPayload::Executed(p) => &p.payload, } } pub fn num_blobs_expected(&self) -> usize { self.as_payload().message.blob_kzg_commitments.len() } + + pub fn payload_cloned(&self) -> Arc> { + match self { + CachedPayload::PreExecution(p, _) => p.clone(), + CachedPayload::Executed(p) => p.payload.clone(), + } + } } /// This represents the components of a partially available payload @@ -61,14 +66,6 @@ pub struct PendingComponents { } impl PendingComponents { - #[cfg(test)] - fn get_diet_payload(&self) -> Option<&DietAvailabilityPendingExecutedPayload> { - self.payload.as_ref().and_then(|payload| match payload { - CachedPayload::Executed(payload) => Some(payload.as_ref()), - _ => None, - }) - } - /// Returns an immutable reference to the cached data column. pub fn get_cached_data_column( &self, @@ -89,7 +86,7 @@ impl PendingComponents { } /// Inserts an executed payload into the cache. - pub fn insert_executed_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload) { + pub fn insert_executed_payload(&mut self, payload: AvailabilityPendingExecutedPayload) { self.payload = Some(CachedPayload::Executed(Box::new(payload))) } @@ -120,33 +117,23 @@ impl PendingComponents { } /// Inserts a new payload. - pub fn merge_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload) { + pub fn merge_payload(&mut self, payload: AvailabilityPendingExecutedPayload) { self.insert_executed_payload(payload); } /// Returns Some if the payload has received all its required data for import. The return value /// must be persisted in the DB along with the payload. - /// - /// WARNING: This function can potentially take a lot of time if the state needs to be - /// reconstructed from disk. Ensure you are not holding any write locks while calling this. - pub fn make_available( + pub fn make_available( &self, spec: &Arc, num_expected_columns: usize, - recover: R, - ) -> Result>, AvailabilityCheckError> - where - R: FnOnce( - DietAvailabilityPendingExecutedPayload, - &Span, - ) -> Result, AvailabilityCheckError>, - { - let Some(CachedPayload::Executed(payload)) = &self.payload else { + ) -> Result>, AvailabilityCheckError> { + let Some(CachedPayload::Executed(executed_payload)) = &self.payload else { // Payload not available yet return Ok(None); }; - let num_expected_blobs = payload.num_blobs_expected(); + let num_expected_blobs = executed_payload.num_blobs_expected(); let column_data = if num_expected_blobs == 0 { Some(AvailablePayloadData::NoData) } else { @@ -198,7 +185,7 @@ impl PendingComponents { payload, import_data, payload_verification_outcome, - } = recover(*payload.clone(), &self.span)?; + } = executed_payload.as_ref().clone(); let available_payload = AvailablePayload { block_root: payload.message.beacon_block_root, @@ -272,9 +259,6 @@ impl PendingComponents { pub struct DataAvailabilityCheckerInner { /// Contains all the data we keep in memory, protected by an RwLock critical: RwLock>>, - /// This cache holds a limited number of states in memory and reconstructs them - /// from disk when necessary. This is necessary until we merge tree-states - state_cache: StateLRUCache, custody_context: Arc>, spec: Arc, } @@ -291,13 +275,11 @@ pub(crate) enum ReconstructColumnsDecision { impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, - beacon_store: BeaconStore, custody_context: Arc>, spec: Arc, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), - state_cache: StateLRUCache::new(beacon_store, spec.clone()), custody_context, spec, }) @@ -320,7 +302,7 @@ impl DataAvailabilityCheckerInner { PayloadProcessStatus::NotValidated(p.clone(), *source) } CachedPayload::Executed(p) => { - PayloadProcessStatus::ExecutionValidated(p.payload_cloned()) + PayloadProcessStatus::ExecutionValidated(p.payload.clone()) } }) }) @@ -399,14 +381,9 @@ impl DataAvailabilityCheckerInner { pending_components: MappedRwLockReadGuard<'_, PendingComponents>, num_expected_columns: usize, ) -> Result, AvailabilityCheckError> { - if let Some(available_payload) = pending_components.make_available( - &self.spec, - num_expected_columns, - |payload, span| { - self.state_cache - .recover_pending_executed_payload(payload, span) - }, - )? { + if let Some(available_payload) = + pending_components.make_available(&self.spec, num_expected_columns)? + { // Explicitly drop read lock before acquiring write lock drop(pending_components); if let Some(components) = self.critical.write().get_mut(&block_root) { @@ -564,14 +541,9 @@ impl DataAvailabilityCheckerInner { .epoch(T::EthSpec::slots_per_epoch()); let block_root = executed_payload.payload.message.beacon_block_root; - // register the payload to get the diet payload - let diet_executed_payload = self - .state_cache - .register_pending_executed_payload(executed_payload); - let pending_components = self.update_or_insert_pending_components(block_root, |pending_components| { - pending_components.merge_payload(diet_executed_payload); + pending_components.merge_payload(executed_payload); Ok(()) })?; @@ -599,9 +571,6 @@ impl DataAvailabilityCheckerInner { /// maintain the cache pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> { - // clean up any lingering states in the state cache - self.state_cache.do_maintenance(cutoff_epoch); - // Collect keys of pending payloads from a previous epoch to cutoff let mut write_lock = self.critical.write(); let mut keys_to_remove = vec![]; @@ -620,17 +589,6 @@ impl DataAvailabilityCheckerInner { Ok(()) } - #[cfg(test)] - /// get the state cache for inspection (used only for tests) - pub fn state_lru_cache(&self) -> &StateLRUCache { - &self.state_cache - } - - /// Number of states stored in memory in the cache. - pub fn state_cache_size(&self) -> usize { - self.state_cache.lru_cache().read().len() - } - /// Number of pending component entries in memory in the cache. pub fn payload_cache_size(&self) -> usize { self.critical.read().len() @@ -646,19 +604,16 @@ mod test { use crate::{ block_verification_types::AsBlock, custody_context::NodeCustodyType, - data_availability_checker_v2::STATE_LRU_CAPACITY_NON_ZERO, test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, }; use logging::create_test_tracing_subscriber; - use std::collections::{HashSet, VecDeque}; + use std::collections::HashSet; use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; - use tracing::debug_span; use types::MinimalEthSpec; use types::new_non_zero_usize; const LOW_VALIDATOR_COUNT: usize = 32; - const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); fn get_store_with_spec( db_path: &TempDir, @@ -756,7 +711,7 @@ mod test { let chain_db_path = tempdir().expect("should get temp dir"); let harness = get_gloas_chain(&chain_db_path).await; let spec = harness.spec.clone(); - let test_store = harness.chain.store.clone(); + let _test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, @@ -764,13 +719,8 @@ mod test { &spec, )); let cache = Arc::new( - DataAvailabilityCheckerInner::::new( - capacity_non_zero, - test_store, - custody_context, - spec.clone(), - ) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new(capacity_non_zero, custody_context, spec.clone()) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -898,127 +848,6 @@ mod test { "cache should still have available payload" ); } - - #[tokio::test] - #[ignore] // TODO(gloas): Implement availability_pending_payload - // ensure the state cache keeps memory usage low and that it can properly recover states - // THIS TEST CAN BE DELETED ONCE TREE STATES IS MERGED AND WE RIP OUT THE STATE CACHE - async fn overflow_cache_test_state_cache() { - type E = MinimalEthSpec; - type T = DiskHarnessType; - let capacity = STATE_LRU_CAPACITY * 2; - let (harness, cache, _path) = setup_harness_and_cache::(capacity).await; - - let mut pending_payloads = VecDeque::new(); - let mut states = Vec::new(); - let mut state_roots = Vec::new(); - // Get enough payload to fill the cache to capacity, ensuring all payloads have blobs - while pending_payloads.len() < capacity { - let (mut pending_payload, _) = availability_pending_payload(&harness).await; - if pending_payload.num_blobs_expected() == 0 { - // we need payloads with blobs - continue; - } - let state_root = pending_payload.import_data.state.canonical_root().unwrap(); - states.push(pending_payload.import_data.state.clone()); - pending_payloads.push_back(pending_payload); - state_roots.push(state_root); - } - - let state_cache = cache.state_lru_cache().lru_cache(); - let mut pushed_diet_payloads = VecDeque::new(); - - for i in 0..capacity { - let pending_payload = pending_payloads.pop_front().expect("should have payload"); - let block_root = pending_payload.as_payload().beacon_block_root(); - - assert_eq!( - state_cache.read().len(), - std::cmp::min(i, STATE_LRU_CAPACITY), - "state cache should be empty at start" - ); - - if i >= STATE_LRU_CAPACITY { - let lru_root = state_roots[i - STATE_LRU_CAPACITY]; - assert_eq!( - state_cache.read().peek_lru().map(|(root, _)| root), - Some(&lru_root), - "lru payload should be in cache" - ); - } - - // put the payload in the cache - let availability = cache - .put_executed_payload(pending_payload) - .expect("should put payload"); - - // grab the diet payload from the cache for later testing - let diet_payload = cache - .critical - .read() - .peek(&block_root) - .and_then(|pending_components| pending_components.get_diet_payload().cloned()) - .expect("should exist"); - pushed_diet_payloads.push_back(diet_payload); - - // should be unavailable since we made sure all payloads had blobs - assert!( - matches!(availability, Availability::MissingComponents(_)), - "should be pending blobs" - ); - - if i >= STATE_LRU_CAPACITY { - let evicted_index = i - STATE_LRU_CAPACITY; - let evicted_root = state_roots[evicted_index]; - assert!( - state_cache.read().peek(&evicted_root).is_none(), - "lru root should be evicted" - ); - // get the diet payload via direct conversion (testing only) - let diet_payload = pushed_diet_payloads - .pop_front() - .expect("should have payload"); - // reconstruct the pending payload by replaying the payload on the parent state - let recovered_pending_payload = cache - .state_lru_cache() - .recover_pending_executed_payload(diet_payload, &debug_span!("test")) - .expect("should reconstruct pending payload"); - - // assert the recovered state is the same as the original - assert_eq!( - recovered_pending_payload.import_data.state, states[evicted_index], - "recovered state should be the same as the original" - ); - } - } - - // now check the last payload - let last_payload = pushed_diet_payloads - .pop_back() - .expect("should exist") - .clone(); - // the state should still be in the cache - assert!( - state_cache - .read() - .peek(&last_payload.as_payload().message.state_root) - .is_some(), - "last payload state should still be in cache" - ); - // get the diet payload via direct conversion (testing only) - let diet_payload = last_payload.clone(); - // recover the pending payload from the cache - let recovered_pending_payload = cache - .state_lru_cache() - .recover_pending_executed_payload(diet_payload, &debug_span!("test")) - .expect("should reconstruct pending payload"); - // assert the recovered state is the same as the original - assert_eq!( - Some(&recovered_pending_payload.import_data.state), - states.last(), - "recovered state should be the same as the original" - ); - } } #[cfg(test)] @@ -1026,14 +855,16 @@ mod pending_components_tests { use super::*; use crate::PayloadVerificationOutcome; use crate::data_column_verification::KzgVerifiedDataColumn; + use crate::payload_verification_types::PayloadImportData; use crate::test_utils::{NumBlobs, generate_rand_payload_and_columns, test_spec}; use fork_choice::PayloadVerificationStatus; use kzg::KzgCommitment; use rand::SeedableRng; use rand::rngs::StdRng; use ssz_types::VariableList; + use state_processing::ConsensusContext; use types::test_utils::TestRandom; - use types::{ForkName, MainnetEthSpec, SignedExecutionPayloadEnvelope, Slot}; + use types::{BeaconState, ForkName, MainnetEthSpec, SignedExecutionPayloadEnvelope, Slot}; type E = MainnetEthSpec; @@ -1093,7 +924,7 @@ mod pending_components_tests { } type PendingComponentsSetup = ( - DietAvailabilityPendingExecutedPayload, + AvailabilityPendingExecutedPayload, Vec>, Vec>, ); @@ -1121,15 +952,19 @@ mod pending_components_tests { }) .collect(); - let diet_payload = DietAvailabilityPendingExecutedPayload::new_for_testing( - Arc::new(payload), + let executed_payload = AvailabilityPendingExecutedPayload::new( + Arc::new(payload.clone()), + PayloadImportData { + state: BeaconState::new(0, Default::default(), &test_spec::()), + consensus_context: ConsensusContext::new(payload.message.slot), + }, PayloadVerificationOutcome { payload_verification_status: PayloadVerificationStatus::Verified, is_valid_merge_transition_block: false, }, ); - (diet_payload, columns, invalid_columns) + (executed_payload, columns, invalid_columns) } fn assert_cache_consistent(cache: &PendingComponents) { @@ -1168,11 +1003,11 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; let mut cache = >::empty(block_root); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); cache .merge_data_columns(invalid_columns) .expect("merge should succeed"); @@ -1191,14 +1026,14 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; let mut cache = >::empty(block_root); cache .merge_data_columns(invalid_columns) .expect("merge should succeed"); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); cache .merge_data_columns(columns) .expect("merge should succeed"); @@ -1213,7 +1048,7 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; @@ -1224,7 +1059,7 @@ mod pending_components_tests { cache .merge_data_columns(columns) .expect("merge should succeed"); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); // Invalid columns were first, valid ones deduplicated away. assert!(!cache.verified_data_columns.is_empty()); @@ -1236,12 +1071,12 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; let mut cache = >::empty(block_root); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); cache .merge_data_columns(columns) .expect("merge should succeed"); @@ -1259,7 +1094,7 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; @@ -1267,7 +1102,7 @@ mod pending_components_tests { cache .merge_data_columns(columns) .expect("merge should succeed"); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); cache .merge_data_columns(invalid_columns) .expect("merge should succeed"); @@ -1282,7 +1117,7 @@ mod pending_components_tests { return; } let (payload, columns, invalid_columns) = pre_setup(); - let (diet_payload, columns, invalid_columns) = + let (executed_payload, columns, invalid_columns) = setup_pending_components(payload, columns, invalid_columns); let block_root = Hash256::ZERO; @@ -1293,7 +1128,7 @@ mod pending_components_tests { cache .merge_data_columns(invalid_columns) .expect("merge should succeed"); - cache.merge_payload(diet_payload); + cache.merge_payload(executed_payload); // Valid columns were inserted first, so they persist. Cache should be consistent. assert_cache_consistent(&cache); @@ -1305,7 +1140,7 @@ mod pending_components_tests { return; } let (payload, _columns, _invalid_columns) = pre_setup(); - let (diet_payload, _columns, _invalid_columns) = + let (executed_payload, _columns, _invalid_columns) = setup_pending_components(payload.clone(), _columns, _invalid_columns); let block_root = Hash256::ZERO; @@ -1322,7 +1157,7 @@ mod pending_components_tests { "pre execution payload inserted" ); - pending_component.insert_executed_payload(diet_payload); + pending_component.insert_executed_payload(executed_payload); assert!( matches!(pending_component.payload, Some(CachedPayload::Executed(_))), "executed payload inserted" diff --git a/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs deleted file mode 100644 index 4d1e02990c..0000000000 --- a/beacon_node/beacon_chain/src/data_availability_checker_v2/state_lru_cache.rs +++ /dev/null @@ -1,156 +0,0 @@ -#![allow(dead_code)] -use crate::payload_verification_types::{AvailabilityPendingExecutedPayload, PayloadImportData}; -use crate::{ - BeaconChainTypes, BeaconStore, PayloadVerificationOutcome, - data_availability_checker_v2::{AvailabilityCheckError, STATE_LRU_CAPACITY_NON_ZERO}, -}; -use lru::LruCache; -use parking_lot::RwLock; -use std::sync::Arc; -use store::OnDiskConsensusContext; -use tracing::{Span, instrument}; -use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedExecutionPayloadEnvelope}; - -/// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except -/// that it is much smaller because it contains only a state root instead of -/// a full `BeaconState`. -#[derive(Clone)] -pub struct DietAvailabilityPendingExecutedPayload { - payload: Arc>, - state_root: Hash256, - consensus_context: OnDiskConsensusContext, - payload_verification_outcome: PayloadVerificationOutcome, -} - -/// Implementing the same methods as `AvailabilityPendingExecutedPayload` -impl DietAvailabilityPendingExecutedPayload { - #[cfg(test)] - pub fn new_for_testing( - payload: Arc>, - payload_verification_outcome: PayloadVerificationOutcome, - ) -> Self { - use state_processing::ConsensusContext; - use types::Slot; - Self { - payload, - state_root: Hash256::ZERO, - consensus_context: OnDiskConsensusContext::from_consensus_context( - ConsensusContext::new(Slot::new(0)), - ), - payload_verification_outcome, - } - } - - pub fn as_payload(&self) -> &SignedExecutionPayloadEnvelope { - &self.payload - } - - pub fn payload_cloned(&self) -> Arc> { - self.payload.clone() - } - - pub fn num_blobs_expected(&self) -> usize { - self.payload.message.blob_kzg_commitments.len() - } -} - -/// This LRU cache holds BeaconStates used for payload import. If the cache overflows, -/// the least recently used state will be dropped. If the dropped state is needed -/// later on, it will be recovered from the parent state and replaying the payload. -/// -/// WARNING: This cache assumes the parent block of any `AvailabilityPendingExecutedPayload` -/// has already been imported into ForkChoice. If this is not the case, the cache -/// will fail to recover the state when the cache overflows because it can't load -/// the parent state! -pub struct StateLRUCache { - states: RwLock>>, - store: BeaconStore, - spec: Arc, -} - -impl StateLRUCache { - pub fn new(store: BeaconStore, spec: Arc) -> Self { - Self { - states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY_NON_ZERO)), - store, - spec, - } - } - - /// This will store the state in the LRU cache and return a - /// `DietAvailabilityPendingExecutedPayload` which is much cheaper to - /// keep around in memory. - pub fn register_pending_executed_payload( - &self, - executed_payload: AvailabilityPendingExecutedPayload, - ) -> DietAvailabilityPendingExecutedPayload { - let state = executed_payload.import_data.state; - let state_root = executed_payload.payload.message.state_root; - self.states.write().put(state_root, state); - - DietAvailabilityPendingExecutedPayload { - payload: executed_payload.payload, - state_root, - consensus_context: OnDiskConsensusContext::from_consensus_context( - executed_payload.import_data.consensus_context, - ), - payload_verification_outcome: executed_payload.payload_verification_outcome, - } - } - - /// Recover the `AvailabilityPendingExecutedPayload` from the diet version. - /// This method will first check the cache and if the state is not found - /// it will reconstruct the state by loading the parent state from disk and - /// replaying the block. - #[instrument(skip_all, parent = _span, level = "debug")] - pub fn recover_pending_executed_payload( - &self, - diet_executed_payload: DietAvailabilityPendingExecutedPayload, - _span: &Span, - ) -> Result, AvailabilityCheckError> { - // Keep the state in the cache to prevent reconstruction in race conditions - let state = if let Some(state) = self.states.write().get(&diet_executed_payload.state_root) - { - state.clone() - } else { - self.reconstruct_state(&diet_executed_payload)? - }; - Ok(AvailabilityPendingExecutedPayload { - payload: diet_executed_payload.payload, - import_data: PayloadImportData { - state, - consensus_context: diet_executed_payload - .consensus_context - .into_consensus_context(), - }, - payload_verification_outcome: diet_executed_payload.payload_verification_outcome, - }) - } - - /// Reconstruct the state by loading the parent state from disk and replaying - /// the block. - #[instrument(skip_all, level = "debug")] - fn reconstruct_state( - &self, - _diet_executed_block: &DietAvailabilityPendingExecutedPayload, - ) -> Result, AvailabilityCheckError> { - todo!() - } - - /// returns the state cache for inspection - pub fn lru_cache(&self) -> &RwLock>> { - &self.states - } - - /// remove any states from the cache from before the given epoch - pub fn do_maintenance(&self, cutoff_epoch: Epoch) { - let mut write_lock = self.states.write(); - while let Some((_, state)) = write_lock.peek_lru() { - if state.slot().epoch(T::EthSpec::slots_per_epoch()) < cutoff_epoch { - write_lock.pop_lru(); - } else { - break; - } - } - } -} diff --git a/beacon_node/beacon_chain/src/payload_verification_types.rs b/beacon_node/beacon_chain/src/payload_verification_types.rs index f54c67ecb1..02868bb597 100644 --- a/beacon_node/beacon_chain/src/payload_verification_types.rs +++ b/beacon_node/beacon_chain/src/payload_verification_types.rs @@ -5,7 +5,7 @@ use types::{BeaconState, BlockImportSource, EthSpec, SignedExecutionPayloadEnvel use crate::{PayloadVerificationOutcome, data_availability_checker_v2::AvailablePayload}; -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct PayloadImportData { pub state: BeaconState, pub consensus_context: ConsensusContext, @@ -13,6 +13,7 @@ pub struct PayloadImportData { /// A payload that has completed payload verification by an EL client but does not /// have all requisite column data to get imported into fork choice. +#[derive(Clone)] pub struct AvailabilityPendingExecutedPayload { pub payload: Arc>, pub import_data: PayloadImportData,