SSE and enw endpoint

This commit is contained in:
Eitan Seri- Levi
2026-02-13 17:21:22 -08:00
parent e5598d529c
commit 5466b8a241
8 changed files with 231 additions and 6 deletions

View File

@@ -1101,6 +1101,14 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
})));
}
// Beacon API execution_payload_bid events
if let Some(event_handler) = chain.event_handler.as_ref()
&& event_handler.has_execution_payload_bid_subscribers()
&& let Ok(bid) = block.message().body().signed_execution_payload_bid()
{
event_handler.register(EventKind::ExecutionPayloadBid(Box::new(bid.clone())));
}
// Having checked the proposer index and the block root we can cache them.
let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root)

View File

@@ -26,6 +26,8 @@ pub struct ServerSentEventHandler<E: EthSpec> {
attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
execution_payload_bid_tx: Sender<EventKind<E>>,
execution_payload_available_tx: Sender<EventKind<E>>,
}
impl<E: EthSpec> ServerSentEventHandler<E> {
@@ -53,6 +55,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);
let (execution_payload_bid_tx, _) = broadcast::channel(capacity);
let (execution_payload_available_tx, _) = broadcast::channel(capacity);
Self {
attestation_tx,
@@ -74,6 +78,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
attester_slashing_tx,
bls_to_execution_change_tx,
block_gossip_tx,
execution_payload_bid_tx,
execution_payload_available_tx,
}
}
@@ -162,6 +168,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
EventKind::ExecutionPayloadBid(_) => self
.execution_payload_bid_tx
.send(kind)
.map(|count| log_count("execution payload bid", count)),
EventKind::ExecutionPayloadAvailable(_) => self
.execution_payload_available_tx
.send(kind)
.map(|count| log_count("execution payload available", count)),
};
if let Err(SendError(event)) = result {
trace!(?event, "No receivers registered to listen for event");
@@ -311,4 +325,20 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}
pub fn subscribe_execution_payload_bid(&self) -> Receiver<EventKind<E>> {
self.execution_payload_bid_tx.subscribe()
}
pub fn subscribe_execution_payload_available(&self) -> Receiver<EventKind<E>> {
self.execution_payload_available_tx.subscribe()
}
pub fn has_execution_payload_bid_subscribers(&self) -> bool {
self.execution_payload_bid_tx.receiver_count() > 0
}
pub fn has_execution_payload_available_subscribers(&self) -> bool {
self.execution_payload_available_tx.receiver_count() > 0
}
}

View File

