merge block production

This commit is contained in:
Eitan Seri- Levi
2026-02-13 00:05:59 -08:00
26 changed files with 2391 additions and 303 deletions

View File

@@ -0,0 +1,129 @@
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bytes::Bytes;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::Decode;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
use types::SignedExecutionPayloadEnvelope;
use warp::{Filter, Rejection, Reply, reply::Response};
// POST beacon/execution_payload_envelope (SSZ)
pub(crate) fn post_beacon_execution_payload_envelope_ssz<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
task_spawner_filter: TaskSpawnerFilter<T>,
chain_filter: ChainFilter<T>,
network_tx_filter: NetworkTxFilter<T>,
) -> ResponseFilter {
eth_v1
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::path::end())
.and(warp::header::exact(
CONTENT_TYPE_HEADER,
SSZ_CONTENT_TYPE_HEADER,
))
.and(warp::body::bytes())
.and(task_spawner_filter)
.and(chain_filter)
.and(network_tx_filter)
.then(
|body_bytes: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let envelope =
SignedExecutionPayloadEnvelope::<T::EthSpec>::from_ssz_bytes(&body_bytes)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
publish_execution_payload_envelope(envelope, chain, &network_tx).await
})
},
)
.boxed()
}
// POST beacon/execution_payload_envelope
pub(crate) fn post_beacon_execution_payload_envelope<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
task_spawner_filter: TaskSpawnerFilter<T>,
chain_filter: ChainFilter<T>,
network_tx_filter: NetworkTxFilter<T>,
) -> ResponseFilter {
eth_v1
.clone()
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(warp::path::end())
.and(warp::body::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.then(
|envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
publish_execution_payload_envelope(envelope, chain, &network_tx).await
})
},
)
.boxed()
}
/// Publishes a signed execution payload envelope to the network.
pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
envelope: SignedExecutionPayloadEnvelope<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
) -> Result<Response, Rejection> {
let slot = envelope.message.slot;
let beacon_block_root = envelope.message.beacon_block_root;
// Basic validation: check that the slot is reasonable
let current_slot = chain.slot().map_err(|_| {
warp_utils::reject::custom_server_error("Unable to get current slot".into())
})?;
// Don't accept envelopes too far in the future
if slot > current_slot + 1 {
return Err(warp_utils::reject::custom_bad_request(format!(
"Envelope slot {} is too far in the future (current slot: {})",
slot, current_slot
)));
}
// TODO(gloas): Do we want to add more validation like:
// - Verify the signature
// - Check builder_index is valid
// - Verify the envelope references a known block
//
// If we do, then we must post the signed execution payload envelope to the BN that originally produced it.
info!(
%slot,
%beacon_block_root,
builder_index = envelope.message.builder_index,
"Publishing signed execution payload envelope to network"
);
// Publish to the network
crate::utils::publish_pubsub_message(
network_tx,
PubsubMessage::ExecutionPayload(Box::new(envelope)),
)
.map_err(|_| {
warn!(%slot, "Failed to publish execution payload envelope to network");
warp_utils::reject::custom_server_error(
"Unable to publish execution payload envelope to network".into(),
)
})?;
Ok(warp::reply().into_response())
}

View File

@@ -1,2 +1,3 @@
pub mod execution_payload_envelope;
pub mod pool;
pub mod states;

View File

