Add block_gossip Beacon API events (#5864)

* Add bls event

* Update events and types

* Add bls in event

* Event bls

* tests..rs

* change order

* another tests.rs

* Signed BLS

* Revert "another tests.rs"

This reverts commit 7f54e9c1ce.

* Revert "Signed BLS"

This reverts commit 1146bc734b.

* withdrawal_keyparis

* Fix genesis

* block gossip

* Add definition for BlockGossip

* Fix block gossip

* Tests.rs

* Update block and events

* Add bls event

* Event bls

* tests..rs

* change order

* another tests.rs

* Signed BLS

* Revert "another tests.rs"

This reverts commit 7f54e9c1ce.

* Revert "Signed BLS"

This reverts commit 1146bc734b.

* block gossip

* Add definition for BlockGossip

* Fix block gossip

* Tests.rs

* Update block and events

* Merge branch 'BeaconAPI-events-block-gossip' of https://github.com/chong-he/lighthouse into BeaconAPI-events-block-gossip

* Remove tests

* Tests.rs

* Tests.rs

* Tests.rs

* Tests similar to block event

* Update common/eth2/src/types.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Merge remote-tracking branch 'origin/unstable' into BeaconAPI-events-block-gossip

* Fix tests
This commit is contained in:
chonghe
2024-07-16 12:39:55 +08:00
committed by GitHub
parent 6856134ded
commit 79680c886d
5 changed files with 55 additions and 3 deletions

View File

@@ -67,7 +67,7 @@ use crate::{
metrics, BeaconChain, BeaconChainError, BeaconChainTypes, metrics, BeaconChain, BeaconChainError, BeaconChainTypes,
}; };
use derivative::Derivative; use derivative::Derivative;
use eth2::types::{EventKind, PublishBlockRequest}; use eth2::types::{BlockGossip, EventKind, PublishBlockRequest};
use execution_layer::PayloadStatus; use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard; use parking_lot::RwLockReadGuard;
@@ -974,6 +974,16 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Validate the block's execution_payload (if any). // Validate the block's execution_payload (if any).
validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?; validate_execution_payload_for_gossip(&parent_block, block.message(), chain)?;
// Beacon API block_gossip events
if let Some(event_handler) = chain.event_handler.as_ref() {
if event_handler.has_block_gossip_subscribers() {
event_handler.register(EventKind::BlockGossip(Box::new(BlockGossip {
slot: block.slot(),
block: block_root,
})));
}
}
// Having checked the proposer index and the block root we can cache them. // Having checked the proposer index and the block root we can cache them.
let consensus_context = ConsensusContext::new(block.slot()) let consensus_context = ConsensusContext::new(block.slot())
.set_current_block_root(block_root) .set_current_block_root(block_root)

View File

@@ -23,6 +23,7 @@ pub struct ServerSentEventHandler<E: EthSpec> {
proposer_slashing_tx: Sender<EventKind<E>>, proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>, attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>, bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
log: Logger, log: Logger,
} }
@@ -51,6 +52,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
let (proposer_slashing_tx, _) = broadcast::channel(capacity); let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity); let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity); let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);
Self { Self {
attestation_tx, attestation_tx,
@@ -69,6 +71,7 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
proposer_slashing_tx, proposer_slashing_tx,
attester_slashing_tx, attester_slashing_tx,
bls_to_execution_change_tx, bls_to_execution_change_tx,
block_gossip_tx,
log, log,
} }
} }
@@ -147,6 +150,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.bls_to_execution_change_tx .bls_to_execution_change_tx
.send(kind) .send(kind)
.map(|count| log_count("bls to execution change", count)), .map(|count| log_count("bls to execution change", count)),
EventKind::BlockGossip(_) => self
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
}; };
if let Err(SendError(event)) = result { if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event); trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
@@ -217,6 +224,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
self.bls_to_execution_change_tx.subscribe() self.bls_to_execution_change_tx.subscribe()
} }
pub fn subscribe_block_gossip(&self) -> Receiver<EventKind<E>> {
self.block_gossip_tx.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool { pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0 self.attestation_tx.receiver_count() > 0
} }
@@ -272,4 +283,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn has_bls_to_execution_change_subscribers(&self) -> bool { pub fn has_bls_to_execution_change_subscribers(&self) -> bool {
self.bls_to_execution_change_tx.receiver_count() > 0 self.bls_to_execution_change_tx.receiver_count() > 0
} }
pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}
} }

View File

