Added NewPayloadCache

This commit is contained in:
Mark Mackey
2025-10-20 12:51:21 -05:00
parent 29e5a1f599
commit f3b79839a1
2 changed files with 159 additions and 38 deletions

View File

@@ -39,7 +39,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use strum::AsRefStr;
use task_executor::TaskExecutor;
use tokio::{
sync::{Mutex, MutexGuard, RwLock},
sync::{Mutex, MutexGuard, RwLock, broadcast},
time::sleep,
};
use tokio_stream::wrappers::WatchStream;
@@ -138,15 +138,15 @@ impl<E: EthSpec> TryFrom<BuilderBid<E>> for ProvenancedPayload<BlockProposalCont
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Error {
NoEngine,
NoPayloadBuilder,
ApiError(ApiError),
Builder(builder_client::Error),
ApiError(Arc<ApiError>),
Builder(Arc<builder_client::Error>),
NoHeaderFromBuilder,
CannotProduceHeader,
EngineError(Box<EngineError>),
EngineError(Arc<EngineError>),
NotSynced,
ShuttingDown,
FeeRecipientUnspecified,
@@ -177,7 +177,7 @@ impl From<BeaconStateError> for Error {
impl From<ApiError> for Error {
fn from(e: ApiError) -> Self {
Error::ApiError(e)
Error::ApiError(Arc::new(e))
}
}
@@ -186,12 +186,18 @@ impl From<EngineError> for Error {
match e {
// This removes an unnecessary layer of indirection.
// TODO (mark): consider refactoring these error enums
EngineError::Api { error } => Error::ApiError(error),
_ => Error::EngineError(Box::new(e)),
EngineError::Api { error } => Error::ApiError(Arc::new(error)),
_ => Error::EngineError(Arc::new(e)),
}
}
}
impl From<builder_client::Error> for Error {
fn from(e: builder_client::Error) -> Self {
Error::Builder(Arc::new(e))
}
}
pub enum BlockProposalContentsType<E: EthSpec> {
Full(BlockProposalContents<E, FullPayload<E>>),
Blinded(BlockProposalContents<E, BlindedPayload<E>>),
@@ -418,6 +424,108 @@ pub enum SubmitBlindedBlockResponse<E: EthSpec> {
type PayloadContentsRefTuple<'a, E> = (ExecutionPayloadRef<'a, E>, Option<&'a BlobsBundle<E>>);
/// Cache for deduplicating new payload requests.
///
/// Handles both in-flight requests and recently completed requests to avoid
/// duplicate network calls to the execution engine.
struct NewPayloadCache {
inner: Mutex<NewPayloadCacheInner>,
}
struct NewPayloadCacheInner {
/// In-flight requests mapped by block hash
in_flight: HashMap<ExecutionBlockHash, broadcast::Sender<Result<PayloadStatus, Error>>>,
/// Recently completed requests with their completion time
completed: LruCache<ExecutionBlockHash, (Instant, Result<PayloadStatus, Error>)>,
}
impl NewPayloadCache {
/// Cache TTL for completed requests (12 seconds)
const COMPLETED_TTL: Duration = Duration::from_secs(12);
/// Maximum number of completed requests to cache
const COMPLETED_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(32);
fn new() -> Self {
Self {
inner: Mutex::new(NewPayloadCacheInner {
in_flight: HashMap::new(),
completed: LruCache::new(Self::COMPLETED_CACHE_SIZE),
}),
}
}
/// Get cached result or execute the provided function.
///
/// Returns a future that resolves to the payload status. Handles:
/// 1. Returning cached completed results (if not expired)
/// 2. Joining in-flight requests
/// 3. Executing new requests and caching results
async fn get_or_execute<F, Fut>(
&self,
block_hash: ExecutionBlockHash,
execute_fn: F,
) -> Result<PayloadStatus, Error>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<PayloadStatus, Error>>,
{
let now = Instant::now();
// Single lock acquisition to handle all cases
let mut cache = self.inner.lock().await;
// 1. Check completed cache first
if let Some((timestamp, result)) = cache.completed.get(&block_hash) {
if now.duration_since(*timestamp) < Self::COMPLETED_TTL {
return result.clone();
} else {
// Entry expired, remove it
cache.completed.pop(&block_hash);
}
}
// 2. Check in-flight requests
if let Some(sender) = cache.in_flight.get(&block_hash) {
let mut receiver = sender.subscribe();
drop(cache); // Release lock early
match receiver.recv().await {
Ok(result) => return result,
Err(_) => {
// Sender was dropped, fall through to execute new request
error!(
"NewPayloadCache: Sender was dropped for block hash {}. This shouldn't happen.",
block_hash
);
// just call the execute_fn again
return execute_fn().await;
}
}
}
// 3. Start new request
let (sender, _receiver) = broadcast::channel(1);
cache.in_flight.insert(block_hash, sender.clone());
drop(cache); // Release lock for execution
// Execute the function
let result = execute_fn().await;
// Update cache with result
let mut cache = self.inner.lock().await;
cache.in_flight.remove(&block_hash);
cache
.completed
.put(block_hash, (Instant::now(), result.clone()));
drop(cache);
// Broadcast result to any waiting receivers
let _ = sender.send(result.clone());
result
}
}
struct Inner<E: EthSpec> {
engine: Arc<Engine>,
builder: ArcSwapOption<BuilderHttpClient>,
@@ -433,6 +541,10 @@ struct Inner<E: EthSpec> {
/// This is used *only* in the informational sync status endpoint, so that a VC using this
/// node can prefer another node with a healthier EL.
last_new_payload_errored: RwLock<bool>,
/// Cache for deduplicating `notify_new_payload` requests.
///
/// Handles both in-flight requests and recently completed requests.
new_payload_cache: NewPayloadCache,
}
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
@@ -523,8 +635,8 @@ impl<E: EthSpec> ExecutionLayer<E> {
let engine: Engine = {
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
debug!(endpoint = %execution_url, jwt_path = ?secret_file.as_path(),"Loaded execution endpoint");
let api = HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)
.map_err(Error::ApiError)?;
let api =
HttpJsonRpc::new_with_auth(execution_url, auth, execution_timeout_multiplier)?;
Engine::new(api, executor.clone())
};
@@ -539,6 +651,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
executor,
payload_cache: PayloadCache::default(),
last_new_payload_errored: RwLock::new(false),
new_payload_cache: NewPayloadCache::new(),
};
let el = Self {
@@ -582,7 +695,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
builder_header_timeout,
disable_ssz,
)
.map_err(Error::Builder)?;
.map_err(Into::<Error>::into)?;
info!(
?builder_url,
local_user_agent = builder_client.get_user_agent(),
@@ -1349,15 +1462,34 @@ impl<E: EthSpec> ExecutionLayer<E> {
Ok(GetPayloadResponseType::Full(payload_response))
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
/// Maps to the `engine_newPayload` JSON-RPC call.
///
/// Deduplicates concurrent requests with the same block hash - if multiple threads
/// call this function with the same block hash simultaneously, only one request
/// is sent to the execution engine, but all threads receive the same response.
/// Also caches recent results for a short time to avoid duplicate requests.
/// TODO(EIP-7732) figure out how and why Mark relaxed new_payload_request param's typ to NewPayloadRequest<E>
pub async fn notify_new_payload(
&self,
new_payload_request: NewPayloadRequest<'_, E>,
) -> Result<PayloadStatus, Error> {
let block_hash = new_payload_request.block_hash();
self.inner
.new_payload_cache
.get_or_execute(block_hash, || {
self.notify_new_payload_impl(new_payload_request)
})
.await
}
/// Internal implementation of notify_new_payload without deduplication logic.
async fn notify_new_payload_impl(
&self,
new_payload_request: NewPayloadRequest<'_, E>,
) -> Result<PayloadStatus, Error> {
let _timer = metrics::start_timer_vec(
&metrics::EXECUTION_LAYER_REQUEST_TIMES,
@@ -1391,9 +1523,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
}
*self.inner.last_new_payload_errored.write().await = result.is_err();
process_payload_status(block_hash, result)
.map_err(Box::new)
.map_err(Error::EngineError)
process_payload_status(block_hash, result).map_err(Into::into)
}
/// Update engine sync status.
@@ -1529,8 +1659,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
head_block_hash,
result.map(|response| response.payload_status),
)
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
/// Returns the execution engine capabilities resulting from a call to
@@ -1622,9 +1751,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
}
Ok(block.map(|b| b.block_hash))
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)?;
.await?;
if let Some(hash) = &hash_opt {
info!(
@@ -1734,8 +1861,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
Ok(None)
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
/// This function should remain internal.
@@ -1786,8 +1912,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
engine.api.get_payload_bodies_by_hash_v1(hashes).await
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
pub async fn get_payload_bodies_by_range(
@@ -1804,8 +1929,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
.await
})
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
/// Fetch a full payload from the execution node.
@@ -1867,8 +1991,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
self.engine()
.request(|engine| async move { engine.api.get_blobs_v1(query).await })
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
} else {
Err(Error::GetBlobsNotSupported)
}
@@ -1884,8 +2007,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
self.engine()
.request(|engine| async move { engine.api.get_blobs_v2(query).await })
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
} else {
Err(Error::GetBlobsNotSupported)
}
@@ -1898,8 +2020,7 @@ impl<E: EthSpec> ExecutionLayer<E> {
self.engine()
.request(|engine| async move { engine.api.get_block_by_number(query).await })
.await
.map_err(Box::new)
.map_err(Error::EngineError)
.map_err(Into::into)
}
pub async fn propose_blinded_beacon_block(
@@ -1948,12 +2069,12 @@ impl<E: EthSpec> ExecutionLayer<E> {
builder
.post_builder_blinded_blocks_v1_ssz(block)
.await
.map_err(Error::Builder)
.map_err(Into::into)
} else {
builder
.post_builder_blinded_blocks_v1(block)
.await
.map_err(Error::Builder)
.map_err(Into::into)
.map(|d| d.data)
}
})
@@ -2018,12 +2139,12 @@ impl<E: EthSpec> ExecutionLayer<E> {
builder
.post_builder_blinded_blocks_v2_ssz(block)
.await
.map_err(Error::Builder)
.map_err(Into::into)
} else {
builder
.post_builder_blinded_blocks_v2(block)
.await
.map_err(Error::Builder)
.map_err(Into::into)
}
})
.await;

View File

@@ -2,7 +2,7 @@ use alloy_consensus::TxEnvelope;
use alloy_rlp::Decodable;
use types::{EthSpec, ExecutionPayloadRef, Hash256, Unsigned, VersionedHash};
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Error {
DecodingTransaction(String),
LengthMismatch { expected: usize, found: usize },