This commit is contained in:
Eitan Seri-Levi
2026-06-07 16:59:13 +03:00
parent d236a53d9a
commit c1cfcfebf7
4 changed files with 47 additions and 26 deletions

View File

@@ -115,9 +115,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
// TODO(gloas) do we need to send a `PayloadImported` event to the reprocess queue?
// TODO(gloas) do we need to recompute head?
// should canonical_head return the block and the payload now?
self.recompute_head_at_current_slot().await;
metrics::inc_counter(&metrics::ENVELOPE_PROCESSING_SUCCESSES);

View File

@@ -8,7 +8,9 @@ use crate::version::{
};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::payload_envelope_verification::EnvelopeError;
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, NotifyExecutionLayer,
};
use bytes::Bytes;
use eth2::types as api_types;
use lighthouse_network::PubsubMessage;
@@ -160,12 +162,16 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
)
.await;
if let Err(e) = import_result {
warn!(%slot, error = ?e, "Failed to import execution payload envelope");
return Err(warp_utils::reject::custom_server_error(format!(
"envelope import failed: {e}"
)));
}
let mut envelope_imported = match &import_result {
Ok(AvailabilityProcessingStatus::Imported(_)) => true,
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => false,
Err(e) => {
warn!(%slot, error = ?e, "Failed to import execution payload envelope");
return Err(warp_utils::reject::custom_server_error(format!(
"envelope import failed: {e}"
)));
}
};
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
// entry, so a retry would not republish columns; returning Err would mislead the
@@ -201,19 +207,27 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
.collect::<Vec<_>>();
// Local processing only — envelope already broadcast, so log and fall through.
if !sampling_columns.is_empty()
&& let Err(e) =
Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
{
error!(
%slot,
error = ?e,
"Failed to process sampling data columns during envelope publication"
);
if !sampling_columns.is_empty() {
match Box::pin(chain.process_gossip_data_columns(sampling_columns, || Ok(()))).await
{
Ok(AvailabilityProcessingStatus::Imported(_)) => envelope_imported = true,
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {}
Err(e) => {
error!(
%slot,
error = ?e,
"Failed to process sampling data columns during envelope publication"
);
}
}
}
}
}
if envelope_imported {
chain.recompute_head_at_current_slot().await;
}
Ok(warp::reply().into_response())
}

View File

@@ -3794,13 +3794,19 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// TODO(gloas) metrics
// register_process_result_metrics(&result, metrics::BlockSource::Gossip, "envelope");
if let Err(e) = &result {
debug!(
?beacon_block_root,
%peer_id,
error = ?e,
"Execution payload envelope processing failed"
);
match &result {
Ok(AvailabilityProcessingStatus::Imported(_)) => {
self.chain.recompute_head_at_current_slot().await;
}
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {}
Err(e) => {
debug!(
?beacon_block_root,
%peer_id,
error = ?e,
"Execution payload envelope processing failed"
);
}
}
}

View File

@@ -376,6 +376,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let result: Result<AvailabilityProcessingStatus, BlockError> =
result.map_err(|e| BlockError::InternalError(format!("envelope: {e}")));
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.chain.recompute_head_at_current_slot().await;
}
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: result.into(),