mirror of
https://github.com/sigp/lighthouse.git
synced 2026-06-10 01:26:44 +00:00
fix
This commit is contained in:
@@ -7,7 +7,9 @@ use crate::version::{
|
||||
execution_optimistic_finalized_beacon_response,
|
||||
};
|
||||
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
||||
use beacon_chain::payload_envelope_verification::EnvelopeError;
|
||||
use beacon_chain::payload_envelope_verification::gossip_verified_envelope::GossipVerifiedEnvelope;
|
||||
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
|
||||
use bytes::Bytes;
|
||||
use eth2::types as api_types;
|
||||
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
||||
@@ -19,7 +21,7 @@ use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use types::{EthSpec, SignedExecutionPayloadEnvelope};
|
||||
use types::{BlockImportSource, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope};
|
||||
use warp::{
|
||||
Filter, Rejection, Reply,
|
||||
hyper::{Body, Response},
|
||||
@@ -89,9 +91,8 @@ pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
)
|
||||
.boxed()
|
||||
}
|
||||
/// Publishes a signed execution payload envelope to the network. Implements
|
||||
/// `POST /eth/v1/beacon/execution_payload_envelope` per the in-flight beacon-APIs PR
|
||||
/// <https://github.com/ethereum/beacon-APIs/pull/580>.
|
||||
|
||||
/// Publishes a signed execution payload envelope to the network.
|
||||
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
@@ -111,30 +112,20 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
|
||||
let blobs_and_proofs = chain.pending_payload_envelopes.write().take_blobs(slot);
|
||||
|
||||
// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
|
||||
// publishing the envelope so it runs in parallel with envelope gossip, narrowing
|
||||
// the window in which peers see envelope-without-columns. If envelope publication
|
||||
// fails below, dropping this future drops the spawned `JoinHandle` (the running
|
||||
// closure on the blocking pool finishes and is then discarded — no work cancellation).
|
||||
let column_build_future = match blobs_and_proofs {
|
||||
Some(blobs) if !blobs.is_empty() => Some(spawn_build_gloas_data_columns_task(
|
||||
&chain,
|
||||
beacon_block_root,
|
||||
slot,
|
||||
blobs,
|
||||
)?),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Publish the envelope to the network.
|
||||
crate::utils::publish_pubsub_message(
|
||||
network_tx,
|
||||
PubsubMessage::ExecutionPayload(Box::new(envelope)),
|
||||
)
|
||||
.map_err(|_| {
|
||||
warn!(%slot, "Failed to publish execution payload envelope to network");
|
||||
warp_utils::reject::custom_server_error(
|
||||
"Unable to publish execution payload envelope to network".into(),
|
||||
// The publish_fn is called inside process_execution_payload_envelope after consensus
|
||||
// verification but before the EL call.
|
||||
let envelope_for_publish = signed_envelope.clone();
|
||||
let sender = network_tx.clone();
|
||||
let publish_fn = move || {
|
||||
info!(
|
||||
%slot,
|
||||
%beacon_block_root,
|
||||
builder_index,
|
||||
"Publishing signed execution payload envelope to network"
|
||||
);
|
||||
crate::utils::publish_pubsub_message(
|
||||
&sender,
|
||||
PubsubMessage::ExecutionPayload(Box::new((*envelope_for_publish).clone())),
|
||||
)
|
||||
.map_err(|_| {
|
||||
warn!(%slot, "Failed to publish execution payload envelope to network");
|
||||
@@ -174,7 +165,7 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
})?;
|
||||
|
||||
// Build and publish data column sidecars from the blobs.
|
||||
if let Some((blobs, _kzg_proofs)) = blobs_and_proofs
|
||||
if let Some(blobs) = blobs_and_proofs
|
||||
&& !blobs.is_empty()
|
||||
{
|
||||
let gossip_verified_columns = spawn_build_gloas_data_columns_task(
|
||||
@@ -215,81 +206,34 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
||||
}
|
||||
}
|
||||
|
||||
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
|
||||
// entry, so a retry would not republish columns; returning Err would mislead the
|
||||
// caller. Log column-build/publish failures and fall through to `Ok`.
|
||||
if let Some(column_build_future) = column_build_future {
|
||||
let gossip_verified_columns = match column_build_future.await {
|
||||
Ok(columns) => columns,
|
||||
Err(e) => {
|
||||
error!(
|
||||
%slot,
|
||||
error = ?e,
|
||||
"Failed to build data columns after envelope publication"
|
||||
);
|
||||
return Ok(warp::reply().into_response());
|
||||
}
|
||||
};
|
||||
|
||||
if !gossip_verified_columns.is_empty() {
|
||||
if let Err(e) = publish_column_sidecars(network_tx, &gossip_verified_columns, &chain) {
|
||||
error!(
|
||||
%slot,
|
||||
error = ?e,
|
||||
"Failed to publish data column sidecars after envelope publication"
|
||||
);
|
||||
return Ok(warp::reply().into_response());
|
||||
}
|
||||
|
||||
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
|
||||
let sampling_column_indices = chain.sampling_columns_for_epoch(epoch);
|
||||
let sampling_columns = gossip_verified_columns
|
||||
.into_iter()
|
||||
.filter(|col| sampling_column_indices.contains(&col.index()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Local processing only — envelope already broadcast, so log and fall through.
|
||||
if !sampling_columns.is_empty()
|
||||
&& let Err(e) =
|
||||
Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
|
||||
{
|
||||
error!(
|
||||
%slot,
|
||||
error = ?e,
|
||||
"Failed to process sampling data columns during envelope publication"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(warp::reply().into_response())
|
||||
}
|
||||
|
||||
fn spawn_build_gloas_data_columns_task<T: BeaconChainTypes>(
|
||||
chain: &Arc<BeaconChain<T>>,
|
||||
chain: Arc<BeaconChain<T>>,
|
||||
beacon_block_root: types::Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
slot: types::Slot,
|
||||
blobs: types::BlobsList<T::EthSpec>,
|
||||
) -> Result<impl Future<Output = Result<Vec<GossipVerifiedDataColumn<T>>, Rejection>>, Rejection> {
|
||||
let chain_for_build = chain.clone();
|
||||
let handle = chain
|
||||
chain
|
||||
.clone()
|
||||
.task_executor
|
||||
.spawn_blocking_handle(
|
||||
move || build_gloas_data_columns(&chain_for_build, beacon_block_root, slot, &blobs),
|
||||
move || build_gloas_data_columns(&chain, beacon_block_root, block, slot, &blobs),
|
||||
"build_gloas_data_columns",
|
||||
)
|
||||
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))?;
|
||||
|
||||
Ok(async move {
|
||||
handle
|
||||
.await
|
||||
.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))?
|
||||
})
|
||||
.ok_or_else(|| warp_utils::reject::custom_server_error("runtime shutdown".to_string()))
|
||||
.map(|r| {
|
||||
r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string()))
|
||||
.and_then(|output| async move { output })
|
||||
})
|
||||
}
|
||||
|
||||
fn build_gloas_data_columns<T: BeaconChainTypes>(
|
||||
chain: &BeaconChain<T>,
|
||||
beacon_block_root: types::Hash256,
|
||||
block: Arc<SignedBeaconBlock<T::EthSpec>>,
|
||||
slot: types::Slot,
|
||||
blobs: &types::BlobsList<T::EthSpec>,
|
||||
) -> Result<Vec<GossipVerifiedDataColumn<T>>, Rejection> {
|
||||
@@ -314,7 +258,7 @@ fn build_gloas_data_columns<T: BeaconChainTypes>(
|
||||
.into_iter()
|
||||
.filter_map(|col| {
|
||||
let index = *col.index();
|
||||
match GossipVerifiedDataColumn::new_for_block_publishing(col, chain) {
|
||||
match GossipVerifiedDataColumn::new_for_block_publishing(col, &block, chain) {
|
||||
Ok(verified) => Some(verified),
|
||||
Err(GossipDataColumnError::PriorKnownUnpublished) => None,
|
||||
Err(e) => {
|
||||
|
||||
Reference in New Issue
Block a user