@@ -4474,6 +4474,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::BlsToExecutionChange => { api_types::EventTopic::BlsToExecutionChange => {
event_handler.subscribe_bls_to_execution_change() event_handler.subscribe_bls_to_execution_change()
} }
api_types::EventTopic::BlockGossip => {
event_handler.subscribe_block_gossip()
}
}; };
receivers.push( receivers.push(

View File

@@ -5461,6 +5461,7 @@ impl ApiTester {
EventTopic::Attestation, EventTopic::Attestation,
EventTopic::VoluntaryExit, EventTopic::VoluntaryExit,
EventTopic::Block, EventTopic::Block,
EventTopic::BlockGossip,
EventTopic::Head, EventTopic::Head,
EventTopic::FinalizedCheckpoint, EventTopic::FinalizedCheckpoint,
EventTopic::AttesterSlashing, EventTopic::AttesterSlashing,
@@ -5576,10 +5577,20 @@ impl ApiTester {
.await .await
.unwrap(); .unwrap();
let block_events = poll_events(&mut events_future, 3, Duration::from_millis(10000)).await; let expected_gossip = EventKind::BlockGossip(Box::new(BlockGossip {
slot: next_slot,
block: block_root,
}));
let block_events = poll_events(&mut events_future, 4, Duration::from_millis(10000)).await;
assert_eq!( assert_eq!(
block_events.as_slice(), block_events.as_slice(),
&[expected_block, expected_head, expected_finalized] &[
expected_gossip,
expected_block,
expected_head,
expected_finalized
]
); );
// Test a reorg event // Test a reorg event

View File

@@ -971,6 +971,11 @@ pub struct SseHead {
pub execution_optimistic: bool, pub execution_optimistic: bool,
} }
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct BlockGossip {
pub slot: Slot,
pub block: Hash256,
}
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
pub struct SseChainReorg { pub struct SseChainReorg {
pub slot: Slot, pub slot: Slot,
@@ -1100,6 +1105,7 @@ pub enum EventKind<E: EthSpec> {
ProposerSlashing(Box<ProposerSlashing>), ProposerSlashing(Box<ProposerSlashing>),
AttesterSlashing(Box<AttesterSlashing<E>>), AttesterSlashing(Box<AttesterSlashing<E>>),
BlsToExecutionChange(Box<SignedBlsToExecutionChange>), BlsToExecutionChange(Box<SignedBlsToExecutionChange>),
BlockGossip(Box<BlockGossip>),
} }
impl<E: EthSpec> EventKind<E> { impl<E: EthSpec> EventKind<E> {
@@ -1122,6 +1128,7 @@ impl<E: EthSpec> EventKind<E> {
EventKind::ProposerSlashing(_) => "proposer_slashing", EventKind::ProposerSlashing(_) => "proposer_slashing",
EventKind::AttesterSlashing(_) => "attester_slashing", EventKind::AttesterSlashing(_) => "attester_slashing",
EventKind::BlsToExecutionChange(_) => "bls_to_execution_change", EventKind::BlsToExecutionChange(_) => "bls_to_execution_change",
EventKind::BlockGossip(_) => "block_gossip",
} }
} }
@@ -1217,6 +1224,9 @@ impl<E: EthSpec> EventKind<E> {
ServerError::InvalidServerSentEvent(format!("Bls To Execution Change: {:?}", e)) ServerError::InvalidServerSentEvent(format!("Bls To Execution Change: {:?}", e))
})?, })?,
)), )),
"block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err(
|e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)),
)?)),
_ => Err(ServerError::InvalidServerSentEvent( _ => Err(ServerError::InvalidServerSentEvent(
"Could not parse event tag".to_string(), "Could not parse event tag".to_string(),
)), )),
@@ -1251,6 +1261,7 @@ pub enum EventTopic {
AttesterSlashing, AttesterSlashing,
ProposerSlashing, ProposerSlashing,
BlsToExecutionChange, BlsToExecutionChange,
BlockGossip,
} }
impl FromStr for EventTopic { impl FromStr for EventTopic {
@@ -1275,6 +1286,7 @@ impl FromStr for EventTopic {
"attester_slashing" => Ok(EventTopic::AttesterSlashing), "attester_slashing" => Ok(EventTopic::AttesterSlashing),
"proposer_slashing" => Ok(EventTopic::ProposerSlashing), "proposer_slashing" => Ok(EventTopic::ProposerSlashing),
"bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange), "bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange),
"block_gossip" => Ok(EventTopic::BlockGossip),
_ => Err("event topic cannot be parsed.".to_string()), _ => Err("event topic cannot be parsed.".to_string()),
} }
} }
@@ -1300,6 +1312,7 @@ impl fmt::Display for EventTopic {
EventTopic::AttesterSlashing => write!(f, "attester_slashing"), EventTopic::AttesterSlashing => write!(f, "attester_slashing"),
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"), EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"), EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"),
EventTopic::BlockGossip => write!(f, "block_gossip"),
} }
} }
} }