proposer preferences SSE event

This commit is contained in:
Eitan Seri- Levi
2026-05-16 08:08:30 -07:00
parent 1a68631180
commit a5bd456a58
5 changed files with 45 additions and 0 deletions

View File

@@ -29,6 +29,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
execution_payload_gossip_tx: Sender<EventKind<E>>, execution_payload_gossip_tx: Sender<EventKind<E>>,
execution_payload_available_tx: Sender<EventKind<E>>, execution_payload_available_tx: Sender<EventKind<E>>,
execution_payload_bid_tx: Sender<EventKind<E>>, execution_payload_bid_tx: Sender<EventKind<E>>,
proposer_preferences_tx: Sender<EventKind<E>>,
payload_attestation_message_tx: Sender<EventKind<E>>, payload_attestation_message_tx: Sender<EventKind<E>>,
} }
@@ -60,6 +61,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (execution_payload_gossip_tx, _) = broadcast::channel(capacity); let (execution_payload_gossip_tx, _) = broadcast::channel(capacity);
let (execution_payload_available_tx, _) = broadcast::channel(capacity); let (execution_payload_available_tx, _) = broadcast::channel(capacity);
let (execution_payload_bid_tx, _) = broadcast::channel(capacity); let (execution_payload_bid_tx, _) = broadcast::channel(capacity);
let (proposer_preferences_tx, _) = broadcast::channel(capacity);
let (payload_attestation_message_tx, _) = broadcast::channel(capacity); let (payload_attestation_message_tx, _) = broadcast::channel(capacity);
Self { Self {
@@ -85,6 +87,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
execution_payload_gossip_tx, execution_payload_gossip_tx,
execution_payload_available_tx, execution_payload_available_tx,
execution_payload_bid_tx, execution_payload_bid_tx,
proposer_preferences_tx,
payload_attestation_message_tx, payload_attestation_message_tx,
} }
} }
@@ -186,6 +189,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.execution_payload_bid_tx .execution_payload_bid_tx
.send(kind) .send(kind)
.map(|count| log_count("execution payload bid", count)), .map(|count| log_count("execution payload bid", count)),
EventKind::ProposerPreferences(_) => self
.proposer_preferences_tx
.send(kind)
.map(|count| log_count("proposer preferences", count)),
EventKind::PayloadAttestationMessage(_) => self EventKind::PayloadAttestationMessage(_) => self
.payload_attestation_message_tx .payload_attestation_message_tx
.send(kind) .send(kind)
@@ -284,6 +291,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.execution_payload_bid_tx.subscribe() self.execution_payload_bid_tx.subscribe()
} }
pub fn subscribe_proposer_preferences(&self) -> Receiver<EventKind<E>> {
self.proposer_preferences_tx.subscribe()
}
pub fn subscribe_payload_attestation_message(&self) -> Receiver<EventKind<E>> { pub fn subscribe_payload_attestation_message(&self) -> Receiver<EventKind<E>> {
self.payload_attestation_message_tx.subscribe() self.payload_attestation_message_tx.subscribe()
} }
@@ -368,6 +379,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.execution_payload_bid_tx.receiver_count() > 0 self.execution_payload_bid_tx.receiver_count() > 0
} }
pub fn has_proposer_preferences_subscribers(&self) -> bool {
self.proposer_preferences_tx.receiver_count() > 0
}
pub fn has_payload_attestation_message_subscribers(&self) -> bool { pub fn has_payload_attestation_message_subscribers(&self) -> bool {
self.payload_attestation_message_tx.receiver_count() > 0 self.payload_attestation_message_tx.receiver_count() > 0
} }

View File

