From 77e86b102d4abed27344b927a71cc75373228fa5 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Thu, 20 Mar 2025 23:59:32 -0600 Subject: [PATCH] Add inclusion list sse event --- beacon_node/beacon_chain/src/beacon_chain.rs | 9 ++++++--- beacon_node/beacon_chain/src/events.rs | 15 +++++++++++++++ .../src/engine_api/new_payload_request.rs | 2 +- beacon_node/http_api/src/lib.rs | 3 +++ common/eth2/src/types.rs | 10 ++++++++++ consensus/types/src/chain_spec.rs | 10 ++++------ 6 files changed, 39 insertions(+), 10 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 5b98490247..87e0ce2bcd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2391,10 +2391,13 @@ impl BeaconChain { let _timer = metrics::start_timer(&metrics::UNAGGREGATED_ATTESTATION_GOSSIP_VERIFICATION_TIMES); - GossipVerifiedInclusionList::verify(inclusion_list, self).inspect(|_v| { - // TODO(focil) emit event - if let Some(_event_handler) = self.event_handler.as_ref() {} + GossipVerifiedInclusionList::verify(inclusion_list, self).inspect(|v| { metrics::inc_counter(&metrics::INCLUSION_LIST_PROCESSING_SUCCESSES); + if let Some(event_handler) = self.event_handler.as_ref() { + event_handler.register(EventKind::InclusionList(Box::new( + v.signed_il.clone(), + ))); + } }) } diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 8c342893ae..b8713b8e01 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -25,6 +25,7 @@ pub struct ServerSentEventHandler { attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, block_gossip_tx: Sender>, + inclusion_list_tx: Sender>, log: Logger, } @@ -55,6 +56,7 @@ impl ServerSentEventHandler { let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); let (block_gossip_tx, _) = broadcast::channel(capacity); + let (inclusion_list_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -75,6 +77,7 @@ impl ServerSentEventHandler { attester_slashing_tx, bls_to_execution_change_tx, block_gossip_tx, + inclusion_list_tx, log, } } @@ -161,6 +164,10 @@ impl ServerSentEventHandler { .block_gossip_tx .send(kind) .map(|count| log_count("block gossip", count)), + EventKind::InclusionList(_) => self + .inclusion_list_tx + .send(kind) + .map(|count| log_count("inclusion list", count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -239,6 +246,10 @@ impl ServerSentEventHandler { self.block_gossip_tx.subscribe() } + pub fn subscribe_inclusion_list(&self) -> Receiver> { + self.inclusion_list_tx.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -302,4 +313,8 @@ impl ServerSentEventHandler { pub fn has_block_gossip_subscribers(&self) -> bool { self.block_gossip_tx.receiver_count() > 0 } + + pub fn has_inclusion_list_subscribers(&self) -> bool { + self.inclusion_list_tx.receiver_count() > 0 + } } diff --git a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs index 0b36c54594..c622d34855 100644 --- a/beacon_node/execution_layer/src/engine_api/new_payload_request.rs +++ b/beacon_node/execution_layer/src/engine_api/new_payload_request.rs @@ -204,7 +204,7 @@ impl<'a, E: EthSpec> NewPayloadRequest<'a, E> { .collect(), parent_beacon_block_root: block_ref.parent_root, execution_requests: &block_ref.body.execution_requests, - il_transactions + il_transactions, })), BeaconBlockRef::Fulu(block_ref) => Ok(Self::Fulu(NewPayloadRequestFulu { execution_payload: &block_ref.body.execution_payload.execution_payload, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 86afedca7b..6a1c986ef7 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4699,6 +4699,9 @@ pub fn serve( api_types::EventTopic::BlockGossip => { event_handler.subscribe_block_gossip() } + api_types::EventTopic::InclusionList => { + event_handler.subscribe_inclusion_list() + } }; receivers.push( diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index b0beab86f6..e00c2db8fc 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1144,6 +1144,7 @@ pub enum EventKind { AttesterSlashing(Box>), BlsToExecutionChange(Box), BlockGossip(Box), + InclusionList(Box>), } impl EventKind { @@ -1168,6 +1169,7 @@ impl EventKind { EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlockGossip(_) => "block_gossip", + EventKind::InclusionList(_) => "inclusion_list", } } @@ -1254,6 +1256,11 @@ impl EventKind { "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), )?)), + "inclusion_list" => Ok(EventKind::InclusionList( + serde_json::from_str(data).map_err(|e| { + ServerError::InvalidServerSentEvent(format!("Inclusion List {:?}", e)) + })?, + )), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1290,6 +1297,7 @@ pub enum EventTopic { ProposerSlashing, BlsToExecutionChange, BlockGossip, + InclusionList, } impl FromStr for EventTopic { @@ -1316,6 +1324,7 @@ impl FromStr for EventTopic { "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), "block_gossip" => Ok(EventTopic::BlockGossip), + "inclusion_list" => Ok(EventTopic::InclusionList), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1343,6 +1352,7 @@ impl fmt::Display for EventTopic { EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), EventTopic::BlockGossip => write!(f, "block_gossip"), + EventTopic::InclusionList => write!(f, "inclusion_list",), } } } diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 102a61595c..b62f425c8c 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -456,16 +456,14 @@ impl ChainSpec { /// Returns true if the given epoch is greater than or equal to the `EIP7805_FORK_EPOCH`. pub fn is_focil_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { - self.eip7805_fork_epoch.is_some_and(|eip7805_fork_epoch| { - block_epoch >= eip7805_fork_epoch - }) + self.eip7805_fork_epoch + .is_some_and(|eip7805_fork_epoch| block_epoch >= eip7805_fork_epoch) } /// Returns true if `EIP7805_FORK_EPOCH` is set and is not set to `FAR_FUTURE_EPOCH`. pub fn is_focil_scheduled(&self) -> bool { - self.eip7805_fork_epoch.is_some_and(|eip7805_fork_epoch| { - eip7805_fork_epoch != self.far_future_epoch - }) + self.eip7805_fork_epoch + .is_some_and(|eip7805_fork_epoch| eip7805_fork_epoch != self.far_future_epoch) } /// Returns a full `Fork` struct for a given epoch.