mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-07 00:42:42 +00:00
Import execution payload envelope locally during HTTP API publication (#9226)
Fixes a bug where a proposer votes payload missing on its own block. The payload is published to the network but never imported locally. This PR adds gossip verification and import when a payload is sent to the http API Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -7,7 +7,7 @@ use crate::version::{
|
|||||||
execution_optimistic_finalized_beacon_response,
|
execution_optimistic_finalized_beacon_response,
|
||||||
};
|
};
|
||||||
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
|
||||||
use beacon_chain::{BeaconChain, BeaconChainTypes};
|
use beacon_chain::{BeaconChain, BeaconChainTypes, NotifyExecutionLayer};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use eth2::types as api_types;
|
use eth2::types as api_types;
|
||||||
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
|
||||||
@@ -18,7 +18,7 @@ use std::future::Future;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
use types::{EthSpec, SignedExecutionPayloadEnvelope};
|
use types::{BlockImportSource, EthSpec, SignedExecutionPayloadEnvelope};
|
||||||
use warp::{
|
use warp::{
|
||||||
Filter, Rejection, Reply,
|
Filter, Rejection, Reply,
|
||||||
hyper::{Body, Response},
|
hyper::{Body, Response},
|
||||||
@@ -99,14 +99,12 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
|||||||
let slot = envelope.slot();
|
let slot = envelope.slot();
|
||||||
let beacon_block_root = envelope.message.beacon_block_root;
|
let beacon_block_root = envelope.message.beacon_block_root;
|
||||||
|
|
||||||
// TODO(gloas): Replace this check once we have gossip validation.
|
|
||||||
if !chain.spec.is_gloas_scheduled() {
|
if !chain.spec.is_gloas_scheduled() {
|
||||||
return Err(warp_utils::reject::custom_bad_request(
|
return Err(warp_utils::reject::custom_bad_request(
|
||||||
"Execution payload envelopes are not supported before the Gloas fork".into(),
|
"Execution payload envelopes are not supported before the Gloas fork".into(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(gloas): We should probably add validation here i.e. BroadcastValidation::Gossip
|
|
||||||
info!(
|
info!(
|
||||||
%slot,
|
%slot,
|
||||||
%beacon_block_root,
|
%beacon_block_root,
|
||||||
@@ -118,7 +116,7 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
|||||||
|
|
||||||
// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
|
// Spawn the column-build task (CPU-bound KZG cell-and-proof computation) before
|
||||||
// publishing the envelope so it runs in parallel with envelope gossip, narrowing
|
// publishing the envelope so it runs in parallel with envelope gossip, narrowing
|
||||||
// the window in which peers see envelope-without-columns. If envelope publication
|
// the window in which peers see envelope-without-columns. If envelope import
|
||||||
// fails below, dropping this future drops the spawned `JoinHandle` (the running
|
// fails below, dropping this future drops the spawned `JoinHandle` (the running
|
||||||
// closure on the blocking pool finishes and is then discarded — no work cancellation).
|
// closure on the blocking pool finishes and is then discarded — no work cancellation).
|
||||||
let column_build_future = match blobs_and_proofs {
|
let column_build_future = match blobs_and_proofs {
|
||||||
@@ -131,17 +129,47 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
|
|||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Publish the envelope to the network.
|
// Gossip-verify the envelope before publishing.
|
||||||
|
let gossip_verified = chain
|
||||||
|
.verify_envelope_for_gossip(Arc::new(envelope))
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
warn!(%slot, error = ?e, "Execution payload envelope failed gossip verification");
|
||||||
|
warp_utils::reject::custom_bad_request(format!(
|
||||||
|
"envelope failed gossip verification: {e}"
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let network_tx_clone = network_tx.clone();
|
||||||
|
let envelope_for_gossip = gossip_verified.signed_envelope.as_ref().clone();
|
||||||
|
let publish_fn = || {
|
||||||
crate::utils::publish_pubsub_message(
|
crate::utils::publish_pubsub_message(
|
||||||
network_tx,
|
&network_tx_clone,
|
||||||
PubsubMessage::ExecutionPayload(Box::new(envelope)),
|
PubsubMessage::ExecutionPayload(Box::new(envelope_for_gossip)),
|
||||||
)
|
)
|
||||||
.map_err(|_| {
|
.map_err(|_| {
|
||||||
warn!(%slot, "Failed to publish execution payload envelope to network");
|
beacon_chain::payload_envelope_verification::EnvelopeError::BeaconChainError(Arc::new(
|
||||||
warp_utils::reject::custom_server_error(
|
beacon_chain::BeaconChainError::UnableToPublish,
|
||||||
"Unable to publish execution payload envelope to network".into(),
|
))
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let import_result = chain
|
||||||
|
.process_execution_payload_envelope(
|
||||||
|
beacon_block_root,
|
||||||
|
gossip_verified,
|
||||||
|
NotifyExecutionLayer::Yes,
|
||||||
|
BlockImportSource::HttpApi,
|
||||||
|
publish_fn,
|
||||||
)
|
)
|
||||||
})?;
|
.await;
|
||||||
|
|
||||||
|
if let Err(e) = import_result {
|
||||||
|
warn!(%slot, error = ?e, "Failed to import execution payload envelope");
|
||||||
|
return Err(warp_utils::reject::custom_server_error(format!(
|
||||||
|
"envelope import failed: {e}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
|
// From here on the envelope is on the wire. `take_blobs` already consumed the cache
|
||||||
// entry, so a retry would not republish columns; returning Err would mislead the
|
// entry, so a retry would not republish columns; returning Err would mislead the
|
||||||
|
|||||||
@@ -4644,6 +4644,86 @@ impl ApiTester {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Regression test: publishing an envelope via the HTTP API must import it locally so
|
||||||
|
/// that `produce_payload_attestation_data` returns `payload_present = true`. Without
|
||||||
|
/// local import, the `envelope_times_cache` is never populated and PTC voters on the
|
||||||
|
/// same node incorrectly vote MISSING for their own payload.
|
||||||
|
pub async fn test_payload_attestation_present_after_envelope_publish(self) -> Self {
|
||||||
|
if !self.chain.spec.is_gloas_scheduled() {
|
||||||
|
return self;
|
||||||
|
}
|
||||||
|
|
||||||
|
let fork = self.chain.canonical_head.cached_head().head_fork();
|
||||||
|
let genesis_validators_root = self.chain.genesis_validators_root;
|
||||||
|
|
||||||
|
for _ in 0..E::slots_per_epoch() * 3 {
|
||||||
|
let slot = self.chain.slot().unwrap();
|
||||||
|
let epoch = self.chain.epoch().unwrap();
|
||||||
|
let fork_name = self.chain.spec.fork_name_at_slot::<E>(slot);
|
||||||
|
|
||||||
|
if !fork_name.gloas_enabled() {
|
||||||
|
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (sk, randao_reveal) = self
|
||||||
|
.proposer_setup(slot, epoch, &fork, genesis_validators_root)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Produce and publish a block.
|
||||||
|
let (response, _metadata) = self
|
||||||
|
.client
|
||||||
|
.get_validator_blocks_v4::<E>(slot, &randao_reveal, None, None, None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let block = response.data;
|
||||||
|
let block_root = block.tree_hash_root();
|
||||||
|
|
||||||
|
let signed_block = block.sign(&sk, &fork, genesis_validators_root, &self.chain.spec);
|
||||||
|
let signed_block_request =
|
||||||
|
PublishBlockRequest::try_from(Arc::new(signed_block)).unwrap();
|
||||||
|
self.client
|
||||||
|
.post_beacon_blocks_v2(&signed_block_request, None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Retrieve and publish the envelope.
|
||||||
|
let envelope = self
|
||||||
|
.client
|
||||||
|
.get_validator_execution_payload_envelope::<E>(slot, BUILDER_INDEX_SELF_BUILD)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.data;
|
||||||
|
|
||||||
|
let signed_envelope =
|
||||||
|
self.sign_envelope(envelope, &sk, epoch, &fork, genesis_validators_root);
|
||||||
|
self.client
|
||||||
|
.post_beacon_execution_payload_envelope(&signed_envelope, fork_name)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// The payload attestation data endpoint must now report the payload as present.
|
||||||
|
let pa_data = self
|
||||||
|
.client
|
||||||
|
.get_validator_payload_attestation_data(slot)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.into_data();
|
||||||
|
|
||||||
|
assert_eq!(pa_data.beacon_block_root, block_root);
|
||||||
|
assert_eq!(pa_data.slot, slot);
|
||||||
|
assert!(
|
||||||
|
pa_data.payload_present,
|
||||||
|
"payload attestation should report payload_present=true after publishing \
|
||||||
|
the envelope via the HTTP API (slot {slot})"
|
||||||
|
);
|
||||||
|
|
||||||
|
self.chain.slot_clock.set_slot(slot.as_u64() + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn test_get_validator_payload_attestation_data_pre_gloas(self) -> Self {
|
pub async fn test_get_validator_payload_attestation_data_pre_gloas(self) -> Self {
|
||||||
let slot = self.chain.slot().unwrap();
|
let slot = self.chain.slot().unwrap();
|
||||||
|
|
||||||
@@ -8333,6 +8413,14 @@ async fn get_validator_payload_attestation_data_pre_gloas() {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn payload_attestation_present_after_envelope_publish() {
|
||||||
|
ApiTester::new_with_hard_forks()
|
||||||
|
.await
|
||||||
|
.test_payload_attestation_present_after_envelope_publish()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn post_beacon_pool_payload_attestations_valid() {
|
async fn post_beacon_pool_payload_attestations_valid() {
|
||||||
if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) {
|
if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) {
|
||||||
|
|||||||
Reference in New Issue
Block a user