Consolidate reqresp_pre_import_cache into data_availability_checker (#8045)

This PR consolidates the `reqresp_pre_import_cache` into the `data_availability_checker` for the following reasons:
- the `reqresp_pre_import_cache` suffers from the same TOCTOU bug we had with `data_availability_checker` earlier, and leads to unbounded memory leak, which we have observed over the last 6 months on some nodes.
- the `reqresp_pre_import_cache` is no longer necessary, because we now hold blocks in the `data_availability_checker` for longer since (#7961), and recent blocks can be served from the DA checker.

This PR also maintains the following functionalities
- Serving pre-executed blocks over RPC, and they're now served from the `data_availability_checker` instead.
- Using the cache for de-duplicating lookup requests.


  


Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>

Co-Authored-By: Jimmy Chen <jimmy@sigmaprime.io>
This commit is contained in:
Jimmy Chen
2025-09-19 17:01:13 +10:00
committed by GitHub
parent 4111bcb39b
commit 78d330e4b7
9 changed files with 239 additions and 208 deletions

View File

@@ -340,10 +340,6 @@ pub enum BlockProcessStatus<E: EthSpec> {
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
}
pub struct BeaconChainMetrics {
pub reqresp_pre_import_cache_len: usize,
}
pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);
pub type BeaconForkChoice<T> = ForkChoice<
@@ -363,9 +359,6 @@ pub type BeaconStore<T> = Arc<
>,
>;
/// Cache gossip verified blocks to serve over ReqResp before they are imported
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;
/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
@@ -462,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// Cache gossip verified blocks to serve over ReqResp before they are imported
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
@@ -1289,18 +1280,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
/// processing attempts.
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
if let Some(block) = self
.data_availability_checker
.get_execution_valid_block(block_root)
{
return BlockProcessStatus::ExecutionValidated(block);
}
if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
// A block is on the `reqresp_pre_import_cache` but NOT in the
// `data_availability_checker` only if it is actively processing. We can expect a future
// event with the result of processing
return BlockProcessStatus::NotValidated(block.clone());
if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) {
return cached_block;
}
BlockProcessStatus::Unknown
@@ -3054,8 +3035,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
self.check_gossip_blob_availability_and_import(blob).await
}
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
@@ -3092,15 +3072,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_columns.iter().map(|column| column.as_data_column()),
);
let r = self
.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await;
self.remove_notified(&block_root, r)
self.check_gossip_data_columns_availability_and_import(
slot,
block_root,
data_columns,
publish_fn,
)
.await
}
/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
@@ -3139,10 +3117,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
self.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await
}
/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
@@ -3174,10 +3150,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
let r = self
.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await;
self.remove_notified(&block_root, r)
self.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
.await
}
fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
@@ -3270,10 +3244,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
custody_columns.iter().map(|column| column.as_ref()),
);
let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await
}
pub async fn reconstruct_data_columns(
@@ -3320,10 +3292,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(None);
};
let r = self
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
self.process_availability(slot, availability, || Ok(()))
.await
.map(|availability_processing_status| {
Some((availability_processing_status, data_columns_to_publish))
})
@@ -3340,46 +3310,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}
/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
block_source: BlockImportSource,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());
let r = self
.process_block(
block_root,
unverified_block,
notify_execution_layer,
block_source,
|| Ok(()),
)
.await;
self.remove_notified(&block_root, r)
}
/// Check for known and configured invalid block roots before processing.
pub fn check_invalid_block_roots(&self, block_root: Hash256) -> Result<(), BlockError> {
if self.config.invalid_block_roots.contains(&block_root) {
@@ -3411,12 +3341,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_source: BlockImportSource,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
let block_slot = unverified_block.block().slot();
// Set observed time if not already set. Usually this should be set by gossip or RPC,
@@ -3431,6 +3355,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
self.data_availability_checker
.put_pre_execution_block(block_root, unverified_block.block_cloned())?;
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
@@ -3448,7 +3381,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.set_time_consensus_verified(block_root, block_slot, timestamp)
}
let executed_block = chain.into_executed_block(execution_pending).await?;
let executed_block = chain
.into_executed_block(execution_pending)
.await
.inspect_err(|_| {
// If the block fails execution for whatever reason (e.g. engine offline),
// and we keep it in the cache, then the node will NOT perform lookup and
// reprocess this block until the block is evicted from DA checker, causing the
// chain to get stuck temporarily if the block is canonical. Therefore we remove
// it from the cache if execution fails.
self.data_availability_checker
.remove_block_on_execution_error(&block_root);
})?;
// Record the *additional* time it took to wait for execution layer verification.
if let Some(timestamp) = self.slot_clock.now_duration() {
@@ -3574,9 +3518,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let slot = block.block.slot();
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
let availability = self.data_availability_checker.put_executed_block(block)?;
self.process_availability(slot, availability, || Ok(()))
.await
}
@@ -7156,12 +7098,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}
pub fn metrics(&self) -> BeaconChainMetrics {
BeaconChainMetrics {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,