@@ -6,6 +6,7 @@ use crate::{
ProposerPreferencesError, proposer_preference_cache::GossipVerifiedProposerPreferenceCache, ProposerPreferencesError, proposer_preference_cache::GossipVerifiedProposerPreferenceCache,
}, },
}; };
use eth2::types::{EventKind, ForkVersionedResponse};
use slot_clock::SlotClock; use slot_clock::SlotClock;
use state_processing::signature_sets::{get_pubkey_from_state, proposer_preferences_signature_set}; use state_processing::signature_sets::{get_pubkey_from_state, proposer_preferences_signature_set};
use tracing::debug; use tracing::debug;
@@ -137,6 +138,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
%validator_index, %validator_index,
"Successfully verified gossip proposer preferences" "Successfully verified gossip proposer preferences"
); );
if let Some(event_handler) = self.event_handler.as_ref()
&& event_handler.has_proposer_preferences_subscribers()
{
event_handler.register(EventKind::ProposerPreferences(Box::new(
ForkVersionedResponse {
version: self.spec.fork_name_at_slot::<T::EthSpec>(proposal_slot),
metadata: Default::default(),
data: (*verified.signed_preferences).clone(),
},
)));
}
Ok(verified) Ok(verified)
} }
Err(e) => { Err(e) => {

View File

@@ -3241,6 +3241,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::ExecutionPayloadBid => { api_types::EventTopic::ExecutionPayloadBid => {
event_handler.subscribe_execution_payload_bid() event_handler.subscribe_execution_payload_bid()
} }
api_types::EventTopic::ProposerPreferences => {
event_handler.subscribe_proposer_preferences()
}
api_types::EventTopic::PayloadAttestationMessage => { api_types::EventTopic::PayloadAttestationMessage => {
event_handler.subscribe_payload_attestation_message() event_handler.subscribe_payload_attestation_message()
} }

View File

@@ -1164,6 +1164,7 @@ pub struct SseExtendedPayloadAttributesGeneric<T> {
pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric<SsePayloadAttributes>; pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric<SsePayloadAttributes>;
pub type VersionedSsePayloadAttributes = ForkVersionedResponse<SseExtendedPayloadAttributes>; pub type VersionedSsePayloadAttributes = ForkVersionedResponse<SseExtendedPayloadAttributes>;
pub type VersionedSseExecutionPayloadBid<E> = ForkVersionedResponse<SignedExecutionPayloadBid<E>>; pub type VersionedSseExecutionPayloadBid<E> = ForkVersionedResponse<SignedExecutionPayloadBid<E>>;
pub type VersionedSseProposerPreferences = ForkVersionedResponse<SignedProposerPreferences>;
pub type VersionedSsePayloadAttestationMessage = ForkVersionedResponse<PayloadAttestationMessage>; pub type VersionedSsePayloadAttestationMessage = ForkVersionedResponse<PayloadAttestationMessage>;
impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes { impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes {
@@ -1245,6 +1246,7 @@ pub enum EventKind<E: EthSpec> {
ExecutionPayloadGossip(SseExecutionPayloadGossip), ExecutionPayloadGossip(SseExecutionPayloadGossip),
ExecutionPayloadAvailable(SseExecutionPayloadAvailable), ExecutionPayloadAvailable(SseExecutionPayloadAvailable),
ExecutionPayloadBid(Box<VersionedSseExecutionPayloadBid<E>>), ExecutionPayloadBid(Box<VersionedSseExecutionPayloadBid<E>>),
ProposerPreferences(Box<VersionedSseProposerPreferences>),
PayloadAttestationMessage(Box<VersionedSsePayloadAttestationMessage>), PayloadAttestationMessage(Box<VersionedSsePayloadAttestationMessage>),
} }
@@ -1273,6 +1275,7 @@ impl<E: EthSpec> EventKind<E> {
EventKind::ExecutionPayloadGossip(_) => "execution_payload_gossip", EventKind::ExecutionPayloadGossip(_) => "execution_payload_gossip",
EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available", EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available",
EventKind::ExecutionPayloadBid(_) => "execution_payload_bid", EventKind::ExecutionPayloadBid(_) => "execution_payload_bid",
EventKind::ProposerPreferences(_) => "proposer_preferences",
EventKind::PayloadAttestationMessage(_) => "payload_attestation_message", EventKind::PayloadAttestationMessage(_) => "payload_attestation_message",
} }
} }
@@ -1389,6 +1392,11 @@ impl<E: EthSpec> EventKind<E> {
ServerError::InvalidServerSentEvent(format!("Execution Payload Bid: {:?}", e)) ServerError::InvalidServerSentEvent(format!("Execution Payload Bid: {:?}", e))
})?, })?,
))), ))),
"proposer_preferences" => Ok(EventKind::ProposerPreferences(Box::new(
serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!("Proposer Preferences: {:?}", e))
})?,
))),
"payload_attestation_message" => Ok(EventKind::PayloadAttestationMessage(Box::new( "payload_attestation_message" => Ok(EventKind::PayloadAttestationMessage(Box::new(
serde_json::from_str(data).map_err(|e| { serde_json::from_str(data).map_err(|e| {
ServerError::InvalidServerSentEvent(format!( ServerError::InvalidServerSentEvent(format!(
@@ -1436,6 +1444,7 @@ pub enum EventTopic {
ExecutionPayloadGossip, ExecutionPayloadGossip,
ExecutionPayloadAvailable, ExecutionPayloadAvailable,
ExecutionPayloadBid, ExecutionPayloadBid,
ProposerPreferences,
PayloadAttestationMessage, PayloadAttestationMessage,
} }
@@ -1466,6 +1475,7 @@ impl FromStr for EventTopic {
"execution_payload_gossip" => Ok(EventTopic::ExecutionPayloadGossip), "execution_payload_gossip" => Ok(EventTopic::ExecutionPayloadGossip),
"execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable), "execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable),
"execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid), "execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid),
"proposer_preferences" => Ok(EventTopic::ProposerPreferences),
"payload_attestation_message" => Ok(EventTopic::PayloadAttestationMessage), "payload_attestation_message" => Ok(EventTopic::PayloadAttestationMessage),
_ => Err("event topic cannot be parsed.".to_string()), _ => Err("event topic cannot be parsed.".to_string()),
} }
@@ -1499,6 +1509,7 @@ impl fmt::Display for EventTopic {
write!(f, "execution_payload_available") write!(f, "execution_payload_available")
} }
EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"), EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"),
EventTopic::ProposerPreferences => write!(f, "proposer_preferences"),
EventTopic::PayloadAttestationMessage => { EventTopic::PayloadAttestationMessage => {
write!(f, "payload_attestation_message") write!(f, "payload_attestation_message")
} }

View File

@@ -14,8 +14,10 @@ use tree_hash_derive::TreeHash;
pub struct ProposerPreferences { pub struct ProposerPreferences {
pub dependent_root: Hash256, pub dependent_root: Hash256,
pub proposal_slot: Slot, pub proposal_slot: Slot,
#[serde(with = "serde_utils::quoted_u64")]
pub validator_index: u64, pub validator_index: u64,
pub fee_recipient: Address, pub fee_recipient: Address,
#[serde(with = "serde_utils::quoted_u64")]
pub gas_limit: u64, pub gas_limit: u64,
} }