From 2ab1ffe10dfef6442ce77314e7acbe966c6868fd Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 28 Apr 2026 22:34:45 +0200 Subject: [PATCH] fix --- .../src/beacon/execution_payload_envelope.rs | 122 +++++------------- 1 file changed, 33 insertions(+), 89 deletions(-) diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index ad0c46c6ad..326943068e 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -7,7 +7,9 @@ use crate::version::{ execution_optimistic_finalized_beacon_response, }; use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; -use beacon_chain::{BeaconChain, BeaconChainTypes}; +use beacon_chain::payload_envelope_verification::EnvelopeError; +use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope; +use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer}; use bytes::Bytes; use eth2::types as api_types; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; @@ -19,7 +21,7 @@ use std::future::Future; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error, info, warn}; -use types::{EthSpec, SignedExecutionPayloadEnvelope}; +use types::{BlockImportSource, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope}; use warp::{ Filter, Rejection, Reply, hyper::{Body, Response}, @@ -89,9 +91,8 @@ pub(crate) fn post_beacon_execution_payload_envelope( ) .boxed() } -/// Publishes a signed execution payload envelope to the network. Implements -/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR -/// . + +/// Publishes a signed execution payload envelope to the network. pub async fn publish_execution_payload_envelope( envelope: SignedExecutionPayloadEnvelope, chain: Arc>, @@ -111,30 +112,20 @@ pub async fn publish_execution_payload_envelope( let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot); - // Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before - // publishing the envelope so it runs in parallel with envelope gossip, narrowing - // the window in which peers see envelope-without-columns. If envelope publication - // fails below, dropping this future drops the spawned `JoinHandle` (the running - // closure on the blocking pool finishes and is then discarded — no work cancellation). - let column_build_future = match blobs_and_proofs { - Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task( - &chain, - beacon_block_root, - slot, - blobs, - )?), - _ => None, - }; - - // Publish the envelope to the network. - crate::utils::publish_pubsub_message( - network_tx, - PubsubMessage::ExecutionPayload(Box::new(envelope)), - ) - .map_err(|_| { - warn!(%slot, "Failed to publish execution payload envelope to network"); - warp_utils::reject::custom_server_error( - "Unable to publish execution payload envelope to network".into(), + // The publish_fn is called inside process_execution_payload_envelope after consensus + // verification but before the EL call. + let envelope_for_publish = signed_envelope.clone(); + let sender = network_tx.clone(); + let publish_fn = move || { + info!( + %slot, + %beacon_block_root, + builder_index, + "Publishing signed execution payload envelope to network" + ); + crate::utils::publish_pubsub_message( + &sender, + PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())), ) .map_err(|_| { warn!(%slot, "Failed to publish execution payload envelope to network"); @@ -174,7 +165,7 @@ pub async fn publish_execution_payload_envelope( })?; // Build and publish data column sidecars from the blobs. - if let Some((blobs, _kzg_proofs)) = blobs_and_proofs + if let Some(blobs) = blobs_and_proofs && !blobs.is_empty() { let gossip_verified_columns = spawn_build_gloas_data_columns_task( @@ -215,81 +206,34 @@ pub async fn publish_execution_payload_envelope( } } - // From here on the envelope is on the wire. `take_blobs` already consumed the cache - // entry, so a retry would not republish columns; returning Err would mislead the - // caller. Log column-build/publish failures and fall through to `Ok`. - if let Some(column_build_future) = column_build_future { - let gossip_verified_columns = match column_build_future.await { - Ok(columns) => columns, - Err(e) => { - error!( - %slot, - error = ?e, - "Failed to build data columns after envelope publication" - ); - return Ok(warp::reply().into_response()); - } - }; - - if !gossip_verified_columns.is_empty() { - if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) { - error!( - %slot, - error = ?e, - "Failed to publish data column sidecars after envelope publication" - ); - return Ok(warp::reply().into_response()); - } - - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let sampling_column_indices = chain.sampling_columns_for_epoch(epoch); - let sampling_columns = gossip_verified_columns - .into_iter() - .filter(|col| sampling_column_indices.contains(&col.index())) - .collect::>(); - - // Local processing only — envelope already broadcast, so log and fall through. - if !sampling_columns.is_empty() - && let Err(e) = - Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await - { - error!( - %slot, - error = ?e, - "Failed to process sampling data columns during envelope publication" - ); - } - } - } - Ok(warp::reply().into_response()) } fn spawn_build_gloas_data_columns_task( - chain: &Arc>, + chain: Arc>, beacon_block_root: types::Hash256, + block: Arc>, slot: types::Slot, blobs: types::BlobsList, ) -> Result>, Rejection>>, Rejection> { - let chain_for_build = chain.clone(); - let handle = chain + chain + .clone() .task_executor .spawn_blocking_handle( - move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs), + move || build_gloas_data_columns(&chain, beacon_block_root, block, slot, &blobs), "build_gloas_data_columns", ) - .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?; - - Ok(async move { - handle - .await - .map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))? - }) + .ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string())) + .map(|r| { + r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string())) + .and_then(|output| async move { output }) + }) } fn build_gloas_data_columns( chain: &BeaconChain, beacon_block_root: types::Hash256, + block: Arc>, slot: types::Slot, blobs: &types::BlobsList, ) -> Result>, Rejection> { @@ -314,7 +258,7 @@ fn build_gloas_data_columns( .into_iter() .filter_map(|col| { let index = *col.index(); - match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) { + match GossipVerifiedDataColumn::new_for_block_publishing(col, &block, chain) { Ok(verified) => Some(verified), Err(GossipDataColumnError::PriorKnownUnpublished) => None, Err(e) => {