mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-31 13:17:09 +00:00
Remove state LRU cache for da checker v2
This commit is contained in:
@@ -996,7 +996,6 @@ where
|
|||||||
complete_blob_backfill,
|
complete_blob_backfill,
|
||||||
slot_clock.clone(),
|
slot_clock.clone(),
|
||||||
self.kzg.clone(),
|
self.kzg.clone(),
|
||||||
store.clone(),
|
|
||||||
custody_context.clone(),
|
custody_context.clone(),
|
||||||
self.spec.clone(),
|
self.spec.clone(),
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ pub enum Error {
|
|||||||
StoreError(store::Error),
|
StoreError(store::Error),
|
||||||
DecodeError(ssz::DecodeError),
|
DecodeError(ssz::DecodeError),
|
||||||
ParentStateMissing(Hash256),
|
ParentStateMissing(Hash256),
|
||||||
|
StateMissing(Hash256),
|
||||||
BlockReplayError(state_processing::BlockReplayError),
|
BlockReplayError(state_processing::BlockReplayError),
|
||||||
RebuildingStateCaches(BeaconStateError),
|
RebuildingStateCaches(BeaconStateError),
|
||||||
SlotClockError,
|
SlotClockError,
|
||||||
@@ -43,6 +44,7 @@ impl Error {
|
|||||||
| Error::DecodeError(_)
|
| Error::DecodeError(_)
|
||||||
| Error::Unexpected(_)
|
| Error::Unexpected(_)
|
||||||
| Error::ParentStateMissing(_)
|
| Error::ParentStateMissing(_)
|
||||||
|
| Error::StateMissing(_)
|
||||||
| Error::BlockReplayError(_)
|
| Error::BlockReplayError(_)
|
||||||
| Error::RebuildingStateCaches(_)
|
| Error::RebuildingStateCaches(_)
|
||||||
| Error::SlotClockError
|
| Error::SlotClockError
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ use crate::data_availability_router::DataColumnCache;
|
|||||||
use crate::payload_verification_types::{
|
use crate::payload_verification_types::{
|
||||||
AvailabilityPendingExecutedPayload, AvailableExecutedPayload, PayloadProcessStatus,
|
AvailabilityPendingExecutedPayload, AvailableExecutedPayload, PayloadProcessStatus,
|
||||||
};
|
};
|
||||||
use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics};
|
use crate::{BeaconChain, BeaconChainTypes, CustodyContext, metrics};
|
||||||
use educe::Educe;
|
use educe::Educe;
|
||||||
use kzg::Kzg;
|
use kzg::Kzg;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
@@ -25,7 +25,6 @@ use types::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
mod overflow_lru_cache;
|
mod overflow_lru_cache;
|
||||||
mod state_lru_cache;
|
|
||||||
|
|
||||||
use crate::data_column_verification::{
|
use crate::data_column_verification::{
|
||||||
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
|
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
|
||||||
@@ -46,7 +45,6 @@ use types::new_non_zero_usize;
|
|||||||
/// `PendingComponents` are now never removed from the cache manually are only removed via LRU
|
/// `PendingComponents` are now never removed from the cache manually are only removed via LRU
|
||||||
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
|
/// eviction to prevent race conditions (#7961), so we expect this cache to be full all the time.
|
||||||
const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
const OVERFLOW_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
||||||
const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
|
|
||||||
|
|
||||||
/// Cache to hold fully valid data that can't be imported to fork-choice yet. After the Gloas hard-fork
|
/// Cache to hold fully valid data that can't be imported to fork-choice yet. After the Gloas hard-fork
|
||||||
/// beacon blocks can be immediately imported into fork choice. The execution payload is now separated out from
|
/// beacon blocks can be immediately imported into fork choice. The execution payload is now separated out from
|
||||||
@@ -314,13 +312,11 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
complete_blob_backfill: bool,
|
complete_blob_backfill: bool,
|
||||||
slot_clock: T::SlotClock,
|
slot_clock: T::SlotClock,
|
||||||
kzg: Arc<Kzg>,
|
kzg: Arc<Kzg>,
|
||||||
store: BeaconStore<T>,
|
|
||||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||||
spec: Arc<ChainSpec>,
|
spec: Arc<ChainSpec>,
|
||||||
) -> Result<Self, AvailabilityCheckError> {
|
) -> Result<Self, AvailabilityCheckError> {
|
||||||
let inner = DataAvailabilityCheckerInner::new(
|
let inner = DataAvailabilityCheckerInner::new(
|
||||||
OVERFLOW_LRU_CAPACITY_NON_ZERO,
|
OVERFLOW_LRU_CAPACITY_NON_ZERO,
|
||||||
store,
|
|
||||||
custody_context.clone(),
|
custody_context.clone(),
|
||||||
spec.clone(),
|
spec.clone(),
|
||||||
)?;
|
)?;
|
||||||
@@ -449,7 +445,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
/// Collects metrics from the data availability checker.
|
/// Collects metrics from the data availability checker.
|
||||||
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
|
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
|
||||||
DataAvailabilityCheckerMetrics {
|
DataAvailabilityCheckerMetrics {
|
||||||
state_cache_size: self.availability_cache.state_cache_size(),
|
|
||||||
payload_cache_size: self.availability_cache.payload_cache_size(),
|
payload_cache_size: self.availability_cache.payload_cache_size(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -457,7 +452,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
|
|||||||
|
|
||||||
/// Helper struct to group data availability checker metrics.
|
/// Helper struct to group data availability checker metrics.
|
||||||
pub struct DataAvailabilityCheckerMetrics {
|
pub struct DataAvailabilityCheckerMetrics {
|
||||||
pub state_cache_size: usize,
|
|
||||||
pub payload_cache_size: usize,
|
pub payload_cache_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
use super::state_lru_cache::{DietAvailabilityPendingExecutedPayload, StateLRUCache};
|
|
||||||
use crate::BeaconChainTypes;
|
use crate::BeaconChainTypes;
|
||||||
use crate::CustodyContext;
|
use crate::CustodyContext;
|
||||||
use crate::beacon_chain::BeaconStore;
|
|
||||||
use crate::data_availability_checker::AvailabilityCheckError;
|
use crate::data_availability_checker::AvailabilityCheckError;
|
||||||
use crate::data_availability_checker_v2::{Availability, AvailablePayload, AvailablePayloadData};
|
use crate::data_availability_checker_v2::{Availability, AvailablePayload, AvailablePayloadData};
|
||||||
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
|
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
|
||||||
@@ -24,7 +22,7 @@ use types::{
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum CachedPayload<E: EthSpec> {
|
pub enum CachedPayload<E: EthSpec> {
|
||||||
PreExecution(Arc<SignedExecutionPayloadEnvelope<E>>, BlockImportSource),
|
PreExecution(Arc<SignedExecutionPayloadEnvelope<E>>, BlockImportSource),
|
||||||
Executed(Box<DietAvailabilityPendingExecutedPayload<E>>),
|
Executed(Box<AvailabilityPendingExecutedPayload<E>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@@ -37,13 +35,20 @@ impl<E: EthSpec> CachedPayload<E> {
|
|||||||
fn as_payload(&self) -> &SignedExecutionPayloadEnvelope<E> {
|
fn as_payload(&self) -> &SignedExecutionPayloadEnvelope<E> {
|
||||||
match self {
|
match self {
|
||||||
CachedPayload::PreExecution(p, _) => p,
|
CachedPayload::PreExecution(p, _) => p,
|
||||||
CachedPayload::Executed(p) => p.as_payload(),
|
CachedPayload::Executed(p) => &p.payload,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn num_blobs_expected(&self) -> usize {
|
pub fn num_blobs_expected(&self) -> usize {
|
||||||
self.as_payload().message.blob_kzg_commitments.len()
|
self.as_payload().message.blob_kzg_commitments.len()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn payload_cloned(&self) -> Arc<SignedExecutionPayloadEnvelope<E>> {
|
||||||
|
match self {
|
||||||
|
CachedPayload::PreExecution(p, _) => p.clone(),
|
||||||
|
CachedPayload::Executed(p) => p.payload.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This represents the components of a partially available payload
|
/// This represents the components of a partially available payload
|
||||||
@@ -61,14 +66,6 @@ pub struct PendingComponents<E: EthSpec> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec> PendingComponents<E> {
|
impl<E: EthSpec> PendingComponents<E> {
|
||||||
#[cfg(test)]
|
|
||||||
fn get_diet_payload(&self) -> Option<&DietAvailabilityPendingExecutedPayload<E>> {
|
|
||||||
self.payload.as_ref().and_then(|payload| match payload {
|
|
||||||
CachedPayload::Executed(payload) => Some(payload.as_ref()),
|
|
||||||
_ => None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns an immutable reference to the cached data column.
|
/// Returns an immutable reference to the cached data column.
|
||||||
pub fn get_cached_data_column(
|
pub fn get_cached_data_column(
|
||||||
&self,
|
&self,
|
||||||
@@ -89,7 +86,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Inserts an executed payload into the cache.
|
/// Inserts an executed payload into the cache.
|
||||||
pub fn insert_executed_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload<E>) {
|
pub fn insert_executed_payload(&mut self, payload: AvailabilityPendingExecutedPayload<E>) {
|
||||||
self.payload = Some(CachedPayload::Executed(Box::new(payload)))
|
self.payload = Some(CachedPayload::Executed(Box::new(payload)))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,33 +117,23 @@ impl<E: EthSpec> PendingComponents<E> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Inserts a new payload.
|
/// Inserts a new payload.
|
||||||
pub fn merge_payload(&mut self, payload: DietAvailabilityPendingExecutedPayload<E>) {
|
pub fn merge_payload(&mut self, payload: AvailabilityPendingExecutedPayload<E>) {
|
||||||
self.insert_executed_payload(payload);
|
self.insert_executed_payload(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns Some if the payload has received all its required data for import. The return value
|
/// Returns Some if the payload has received all its required data for import. The return value
|
||||||
/// must be persisted in the DB along with the payload.
|
/// must be persisted in the DB along with the payload.
|
||||||
///
|
pub fn make_available(
|
||||||
/// WARNING: This function can potentially take a lot of time if the state needs to be
|
|
||||||
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
|
|
||||||
pub fn make_available<R>(
|
|
||||||
&self,
|
&self,
|
||||||
spec: &Arc<ChainSpec>,
|
spec: &Arc<ChainSpec>,
|
||||||
num_expected_columns: usize,
|
num_expected_columns: usize,
|
||||||
recover: R,
|
) -> Result<Option<AvailableExecutedPayload<E>>, AvailabilityCheckError> {
|
||||||
) -> Result<Option<AvailableExecutedPayload<E>>, AvailabilityCheckError>
|
let Some(CachedPayload::Executed(executed_payload)) = &self.payload else {
|
||||||
where
|
|
||||||
R: FnOnce(
|
|
||||||
DietAvailabilityPendingExecutedPayload<E>,
|
|
||||||
&Span,
|
|
||||||
) -> Result<AvailabilityPendingExecutedPayload<E>, AvailabilityCheckError>,
|
|
||||||
{
|
|
||||||
let Some(CachedPayload::Executed(payload)) = &self.payload else {
|
|
||||||
// Payload not available yet
|
// Payload not available yet
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
let num_expected_blobs = payload.num_blobs_expected();
|
let num_expected_blobs = executed_payload.num_blobs_expected();
|
||||||
let column_data = if num_expected_blobs == 0 {
|
let column_data = if num_expected_blobs == 0 {
|
||||||
Some(AvailablePayloadData::NoData)
|
Some(AvailablePayloadData::NoData)
|
||||||
} else {
|
} else {
|
||||||
@@ -198,7 +185,7 @@ impl<E: EthSpec> PendingComponents<E> {
|
|||||||
payload,
|
payload,
|
||||||
import_data,
|
import_data,
|
||||||
payload_verification_outcome,
|
payload_verification_outcome,
|
||||||
} = recover(*payload.clone(), &self.span)?;
|
} = executed_payload.as_ref().clone();
|
||||||
|
|
||||||
let available_payload = AvailablePayload {
|
let available_payload = AvailablePayload {
|
||||||
block_root: payload.message.beacon_block_root,
|
block_root: payload.message.beacon_block_root,
|
||||||
@@ -272,9 +259,6 @@ impl<E: EthSpec> PendingComponents<E> {
|
|||||||
pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
|
pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
|
||||||
/// Contains all the data we keep in memory, protected by an RwLock
|
/// Contains all the data we keep in memory, protected by an RwLock
|
||||||
critical: RwLock<LruCache<Hash256, PendingComponents<T::EthSpec>>>,
|
critical: RwLock<LruCache<Hash256, PendingComponents<T::EthSpec>>>,
|
||||||
/// This cache holds a limited number of states in memory and reconstructs them
|
|
||||||
/// from disk when necessary. This is necessary until we merge tree-states
|
|
||||||
state_cache: StateLRUCache<T>,
|
|
||||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||||
spec: Arc<ChainSpec>,
|
spec: Arc<ChainSpec>,
|
||||||
}
|
}
|
||||||
@@ -291,13 +275,11 @@ pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
|
|||||||
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
capacity: NonZeroUsize,
|
capacity: NonZeroUsize,
|
||||||
beacon_store: BeaconStore<T>,
|
|
||||||
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
custody_context: Arc<CustodyContext<T::EthSpec>>,
|
||||||
spec: Arc<ChainSpec>,
|
spec: Arc<ChainSpec>,
|
||||||
) -> Result<Self, AvailabilityCheckError> {
|
) -> Result<Self, AvailabilityCheckError> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
critical: RwLock::new(LruCache::new(capacity)),
|
critical: RwLock::new(LruCache::new(capacity)),
|
||||||
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
|
|
||||||
custody_context,
|
custody_context,
|
||||||
spec,
|
spec,
|
||||||
})
|
})
|
||||||
@@ -320,7 +302,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
PayloadProcessStatus::NotValidated(p.clone(), *source)
|
PayloadProcessStatus::NotValidated(p.clone(), *source)
|
||||||
}
|
}
|
||||||
CachedPayload::Executed(p) => {
|
CachedPayload::Executed(p) => {
|
||||||
PayloadProcessStatus::ExecutionValidated(p.payload_cloned())
|
PayloadProcessStatus::ExecutionValidated(p.payload.clone())
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@@ -399,14 +381,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>,
|
pending_components: MappedRwLockReadGuard<'_, PendingComponents<T::EthSpec>>,
|
||||||
num_expected_columns: usize,
|
num_expected_columns: usize,
|
||||||
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
|
||||||
if let Some(available_payload) = pending_components.make_available(
|
if let Some(available_payload) =
|
||||||
&self.spec,
|
pending_components.make_available(&self.spec, num_expected_columns)?
|
||||||
num_expected_columns,
|
{
|
||||||
|payload, span| {
|
|
||||||
self.state_cache
|
|
||||||
.recover_pending_executed_payload(payload, span)
|
|
||||||
},
|
|
||||||
)? {
|
|
||||||
// Explicitly drop read lock before acquiring write lock
|
// Explicitly drop read lock before acquiring write lock
|
||||||
drop(pending_components);
|
drop(pending_components);
|
||||||
if let Some(components) = self.critical.write().get_mut(&block_root) {
|
if let Some(components) = self.critical.write().get_mut(&block_root) {
|
||||||
@@ -564,14 +541,9 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
.epoch(T::EthSpec::slots_per_epoch());
|
.epoch(T::EthSpec::slots_per_epoch());
|
||||||
let block_root = executed_payload.payload.message.beacon_block_root;
|
let block_root = executed_payload.payload.message.beacon_block_root;
|
||||||
|
|
||||||
// register the payload to get the diet payload
|
|
||||||
let diet_executed_payload = self
|
|
||||||
.state_cache
|
|
||||||
.register_pending_executed_payload(executed_payload);
|
|
||||||
|
|
||||||
let pending_components =
|
let pending_components =
|
||||||
self.update_or_insert_pending_components(block_root, |pending_components| {
|
self.update_or_insert_pending_components(block_root, |pending_components| {
|
||||||
pending_components.merge_payload(diet_executed_payload);
|
pending_components.merge_payload(executed_payload);
|
||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
@@ -599,9 +571,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
|
|
||||||
/// maintain the cache
|
/// maintain the cache
|
||||||
pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
|
pub fn do_maintenance(&self, cutoff_epoch: Epoch) -> Result<(), AvailabilityCheckError> {
|
||||||
// clean up any lingering states in the state cache
|
|
||||||
self.state_cache.do_maintenance(cutoff_epoch);
|
|
||||||
|
|
||||||
// Collect keys of pending payloads from a previous epoch to cutoff
|
// Collect keys of pending payloads from a previous epoch to cutoff
|
||||||
let mut write_lock = self.critical.write();
|
let mut write_lock = self.critical.write();
|
||||||
let mut keys_to_remove = vec![];
|
let mut keys_to_remove = vec![];
|
||||||
@@ -620,17 +589,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
/// get the state cache for inspection (used only for tests)
|
|
||||||
pub fn state_lru_cache(&self) -> &StateLRUCache<T> {
|
|
||||||
&self.state_cache
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Number of states stored in memory in the cache.
|
|
||||||
pub fn state_cache_size(&self) -> usize {
|
|
||||||
self.state_cache.lru_cache().read().len()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Number of pending component entries in memory in the cache.
|
/// Number of pending component entries in memory in the cache.
|
||||||
pub fn payload_cache_size(&self) -> usize {
|
pub fn payload_cache_size(&self) -> usize {
|
||||||
self.critical.read().len()
|
self.critical.read().len()
|
||||||
@@ -646,19 +604,16 @@ mod test {
|
|||||||
use crate::{
|
use crate::{
|
||||||
block_verification_types::AsBlock,
|
block_verification_types::AsBlock,
|
||||||
custody_context::NodeCustodyType,
|
custody_context::NodeCustodyType,
|
||||||
data_availability_checker_v2::STATE_LRU_CAPACITY_NON_ZERO,
|
|
||||||
test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType},
|
test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType},
|
||||||
};
|
};
|
||||||
use logging::create_test_tracing_subscriber;
|
use logging::create_test_tracing_subscriber;
|
||||||
use std::collections::{HashSet, VecDeque};
|
use std::collections::HashSet;
|
||||||
use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend};
|
use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend};
|
||||||
use tempfile::{TempDir, tempdir};
|
use tempfile::{TempDir, tempdir};
|
||||||
use tracing::debug_span;
|
|
||||||
use types::MinimalEthSpec;
|
use types::MinimalEthSpec;
|
||||||
use types::new_non_zero_usize;
|
use types::new_non_zero_usize;
|
||||||
|
|
||||||
const LOW_VALIDATOR_COUNT: usize = 32;
|
const LOW_VALIDATOR_COUNT: usize = 32;
|
||||||
const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
|
|
||||||
|
|
||||||
fn get_store_with_spec<E: EthSpec>(
|
fn get_store_with_spec<E: EthSpec>(
|
||||||
db_path: &TempDir,
|
db_path: &TempDir,
|
||||||
@@ -756,7 +711,7 @@ mod test {
|
|||||||
let chain_db_path = tempdir().expect("should get temp dir");
|
let chain_db_path = tempdir().expect("should get temp dir");
|
||||||
let harness = get_gloas_chain(&chain_db_path).await;
|
let harness = get_gloas_chain(&chain_db_path).await;
|
||||||
let spec = harness.spec.clone();
|
let spec = harness.spec.clone();
|
||||||
let test_store = harness.chain.store.clone();
|
let _test_store = harness.chain.store.clone();
|
||||||
let capacity_non_zero = new_non_zero_usize(capacity);
|
let capacity_non_zero = new_non_zero_usize(capacity);
|
||||||
let custody_context = Arc::new(CustodyContext::new(
|
let custody_context = Arc::new(CustodyContext::new(
|
||||||
NodeCustodyType::Fullnode,
|
NodeCustodyType::Fullnode,
|
||||||
@@ -764,12 +719,7 @@ mod test {
|
|||||||
&spec,
|
&spec,
|
||||||
));
|
));
|
||||||
let cache = Arc::new(
|
let cache = Arc::new(
|
||||||
DataAvailabilityCheckerInner::<T>::new(
|
DataAvailabilityCheckerInner::<T>::new(capacity_non_zero, custody_context, spec.clone())
|
||||||
capacity_non_zero,
|
|
||||||
test_store,
|
|
||||||
custody_context,
|
|
||||||
spec.clone(),
|
|
||||||
)
|
|
||||||
.expect("should create cache"),
|
.expect("should create cache"),
|
||||||
);
|
);
|
||||||
(harness, cache, chain_db_path)
|
(harness, cache, chain_db_path)
|
||||||
@@ -898,127 +848,6 @@ mod test {
|
|||||||
"cache should still have available payload"
|
"cache should still have available payload"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
#[ignore] // TODO(gloas): Implement availability_pending_payload
|
|
||||||
// ensure the state cache keeps memory usage low and that it can properly recover states
|
|
||||||
// THIS TEST CAN BE DELETED ONCE TREE STATES IS MERGED AND WE RIP OUT THE STATE CACHE
|
|
||||||
async fn overflow_cache_test_state_cache() {
|
|
||||||
type E = MinimalEthSpec;
|
|
||||||
type T = DiskHarnessType<E>;
|
|
||||||
let capacity = STATE_LRU_CAPACITY * 2;
|
|
||||||
let (harness, cache, _path) = setup_harness_and_cache::<E, T>(capacity).await;
|
|
||||||
|
|
||||||
let mut pending_payloads = VecDeque::new();
|
|
||||||
let mut states = Vec::new();
|
|
||||||
let mut state_roots = Vec::new();
|
|
||||||
// Get enough payload to fill the cache to capacity, ensuring all payloads have blobs
|
|
||||||
while pending_payloads.len() < capacity {
|
|
||||||
let (mut pending_payload, _) = availability_pending_payload(&harness).await;
|
|
||||||
if pending_payload.num_blobs_expected() == 0 {
|
|
||||||
// we need payloads with blobs
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let state_root = pending_payload.import_data.state.canonical_root().unwrap();
|
|
||||||
states.push(pending_payload.import_data.state.clone());
|
|
||||||
pending_payloads.push_back(pending_payload);
|
|
||||||
state_roots.push(state_root);
|
|
||||||
}
|
|
||||||
|
|
||||||
let state_cache = cache.state_lru_cache().lru_cache();
|
|
||||||
let mut pushed_diet_payloads = VecDeque::new();
|
|
||||||
|
|
||||||
for i in 0..capacity {
|
|
||||||
let pending_payload = pending_payloads.pop_front().expect("should have payload");
|
|
||||||
let block_root = pending_payload.as_payload().beacon_block_root();
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
state_cache.read().len(),
|
|
||||||
std::cmp::min(i, STATE_LRU_CAPACITY),
|
|
||||||
"state cache should be empty at start"
|
|
||||||
);
|
|
||||||
|
|
||||||
if i >= STATE_LRU_CAPACITY {
|
|
||||||
let lru_root = state_roots[i - STATE_LRU_CAPACITY];
|
|
||||||
assert_eq!(
|
|
||||||
state_cache.read().peek_lru().map(|(root, _)| root),
|
|
||||||
Some(&lru_root),
|
|
||||||
"lru payload should be in cache"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// put the payload in the cache
|
|
||||||
let availability = cache
|
|
||||||
.put_executed_payload(pending_payload)
|
|
||||||
.expect("should put payload");
|
|
||||||
|
|
||||||
// grab the diet payload from the cache for later testing
|
|
||||||
let diet_payload = cache
|
|
||||||
.critical
|
|
||||||
.read()
|
|
||||||
.peek(&block_root)
|
|
||||||
.and_then(|pending_components| pending_components.get_diet_payload().cloned())
|
|
||||||
.expect("should exist");
|
|
||||||
pushed_diet_payloads.push_back(diet_payload);
|
|
||||||
|
|
||||||
// should be unavailable since we made sure all payloads had blobs
|
|
||||||
assert!(
|
|
||||||
matches!(availability, Availability::MissingComponents(_)),
|
|
||||||
"should be pending blobs"
|
|
||||||
);
|
|
||||||
|
|
||||||
if i >= STATE_LRU_CAPACITY {
|
|
||||||
let evicted_index = i - STATE_LRU_CAPACITY;
|
|
||||||
let evicted_root = state_roots[evicted_index];
|
|
||||||
assert!(
|
|
||||||
state_cache.read().peek(&evicted_root).is_none(),
|
|
||||||
"lru root should be evicted"
|
|
||||||
);
|
|
||||||
// get the diet payload via direct conversion (testing only)
|
|
||||||
let diet_payload = pushed_diet_payloads
|
|
||||||
.pop_front()
|
|
||||||
.expect("should have payload");
|
|
||||||
// reconstruct the pending payload by replaying the payload on the parent state
|
|
||||||
let recovered_pending_payload = cache
|
|
||||||
.state_lru_cache()
|
|
||||||
.recover_pending_executed_payload(diet_payload, &debug_span!("test"))
|
|
||||||
.expect("should reconstruct pending payload");
|
|
||||||
|
|
||||||
// assert the recovered state is the same as the original
|
|
||||||
assert_eq!(
|
|
||||||
recovered_pending_payload.import_data.state, states[evicted_index],
|
|
||||||
"recovered state should be the same as the original"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// now check the last payload
|
|
||||||
let last_payload = pushed_diet_payloads
|
|
||||||
.pop_back()
|
|
||||||
.expect("should exist")
|
|
||||||
.clone();
|
|
||||||
// the state should still be in the cache
|
|
||||||
assert!(
|
|
||||||
state_cache
|
|
||||||
.read()
|
|
||||||
.peek(&last_payload.as_payload().message.state_root)
|
|
||||||
.is_some(),
|
|
||||||
"last payload state should still be in cache"
|
|
||||||
);
|
|
||||||
// get the diet payload via direct conversion (testing only)
|
|
||||||
let diet_payload = last_payload.clone();
|
|
||||||
// recover the pending payload from the cache
|
|
||||||
let recovered_pending_payload = cache
|
|
||||||
.state_lru_cache()
|
|
||||||
.recover_pending_executed_payload(diet_payload, &debug_span!("test"))
|
|
||||||
.expect("should reconstruct pending payload");
|
|
||||||
// assert the recovered state is the same as the original
|
|
||||||
assert_eq!(
|
|
||||||
Some(&recovered_pending_payload.import_data.state),
|
|
||||||
states.last(),
|
|
||||||
"recovered state should be the same as the original"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -1026,14 +855,16 @@ mod pending_components_tests {
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::PayloadVerificationOutcome;
|
use crate::PayloadVerificationOutcome;
|
||||||
use crate::data_column_verification::KzgVerifiedDataColumn;
|
use crate::data_column_verification::KzgVerifiedDataColumn;
|
||||||
|
use crate::payload_verification_types::PayloadImportData;
|
||||||
use crate::test_utils::{NumBlobs, generate_rand_payload_and_columns, test_spec};
|
use crate::test_utils::{NumBlobs, generate_rand_payload_and_columns, test_spec};
|
||||||
use fork_choice::PayloadVerificationStatus;
|
use fork_choice::PayloadVerificationStatus;
|
||||||
use kzg::KzgCommitment;
|
use kzg::KzgCommitment;
|
||||||
use rand::SeedableRng;
|
use rand::SeedableRng;
|
||||||
use rand::rngs::StdRng;
|
use rand::rngs::StdRng;
|
||||||
use ssz_types::VariableList;
|
use ssz_types::VariableList;
|
||||||
|
use state_processing::ConsensusContext;
|
||||||
use types::test_utils::TestRandom;
|
use types::test_utils::TestRandom;
|
||||||
use types::{ForkName, MainnetEthSpec, SignedExecutionPayloadEnvelope, Slot};
|
use types::{BeaconState, ForkName, MainnetEthSpec, SignedExecutionPayloadEnvelope, Slot};
|
||||||
|
|
||||||
type E = MainnetEthSpec;
|
type E = MainnetEthSpec;
|
||||||
|
|
||||||
@@ -1093,7 +924,7 @@ mod pending_components_tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type PendingComponentsSetup = (
|
type PendingComponentsSetup = (
|
||||||
DietAvailabilityPendingExecutedPayload<E>,
|
AvailabilityPendingExecutedPayload<E>,
|
||||||
Vec<KzgVerifiedCustodyDataColumn<E>>,
|
Vec<KzgVerifiedCustodyDataColumn<E>>,
|
||||||
Vec<KzgVerifiedCustodyDataColumn<E>>,
|
Vec<KzgVerifiedCustodyDataColumn<E>>,
|
||||||
);
|
);
|
||||||
@@ -1121,15 +952,19 @@ mod pending_components_tests {
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let diet_payload = DietAvailabilityPendingExecutedPayload::new_for_testing(
|
let executed_payload = AvailabilityPendingExecutedPayload::new(
|
||||||
Arc::new(payload),
|
Arc::new(payload.clone()),
|
||||||
|
PayloadImportData {
|
||||||
|
state: BeaconState::new(0, Default::default(), &test_spec::<E>()),
|
||||||
|
consensus_context: ConsensusContext::new(payload.message.slot),
|
||||||
|
},
|
||||||
PayloadVerificationOutcome {
|
PayloadVerificationOutcome {
|
||||||
payload_verification_status: PayloadVerificationStatus::Verified,
|
payload_verification_status: PayloadVerificationStatus::Verified,
|
||||||
is_valid_merge_transition_block: false,
|
is_valid_merge_transition_block: false,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
(diet_payload, columns, invalid_columns)
|
(executed_payload, columns, invalid_columns)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn assert_cache_consistent(cache: &PendingComponents<E>) {
|
fn assert_cache_consistent(cache: &PendingComponents<E>) {
|
||||||
@@ -1168,11 +1003,11 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
let mut cache = <PendingComponents<E>>::empty(block_root);
|
let mut cache = <PendingComponents<E>>::empty(block_root);
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
cache
|
cache
|
||||||
.merge_data_columns(invalid_columns)
|
.merge_data_columns(invalid_columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
@@ -1191,14 +1026,14 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
let mut cache = <PendingComponents<E>>::empty(block_root);
|
let mut cache = <PendingComponents<E>>::empty(block_root);
|
||||||
cache
|
cache
|
||||||
.merge_data_columns(invalid_columns)
|
.merge_data_columns(invalid_columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
cache
|
cache
|
||||||
.merge_data_columns(columns)
|
.merge_data_columns(columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
@@ -1213,7 +1048,7 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
|
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
@@ -1224,7 +1059,7 @@ mod pending_components_tests {
|
|||||||
cache
|
cache
|
||||||
.merge_data_columns(columns)
|
.merge_data_columns(columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
|
|
||||||
// Invalid columns were first, valid ones deduplicated away.
|
// Invalid columns were first, valid ones deduplicated away.
|
||||||
assert!(!cache.verified_data_columns.is_empty());
|
assert!(!cache.verified_data_columns.is_empty());
|
||||||
@@ -1236,12 +1071,12 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
|
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
let mut cache = <PendingComponents<E>>::empty(block_root);
|
let mut cache = <PendingComponents<E>>::empty(block_root);
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
cache
|
cache
|
||||||
.merge_data_columns(columns)
|
.merge_data_columns(columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
@@ -1259,7 +1094,7 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
|
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
@@ -1267,7 +1102,7 @@ mod pending_components_tests {
|
|||||||
cache
|
cache
|
||||||
.merge_data_columns(columns)
|
.merge_data_columns(columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
cache
|
cache
|
||||||
.merge_data_columns(invalid_columns)
|
.merge_data_columns(invalid_columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
@@ -1282,7 +1117,7 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, columns, invalid_columns) = pre_setup();
|
let (payload, columns, invalid_columns) = pre_setup();
|
||||||
let (diet_payload, columns, invalid_columns) =
|
let (executed_payload, columns, invalid_columns) =
|
||||||
setup_pending_components(payload, columns, invalid_columns);
|
setup_pending_components(payload, columns, invalid_columns);
|
||||||
|
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
@@ -1293,7 +1128,7 @@ mod pending_components_tests {
|
|||||||
cache
|
cache
|
||||||
.merge_data_columns(invalid_columns)
|
.merge_data_columns(invalid_columns)
|
||||||
.expect("merge should succeed");
|
.expect("merge should succeed");
|
||||||
cache.merge_payload(diet_payload);
|
cache.merge_payload(executed_payload);
|
||||||
|
|
||||||
// Valid columns were inserted first, so they persist. Cache should be consistent.
|
// Valid columns were inserted first, so they persist. Cache should be consistent.
|
||||||
assert_cache_consistent(&cache);
|
assert_cache_consistent(&cache);
|
||||||
@@ -1305,7 +1140,7 @@ mod pending_components_tests {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let (payload, _columns, _invalid_columns) = pre_setup();
|
let (payload, _columns, _invalid_columns) = pre_setup();
|
||||||
let (diet_payload, _columns, _invalid_columns) =
|
let (executed_payload, _columns, _invalid_columns) =
|
||||||
setup_pending_components(payload.clone(), _columns, _invalid_columns);
|
setup_pending_components(payload.clone(), _columns, _invalid_columns);
|
||||||
|
|
||||||
let block_root = Hash256::ZERO;
|
let block_root = Hash256::ZERO;
|
||||||
@@ -1322,7 +1157,7 @@ mod pending_components_tests {
|
|||||||
"pre execution payload inserted"
|
"pre execution payload inserted"
|
||||||
);
|
);
|
||||||
|
|
||||||
pending_component.insert_executed_payload(diet_payload);
|
pending_component.insert_executed_payload(executed_payload);
|
||||||
assert!(
|
assert!(
|
||||||
matches!(pending_component.payload, Some(CachedPayload::Executed(_))),
|
matches!(pending_component.payload, Some(CachedPayload::Executed(_))),
|
||||||
"executed payload inserted"
|
"executed payload inserted"
|
||||||
|
|||||||
@@ -1,156 +0,0 @@
|
|||||||
#![allow(dead_code)]
|
|
||||||
use crate::payload_verification_types::{AvailabilityPendingExecutedPayload, PayloadImportData};
|
|
||||||
use crate::{
|
|
||||||
BeaconChainTypes, BeaconStore, PayloadVerificationOutcome,
|
|
||||||
data_availability_checker_v2::{AvailabilityCheckError, STATE_LRU_CAPACITY_NON_ZERO},
|
|
||||||
};
|
|
||||||
use lru::LruCache;
|
|
||||||
use parking_lot::RwLock;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use store::OnDiskConsensusContext;
|
|
||||||
use tracing::{Span, instrument};
|
|
||||||
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedExecutionPayloadEnvelope};
|
|
||||||
|
|
||||||
/// This mirrors everything in the `AvailabilityPendingExecutedBlock`, except
|
|
||||||
/// that it is much smaller because it contains only a state root instead of
|
|
||||||
/// a full `BeaconState`.
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct DietAvailabilityPendingExecutedPayload<E: EthSpec> {
|
|
||||||
payload: Arc<SignedExecutionPayloadEnvelope<E>>,
|
|
||||||
state_root: Hash256,
|
|
||||||
consensus_context: OnDiskConsensusContext<E>,
|
|
||||||
payload_verification_outcome: PayloadVerificationOutcome,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Implementing the same methods as `AvailabilityPendingExecutedPayload`
|
|
||||||
impl<E: EthSpec> DietAvailabilityPendingExecutedPayload<E> {
|
|
||||||
#[cfg(test)]
|
|
||||||
pub fn new_for_testing(
|
|
||||||
payload: Arc<SignedExecutionPayloadEnvelope<E>>,
|
|
||||||
payload_verification_outcome: PayloadVerificationOutcome,
|
|
||||||
) -> Self {
|
|
||||||
use state_processing::ConsensusContext;
|
|
||||||
use types::Slot;
|
|
||||||
Self {
|
|
||||||
payload,
|
|
||||||
state_root: Hash256::ZERO,
|
|
||||||
consensus_context: OnDiskConsensusContext::from_consensus_context(
|
|
||||||
ConsensusContext::new(Slot::new(0)),
|
|
||||||
),
|
|
||||||
payload_verification_outcome,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn as_payload(&self) -> &SignedExecutionPayloadEnvelope<E> {
|
|
||||||
&self.payload
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn payload_cloned(&self) -> Arc<SignedExecutionPayloadEnvelope<E>> {
|
|
||||||
self.payload.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn num_blobs_expected(&self) -> usize {
|
|
||||||
self.payload.message.blob_kzg_commitments.len()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This LRU cache holds BeaconStates used for payload import. If the cache overflows,
|
|
||||||
/// the least recently used state will be dropped. If the dropped state is needed
|
|
||||||
/// later on, it will be recovered from the parent state and replaying the payload.
|
|
||||||
///
|
|
||||||
/// WARNING: This cache assumes the parent block of any `AvailabilityPendingExecutedPayload`
|
|
||||||
/// has already been imported into ForkChoice. If this is not the case, the cache
|
|
||||||
/// will fail to recover the state when the cache overflows because it can't load
|
|
||||||
/// the parent state!
|
|
||||||
pub struct StateLRUCache<T: BeaconChainTypes> {
|
|
||||||
states: RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>>,
|
|
||||||
store: BeaconStore<T>,
|
|
||||||
spec: Arc<ChainSpec>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: BeaconChainTypes> StateLRUCache<T> {
|
|
||||||
pub fn new(store: BeaconStore<T>, spec: Arc<ChainSpec>) -> Self {
|
|
||||||
Self {
|
|
||||||
states: RwLock::new(LruCache::new(STATE_LRU_CAPACITY_NON_ZERO)),
|
|
||||||
store,
|
|
||||||
spec,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This will store the state in the LRU cache and return a
|
|
||||||
/// `DietAvailabilityPendingExecutedPayload` which is much cheaper to
|
|
||||||
/// keep around in memory.
|
|
||||||
pub fn register_pending_executed_payload(
|
|
||||||
&self,
|
|
||||||
executed_payload: AvailabilityPendingExecutedPayload<T::EthSpec>,
|
|
||||||
) -> DietAvailabilityPendingExecutedPayload<T::EthSpec> {
|
|
||||||
let state = executed_payload.import_data.state;
|
|
||||||
let state_root = executed_payload.payload.message.state_root;
|
|
||||||
self.states.write().put(state_root, state);
|
|
||||||
|
|
||||||
DietAvailabilityPendingExecutedPayload {
|
|
||||||
payload: executed_payload.payload,
|
|
||||||
state_root,
|
|
||||||
consensus_context: OnDiskConsensusContext::from_consensus_context(
|
|
||||||
executed_payload.import_data.consensus_context,
|
|
||||||
),
|
|
||||||
payload_verification_outcome: executed_payload.payload_verification_outcome,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Recover the `AvailabilityPendingExecutedPayload` from the diet version.
|
|
||||||
/// This method will first check the cache and if the state is not found
|
|
||||||
/// it will reconstruct the state by loading the parent state from disk and
|
|
||||||
/// replaying the block.
|
|
||||||
#[instrument(skip_all, parent = _span, level = "debug")]
|
|
||||||
pub fn recover_pending_executed_payload(
|
|
||||||
&self,
|
|
||||||
diet_executed_payload: DietAvailabilityPendingExecutedPayload<T::EthSpec>,
|
|
||||||
_span: &Span,
|
|
||||||
) -> Result<AvailabilityPendingExecutedPayload<T::EthSpec>, AvailabilityCheckError> {
|
|
||||||
// Keep the state in the cache to prevent reconstruction in race conditions
|
|
||||||
let state = if let Some(state) = self.states.write().get(&diet_executed_payload.state_root)
|
|
||||||
{
|
|
||||||
state.clone()
|
|
||||||
} else {
|
|
||||||
self.reconstruct_state(&diet_executed_payload)?
|
|
||||||
};
|
|
||||||
Ok(AvailabilityPendingExecutedPayload {
|
|
||||||
payload: diet_executed_payload.payload,
|
|
||||||
import_data: PayloadImportData {
|
|
||||||
state,
|
|
||||||
consensus_context: diet_executed_payload
|
|
||||||
.consensus_context
|
|
||||||
.into_consensus_context(),
|
|
||||||
},
|
|
||||||
payload_verification_outcome: diet_executed_payload.payload_verification_outcome,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Reconstruct the state by loading the parent state from disk and replaying
|
|
||||||
/// the block.
|
|
||||||
#[instrument(skip_all, level = "debug")]
|
|
||||||
fn reconstruct_state(
|
|
||||||
&self,
|
|
||||||
_diet_executed_block: &DietAvailabilityPendingExecutedPayload<T::EthSpec>,
|
|
||||||
) -> Result<BeaconState<T::EthSpec>, AvailabilityCheckError> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// returns the state cache for inspection
|
|
||||||
pub fn lru_cache(&self) -> &RwLock<LruCache<Hash256, BeaconState<T::EthSpec>>> {
|
|
||||||
&self.states
|
|
||||||
}
|
|
||||||
|
|
||||||
/// remove any states from the cache from before the given epoch
|
|
||||||
pub fn do_maintenance(&self, cutoff_epoch: Epoch) {
|
|
||||||
let mut write_lock = self.states.write();
|
|
||||||
while let Some((_, state)) = write_lock.peek_lru() {
|
|
||||||
if state.slot().epoch(T::EthSpec::slots_per_epoch()) < cutoff_epoch {
|
|
||||||
write_lock.pop_lru();
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -5,7 +5,7 @@ use types::{BeaconState, BlockImportSource, EthSpec, SignedExecutionPayloadEnvel
|
|||||||
|
|
||||||
use crate::{PayloadVerificationOutcome, data_availability_checker_v2::AvailablePayload};
|
use crate::{PayloadVerificationOutcome, data_availability_checker_v2::AvailablePayload};
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub struct PayloadImportData<E: EthSpec> {
|
pub struct PayloadImportData<E: EthSpec> {
|
||||||
pub state: BeaconState<E>,
|
pub state: BeaconState<E>,
|
||||||
pub consensus_context: ConsensusContext<E>,
|
pub consensus_context: ConsensusContext<E>,
|
||||||
@@ -13,6 +13,7 @@ pub struct PayloadImportData<E: EthSpec> {
|
|||||||
|
|
||||||
/// A payload that has completed payload verification by an EL client but does not
|
/// A payload that has completed payload verification by an EL client but does not
|
||||||
/// have all requisite column data to get imported into fork choice.
|
/// have all requisite column data to get imported into fork choice.
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct AvailabilityPendingExecutedPayload<E: EthSpec> {
|
pub struct AvailabilityPendingExecutedPayload<E: EthSpec> {
|
||||||
pub payload: Arc<SignedExecutionPayloadEnvelope<E>>,
|
pub payload: Arc<SignedExecutionPayloadEnvelope<E>>,
|
||||||
pub import_data: PayloadImportData<E>,
|
pub import_data: PayloadImportData<E>,
|
||||||
|
|||||||
Reference in New Issue
Block a user