diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs new file mode 100644 index 0000000000..13495d4d6d --- /dev/null +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -0,0 +1,127 @@ +use crate::task_spawner::{Priority, TaskSpawner}; +use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use bytes::Bytes; +use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; +use lighthouse_network::PubsubMessage; +use network::NetworkMessage; +use ssz::Decode; +use std::sync::Arc; +use tokio::sync::mpsc::UnboundedSender; +use tracing::{info, warn}; +use types::SignedExecutionPayloadEnvelope; +use warp::{Filter, Rejection, Reply, reply::Response}; + +// POST beacon/execution_payload_envelope (SSZ) +pub(crate) fn post_beacon_execution_payload_envelope_ssz( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("execution_payload_envelope")) + .and(warp::path::end()) + .and(warp::header::exact( + CONTENT_TYPE_HEADER, + SSZ_CONTENT_TYPE_HEADER, + )) + .and(warp::body::bytes()) + .and(task_spawner_filter) + .and(chain_filter) + .and(network_tx_filter) + .then( + |body_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let envelope = + SignedExecutionPayloadEnvelope::::from_ssz_bytes(&body_bytes) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; + publish_execution_payload_envelope(envelope, chain, &network_tx).await + }) + }, + ) + .boxed() +} + +// POST beacon/execution_payload_envelope +pub(crate) fn post_beacon_execution_payload_envelope( + eth_v1: EthV1Filter, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, + network_tx_filter: NetworkTxFilter, +) -> ResponseFilter { + eth_v1 + .clone() + .and(warp::path("beacon")) + .and(warp::path("execution_payload_envelope")) + .and(warp::path::end()) + .and(warp::body::json()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .then( + |envelope: SignedExecutionPayloadEnvelope, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_execution_payload_envelope(envelope, chain, &network_tx).await + }) + }, + ) + .boxed() +} +/// Publishes a signed execution payload envelope to the network. +pub async fn publish_execution_payload_envelope( + envelope: SignedExecutionPayloadEnvelope, + chain: Arc>, + network_tx: &UnboundedSender>, +) -> Result { + let slot = envelope.message.slot; + let beacon_block_root = envelope.message.beacon_block_root; + + // Basic validation: check that the slot is reasonable + let current_slot = chain.slot().map_err(|_| { + warp_utils::reject::custom_server_error("Unable to get current slot".into()) + })?; + + // Don't accept envelopes too far in the future + if slot > current_slot + 1 { + return Err(warp_utils::reject::custom_bad_request(format!( + "Envelope slot {} is too far in the future (current slot: {})", + slot, current_slot + ))); + } + + // TODO(gloas): Add more validation: + // - Verify the signature + // - Check builder_index is valid + // - Verify the envelope references a known block + + info!( + %slot, + %beacon_block_root, + builder_index = envelope.message.builder_index, + "Publishing signed execution payload envelope to network" + ); + + // Publish to the network + crate::utils::publish_pubsub_message( + network_tx, + PubsubMessage::ExecutionPayload(Box::new(envelope)), + ) + .map_err(|_| { + warn!(%slot, "Failed to publish execution payload envelope to network"); + warp_utils::reject::custom_server_error( + "Unable to publish execution payload envelope to network".into(), + ) + })?; + + Ok(warp::reply().into_response()) +} diff --git a/beacon_node/http_api/src/beacon/mod.rs b/beacon_node/http_api/src/beacon/mod.rs index df5e6eee5c..9ec1c476f6 100644 --- a/beacon_node/http_api/src/beacon/mod.rs +++ b/beacon_node/http_api/src/beacon/mod.rs @@ -1,2 +1,3 @@ +pub mod execution_payload_envelope; pub mod pool; pub mod states; diff --git a/beacon_node/http_api/src/beacon/states.rs b/beacon_node/http_api/src/beacon/states.rs index 828efb86a7..50be7211d8 100644 --- a/beacon_node/http_api/src/beacon/states.rs +++ b/beacon_node/http_api/src/beacon/states.rs @@ -28,7 +28,6 @@ pub fn get_beacon_state_pending_consolidations( beacon_states_path: BeaconStatesPath, ) -> ResponseFilter { beacon_states_path - .clone() .and(warp::path("pending_consolidations")) .and(warp::path::end()) .then( diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 35885e2091..9e09ef65b7 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -23,7 +23,6 @@ mod produce_block; mod proposer_duties; mod publish_attestations; mod publish_blocks; -mod publish_execution_payload_envelope; mod standard_block_rewards; mod state_id; mod sync_committee_rewards; @@ -37,9 +36,15 @@ mod validator_inclusion; mod validators; mod version; +use crate::beacon::execution_payload_envelope::{ + post_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope_ssz, +}; use crate::beacon::pool::*; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; -use crate::utils::{AnyVersionFilter, EthV1Filter}; +use crate::utils::{ + AnyVersionFilter, EthV1Filter, +}; +use crate::validator::execution_payload_bid::get_validator_execution_payload_bid; use crate::validator::post_validator_liveness_epoch; use crate::validator::*; use crate::version::beacon_response; @@ -72,7 +77,7 @@ pub use publish_blocks::{ }; use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; -use ssz::{Decode, Encode}; +use ssz::Encode; pub use state_id::StateId; use std::future::Future; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; @@ -91,8 +96,9 @@ use tokio_stream::{ use tracing::{debug, info, warn}; use types::{ BeaconStateError, Checkpoint, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, - SignedBlindedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, + SignedBlindedBeaconBlock, Slot, }; +use validator::execution_payload_envelope::get_validator_execution_payload_envelope; use version::{ ResponseIncludesVersion, V1, V2, add_consensus_version_header, add_ssz_content_type_header, execution_optimistic_finalized_beacon_response, inconsistent_fork_rejection, @@ -1488,65 +1494,20 @@ pub fn serve( post_beacon_pool_bls_to_execution_changes(&network_tx_filter, &beacon_pool_path); // POST beacon/execution_payload_envelope - let post_beacon_execution_payload_envelope = eth_v1 - .clone() - .and(warp::path("beacon")) - .and(warp::path("execution_payload_envelope")) - .and(warp::path::end()) - .and(warp::body::json()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) - .then( - |envelope: SignedExecutionPayloadEnvelope, - task_spawner: TaskSpawner, - chain: Arc>, - network_tx: UnboundedSender>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - publish_execution_payload_envelope::publish_execution_payload_envelope( - envelope, - chain, - &network_tx, - ) - .await - }) - }, - ); + let post_beacon_execution_payload_envelope = post_beacon_execution_payload_envelope( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); // POST beacon/execution_payload_envelope (SSZ) - let post_beacon_execution_payload_envelope_ssz = eth_v1 - .clone() - .and(warp::path("beacon")) - .and(warp::path("execution_payload_envelope")) - .and(warp::path::end()) - .and(warp::header::exact( - CONTENT_TYPE_HEADER, - SSZ_CONTENT_TYPE_HEADER, - )) - .and(warp::body::bytes()) - .and(task_spawner_filter.clone()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) - .then( - |body_bytes: Bytes, - task_spawner: TaskSpawner, - chain: Arc>, - network_tx: UnboundedSender>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - let envelope = - SignedExecutionPayloadEnvelope::::from_ssz_bytes(&body_bytes) - .map_err(|e| { - warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) - })?; - publish_execution_payload_envelope::publish_execution_payload_envelope( - envelope, - chain, - &network_tx, - ) - .await - }) - }, - ); + let post_beacon_execution_payload_envelope_ssz = post_beacon_execution_payload_envelope_ssz( + eth_v1.clone(), + task_spawner_filter.clone(), + chain_filter.clone(), + network_tx_filter.clone(), + ); let beacon_rewards_path = eth_v1 .clone() @@ -2539,6 +2500,14 @@ pub fn serve( task_spawner_filter.clone(), ); + // GET validator/execution_payload_bid/ + let get_validator_execution_payload_bid = get_validator_execution_payload_bid( + eth_v1.clone(), + chain_filter.clone(), + not_while_syncing_filter.clone(), + task_spawner_filter.clone(), + ); + // GET validator/attestation_data?slot,committee_index let get_validator_attestation_data = get_validator_attestation_data( eth_v1.clone().clone(), @@ -3398,6 +3367,7 @@ pub fn serve( .uor(get_validator_blocks) .uor(get_validator_blinded_blocks) .uor(get_validator_execution_payload_envelope) + .uor(get_validator_execution_payload_bid) .uor(get_validator_attestation_data) .uor(get_validator_aggregate_attestation) .uor(get_validator_sync_committee_contribution) diff --git a/beacon_node/http_api/src/produce_block.rs b/beacon_node/http_api/src/produce_block.rs index f97df399d7..c5338475b4 100644 --- a/beacon_node/http_api/src/produce_block.rs +++ b/beacon_node/http_api/src/produce_block.rs @@ -138,7 +138,7 @@ pub fn build_response_v4( ) -> Result, warp::Rejection> { let fork_name = block .to_ref() - .fork_name(&spec) + .fork_name(spec) .map_err(inconsistent_fork_rejection)?; let consensus_block_value_wei = Uint256::from(consensus_block_value) * Uint256::from(1_000_000_000u64); diff --git a/beacon_node/http_api/src/publish_execution_payload_envelope.rs b/beacon_node/http_api/src/publish_execution_payload_envelope.rs deleted file mode 100644 index 1e4225505c..0000000000 --- a/beacon_node/http_api/src/publish_execution_payload_envelope.rs +++ /dev/null @@ -1,57 +0,0 @@ -use beacon_chain::{BeaconChain, BeaconChainTypes}; -use lighthouse_network::PubsubMessage; -use network::NetworkMessage; -use std::sync::Arc; -use tokio::sync::mpsc::UnboundedSender; -use tracing::{info, warn}; -use types::SignedExecutionPayloadEnvelope; -use warp::{Rejection, Reply, reply::Response}; - -/// Publishes a signed execution payload envelope to the network. -pub async fn publish_execution_payload_envelope( - envelope: SignedExecutionPayloadEnvelope, - chain: Arc>, - network_tx: &UnboundedSender>, -) -> Result { - let slot = envelope.message.slot; - let beacon_block_root = envelope.message.beacon_block_root; - - // Basic validation: check that the slot is reasonable - let current_slot = chain.slot().map_err(|_| { - warp_utils::reject::custom_server_error("Unable to get current slot".into()) - })?; - - // Don't accept envelopes too far in the future - if slot > current_slot + 1 { - return Err(warp_utils::reject::custom_bad_request(format!( - "Envelope slot {} is too far in the future (current slot: {})", - slot, current_slot - ))); - } - - // TODO(gloas): Add more validation: - // - Verify the signature - // - Check builder_index is valid - // - Verify the envelope references a known block - - info!( - %slot, - %beacon_block_root, - builder_index = envelope.message.builder_index, - "Publishing signed execution payload envelope to network" - ); - - // Publish to the network - crate::utils::publish_pubsub_message( - network_tx, - PubsubMessage::ExecutionPayload(Box::new(envelope)), - ) - .map_err(|_| { - warn!(%slot, "Failed to publish execution payload envelope to network"); - warp_utils::reject::custom_server_error( - "Unable to publish execution payload envelope to network".into(), - ) - })?; - - Ok(warp::reply().into_response()) -} diff --git a/beacon_node/http_api/src/validator/execution_payload_bid.rs b/beacon_node/http_api/src/validator/execution_payload_bid.rs new file mode 100644 index 0000000000..8e1235d0b0 --- /dev/null +++ b/beacon_node/http_api/src/validator/execution_payload_bid.rs @@ -0,0 +1,52 @@ +use crate::task_spawner::{Priority, TaskSpawner}; +use crate::utils::{ + ChainFilter, EthV1Filter, NotWhileSyncingFilter, ResponseFilter, TaskSpawnerFilter, +}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2::types::Accept; +use std::sync::Arc; +use tracing::debug; +use types::Slot; +use warp::{Filter, Rejection}; + +// GET validator/execution_payload_bid/ +#[allow(dead_code)] +pub fn get_validator_execution_payload_bid( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("execution_payload_bid")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid slot".to_string(), + )) + })) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .and(not_while_syncing_filter) + .and(task_spawner_filter) + .and(chain_filter) + .then( + |slot: Slot, + _accept_header: Option, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + _chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + debug!( + ?slot, + "Execution paylaod bid production request from HTTP API" + ); + + not_synced_filter?; + + todo!() + }) + }, + ) + .boxed() +} diff --git a/beacon_node/http_api/src/validator/execution_payload_envelope.rs b/beacon_node/http_api/src/validator/execution_payload_envelope.rs new file mode 100644 index 0000000000..16df922781 --- /dev/null +++ b/beacon_node/http_api/src/validator/execution_payload_envelope.rs @@ -0,0 +1,105 @@ +use crate::task_spawner::{Priority, TaskSpawner}; +use crate::utils::{ + ChainFilter, EthV1Filter, NotWhileSyncingFilter, ResponseFilter, TaskSpawnerFilter, +}; +use beacon_chain::{BeaconChain, BeaconChainTypes}; +use eth2::types::{Accept, GenericResponse}; +use ssz::Encode; +use std::sync::Arc; +use tracing::debug; +use types::Slot; +use warp::http::Response; +use warp::{Filter, Rejection}; + +// GET validator/execution_payload_envelope/{slot}/{builder_index} +pub fn get_validator_execution_payload_envelope( + eth_v1: EthV1Filter, + chain_filter: ChainFilter, + not_while_syncing_filter: NotWhileSyncingFilter, + task_spawner_filter: TaskSpawnerFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("validator")) + .and(warp::path("execution_payload_envelope")) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid slot".to_string(), + )) + })) + .and(warp::path::param::().or_else(|_| async { + Err(warp_utils::reject::custom_bad_request( + "Invalid builder_index".to_string(), + )) + })) + .and(warp::path::end()) + .and(warp::header::optional::("accept")) + .and(not_while_syncing_filter) + .and(task_spawner_filter) + .and(chain_filter) + .then( + |slot: Slot, + // TODO(gloas) we're only doing local building + // we'll need to implement builder index logic + // eventually. + _builder_index: u64, + accept_header: Option, + not_synced_filter: Result<(), Rejection>, + task_spawner: TaskSpawner, + chain: Arc>| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + debug!(?slot, "Execution payload envelope request from HTTP API"); + + not_synced_filter?; + + // Get the envelope from the pending cache (local building only) + let envelope = chain + .pending_payload_envelopes + .read() + .get(slot) + .cloned() + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "Execution payload envelope not available for slot {slot}" + )) + })?; + + let fork_name = chain.spec.fork_name_at_slot::(slot); + + match accept_header { + Some(Accept::Ssz) => Response::builder() + .status(200) + .header("Content-Type", "application/octet-stream") + .header("Eth-Consensus-Version", fork_name.to_string()) + .body(envelope.as_ssz_bytes().into()) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to build SSZ response: {e}" + )) + }), + _ => { + let json_response = GenericResponse { data: envelope }; + Response::builder() + .status(200) + .header("Content-Type", "application/json") + .header("Eth-Consensus-Version", fork_name.to_string()) + .body( + serde_json::to_string(&json_response) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to serialize response: {e}" + )) + })? + .into(), + ) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "Failed to build JSON response: {e}" + )) + }) + } + } + }) + }, + ) + .boxed() +} diff --git a/beacon_node/http_api/src/validator/mod.rs b/beacon_node/http_api/src/validator/mod.rs index 612b1fafec..90cca33018 100644 --- a/beacon_node/http_api/src/validator/mod.rs +++ b/beacon_node/http_api/src/validator/mod.rs @@ -21,7 +21,6 @@ use eth2::types::{ use lighthouse_network::PubsubMessage; use network::{NetworkMessage, ValidatorSubscriptionMessage}; use slot_clock::SlotClock; -use ssz::Encode; use std::sync::Arc; use tokio::sync::mpsc::{Sender, UnboundedSender}; use tokio::sync::oneshot; @@ -31,10 +30,12 @@ use types::{ SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncContributionData, ValidatorSubscription, }; -use warp::http::Response; use warp::{Filter, Rejection, Reply}; use warp_utils::reject::convert_rejection; +pub mod execution_payload_bid; +pub mod execution_payload_envelope; + /// Uses the `chain.validator_pubkey_cache` to resolve a pubkey to a validator /// index and then ensures that the validator exists in the given `state`. pub fn pubkey_to_validator_index( @@ -335,141 +336,6 @@ pub fn get_validator_blocks( .boxed() } -// GET validator/execution_payload_bid/ -#[allow(dead_code)] -pub fn get_validator_execution_payload_bid( - eth_v1: EthV1Filter, - chain_filter: ChainFilter, - not_while_syncing_filter: NotWhileSyncingFilter, - task_spawner_filter: TaskSpawnerFilter, -) -> ResponseFilter { - eth_v1 - .and(warp::path("validator")) - .and(warp::path("execution_payload_bid")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid slot".to_string(), - )) - })) - .and(warp::path::end()) - .and(warp::header::optional::("accept")) - .and(not_while_syncing_filter) - .and(task_spawner_filter) - .and(chain_filter) - .then( - |slot: Slot, - _accept_header: Option, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - _chain: Arc>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - debug!( - ?slot, - "Execution paylaod bid production request from HTTP API" - ); - - not_synced_filter?; - - todo!() - }) - }, - ) - .boxed() -} - -// GET validator/execution_payload_envelope/{slot}/{builder_index} -pub fn get_validator_execution_payload_envelope( - eth_v1: EthV1Filter, - chain_filter: ChainFilter, - not_while_syncing_filter: NotWhileSyncingFilter, - task_spawner_filter: TaskSpawnerFilter, -) -> ResponseFilter { - eth_v1 - .and(warp::path("validator")) - .and(warp::path("execution_payload_envelope")) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid slot".to_string(), - )) - })) - .and(warp::path::param::().or_else(|_| async { - Err(warp_utils::reject::custom_bad_request( - "Invalid builder_index".to_string(), - )) - })) - .and(warp::path::end()) - .and(warp::header::optional::("accept")) - .and(not_while_syncing_filter) - .and(task_spawner_filter) - .and(chain_filter) - .then( - |slot: Slot, - // TODO(gloas) we're only doing local building - // we'll need to implement builder index logic - // eventually. - _builder_index: u64, - accept_header: Option, - not_synced_filter: Result<(), Rejection>, - task_spawner: TaskSpawner, - chain: Arc>| { - task_spawner.spawn_async_with_rejection(Priority::P0, async move { - debug!(?slot, "Execution payload envelope request from HTTP API"); - - not_synced_filter?; - - // Get the envelope from the pending cache (local building only) - let envelope = chain - .pending_payload_envelopes - .read() - .get(slot) - .cloned() - .ok_or_else(|| { - warp_utils::reject::custom_not_found(format!( - "Execution payload envelope not available for slot {slot}" - )) - })?; - - let fork_name = chain.spec.fork_name_at_slot::(slot); - - match accept_header { - Some(Accept::Ssz) => Response::builder() - .status(200) - .header("Content-Type", "application/octet-stream") - .header("Eth-Consensus-Version", fork_name.to_string()) - .body(envelope.as_ssz_bytes().into()) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "Failed to build SSZ response: {e}" - )) - }), - _ => { - let json_response = GenericResponse { data: envelope }; - Response::builder() - .status(200) - .header("Content-Type", "application/json") - .header("Eth-Consensus-Version", fork_name.to_string()) - .body( - serde_json::to_string(&json_response) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "Failed to serialize response: {e}" - )) - })? - .into(), - ) - .map_err(|e| { - warp_utils::reject::custom_server_error(format!( - "Failed to build JSON response: {e}" - )) - }) - } - } - }) - }, - ) - .boxed() -} - // POST validator/liveness/{epoch} pub fn post_validator_liveness_epoch( eth_v1: EthV1Filter, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 853afe9ece..af29df42d0 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1718,7 +1718,7 @@ pub type JsonProduceBlockV3Response = pub enum FullBlockContents { /// This is a full deneb variant with block and blobs. BlockContents(BlockContents), - /// This variant is for all pre-deneb full blocks. + /// This variant is for all pre-deneb full blocks or post-gloas beacon block. Block(BeaconBlock), }