@@ -28,7 +28,6 @@ pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("pending_consolidations"))
.and(warp::path::end())
.then(

View File

@@ -36,6 +36,9 @@ mod validator_inclusion;
mod validators;
mod version;
use crate::beacon::execution_payload_envelope::{
post_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope_ssz,
};
use crate::beacon::pool::*;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::utils::{AnyVersionFilter, EthV1Filter};
@@ -92,6 +95,7 @@ use types::{
BeaconStateError, Checkpoint, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256,
SignedBlindedBeaconBlock, Slot,
};
use validator::execution_payload_envelope::get_validator_execution_payload_envelope;
use version::{
ResponseIncludesVersion, V1, V2, add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection,
@@ -1486,6 +1490,22 @@ pub fn serve<T: BeaconChainTypes>(
let post_beacon_pool_bls_to_execution_changes =
post_beacon_pool_bls_to_execution_changes(&network_tx_filter, &beacon_pool_path);
// POST beacon/execution_payload_envelope
let post_beacon_execution_payload_envelope = post_beacon_execution_payload_envelope(
eth_v1.clone(),
task_spawner_filter.clone(),
chain_filter.clone(),
network_tx_filter.clone(),
);
// POST beacon/execution_payload_envelope (SSZ)
let post_beacon_execution_payload_envelope_ssz = post_beacon_execution_payload_envelope_ssz(
eth_v1.clone(),
task_spawner_filter.clone(),
chain_filter.clone(),
network_tx_filter.clone(),
);
let beacon_rewards_path = eth_v1
.clone()
.and(warp::path("beacon"))
@@ -2466,6 +2486,14 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner_filter.clone(),
);
// GET validator/execution_payload_envelope/{slot}/{builder_index}
let get_validator_execution_payload_envelope = get_validator_execution_payload_envelope(
eth_v1.clone().clone(),
chain_filter.clone(),
not_while_syncing_filter.clone(),
task_spawner_filter.clone(),
);
// GET validator/attestation_data?slot,committee_index
let get_validator_attestation_data = get_validator_attestation_data(
eth_v1.clone().clone(),
@@ -3336,6 +3364,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_validator_duties_proposer)
.uor(get_validator_blocks)
.uor(get_validator_blinded_blocks)
.uor(get_validator_execution_payload_envelope)
.uor(get_validator_attestation_data)
.uor(get_validator_aggregate_attestation)
.uor(get_validator_sync_committee_contribution)
@@ -3374,7 +3403,8 @@ pub fn serve<T: BeaconChainTypes>(
post_beacon_blocks_ssz
.uor(post_beacon_blocks_v2_ssz)
.uor(post_beacon_blinded_blocks_ssz)
.uor(post_beacon_blinded_blocks_v2_ssz),
.uor(post_beacon_blinded_blocks_v2_ssz)
.uor(post_beacon_execution_payload_envelope_ssz),
)
.uor(post_beacon_blocks)
.uor(post_beacon_blinded_blocks)
@@ -3386,6 +3416,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_pool_voluntary_exits)
.uor(post_beacon_pool_sync_committees)
.uor(post_beacon_pool_bls_to_execution_changes)
.uor(post_beacon_execution_payload_envelope)
.uor(post_beacon_state_validators)
.uor(post_beacon_state_validator_balances)
.uor(post_beacon_state_validator_identities)

View File

@@ -43,6 +43,49 @@ pub fn get_randao_verification(
Ok(randao_verification)
}
#[instrument(
name = "lh_produce_block_v4",
skip_all,
fields(%slot)
)]
pub async fn produce_block_v4<T: BeaconChainTypes>(
accept_header: Option<api_types::Accept>,
chain: Arc<BeaconChain<T>>,
slot: Slot,
query: api_types::ValidatorBlocksQuery,
) -> Result<Response<Body>, warp::Rejection> {
let randao_reveal = query.randao_reveal.decompress().map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"randao reveal is not a valid BLS signature: {:?}",
e
))
})?;
let randao_verification = get_randao_verification(&query, randao_reveal.is_infinity())?;
let builder_boost_factor = if query.builder_boost_factor == Some(DEFAULT_BOOST_FACTOR) {
None
} else {
query.builder_boost_factor
};
let graffiti_settings = GraffitiSettings::new(query.graffiti, query.graffiti_policy);
let (block, consensus_block_value) = chain
.produce_block_with_verification_gloas(
randao_reveal,
slot,
graffiti_settings,
randao_verification,
builder_boost_factor,
)
.await
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("failed to fetch a block: {:?}", e))
})?;
build_response_v4::<T>(block, consensus_block_value, accept_header, &chain.spec)
}
#[instrument(
name = "lh_produce_block_v3",
skip_all,
@@ -87,6 +130,39 @@ pub async fn produce_block_v3<T: BeaconChainTypes>(
build_response_v3(chain, block_response_type, accept_header)
}
pub fn build_response_v4<T: BeaconChainTypes>(
block: BeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>,
consensus_block_value: u64,
accept_header: Option<api_types::Accept>,
spec: &ChainSpec,
) -> Result<Response<Body>, warp::Rejection> {
let fork_name = block
.to_ref()
.fork_name(spec)
.map_err(inconsistent_fork_rejection)?;
let consensus_block_value_wei =
Uint256::from(consensus_block_value) * Uint256::from(1_000_000_000u64);
match accept_header {
Some(api_types::Accept::Ssz) => Response::builder()
.status(200)
.body(block.as_ssz_bytes().into())
.map(|res: Response<Body>| add_ssz_content_type_header(res))
.map(|res: Response<Body>| add_consensus_version_header(res, fork_name))
.map(|res| add_consensus_block_value_header(res, consensus_block_value_wei))
.map_err(|e| -> warp::Rejection {
warp_utils::reject::custom_server_error(format!("failed to create response: {}", e))
}),
_ => Ok(warp::reply::json(&beacon_response(
ResponseIncludesVersion::Yes(fork_name),
block,
))
.into_response())
.map(|res| add_consensus_version_header(res, fork_name))
.map(|res| add_consensus_block_value_header(res, consensus_block_value_wei)),
}
}
pub fn build_response_v3<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_response: BeaconBlockResponseWrapper<T::EthSpec>,