@@ -387,9 +387,18 @@ pub fn get_execution_payload<T: BeaconChainTypes>(
let timestamp =
compute_timestamp_at_slot(state, state.slot(), spec).map_err(BeaconStateError::from)?;
let random = *state.get_randao_mix(current_epoch)?;
let latest_execution_payload_header = state.latest_execution_payload_header()?;
let latest_execution_payload_header_block_hash = latest_execution_payload_header.block_hash();
let latest_execution_payload_header_gas_limit = latest_execution_payload_header.gas_limit();
// In GLOAS (ePBS), the execution payload header is replaced by
// `latest_block_hash` and `latest_execution_payload_bid`.
let (latest_execution_payload_header_block_hash, latest_execution_payload_header_gas_limit) =
if state.fork_name_unchecked() == ForkName::Gloas {
(
*state.latest_block_hash()?,
state.latest_execution_payload_bid()?.gas_limit,
)
} else {
let header = state.latest_execution_payload_header()?;
(header.block_hash(), header.gas_limit())
};
let withdrawals = if state.fork_name_unchecked().capella_enabled() {
Some(Withdrawals::<T::EthSpec>::from(get_expected_withdrawals(state, spec)?).into())
} else {

View File

@@ -19,6 +19,7 @@ use crate::{
metrics,
validator_monitor::{get_slot_delay_ms, timestamp_now},
};
use eth2::types::{EventKind, SseExecutionPayloadAvailable};
impl<T: BeaconChainTypes> BeaconChain<T> {
/// Returns `Ok(block_root)` if the given `unverified_envelope` was successfully verified and
@@ -357,6 +358,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
// TODO(gloas) emit SSE event for envelope import (similar to SseBlock for blocks).
// Beacon API execution_payload_available events
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_execution_payload_available_subscribers()
{
event_handler.register(EventKind::ExecutionPayloadAvailable(
SseExecutionPayloadAvailable {
slot: envelope_slot,
block_root,
},
));
}
}
}

View File

@@ -1,11 +1,17 @@
use crate::block_id::BlockId;
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::{ChainFilter, EthV1Filter, NetworkTxFilter, ResponseFilter, TaskSpawnerFilter};
use crate::version::{
ResponseIncludesVersion, add_consensus_version_header, add_ssz_content_type_header,
execution_optimistic_finalized_beacon_response,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use bytes::Bytes;
use eth2::types as api_types;
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use ssz::Decode;
use ssz::{Decode, Encode};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
use tracing::{info, warn};
@@ -127,3 +133,67 @@ pub async fn publish_execution_payload_envelope<T: BeaconChainTypes>(
Ok(warp::reply().into_response())
}
// GET beacon/execution_payload_envelope/{block_id}
pub(crate) fn get_beacon_execution_payload_envelope<T: BeaconChainTypes>(
eth_v1: EthV1Filter,
block_id_or_err: impl Filter<Extract = (BlockId,), Error = Rejection> + Clone + Send + Sync + 'static,
task_spawner_filter: TaskSpawnerFilter<T>,
chain_filter: ChainFilter<T>,
) -> ResponseFilter {
eth_v1
.and(warp::path("beacon"))
.and(warp::path("execution_payload_envelope"))
.and(block_id_or_err)
.and(warp::path::end())
.and(task_spawner_filter)
.and(chain_filter)
.and(warp::header::optional::<api_types::Accept>("accept"))
.then(
|block_id: BlockId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
accept_header: Option<api_types::Accept>| {
task_spawner.blocking_response_task(Priority::P1, move || {
let (root, execution_optimistic, finalized) = block_id.root(&chain)?;
let envelope = chain
.get_payload_envelope(&root)
.map_err(warp_utils::reject::unhandled_error)?
.ok_or_else(|| {
warp_utils::reject::custom_not_found(format!(
"execution payload envelope for block root {root}"
))
})?;
let fork_name = chain
.spec
.fork_name_at_slot::<T::EthSpec>(envelope.message.slot);
match accept_header {
Some(api_types::Accept::Ssz) => warp::http::Response::builder()
.status(200)
.body(warp::hyper::Body::from(envelope.as_ssz_bytes()))
.map(add_ssz_content_type_header)
.map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to create response: {}",
e
))
}),
_ => {
let res = execution_optimistic_finalized_beacon_response(
ResponseIncludesVersion::Yes(fork_name),
execution_optimistic,
finalized,
&envelope,
)?;
Ok(warp::reply::json(&res).into_response())
}
}
.map(|resp| add_consensus_version_header(resp, fork_name))
})
},
)
.boxed()
}

View File

@@ -37,7 +37,8 @@ mod validators;
mod version;
use crate::beacon::execution_payload_envelope::{
post_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope_ssz,
get_beacon_execution_payload_envelope, post_beacon_execution_payload_envelope,
post_beacon_execution_payload_envelope_ssz,
};
use crate::beacon::pool::*;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
@@ -1506,6 +1507,14 @@ pub fn serve<T: BeaconChainTypes>(
network_tx_filter.clone(),
);
// GET beacon/execution_payload_envelope/{block_id}
let get_beacon_execution_payload_envelope = get_beacon_execution_payload_envelope(
eth_v1.clone(),
block_id_or_err,
task_spawner_filter.clone(),
chain_filter.clone(),
);
let beacon_rewards_path = eth_v1
.clone()
.and(warp::path("beacon"))
@@ -3217,6 +3226,12 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlockGossip => {
event_handler.subscribe_block_gossip()
}
api_types::EventTopic::ExecutionPayloadBid => {
event_handler.subscribe_execution_payload_bid()
}
api_types::EventTopic::ExecutionPayloadAvailable => {
event_handler.subscribe_execution_payload_available()
}
};
receivers.push(
@@ -3341,6 +3356,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_beacon_block_root)
.uor(get_blob_sidecars)
.uor(get_blobs)
.uor(get_beacon_execution_payload_envelope)
.uor(get_beacon_pool_attestations)
.uor(get_beacon_pool_attester_slashings)
.uor(get_beacon_pool_proposer_slashings)

View File

