diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 734b12ca83..33605c6b1d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -67,7 +67,7 @@ use crate::{ metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; -use eth2::types::{EventKind, PublishBlockRequest}; +use eth2::types::{BlockGossip, EventKind, PublishBlockRequest}; use execution_layer::PayloadStatus; pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; use parking_lot::RwLockReadGuard; @@ -974,6 +974,16 @@ impl GossipVerifiedBlock { // Validate the block's execution_payload (if any). validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; + // Beacon API block_gossip events + if let Some(event_handler) = chain.event_handler.as_ref() { + if event_handler.has_block_gossip_subscribers() { + event_handler.register(EventKind::BlockGossip(Box::new(BlockGossip { + slot: block.slot(), + block: block_root, + }))); + } + } + // Having checked the proposer index and the block root we can cache them. let consensus_context = ConsensusContext::new(block.slot()) .set_current_block_root(block_root) diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index 5f91fe5d0c..267d56220c 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -23,6 +23,7 @@ pub struct ServerSentEventHandler { proposer_slashing_tx: Sender>, attester_slashing_tx: Sender>, bls_to_execution_change_tx: Sender>, + block_gossip_tx: Sender>, log: Logger, } @@ -51,6 +52,7 @@ impl ServerSentEventHandler { let (proposer_slashing_tx, _) = broadcast::channel(capacity); let (attester_slashing_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); + let (block_gossip_tx, _) = broadcast::channel(capacity); Self { attestation_tx, @@ -69,6 +71,7 @@ impl ServerSentEventHandler { proposer_slashing_tx, attester_slashing_tx, bls_to_execution_change_tx, + block_gossip_tx, log, } } @@ -147,6 +150,10 @@ impl ServerSentEventHandler { .bls_to_execution_change_tx .send(kind) .map(|count| log_count("bls to execution change", count)), + EventKind::BlockGossip(_) => self + .block_gossip_tx + .send(kind) + .map(|count| log_count("block gossip", count)), }; if let Err(SendError(event)) = result { trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); @@ -217,6 +224,10 @@ impl ServerSentEventHandler { self.bls_to_execution_change_tx.subscribe() } + pub fn subscribe_block_gossip(&self) -> Receiver> { + self.block_gossip_tx.subscribe() + } + pub fn has_attestation_subscribers(&self) -> bool { self.attestation_tx.receiver_count() > 0 } @@ -272,4 +283,8 @@ impl ServerSentEventHandler { pub fn has_bls_to_execution_change_subscribers(&self) -> bool { self.bls_to_execution_change_tx.receiver_count() > 0 } + + pub fn has_block_gossip_subscribers(&self) -> bool { + self.block_gossip_tx.receiver_count() > 0 + } } diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2d017d6539..59bf367b4c 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -4474,6 +4474,9 @@ pub fn serve( api_types::EventTopic::BlsToExecutionChange => { event_handler.subscribe_bls_to_execution_change() } + api_types::EventTopic::BlockGossip => { + event_handler.subscribe_block_gossip() + } }; receivers.push( diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index f511f25d32..633baaf6f4 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -5461,6 +5461,7 @@ impl ApiTester { EventTopic::Attestation, EventTopic::VoluntaryExit, EventTopic::Block, + EventTopic::BlockGossip, EventTopic::Head, EventTopic::FinalizedCheckpoint, EventTopic::AttesterSlashing, @@ -5576,10 +5577,20 @@ impl ApiTester { .await .unwrap(); - let block_events = poll_events(&mut events_future, 3, Duration::from_millis(10000)).await; + let expected_gossip = EventKind::BlockGossip(Box::new(BlockGossip { + slot: next_slot, + block: block_root, + })); + + let block_events = poll_events(&mut events_future, 4, Duration::from_millis(10000)).await; assert_eq!( block_events.as_slice(), - &[expected_block, expected_head, expected_finalized] + &[ + expected_gossip, + expected_block, + expected_head, + expected_finalized + ] ); // Test a reorg event diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index d399bc2bd0..bbcbda3ae5 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -971,6 +971,11 @@ pub struct SseHead { pub execution_optimistic: bool, } +#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] +pub struct BlockGossip { + pub slot: Slot, + pub block: Hash256, +} #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] pub struct SseChainReorg { pub slot: Slot, @@ -1100,6 +1105,7 @@ pub enum EventKind { ProposerSlashing(Box), AttesterSlashing(Box>), BlsToExecutionChange(Box), + BlockGossip(Box), } impl EventKind { @@ -1122,6 +1128,7 @@ impl EventKind { EventKind::ProposerSlashing(_) => "proposer_slashing", EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", + EventKind::BlockGossip(_) => "block_gossip", } } @@ -1217,6 +1224,9 @@ impl EventKind { ServerError::InvalidServerSentEvent(format!("Bls To Execution Change: {:?}", e)) })?, )), + "block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err( + |e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)), + )?)), _ => Err(ServerError::InvalidServerSentEvent( "Could not parse event tag".to_string(), )), @@ -1251,6 +1261,7 @@ pub enum EventTopic { AttesterSlashing, ProposerSlashing, BlsToExecutionChange, + BlockGossip, } impl FromStr for EventTopic { @@ -1275,6 +1286,7 @@ impl FromStr for EventTopic { "attester_slashing" => Ok(EventTopic::AttesterSlashing), "proposer_slashing" => Ok(EventTopic::ProposerSlashing), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), + "block_gossip" => Ok(EventTopic::BlockGossip), _ => Err("event topic cannot be parsed.".to_string()), } } @@ -1300,6 +1312,7 @@ impl fmt::Display for EventTopic { EventTopic::AttesterSlashing => write!(f, "attester_slashing"), EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), + EventTopic::BlockGossip => write!(f, "block_gossip"), } } }