mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 13:24:39 +00:00
Add Gloas SSE event boilerplate (#9053)
Implement boilerplate for new SSE events as specified in - https://github.com/ethereum/beacon-APIs/pull/588 While that one is not merged yet, I believe the SSE events might be utilized in Dora already. Implement the boilerplate, i.e. subscription tracking and publish queues. A PR to implement to fully implement already implementable events will follow. Co-Authored-By: Daniel Knopik <daniel@dknopik.de>
This commit is contained in:
@@ -25,6 +25,11 @@ pub struct ServerSentEventHandler<E: EthSpec> {
|
|||||||
attester_slashing_tx: Sender<EventKind<E>>,
|
attester_slashing_tx: Sender<EventKind<E>>,
|
||||||
bls_to_execution_change_tx: Sender<EventKind<E>>,
|
bls_to_execution_change_tx: Sender<EventKind<E>>,
|
||||||
block_gossip_tx: Sender<EventKind<E>>,
|
block_gossip_tx: Sender<EventKind<E>>,
|
||||||
|
execution_payload_tx: Sender<EventKind<E>>,
|
||||||
|
execution_payload_gossip_tx: Sender<EventKind<E>>,
|
||||||
|
execution_payload_available_tx: Sender<EventKind<E>>,
|
||||||
|
execution_payload_bid_tx: Sender<EventKind<E>>,
|
||||||
|
payload_attestation_message_tx: Sender<EventKind<E>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec> ServerSentEventHandler<E> {
|
impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||||
@@ -51,6 +56,11 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
|||||||
let (attester_slashing_tx, _) = broadcast::channel(capacity);
|
let (attester_slashing_tx, _) = broadcast::channel(capacity);
|
||||||
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
|
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
|
||||||
let (block_gossip_tx, _) = broadcast::channel(capacity);
|
let (block_gossip_tx, _) = broadcast::channel(capacity);
|
||||||
|
let (execution_payload_tx, _) = broadcast::channel(capacity);
|
||||||
|
let (execution_payload_gossip_tx, _) = broadcast::channel(capacity);
|
||||||
|
let (execution_payload_available_tx, _) = broadcast::channel(capacity);
|
||||||
|
let (execution_payload_bid_tx, _) = broadcast::channel(capacity);
|
||||||
|
let (payload_attestation_message_tx, _) = broadcast::channel(capacity);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
attestation_tx,
|
attestation_tx,
|
||||||
@@ -71,6 +81,11 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
|||||||
attester_slashing_tx,
|
attester_slashing_tx,
|
||||||
bls_to_execution_change_tx,
|
bls_to_execution_change_tx,
|
||||||
block_gossip_tx,
|
block_gossip_tx,
|
||||||
|
execution_payload_tx,
|
||||||
|
execution_payload_gossip_tx,
|
||||||
|
execution_payload_available_tx,
|
||||||
|
execution_payload_bid_tx,
|
||||||
|
payload_attestation_message_tx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -155,6 +170,26 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
|||||||
.block_gossip_tx
|
.block_gossip_tx
|
||||||
.send(kind)
|
.send(kind)
|
||||||
.map(|count| log_count("block gossip", count)),
|
.map(|count| log_count("block gossip", count)),
|
||||||
|
EventKind::ExecutionPayload(_) => self
|
||||||
|
.execution_payload_tx
|
||||||
|
.send(kind)
|
||||||
|
.map(|count| log_count("execution payload", count)),
|
||||||
|
EventKind::ExecutionPayloadGossip(_) => self
|
||||||
|
.execution_payload_gossip_tx
|
||||||
|
.send(kind)
|
||||||
|
.map(|count| log_count("execution payload gossip", count)),
|
||||||
|
EventKind::ExecutionPayloadAvailable(_) => self
|
||||||
|
.execution_payload_available_tx
|
||||||
|
.send(kind)
|
||||||
|
.map(|count| log_count("execution payload available", count)),
|
||||||
|
EventKind::ExecutionPayloadBid(_) => self
|
||||||
|
.execution_payload_bid_tx
|
||||||
|
.send(kind)
|
||||||
|
.map(|count| log_count("execution payload bid", count)),
|
||||||
|
EventKind::PayloadAttestationMessage(_) => self
|
||||||
|
.payload_attestation_message_tx
|
||||||
|
.send(kind)
|
||||||
|
.map(|count| log_count("payload attestation message", count)),
|
||||||
};
|
};
|
||||||
if let Err(SendError(event)) = result {
|
if let Err(SendError(event)) = result {
|
||||||
trace!(?event, "No receivers registered to listen for event");
|
trace!(?event, "No receivers registered to listen for event");
|
||||||
@@ -233,6 +268,26 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
|||||||
self.block_gossip_tx.subscribe()
|
self.block_gossip_tx.subscribe()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn subscribe_execution_payload(&self) -> Receiver<EventKind<E>> {
|
||||||
|
self.execution_payload_tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe_execution_payload_gossip(&self) -> Receiver<EventKind<E>> {
|
||||||
|
self.execution_payload_gossip_tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe_execution_payload_available(&self) -> Receiver<EventKind<E>> {
|
||||||
|
self.execution_payload_available_tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe_execution_payload_bid(&self) -> Receiver<EventKind<E>> {
|
||||||
|
self.execution_payload_bid_tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe_payload_attestation_message(&self) -> Receiver<EventKind<E>> {
|
||||||
|
self.payload_attestation_message_tx.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn has_attestation_subscribers(&self) -> bool {
|
pub fn has_attestation_subscribers(&self) -> bool {
|
||||||
self.attestation_tx.receiver_count() > 0
|
self.attestation_tx.receiver_count() > 0
|
||||||
}
|
}
|
||||||
@@ -296,4 +351,24 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
|||||||
pub fn has_block_gossip_subscribers(&self) -> bool {
|
pub fn has_block_gossip_subscribers(&self) -> bool {
|
||||||
self.block_gossip_tx.receiver_count() > 0
|
self.block_gossip_tx.receiver_count() > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn has_execution_payload_subscribers(&self) -> bool {
|
||||||
|
self.execution_payload_tx.receiver_count() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_execution_payload_gossip_subscribers(&self) -> bool {
|
||||||
|
self.execution_payload_gossip_tx.receiver_count() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_execution_payload_available_subscribers(&self) -> bool {
|
||||||
|
self.execution_payload_available_tx.receiver_count() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_execution_payload_bid_subscribers(&self) -> bool {
|
||||||
|
self.execution_payload_bid_tx.receiver_count() > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn has_payload_attestation_message_subscribers(&self) -> bool {
|
||||||
|
self.payload_attestation_message_tx.receiver_count() > 0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3158,6 +3158,21 @@ pub fn serve<T: BeaconChainTypes>(
|
|||||||
api_types::EventTopic::BlockGossip => {
|
api_types::EventTopic::BlockGossip => {
|
||||||
event_handler.subscribe_block_gossip()
|
event_handler.subscribe_block_gossip()
|
||||||
}
|
}
|
||||||
|
api_types::EventTopic::ExecutionPayload => {
|
||||||
|
event_handler.subscribe_execution_payload()
|
||||||
|
}
|
||||||
|
api_types::EventTopic::ExecutionPayloadGossip => {
|
||||||
|
event_handler.subscribe_execution_payload_gossip()
|
||||||
|
}
|
||||||
|
api_types::EventTopic::ExecutionPayloadAvailable => {
|
||||||
|
event_handler.subscribe_execution_payload_available()
|
||||||
|
}
|
||||||
|
api_types::EventTopic::ExecutionPayloadBid => {
|
||||||
|
event_handler.subscribe_execution_payload_bid()
|
||||||
|
}
|
||||||
|
api_types::EventTopic::PayloadAttestationMessage => {
|
||||||
|
event_handler.subscribe_payload_attestation_message()
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
receivers.push(
|
receivers.push(
|
||||||
|
|||||||
@@ -1070,6 +1070,33 @@ pub struct BlockGossip {
|
|||||||
pub slot: Slot,
|
pub slot: Slot,
|
||||||
pub block: Hash256,
|
pub block: Hash256,
|
||||||
}
|
}
|
||||||
|
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct SseExecutionPayload {
|
||||||
|
pub slot: Slot,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub builder_index: u64,
|
||||||
|
pub block_hash: ExecutionBlockHash,
|
||||||
|
pub block_root: Hash256,
|
||||||
|
pub state_root: Hash256,
|
||||||
|
pub execution_optimistic: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct SseExecutionPayloadGossip {
|
||||||
|
pub slot: Slot,
|
||||||
|
#[serde(with = "serde_utils::quoted_u64")]
|
||||||
|
pub builder_index: u64,
|
||||||
|
pub block_hash: ExecutionBlockHash,
|
||||||
|
pub block_root: Hash256,
|
||||||
|
pub state_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||||
|
pub struct SseExecutionPayloadAvailable {
|
||||||
|
pub slot: Slot,
|
||||||
|
pub block_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
|
||||||
pub struct SseChainReorg {
|
pub struct SseChainReorg {
|
||||||
pub slot: Slot,
|
pub slot: Slot,
|
||||||
@@ -1134,6 +1161,8 @@ pub struct SseExtendedPayloadAttributesGeneric<T> {
|
|||||||
|
|
||||||
pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric<SsePayloadAttributes>;
|
pub type SseExtendedPayloadAttributes = SseExtendedPayloadAttributesGeneric<SsePayloadAttributes>;
|
||||||
pub type VersionedSsePayloadAttributes = ForkVersionedResponse<SseExtendedPayloadAttributes>;
|
pub type VersionedSsePayloadAttributes = ForkVersionedResponse<SseExtendedPayloadAttributes>;
|
||||||
|
pub type VersionedSseExecutionPayloadBid<E> = ForkVersionedResponse<SignedExecutionPayloadBid<E>>;
|
||||||
|
pub type VersionedSsePayloadAttestationMessage = ForkVersionedResponse<PayloadAttestationMessage>;
|
||||||
|
|
||||||
impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes {
|
impl<'de> ContextDeserialize<'de, ForkName> for SsePayloadAttributes {
|
||||||
fn context_deserialize<D>(deserializer: D, context: ForkName) -> Result<Self, D::Error>
|
fn context_deserialize<D>(deserializer: D, context: ForkName) -> Result<Self, D::Error>
|
||||||
@@ -1210,6 +1239,11 @@ pub enum EventKind<E: EthSpec> {
|
|||||||
AttesterSlashing(Box<AttesterSlashing<E>>),
|
AttesterSlashing(Box<AttesterSlashing<E>>),
|
||||||
BlsToExecutionChange(Box<SignedBlsToExecutionChange>),
|
BlsToExecutionChange(Box<SignedBlsToExecutionChange>),
|
||||||
BlockGossip(Box<BlockGossip>),
|
BlockGossip(Box<BlockGossip>),
|
||||||
|
ExecutionPayload(SseExecutionPayload),
|
||||||
|
ExecutionPayloadGossip(SseExecutionPayloadGossip),
|
||||||
|
ExecutionPayloadAvailable(SseExecutionPayloadAvailable),
|
||||||
|
ExecutionPayloadBid(Box<VersionedSseExecutionPayloadBid<E>>),
|
||||||
|
PayloadAttestationMessage(Box<VersionedSsePayloadAttestationMessage>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<E: EthSpec> EventKind<E> {
|
impl<E: EthSpec> EventKind<E> {
|
||||||
@@ -1233,6 +1267,11 @@ impl<E: EthSpec> EventKind<E> {
|
|||||||
EventKind::AttesterSlashing(_) => "attester_slashing",
|
EventKind::AttesterSlashing(_) => "attester_slashing",
|
||||||
EventKind::BlsToExecutionChange(_) => "bls_to_execution_change",
|
EventKind::BlsToExecutionChange(_) => "bls_to_execution_change",
|
||||||
EventKind::BlockGossip(_) => "block_gossip",
|
EventKind::BlockGossip(_) => "block_gossip",
|
||||||
|
EventKind::ExecutionPayload(_) => "execution_payload",
|
||||||
|
EventKind::ExecutionPayloadGossip(_) => "execution_payload_gossip",
|
||||||
|
EventKind::ExecutionPayloadAvailable(_) => "execution_payload_available",
|
||||||
|
EventKind::ExecutionPayloadBid(_) => "execution_payload_bid",
|
||||||
|
EventKind::PayloadAttestationMessage(_) => "payload_attestation_message",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1322,6 +1361,40 @@ impl<E: EthSpec> EventKind<E> {
|
|||||||
"block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err(
|
"block_gossip" => Ok(EventKind::BlockGossip(serde_json::from_str(data).map_err(
|
||||||
|e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)),
|
|e| ServerError::InvalidServerSentEvent(format!("Block Gossip: {:?}", e)),
|
||||||
)?)),
|
)?)),
|
||||||
|
"execution_payload" => Ok(EventKind::ExecutionPayload(
|
||||||
|
serde_json::from_str(data).map_err(|e| {
|
||||||
|
ServerError::InvalidServerSentEvent(format!("Execution Payload: {:?}", e))
|
||||||
|
})?,
|
||||||
|
)),
|
||||||
|
"execution_payload_gossip" => Ok(EventKind::ExecutionPayloadGossip(
|
||||||
|
serde_json::from_str(data).map_err(|e| {
|
||||||
|
ServerError::InvalidServerSentEvent(format!(
|
||||||
|
"Execution Payload Gossip: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?,
|
||||||
|
)),
|
||||||
|
"execution_payload_available" => Ok(EventKind::ExecutionPayloadAvailable(
|
||||||
|
serde_json::from_str(data).map_err(|e| {
|
||||||
|
ServerError::InvalidServerSentEvent(format!(
|
||||||
|
"Execution Payload Available: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?,
|
||||||
|
)),
|
||||||
|
"execution_payload_bid" => Ok(EventKind::ExecutionPayloadBid(Box::new(
|
||||||
|
serde_json::from_str(data).map_err(|e| {
|
||||||
|
ServerError::InvalidServerSentEvent(format!("Execution Payload Bid: {:?}", e))
|
||||||
|
})?,
|
||||||
|
))),
|
||||||
|
"payload_attestation_message" => Ok(EventKind::PayloadAttestationMessage(Box::new(
|
||||||
|
serde_json::from_str(data).map_err(|e| {
|
||||||
|
ServerError::InvalidServerSentEvent(format!(
|
||||||
|
"Payload Attestation Message: {:?}",
|
||||||
|
e
|
||||||
|
))
|
||||||
|
})?,
|
||||||
|
))),
|
||||||
_ => Err(ServerError::InvalidServerSentEvent(
|
_ => Err(ServerError::InvalidServerSentEvent(
|
||||||
"Could not parse event tag".to_string(),
|
"Could not parse event tag".to_string(),
|
||||||
)),
|
)),
|
||||||
@@ -1357,6 +1430,11 @@ pub enum EventTopic {
|
|||||||
ProposerSlashing,
|
ProposerSlashing,
|
||||||
BlsToExecutionChange,
|
BlsToExecutionChange,
|
||||||
BlockGossip,
|
BlockGossip,
|
||||||
|
ExecutionPayload,
|
||||||
|
ExecutionPayloadGossip,
|
||||||
|
ExecutionPayloadAvailable,
|
||||||
|
ExecutionPayloadBid,
|
||||||
|
PayloadAttestationMessage,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FromStr for EventTopic {
|
impl FromStr for EventTopic {
|
||||||
@@ -1382,6 +1460,11 @@ impl FromStr for EventTopic {
|
|||||||
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
|
"proposer_slashing" => Ok(EventTopic::ProposerSlashing),
|
||||||
"bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange),
|
"bls_to_execution_change" => Ok(EventTopic::BlsToExecutionChange),
|
||||||
"block_gossip" => Ok(EventTopic::BlockGossip),
|
"block_gossip" => Ok(EventTopic::BlockGossip),
|
||||||
|
"execution_payload" => Ok(EventTopic::ExecutionPayload),
|
||||||
|
"execution_payload_gossip" => Ok(EventTopic::ExecutionPayloadGossip),
|
||||||
|
"execution_payload_available" => Ok(EventTopic::ExecutionPayloadAvailable),
|
||||||
|
"execution_payload_bid" => Ok(EventTopic::ExecutionPayloadBid),
|
||||||
|
"payload_attestation_message" => Ok(EventTopic::PayloadAttestationMessage),
|
||||||
_ => Err("event topic cannot be parsed.".to_string()),
|
_ => Err("event topic cannot be parsed.".to_string()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1408,6 +1491,15 @@ impl fmt::Display for EventTopic {
|
|||||||
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
|
EventTopic::ProposerSlashing => write!(f, "proposer_slashing"),
|
||||||
EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"),
|
EventTopic::BlsToExecutionChange => write!(f, "bls_to_execution_change"),
|
||||||
EventTopic::BlockGossip => write!(f, "block_gossip"),
|
EventTopic::BlockGossip => write!(f, "block_gossip"),
|
||||||
|
EventTopic::ExecutionPayload => write!(f, "execution_payload"),
|
||||||
|
EventTopic::ExecutionPayloadGossip => write!(f, "execution_payload_gossip"),
|
||||||
|
EventTopic::ExecutionPayloadAvailable => {
|
||||||
|
write!(f, "execution_payload_available")
|
||||||
|
}
|
||||||
|
EventTopic::ExecutionPayloadBid => write!(f, "execution_payload_bid"),
|
||||||
|
EventTopic::PayloadAttestationMessage => {
|
||||||
|
write!(f, "payload_attestation_message")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user