diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 8c11cbac73..f2cdeb4847 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1101,6 +1101,14 @@ impl GossipVerifiedBlock { }))); } + // Beacon API execution_payload_bid events + if let Some(event_handler) = chain.event_handler.as_ref() + && event_handler.has_execution_payload_bid_subscribers() + && let Ok(bid) = block.message().body().signed_execution_payload_bid() + { + event_handler.register(EventKind::ExecutionPayloadBid(Box::new(bid.clone()))); + } + // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 63be944eea..fbe8cc3ef9 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -26,6 +26,8 @@ pub struct ServerSentEventHandler { attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, block_gossip_tx: Sender>, + execution_payload_bid_tx: Sender>, + execution_payload_available_tx: Sender>, } impl ServerSentEventHandler { @@ -53,6 +55,8 @@ impl ServerSentEventHandler { let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); let (block_gossip_tx, _) = broadcast::channel(capacity); + let (execution_payload_bid_tx, _) = broadcast::channel(capacity); + let (execution_payload_available_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -74,6 +78,8 @@ impl ServerSentEventHandler { attester_slashing_tx, bls_to_execution_change_tx, block_gossip_tx, + execution_payload_bid_tx, + execution_payload_available_tx, } } @@ -162,6 +168,14 @@ impl ServerSentEventHandler { .block_gossip_tx .send(kind) .map(|count| log_count("block gossip", count)), + EventKind::ExecutionPayloadBid(_) => self + .execution_payload_bid_tx + .send(kind) + .map(|count| log_count("execution payload bid", count)), + EventKind::ExecutionPayloadAvailable(_) => self + .execution_payload_available_tx + .send(kind) + .map(|count| log_count("execution payload available", count)), }; if let Err(SendError(event)) = result { trace!(?event, "No receivers registered to listen for event"); @@ -311,4 +325,20 @@ impl ServerSentEventHandler { pub fn has_block_gossip_subscribers(&self) -> bool { self.block_gossip_tx.receiver_count() > 0 } + + pub fn subscribe_execution_payload_bid(&self) -> Receiver> { + self.execution_payload_bid_tx.subscribe() + } + + pub fn subscribe_execution_payload_available(&self) -> Receiver> { + self.execution_payload_available_tx.subscribe() + } + + pub fn has_execution_payload_bid_subscribers(&self) -> bool { + self.execution_payload_bid_tx.receiver_count() > 0 + } + + pub fn has_execution_payload_available_subscribers(&self) -> bool { + self.execution_payload_available_tx.receiver_count() > 0 + } } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index a2022f7d5e..3ac0b1fd11 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -387,9 +387,18 @@ pub fn get_execution_payload( let timestamp = compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?; let random = *state.get_randao_mix(current_epoch)?; - let latest_execution_payload_header = state.latest_execution_payload_header()?; - let latest_execution_payload_header_block_hash = latest_execution_payload_header.block_hash(); - let latest_execution_payload_header_gas_limit = latest_execution_payload_header.gas_limit(); + // In GLOAS (ePBS), the execution payload header is replaced by + // `latest_block_hash` and `latest_execution_payload_bid`. + let (latest_execution_payload_header_block_hash, latest_execution_payload_header_gas_limit) = + if state.fork_name_unchecked() == ForkName::Gloas { + ( + *state.latest_block_hash()?, + state.latest_execution_payload_bid()?.gas_limit, + ) + } else { + let header = state.latest_execution_payload_header()?; + (header.block_hash(), header.gas_limit()) + }; let withdrawals = if state.fork_name_unchecked().capella_enabled() { Some(Withdrawals::::from(get_expected_withdrawals(state, spec)?).into()) } else { diff --git a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs index 8a23c6050d..7d7a18b264 100644 --- a/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs +++ b/beacon_node/beacon_chain/src/payload_envelope_verification/import.rs @@ -19,6 +19,7 @@ use crate::{ metrics, validator_monitor::{get_slot_delay_ms, timestamp_now}, }; +use eth2::types::{EventKind, SseExecutionPayloadAvailable}; impl BeaconChain { /// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and @@ -357,6 +358,16 @@ impl BeaconChain { ); } - // TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks). + // Beacon API execution_payload_available events + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_execution_payload_available_subscribers() + { + event_handler.register(EventKind::ExecutionPayloadAvailable( + SseExecutionPayloadAvailable { + slot: envelope_slot, + block_root, + }, + )); + } } } diff --git a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs index 18d2e35fa4..ca66a8b52e 100644 --- a/beacon_node/http_api/src/beacon/execution_payload_envelope.rs +++ b/beacon_node/http_api/src/beacon/execution_payload_envelope.rs @@ -1,11 +1,17 @@ +use crate::block_id::BlockId; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter}; +use crate::version::{ + ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header, + execution_optimistic_finalized_beacon_response, +}; use beacon_chain::{BeaconChain, BeaconChainTypes}; use bytes::Bytes; +use eth2::types as api_types; use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER}; use lighthouse_network::PubsubMessage; use network::NetworkMessage; -use ssz::Decode; +use ssz::{Decode, Encode}; use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; use tracing::{info, warn}; @@ -127,3 +133,67 @@ pub async fn publish_execution_payload_envelope( Ok(warp::reply().into_response()) } + +// GET beacon/execution_payload_envelope/{block_id} +pub(crate) fn get_beacon_execution_payload_envelope( + eth_v1: EthV1Filter, + block_id_or_err: impl Filter + Clone + Send + Sync + 'static, + task_spawner_filter: TaskSpawnerFilter, + chain_filter: ChainFilter, +) -> ResponseFilter { + eth_v1 + .and(warp::path("beacon")) + .and(warp::path("execution_payload_envelope")) + .and(block_id_or_err) + .and(warp::path::end()) + .and(task_spawner_filter) + .and(chain_filter) + .and(warp::header::optional::("accept")) + .then( + |block_id: BlockId, + task_spawner: TaskSpawner, + chain: Arc>, + accept_header: Option| { + task_spawner.blocking_response_task(Priority::P1, move || { + let (root, execution_optimistic, finalized) = block_id.root(&chain)?; + + let envelope = chain + .get_payload_envelope(&root) + .map_err(warp_utils::reject::unhandled_error)? + .ok_or_else(|| { + warp_utils::reject::custom_not_found(format!( + "execution payload envelope for block root {root}" + )) + })?; + + let fork_name = chain + .spec + .fork_name_at_slot::(envelope.message.slot); + + match accept_header { + Some(api_types::Accept::Ssz) => warp::http::Response::builder() + .status(200) + .body(warp::hyper::Body::from(envelope.as_ssz_bytes())) + .map(add_ssz_content_type_header) + .map_err(|e| { + warp_utils::reject::custom_server_error(format!( + "failed to create response: {}", + e + )) + }), + _ => { + let res = execution_optimistic_finalized_beacon_response( + ResponseIncludesVersion::Yes(fork_name), + execution_optimistic, + finalized, + &envelope, + )?; + Ok(warp::reply::json(&res).into_response()) + } + } + .map(|resp| add_consensus_version_header(resp, fork_name)) + }) + }, + ) + .boxed() +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 969755fbe5..f05fc9a907 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -37,7 +37,8 @@ mod validators; mod version; use crate::beacon::execution_payload_envelope::{ - post_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope_ssz, + get_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope, + post_beacon_execution_payload_envelope_ssz, }; use crate::beacon::pool::*; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; @@ -1506,6 +1507,14 @@ pub fn serve( network_tx_filter.clone(), ); + // GET beacon/execution_payload_envelope/{block_id} + let get_beacon_execution_payload_envelope = get_beacon_execution_payload_envelope( + eth_v1.clone(), + block_id_or_err, + task_spawner_filter.clone(), + chain_filter.clone(), + ); + let beacon_rewards_path = eth_v1 .clone() .and(warp::path("beacon")) @@ -3217,6 +3226,12 @@ pub fn serve( api_types::EventTopic::BlockGossip => { event_handler.subscribe_block_gossip() } + api_types::EventTopic::ExecutionPayloadBid => { + event_handler.subscribe_execution_payload_bid() + } + api_types::EventTopic::ExecutionPayloadAvailable => { + event_handler.subscribe_execution_payload_available() + } }; receivers.push( @@ -3341,6 +3356,7 @@ pub fn serve( .uor(get_beacon_block_root) .uor(get_blob_sidecars) .uor(get_blobs) + .uor(get_beacon_execution_payload_envelope) .uor(get_beacon_pool_attestations) .uor(get_beacon_pool_attester_slashings) .uor(get_beacon_pool_proposer_slashings) diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 7a340bda6b..afbf50139b 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -2655,6 +2655,55 @@ impl BeaconNodeHttpClient { Ok(()) } + /// Path for `v1/beacon/execution_payload_envelope/{block_id}` + pub fn get_beacon_execution_payload_envelope_path( + &self, + block_id: BlockId, + ) -> Result { + let mut path = self.eth_path(V1)?; + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("beacon") + .push("execution_payload_envelope") + .push(&block_id.to_string()); + Ok(path) + } + + /// `GET v1/beacon/execution_payload_envelope/{block_id}` + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_execution_payload_envelope( + &self, + block_id: BlockId, + ) -> Result>>, Error> + { + let path = self.get_beacon_execution_payload_envelope_path(block_id)?; + self.get_opt(path) + .await + .map(|opt| opt.map(BeaconResponse::ForkVersioned)) + } + + /// `GET v1/beacon/execution_payload_envelope/{block_id}` in SSZ format + /// + /// Returns `Ok(None)` on a 404 error. + pub async fn get_beacon_execution_payload_envelope_ssz( + &self, + block_id: BlockId, + ) -> Result>, Error> { + let path = self.get_beacon_execution_payload_envelope_path(block_id)?; + let opt_response = self + .get_bytes_opt_accept_header(path, Accept::Ssz, self.timeouts.get_beacon_blocks_ssz) + .await?; + match opt_response { + Some(bytes) => { + SignedExecutionPayloadEnvelope::from_ssz_bytes(&bytes) + .map(Some) + .map_err(Error::InvalidSsz) + } + None => Ok(None), + } + } + /// `GET v2/validator/blocks/{slot}` in ssz format pub async fn get_validator_blocks_ssz( &self, diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index ca16fcd28a..01291c7463 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1064,6 +1064,12 @@ pub struct BlockGossip { pub slot: Slot, pub block: Hash256, } + +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct SseExecutionPayloadAvailable { + pub slot: Slot, + pub block_root: Hash256, +} #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseChainReorg { pub slot: Slot, @@ -1206,6 +1212,8 @@ pub enum EventKind { AttesterSlashing(Box>), BlsToExecutionChange(Box), BlockGossip(Box), + ExecutionPayloadBid(Box>), + ExecutionPayloadAvailable(SseExecutionPayloadAvailable), } impl EventKind { @@ -1231,6 +1239,8 @@ impl EventKind { EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlockGossip(_) => "block_gossip", + EventKind::ExecutionPayloadBid(_) => "execution_payload_bid", + EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available", } } @@ -1324,6 +1334,22 @@ impl EventKind { "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), )?)), + "execution_payload_bid" => Ok(EventKind::ExecutionPayloadBid( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Execution Payload Bid: {:?}", + e + )) + })?, + )), + "execution_payload_available" => Ok(EventKind::ExecutionPayloadAvailable( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!( + "Execution Payload Available: {:?}", + e + )) + })?, + )), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1361,6 +1387,8 @@ pub enum EventTopic { ProposerSlashing, BlsToExecutionChange, BlockGossip, + ExecutionPayloadBid, + ExecutionPayloadAvailable, } impl FromStr for EventTopic { @@ -1388,6 +1416,8 @@ impl FromStr for EventTopic { "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), "block_gossip" => Ok(EventTopic::BlockGossip), + "execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid), + "execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1416,6 +1446,8 @@ impl fmt::Display for EventTopic { EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), EventTopic::BlockGossip => write!(f, "block_gossip"), + EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"), + EventTopic::ExecutionPayloadAvailable => write!(f, "execution_payload_available"), } } }