@@ -2655,6 +2655,55 @@ impl BeaconNodeHttpClient {
Ok(())
}
/// Path for `v1/beacon/execution_payload_envelope/{block_id}`
pub fn get_beacon_execution_payload_envelope_path(
&self,
block_id: BlockId,
) -> Result<Url, Error> {
let mut path = self.eth_path(V1)?;
path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("beacon")
.push("execution_payload_envelope")
.push(&block_id.to_string());
Ok(path)
}
/// `GET v1/beacon/execution_payload_envelope/{block_id}`
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_execution_payload_envelope<E: EthSpec>(
&self,
block_id: BlockId,
) -> Result<Option<ExecutionOptimisticFinalizedBeaconResponse<SignedExecutionPayloadEnvelope<E>>>, Error>
{
let path = self.get_beacon_execution_payload_envelope_path(block_id)?;
self.get_opt(path)
.await
.map(|opt| opt.map(BeaconResponse::ForkVersioned))
}
/// `GET v1/beacon/execution_payload_envelope/{block_id}` in SSZ format
///
/// Returns `Ok(None)` on a 404 error.
pub async fn get_beacon_execution_payload_envelope_ssz<E: EthSpec>(
&self,
block_id: BlockId,
) -> Result<Option<SignedExecutionPayloadEnvelope<E>>, Error> {
let path = self.get_beacon_execution_payload_envelope_path(block_id)?;
let opt_response = self
.get_bytes_opt_accept_header(path, Accept::Ssz, self.timeouts.get_beacon_blocks_ssz)
.await?;
match opt_response {
Some(bytes) => {
SignedExecutionPayloadEnvelope::from_ssz_bytes(&bytes)
.map(Some)
.map_err(Error::InvalidSsz)
}
None => Ok(None),
}
}
/// `GET v2/validator/blocks/{slot}` in ssz format
pub async fn get_validator_blocks_ssz<E: EthSpec>(
&self,

View File

@@ -1064,6 +1064,12 @@ pub struct BlockGossip {
pub slot: Slot,
pub block: Hash256,
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct SseExecutionPayloadAvailable {
pub slot: Slot,
pub block_root: Hash256,
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseChainReorg {
pub slot: Slot,
@@ -1206,6 +1212,8 @@ pub enum EventKind<E: EthSpec> {
AttesterSlashing(Box<AttesterSlashing<E>>),
BlsToExecutionChange(Box<SignedBlsToExecutionChange>),
BlockGossip(Box<BlockGossip>),
ExecutionPayloadBid(Box<SignedExecutionPayloadBid<E>>),
ExecutionPayloadAvailable(SseExecutionPayloadAvailable),
}
impl<E: EthSpec> EventKind<E> {
@@ -1231,6 +1239,8 @@ impl<E: EthSpec> EventKind<E> {
EventKind::AttesterSlashing(_) => "attester_slashing",
EventKind::BlsToExecutionChange(_) => "bls_to_execution_change",
EventKind::BlockGossip(_) => "block_gossip",
EventKind::ExecutionPayloadBid(_) => "execution_payload_bid",
EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available",
}
}
@@ -1324,6 +1334,22 @@ impl<E: EthSpec> EventKind<E> {
"block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)),
)?)),
"execution_payload_bid" => Ok(EventKind::ExecutionPayloadBid(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!(
"Execution Payload Bid: {:?}",
e
))
})?,
)),
"execution_payload_available" => Ok(EventKind::ExecutionPayloadAvailable(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!(
"Execution Payload Available: {:?}",
e
))
})?,
)),
_ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(),
)),
@@ -1361,6 +1387,8 @@ pub enum EventTopic {
ProposerSlashing,
BlsToExecutionChange,
BlockGossip,
ExecutionPayloadBid,
ExecutionPayloadAvailable,
}
impl FromStr for EventTopic {
@@ -1388,6 +1416,8 @@ impl FromStr for EventTopic {
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
"bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange),
"block_gossip" => Ok(EventTopic::BlockGossip),
"execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid),
"execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable),
_ => Err("event topic cannot be parsed.".to_string()),
}
}
@@ -1416,6 +1446,8 @@ impl fmt::Display for EventTopic {
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"),
EventTopic::BlockGossip => write!(f, "block_gossip"),
EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"),
EventTopic::ExecutionPayloadAvailable => write!(f, "execution_payload_available"),
}
}
}