View File

@@ -0,0 +1,105 @@
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{
ChainFilter, EthV1Filter, NotWhileSyncingFilter, ResponseFilter, TaskSpawnerFilter,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2::types::{Accept, GenericResponse};
use ssz::Encode;
use std::sync::Arc;
use tracing::debug;
use types::Slot;
use warp::http::Response;
use warp::{Filter, Rejection};
// GET validator/execution_payload_envelope/{slot}/{builder_index}
pub fn get_validator_execution_payload_envelope<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
chain_filter: ChainFilter<T>,
not_while_syncing_filter: NotWhileSyncingFilter,
task_spawner_filter: TaskSpawnerFilter<T>,
) -> ResponseFilter {
eth_v1
.and(warp::path("validator"))
.and(warp::path("execution_payload_envelope"))
.and(warp::path::param::<Slot>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
"Invalid slot".to_string(),
))
}))
.and(warp::path::param::<u64>().or_else(|_| async {
Err(warp_utils::reject::custom_bad_request(
"Invalid builder_index".to_string(),
))
}))
.and(warp::path::end())
.and(warp::header::optional::<Accept>("accept"))
.and(not_while_syncing_filter)
.and(task_spawner_filter)
.and(chain_filter)
.then(
|slot: Slot,
// TODO(gloas) we're only doing local building
// we'll need to implement builder index logic
// eventually.
_builder_index: u64,
accept_header: Option<Accept>,
not_synced_filter: Result<(), Rejection>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
debug!(?slot, "Execution payload envelope request from HTTP API");
not_synced_filter?;
// Get the envelope from the pending cache (local building only)
let envelope = chain
.pending_payload_envelopes
.read()
.get(slot)
.cloned()
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"Execution payload envelope not available for slot {slot}"
))
})?;
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
match accept_header {
Some(Accept::Ssz) => Response::builder()
.status(200)
.header("Content-Type", "application/octet-stream")
.header("Eth-Consensus-Version", fork_name.to_string())
.body(envelope.as_ssz_bytes().into())
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"Failed to build SSZ response: {e}"
))
}),
_ => {
let json_response = GenericResponse { data: envelope };
Response::builder()
.status(200)
.header("Content-Type", "application/json")
.header("Eth-Consensus-Version", fork_name.to_string())
.body(
serde_json::to_string(&json_response)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"Failed to serialize response: {e}"
))
})?
.into(),
)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"Failed to build JSON response: {e}"
))
})
}
}
})
},
)
.boxed()
}

View File

@@ -1,4 +1,6 @@
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::produce_block::{
produce_blinded_block_v2, produce_block_v2, produce_block_v3, produce_block_v4,
};
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{
AnyVersionFilter, ChainFilter, EthV1Filter, NetworkTxFilter, NotWhileSyncingFilter,
@@ -31,6 +33,8 @@ use types::{
use warp::{Filter, Rejection, Reply};
use warp_utils::reject::convert_rejection;
pub mod execution_payload_envelope;
/// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator
/// index and then ensures that the validator exists in the given `state`.
pub fn pubkey_to_validator_index<T: BeaconChainTypes>(
@@ -316,7 +320,11 @@ pub fn get_validator_blocks<T: BeaconChainTypes>(
not_synced_filter?;
if endpoint_version == V3 {
// Use V4 block production for Gloas fork
let fork_name = chain.spec.fork_name_at_slot::<T::EthSpec>(slot);
if fork_name.gloas_enabled() {
produce_block_v4(accept_header, chain, slot, query).await
} else if endpoint_version == V3 {
produce_block_v3(accept_header, chain, slot, query).await
} else {
produce_block_v2(accept_header, chain, slot, query).await