From f3b79839a198223c0f0dc25ce2148e1f0d3f32da Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Mon, 20 Oct 2025 12:51:21 -0500 Subject: [PATCH] Added NewPayloadCache --- beacon_node/execution_layer/src/lib.rs | 195 ++++++++++++++---- .../execution_layer/src/versioned_hashes.rs | 2 +- 2 files changed, 159 insertions(+), 38 deletions(-) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 1983db57eb..d361b6840f 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -39,7 +39,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use strum::AsRefStr; use task_executor::TaskExecutor; use tokio::{ - sync::{Mutex, MutexGuard, RwLock}, + sync::{Mutex, MutexGuard, RwLock, broadcast}, time::sleep, }; use tokio_stream::wrappers::WatchStream; @@ -138,15 +138,15 @@ impl TryFrom> for ProvenancedPayload), + Builder(Arc), NoHeaderFromBuilder, CannotProduceHeader, - EngineError(Box), + EngineError(Arc), NotSynced, ShuttingDown, FeeRecipientUnspecified, @@ -177,7 +177,7 @@ impl From for Error { impl From for Error { fn from(e: ApiError) -> Self { - Error::ApiError(e) + Error::ApiError(Arc::new(e)) } } @@ -186,12 +186,18 @@ impl From for Error { match e { // This removes an unnecessary layer of indirection. // TODO (mark): consider refactoring these error enums - EngineError::Api { error } => Error::ApiError(error), - _ => Error::EngineError(Box::new(e)), + EngineError::Api { error } => Error::ApiError(Arc::new(error)), + _ => Error::EngineError(Arc::new(e)), } } } +impl From for Error { + fn from(e: builder_client::Error) -> Self { + Error::Builder(Arc::new(e)) + } +} + pub enum BlockProposalContentsType { Full(BlockProposalContents>), Blinded(BlockProposalContents>), @@ -418,6 +424,108 @@ pub enum SubmitBlindedBlockResponse { type PayloadContentsRefTuple<'a, E> = (ExecutionPayloadRef<'a, E>, Option<&'a BlobsBundle>); +/// Cache for deduplicating new payload requests. +/// +/// Handles both in-flight requests and recently completed requests to avoid +/// duplicate network calls to the execution engine. +struct NewPayloadCache { + inner: Mutex, +} + +struct NewPayloadCacheInner { + /// In-flight requests mapped by block hash + in_flight: HashMap>>, + /// Recently completed requests with their completion time + completed: LruCache)>, +} + +impl NewPayloadCache { + /// Cache TTL for completed requests (12 seconds) + const COMPLETED_TTL: Duration = Duration::from_secs(12); + /// Maximum number of completed requests to cache + const COMPLETED_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32); + + fn new() -> Self { + Self { + inner: Mutex::new(NewPayloadCacheInner { + in_flight: HashMap::new(), + completed: LruCache::new(Self::COMPLETED_CACHE_SIZE), + }), + } + } + + /// Get cached result or execute the provided function. + /// + /// Returns a future that resolves to the payload status. Handles: + /// 1. Returning cached completed results (if not expired) + /// 2. Joining in-flight requests + /// 3. Executing new requests and caching results + async fn get_or_execute( + &self, + block_hash: ExecutionBlockHash, + execute_fn: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: Future>, + { + let now = Instant::now(); + + // Single lock acquisition to handle all cases + let mut cache = self.inner.lock().await; + + // 1. Check completed cache first + if let Some((timestamp, result)) = cache.completed.get(&block_hash) { + if now.duration_since(*timestamp) < Self::COMPLETED_TTL { + return result.clone(); + } else { + // Entry expired, remove it + cache.completed.pop(&block_hash); + } + } + + // 2. Check in-flight requests + if let Some(sender) = cache.in_flight.get(&block_hash) { + let mut receiver = sender.subscribe(); + drop(cache); // Release lock early + + match receiver.recv().await { + Ok(result) => return result, + Err(_) => { + // Sender was dropped, fall through to execute new request + error!( + "NewPayloadCache: Sender was dropped for block hash {}. This shouldn't happen.", + block_hash + ); + // just call the execute_fn again + return execute_fn().await; + } + } + } + + // 3. Start new request + let (sender, _receiver) = broadcast::channel(1); + cache.in_flight.insert(block_hash, sender.clone()); + drop(cache); // Release lock for execution + + // Execute the function + let result = execute_fn().await; + + // Update cache with result + let mut cache = self.inner.lock().await; + cache.in_flight.remove(&block_hash); + cache + .completed + .put(block_hash, (Instant::now(), result.clone())); + drop(cache); + + // Broadcast result to any waiting receivers + let _ = sender.send(result.clone()); + + result + } +} + struct Inner { engine: Arc, builder: ArcSwapOption, @@ -433,6 +541,10 @@ struct Inner { /// This is used *only* in the informational sync status endpoint, so that a VC using this /// node can prefer another node with a healthier EL. last_new_payload_errored: RwLock, + /// Cache for deduplicating `notify_new_payload` requests. + /// + /// Handles both in-flight requests and recently completed requests. + new_payload_cache: NewPayloadCache, } #[derive(Debug, Default, Clone, Serialize, Deserialize)] @@ -523,8 +635,8 @@ impl ExecutionLayer { let engine: Engine = { let auth = Auth::new(jwt_key, jwt_id, jwt_version); debug!(endpoint = %execution_url, jwt_path = ?secret_file.as_path(),"Loaded execution endpoint"); - let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier) - .map_err(Error::ApiError)?; + let api = + HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)?; Engine::new(api, executor.clone()) }; @@ -539,6 +651,7 @@ impl ExecutionLayer { executor, payload_cache: PayloadCache::default(), last_new_payload_errored: RwLock::new(false), + new_payload_cache: NewPayloadCache::new(), }; let el = Self { @@ -582,7 +695,7 @@ impl ExecutionLayer { builder_header_timeout, disable_ssz, ) - .map_err(Error::Builder)?; + .map_err(Into::::into)?; info!( ?builder_url, local_user_agent = builder_client.get_user_agent(), @@ -1349,15 +1462,34 @@ impl ExecutionLayer { Ok(GetPayloadResponseType::Full(payload_response)) }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } /// Maps to the `engine_newPayload` JSON-RPC call. + /// + /// Deduplicates concurrent requests with the same block hash - if multiple threads + /// call this function with the same block hash simultaneously, only one request + /// is sent to the execution engine, but all threads receive the same response. + /// Also caches recent results for a short time to avoid duplicate requests. /// TODO(EIP-7732) figure out how and why Mark relaxed new_payload_request param's typ to NewPayloadRequest pub async fn notify_new_payload( &self, new_payload_request: NewPayloadRequest<'_, E>, + ) -> Result { + let block_hash = new_payload_request.block_hash(); + + self.inner + .new_payload_cache + .get_or_execute(block_hash, || { + self.notify_new_payload_impl(new_payload_request) + }) + .await + } + + /// Internal implementation of notify_new_payload without deduplication logic. + async fn notify_new_payload_impl( + &self, + new_payload_request: NewPayloadRequest<'_, E>, ) -> Result { let _timer = metrics::start_timer_vec( &metrics::EXECUTION_LAYER_REQUEST_TIMES, @@ -1391,9 +1523,7 @@ impl ExecutionLayer { } *self.inner.last_new_payload_errored.write().await = result.is_err(); - process_payload_status(block_hash, result) - .map_err(Box::new) - .map_err(Error::EngineError) + process_payload_status(block_hash, result).map_err(Into::into) } /// Update engine sync status. @@ -1529,8 +1659,7 @@ impl ExecutionLayer { head_block_hash, result.map(|response| response.payload_status), ) - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } /// Returns the execution engine capabilities resulting from a call to @@ -1622,9 +1751,7 @@ impl ExecutionLayer { } Ok(block.map(|b| b.block_hash)) }) - .await - .map_err(Box::new) - .map_err(Error::EngineError)?; + .await?; if let Some(hash) = &hash_opt { info!( @@ -1734,8 +1861,7 @@ impl ExecutionLayer { Ok(None) }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } /// This function should remain internal. @@ -1786,8 +1912,7 @@ impl ExecutionLayer { engine.api.get_payload_bodies_by_hash_v1(hashes).await }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } pub async fn get_payload_bodies_by_range( @@ -1804,8 +1929,7 @@ impl ExecutionLayer { .await }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } /// Fetch a full payload from the execution node. @@ -1867,8 +1991,7 @@ impl ExecutionLayer { self.engine() .request(|engine| async move { engine.api.get_blobs_v1(query).await }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } else { Err(Error::GetBlobsNotSupported) } @@ -1884,8 +2007,7 @@ impl ExecutionLayer { self.engine() .request(|engine| async move { engine.api.get_blobs_v2(query).await }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } else { Err(Error::GetBlobsNotSupported) } @@ -1898,8 +2020,7 @@ impl ExecutionLayer { self.engine() .request(|engine| async move { engine.api.get_block_by_number(query).await }) .await - .map_err(Box::new) - .map_err(Error::EngineError) + .map_err(Into::into) } pub async fn propose_blinded_beacon_block( @@ -1948,12 +2069,12 @@ impl ExecutionLayer { builder .post_builder_blinded_blocks_v1_ssz(block) .await - .map_err(Error::Builder) + .map_err(Into::into) } else { builder .post_builder_blinded_blocks_v1(block) .await - .map_err(Error::Builder) + .map_err(Into::into) .map(|d| d.data) } }) @@ -2018,12 +2139,12 @@ impl ExecutionLayer { builder .post_builder_blinded_blocks_v2_ssz(block) .await - .map_err(Error::Builder) + .map_err(Into::into) } else { builder .post_builder_blinded_blocks_v2(block) .await - .map_err(Error::Builder) + .map_err(Into::into) } }) .await; diff --git a/beacon_node/execution_layer/src/versioned_hashes.rs b/beacon_node/execution_layer/src/versioned_hashes.rs index 97c3100de9..b895db6388 100644 --- a/beacon_node/execution_layer/src/versioned_hashes.rs +++ b/beacon_node/execution_layer/src/versioned_hashes.rs @@ -2,7 +2,7 @@ use alloy_consensus::TxEnvelope; use alloy_rlp::Decodable; use types::{EthSpec, ExecutionPayloadRef, Hash256, Unsigned, VersionedHash}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Error { DecodingTransaction(String), LengthMismatch { expected: usize, found: usize },