diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 80667cd399..dd08c59c76 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -29,6 +29,7 @@ pub struct ServerSentEventHandler { execution_payload_gossip_tx: Sender>, execution_payload_available_tx: Sender>, execution_payload_bid_tx: Sender>, + proposer_preferences_tx: Sender>, payload_attestation_message_tx: Sender>, } @@ -60,6 +61,7 @@ impl ServerSentEventHandler { let (execution_payload_gossip_tx, _) = broadcast::channel(capacity); let (execution_payload_available_tx, _) = broadcast::channel(capacity); let (execution_payload_bid_tx, _) = broadcast::channel(capacity); + let (proposer_preferences_tx, _) = broadcast::channel(capacity); let (payload_attestation_message_tx, _) = broadcast::channel(capacity); Self { @@ -85,6 +87,7 @@ impl ServerSentEventHandler { execution_payload_gossip_tx, execution_payload_available_tx, execution_payload_bid_tx, + proposer_preferences_tx, payload_attestation_message_tx, } } @@ -186,6 +189,10 @@ impl ServerSentEventHandler { .execution_payload_bid_tx .send(kind) .map(|count| log_count("execution payload bid", count)), + EventKind::ProposerPreferences(_) => self + .proposer_preferences_tx + .send(kind) + .map(|count| log_count("proposer preferences", count)), EventKind::PayloadAttestationMessage(_) => self .payload_attestation_message_tx .send(kind) @@ -284,6 +291,10 @@ impl ServerSentEventHandler { self.execution_payload_bid_tx.subscribe() } + pub fn subscribe_proposer_preferences(&self) -> Receiver> { + self.proposer_preferences_tx.subscribe() + } + pub fn subscribe_payload_attestation_message(&self) -> Receiver> { self.payload_attestation_message_tx.subscribe() } @@ -368,6 +379,10 @@ impl ServerSentEventHandler { self.execution_payload_bid_tx.receiver_count() > 0 } + pub fn has_proposer_preferences_subscribers(&self) -> bool { + self.proposer_preferences_tx.receiver_count() > 0 + } + pub fn has_payload_attestation_message_subscribers(&self) -> bool { self.payload_attestation_message_tx.receiver_count() > 0 } diff --git a/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs b/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs index 586721d8c1..4dc1646ec4 100644 --- a/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs +++ b/beacon_node/beacon_chain/src/proposer_preferences_verification/gossip_verified_proposer_preferences.rs @@ -6,6 +6,7 @@ use crate::{ ProposerPreferencesError, proposer_preference_cache::GossipVerifiedProposerPreferenceCache, }, }; +use eth2::types::{EventKind, ForkVersionedResponse}; use slot_clock::SlotClock; use state_processing::signature_sets::{get_pubkey_from_state, proposer_preferences_signature_set}; use tracing::debug; @@ -145,6 +146,19 @@ impl BeaconChain { %validator_index, "Successfully verified gossip proposer preferences" ); + + if let Some(event_handler) = self.event_handler.as_ref() + && event_handler.has_proposer_preferences_subscribers() + { + event_handler.register(EventKind::ProposerPreferences(Box::new( + ForkVersionedResponse { + version: self.spec.fork_name_at_slot::(proposal_slot), + metadata: Default::default(), + data: (*verified.signed_preferences).clone(), + }, + ))); + } + Ok(verified) } Err(e) => { diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index baa6975303..9f0b3675f3 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -7,9 +7,9 @@ use eth2::types::{EventKind, SseBlobSidecar, SseDataColumnSidecar}; use std::sync::Arc; use types::data::FixedBlobSidecarList; use types::{ - BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, EthSpec, - MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, SignedExecutionPayloadBid, - SignedRoot, Slot, + Address, BlobSidecar, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, Domain, + EthSpec, Hash256, MinimalEthSpec, PayloadAttestationData, PayloadAttestationMessage, + ProposerPreferences, SignedExecutionPayloadBid, SignedProposerPreferences, SignedRoot, Slot, }; type E = MinimalEthSpec; @@ -401,3 +401,80 @@ async fn payload_attestation_message_event_on_gossip_verification() { panic!("Expected PayloadAttestationMessage event, got {:?}", event); } } + +/// Verifies that a `proposer_preferences` SSE event is emitted when signed proposer preferences +/// pass gossip verification. +#[tokio::test] +async fn proposer_preferences_event_on_gossip_verification() { + if !fork_name_from_env().is_some_and(|f| f.gloas_enabled()) { + return; + } + + let harness = BeaconChainHarness::builder(E::default()) + .default_spec() + .deterministic_keypairs(64) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + let head = harness.chain.canonical_head.cached_head(); + let head_state = &head.snapshot.beacon_state; + let genesis_validators_root = harness.chain.genesis_validators_root; + + // Pick a proposal slot in the next epoch so it is always a valid, future slot. The lookahead + // covers 2 epochs: index = epoch_offset * slots_per_epoch + slot_in_epoch. + let slots_per_epoch = E::slots_per_epoch() as usize; + let proposer_lookahead = head_state + .proposer_lookahead() + .expect("gloas state should have proposer lookahead"); + let next_epoch_start = (head_state.current_epoch() + 1).start_slot(E::slots_per_epoch()); + let proposal_slot = next_epoch_start + 1; + let lookahead_index = slots_per_epoch + 1; + let validator_index = *proposer_lookahead + .get(lookahead_index) + .expect("lookahead index should be in range"); + + // Build and sign proposer preferences for the proposer of `proposal_slot`. + let preferences = ProposerPreferences { + dependent_root: Hash256::ZERO, + proposal_slot, + validator_index, + fee_recipient: Address::repeat_byte(0xaa), + target_gas_limit: 30_000_000, + }; + let domain = harness.spec.get_domain( + proposal_slot.epoch(E::slots_per_epoch()), + Domain::ProposerPreferences, + &head_state.fork(), + genesis_validators_root, + ); + let signature = harness.validator_keypairs[validator_index as usize] + .sk + .sign(preferences.signing_root(domain)); + let signed = SignedProposerPreferences { + message: preferences.clone(), + signature, + }; + + // Subscribe before verification. + let event_handler = harness.chain.event_handler.as_ref().unwrap(); + let mut receiver = event_handler.subscribe_proposer_preferences(); + + // Verify the preferences through the gossip path. + harness + .chain + .verify_proposer_preferences_for_gossip(Arc::new(signed)) + .expect("verification should succeed"); + + // Assert the event was emitted with the expected data. + let event = receiver.try_recv().expect("should receive event"); + if let EventKind::ProposerPreferences(versioned) = event { + assert_eq!(versioned.data.message, preferences); + assert_eq!( + versioned.version, + harness.spec.fork_name_at_slot::(proposal_slot) + ); + } else { + panic!("Expected ProposerPreferences event, got {:?}", event); + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 94f2e3f1df..7c0959acb9 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3273,6 +3273,9 @@ pub fn serve( api_types::EventTopic::ExecutionPayloadBid => { event_handler.subscribe_execution_payload_bid() } + api_types::EventTopic::ProposerPreferences => { + event_handler.subscribe_proposer_preferences() + } api_types::EventTopic::PayloadAttestationMessage => { event_handler.subscribe_payload_attestation_message() } diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 049ffba657..4d0bb48f54 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1164,6 +1164,7 @@ pub struct SseExtendedPayloadAttributesGeneric { pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric; pub type VersionedSsePayloadAttributes = ForkVersionedResponse; pub type VersionedSseExecutionPayloadBid = ForkVersionedResponse>; +pub type VersionedSseProposerPreferences = ForkVersionedResponse; pub type VersionedSsePayloadAttestationMessage = ForkVersionedResponse; impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes { @@ -1245,6 +1246,7 @@ pub enum EventKind { ExecutionPayloadGossip(SseExecutionPayloadGossip), ExecutionPayloadAvailable(SseExecutionPayloadAvailable), ExecutionPayloadBid(Box>), + ProposerPreferences(Box), PayloadAttestationMessage(Box), } @@ -1273,6 +1275,7 @@ impl EventKind { EventKind::ExecutionPayloadGossip(_) => "execution_payload_gossip", EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available", EventKind::ExecutionPayloadBid(_) => "execution_payload_bid", + EventKind::ProposerPreferences(_) => "proposer_preferences", EventKind::PayloadAttestationMessage(_) => "payload_attestation_message", } } @@ -1389,6 +1392,11 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Execution Payload Bid: {:?}", e)) })?, ))), + "proposer_preferences" => Ok(EventKind::ProposerPreferences(Box::new( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Proposer Preferences: {:?}", e)) + })?, + ))), "payload_attestation_message" => Ok(EventKind::PayloadAttestationMessage(Box::new( serde_json::from_str(data).map_err(|e| { ServerError::InvalidServerSentEvent(format!( @@ -1436,6 +1444,7 @@ pub enum EventTopic { ExecutionPayloadGossip, ExecutionPayloadAvailable, ExecutionPayloadBid, + ProposerPreferences, PayloadAttestationMessage, } @@ -1466,6 +1475,7 @@ impl FromStr for EventTopic { "execution_payload_gossip" => Ok(EventTopic::ExecutionPayloadGossip), "execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable), "execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid), + "proposer_preferences" => Ok(EventTopic::ProposerPreferences), "payload_attestation_message" => Ok(EventTopic::PayloadAttestationMessage), _ => Err("event topic cannot be parsed.".to_string()), } @@ -1499,6 +1509,7 @@ impl fmt::Display for EventTopic { write!(f, "execution_payload_available") } EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"), + EventTopic::ProposerPreferences => write!(f, "proposer_preferences"), EventTopic::PayloadAttestationMessage => { write!(f, "payload_attestation_message") } diff --git a/consensus/types/src/builder/proposer_preferences.rs b/consensus/types/src/builder/proposer_preferences.rs index 4f27020105..97ba980d6c 100644 --- a/consensus/types/src/builder/proposer_preferences.rs +++ b/consensus/types/src/builder/proposer_preferences.rs @@ -14,8 +14,10 @@ use tree_hash_derive::TreeHash; pub struct ProposerPreferences { pub dependent_root: Hash256, pub proposal_slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] pub validator_index: u64, pub fee_recipient: Address, + #[serde(with = "serde_utils::quoted_u64")] pub target_gas_limit: u64, } @@ -45,4 +47,24 @@ mod tests { use super::*; ssz_and_tree_hash_tests!(ProposerPreferences); + + /// `validator_index` and `target_gas_limit` must serialize as quoted JSON strings (Beacon API + /// convention) and round-trip back to their numeric values. + #[test] + fn quoted_u64_json_serde() { + let preferences = ProposerPreferences { + dependent_root: Hash256::ZERO, + proposal_slot: Slot::new(7), + validator_index: 42, + fee_recipient: Address::ZERO, + target_gas_limit: 30_000_000, + }; + + let value = serde_json::to_value(&preferences).unwrap(); + assert_eq!(value["validator_index"], serde_json::json!("42")); + assert_eq!(value["target_gas_limit"], serde_json::json!("30000000")); + + let decoded: ProposerPreferences = serde_json::from_value(value).unwrap(); + assert_eq!(decoded, preferences); + } }