Gloas publish data columns during local block building (#9182)

Make sure we are publishing columns during local block production


  


Co-Authored-By: Eitan Seri-Levi <eserilev@ucsc.edu>

Co-Authored-By: dapplion <35266934+dapplion@users.noreply.github.com>
This commit is contained in:
Eitan Seri-Levi
2026-04-28 15:19:47 +02:00
committed by GitHub
parent 4415cf0506
commit 6258eadc91
9 changed files with 545 additions and 57 deletions

View File

@@ -1,10 +1,12 @@
use crate::block_id::BlockId;
use crate::publish_blocks::publish_column_sidecars;
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use crate::version::{
ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_beacon_response,
};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bytes::Bytes;
use eth2::types as api_types;
@@ -12,10 +14,11 @@ use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::{Decode, Encode};
use std::future::Future;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
use types::SignedExecutionPayloadEnvelope;
use tracing::{debug, error, info, warn};
use types::{EthSpec, SignedExecutionPayloadEnvelope};
use warp::{
Filter, Rejection, Reply,
hyper::{Body, Response},
@@ -85,7 +88,9 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
)
.boxed()
}
/// Publishes a signed execution payload envelope to the network.
/// Publishes a signed execution payload envelope to the network. Implements
/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR
/// <https://github.com/ethereum/beacon-APIs/pull/580>.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
@@ -109,7 +114,24 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
"Publishing signed execution payload envelope to network"
);
// Publish to the network
let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot);
// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
// publishing the envelope so it runs in parallel with envelope gossip, narrowing
// the window in which peers see envelope-without-columns. If envelope publication
// fails below, dropping this future drops the spawned `JoinHandle` (the running
// closure on the blocking pool finishes and is then discarded — no work cancellation).
let column_build_future = match blobs_and_proofs {
Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task(
&chain,
beacon_block_root,
slot,
blobs,
)?),
_ => None,
};
// Publish the envelope to the network.
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
@@ -121,9 +143,130 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
)
})?;
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
// entry, so a retry would not republish columns; returning Err would mislead the
// caller. Log column-build/publish failures and fall through to `Ok`.
if let Some(column_build_future) = column_build_future {
let gossip_verified_columns = match column_build_future.await {
Ok(columns) => columns,
Err(e) => {
error!(
%slot,
error = ?e,
"Failed to build data columns after envelope publication"
);
return Ok(warp::reply().into_response());
}
};
if !gossip_verified_columns.is_empty() {
if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) {
error!(
%slot,
error = ?e,
"Failed to publish data column sidecars after envelope publication"
);
return Ok(warp::reply().into_response());
}
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let sampling_column_indices = chain.sampling_columns_for_epoch(epoch);
let sampling_columns = gossip_verified_columns
.into_iter()
.filter(|col| sampling_column_indices.contains(&col.index()))
.collect::<Vec<_>>();
// Local processing only — envelope already broadcast, so log and fall through.
if !sampling_columns.is_empty()
&& let Err(e) =
Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
{
error!(
%slot,
error = ?e,
"Failed to process sampling data columns during envelope publication"
);
}
}
}
Ok(warp::reply().into_response())
}
fn spawn_build_gloas_data_columns_task<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
beacon_block_root: types::Hash256,
slot: types::Slot,
blobs: types::BlobsList<T::EthSpec>,
) -> Result<impl Future<Output = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>>, Rejection> {
let chain_for_build = chain.clone();
let handle = chain
.task_executor
.spawn_blocking_handle(
move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs),
"build_gloas_data_columns",
)
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?;
Ok(async move {
handle
.await
.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))?
})
}
fn build_gloas_data_columns<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
beacon_block_root: types::Hash256,
slot: types::Slot,
blobs: &types::BlobsList<T::EthSpec>,
) -> Result<Vec<GossipVerifiedDataColumn<T>>, Rejection> {
let blob_refs: Vec<_> = blobs.iter().collect();
let data_column_sidecars = beacon_chain::kzg_utils::blobs_to_data_column_sidecars_gloas(
&blob_refs,
beacon_block_root,
slot,
&chain.kzg,
&chain.spec,
)
.map_err(|e| {
error!(
error = ?e,
%slot,
"Failed to build data column sidecars for envelope"
);
warp_utils::reject::custom_server_error(format!("{e:?}"))
})?;
let gossip_verified_columns = data_column_sidecars
.into_iter()
.filter_map(|col| {
let index = *col.index();
match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) {
Ok(verified) => Some(verified),
Err(GossipDataColumnError::PriorKnownUnpublished) => None,
Err(e) => {
warn!(
%slot,
column_index = index,
error = ?e,
"Locally-built data column failed gossip verification"
);
None
}
}
})
.collect::<Vec<_>>();
debug!(
%slot,
column_count = gossip_verified_columns.len(),
"Built data columns for envelope publication"
);
Ok(gossip_verified_columns)
}
// TODO(gloas): add tests for this endpoint once we support importing payloads into the db
// GET beacon/execution_payload_envelope/{block_id}
pub(crate) fn get_beacon_execution_payload_envelope<T: BeaconChainTypes>(

View File

@@ -494,7 +494,7 @@ fn publish_blob_sidecars<T: BeaconChainTypes>(
.map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)))
}
fn publish_column_sidecars<T: BeaconChainTypes>(
pub(crate) fn publish_column_sidecars<T: BeaconChainTypes>(
sender_clone: &UnboundedSender<NetworkMessage<T::EthSpec>>,
data_column_sidecars: &[GossipVerifiedDataColumn<T>],
chain: &BeaconChain<T>,