mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-18 04:13:00 +00:00
* Use `E` for `EthSpec` globally * Fix tests * Merge branch 'unstable' into e-ethspec * Merge branch 'unstable' into e-ethspec # Conflicts: # beacon_node/execution_layer/src/engine_api.rs # beacon_node/execution_layer/src/engine_api/http.rs # beacon_node/execution_layer/src/engine_api/json_structures.rs # beacon_node/execution_layer/src/test_utils/handle_rpc.rs # beacon_node/store/src/partial_beacon_state.rs # consensus/types/src/beacon_block.rs # consensus/types/src/beacon_block_body.rs # consensus/types/src/beacon_state.rs # consensus/types/src/config_and_preset.rs # consensus/types/src/execution_payload.rs # consensus/types/src/execution_payload_header.rs # consensus/types/src/light_client_optimistic_update.rs # consensus/types/src/payload.rs # lcli/src/parse_ssz.rs
231 lines
7.8 KiB
Rust
231 lines
7.8 KiB
Rust
pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
|
|
use slog::{trace, Logger};
|
|
use tokio::sync::broadcast;
|
|
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
|
|
use types::EthSpec;
|
|
|
|
const DEFAULT_CHANNEL_CAPACITY: usize = 16;
|
|
|
|
pub struct ServerSentEventHandler<E: EthSpec> {
|
|
attestation_tx: Sender<EventKind<E>>,
|
|
block_tx: Sender<EventKind<E>>,
|
|
blob_sidecar_tx: Sender<EventKind<E>>,
|
|
finalized_tx: Sender<EventKind<E>>,
|
|
head_tx: Sender<EventKind<E>>,
|
|
exit_tx: Sender<EventKind<E>>,
|
|
chain_reorg_tx: Sender<EventKind<E>>,
|
|
contribution_tx: Sender<EventKind<E>>,
|
|
payload_attributes_tx: Sender<EventKind<E>>,
|
|
late_head: Sender<EventKind<E>>,
|
|
light_client_finality_update_tx: Sender<EventKind<E>>,
|
|
light_client_optimistic_update_tx: Sender<EventKind<E>>,
|
|
block_reward_tx: Sender<EventKind<E>>,
|
|
log: Logger,
|
|
}
|
|
|
|
impl<E: EthSpec> ServerSentEventHandler<E> {
|
|
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
|
|
Self::new_with_capacity(
|
|
log,
|
|
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
|
|
)
|
|
}
|
|
|
|
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
|
|
let (attestation_tx, _) = broadcast::channel(capacity);
|
|
let (block_tx, _) = broadcast::channel(capacity);
|
|
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
|
|
let (finalized_tx, _) = broadcast::channel(capacity);
|
|
let (head_tx, _) = broadcast::channel(capacity);
|
|
let (exit_tx, _) = broadcast::channel(capacity);
|
|
let (chain_reorg_tx, _) = broadcast::channel(capacity);
|
|
let (contribution_tx, _) = broadcast::channel(capacity);
|
|
let (payload_attributes_tx, _) = broadcast::channel(capacity);
|
|
let (late_head, _) = broadcast::channel(capacity);
|
|
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
|
|
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
|
|
let (block_reward_tx, _) = broadcast::channel(capacity);
|
|
|
|
Self {
|
|
attestation_tx,
|
|
block_tx,
|
|
blob_sidecar_tx,
|
|
finalized_tx,
|
|
head_tx,
|
|
exit_tx,
|
|
chain_reorg_tx,
|
|
contribution_tx,
|
|
payload_attributes_tx,
|
|
late_head,
|
|
light_client_finality_update_tx,
|
|
light_client_optimistic_update_tx,
|
|
block_reward_tx,
|
|
log,
|
|
}
|
|
}
|
|
|
|
pub fn register(&self, kind: EventKind<E>) {
|
|
let log_count = |name, count| {
|
|
trace!(
|
|
self.log,
|
|
"Registering server-sent event";
|
|
"kind" => name,
|
|
"receiver_count" => count
|
|
);
|
|
};
|
|
let result = match &kind {
|
|
EventKind::Attestation(_) => self
|
|
.attestation_tx
|
|
.send(kind)
|
|
.map(|count| log_count("attestation", count)),
|
|
EventKind::Block(_) => self
|
|
.block_tx
|
|
.send(kind)
|
|
.map(|count| log_count("block", count)),
|
|
EventKind::BlobSidecar(_) => self
|
|
.blob_sidecar_tx
|
|
.send(kind)
|
|
.map(|count| log_count("blob sidecar", count)),
|
|
EventKind::FinalizedCheckpoint(_) => self
|
|
.finalized_tx
|
|
.send(kind)
|
|
.map(|count| log_count("finalized checkpoint", count)),
|
|
EventKind::Head(_) => self
|
|
.head_tx
|
|
.send(kind)
|
|
.map(|count| log_count("head", count)),
|
|
EventKind::VoluntaryExit(_) => self
|
|
.exit_tx
|
|
.send(kind)
|
|
.map(|count| log_count("exit", count)),
|
|
EventKind::ChainReorg(_) => self
|
|
.chain_reorg_tx
|
|
.send(kind)
|
|
.map(|count| log_count("chain reorg", count)),
|
|
EventKind::ContributionAndProof(_) => self
|
|
.contribution_tx
|
|
.send(kind)
|
|
.map(|count| log_count("contribution and proof", count)),
|
|
EventKind::PayloadAttributes(_) => self
|
|
.payload_attributes_tx
|
|
.send(kind)
|
|
.map(|count| log_count("payload attributes", count)),
|
|
EventKind::LateHead(_) => self
|
|
.late_head
|
|
.send(kind)
|
|
.map(|count| log_count("late head", count)),
|
|
EventKind::LightClientFinalityUpdate(_) => self
|
|
.light_client_finality_update_tx
|
|
.send(kind)
|
|
.map(|count| log_count("light client finality update", count)),
|
|
EventKind::LightClientOptimisticUpdate(_) => self
|
|
.light_client_optimistic_update_tx
|
|
.send(kind)
|
|
.map(|count| log_count("light client optimistic update", count)),
|
|
EventKind::BlockReward(_) => self
|
|
.block_reward_tx
|
|
.send(kind)
|
|
.map(|count| log_count("block reward", count)),
|
|
};
|
|
if let Err(SendError(event)) = result {
|
|
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
|
|
}
|
|
}
|
|
|
|
pub fn subscribe_attestation(&self) -> Receiver<EventKind<E>> {
|
|
self.attestation_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_block(&self) -> Receiver<EventKind<E>> {
|
|
self.block_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<E>> {
|
|
self.blob_sidecar_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_finalized(&self) -> Receiver<EventKind<E>> {
|
|
self.finalized_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_head(&self) -> Receiver<EventKind<E>> {
|
|
self.head_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_exit(&self) -> Receiver<EventKind<E>> {
|
|
self.exit_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<E>> {
|
|
self.chain_reorg_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_contributions(&self) -> Receiver<EventKind<E>> {
|
|
self.contribution_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<E>> {
|
|
self.payload_attributes_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_late_head(&self) -> Receiver<EventKind<E>> {
|
|
self.late_head.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<E>> {
|
|
self.light_client_finality_update_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<E>> {
|
|
self.light_client_optimistic_update_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_block_reward(&self) -> Receiver<EventKind<E>> {
|
|
self.block_reward_tx.subscribe()
|
|
}
|
|
|
|
pub fn has_attestation_subscribers(&self) -> bool {
|
|
self.attestation_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_block_subscribers(&self) -> bool {
|
|
self.block_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_blob_sidecar_subscribers(&self) -> bool {
|
|
self.blob_sidecar_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_finalized_subscribers(&self) -> bool {
|
|
self.finalized_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_head_subscribers(&self) -> bool {
|
|
self.head_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_exit_subscribers(&self) -> bool {
|
|
self.exit_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_reorg_subscribers(&self) -> bool {
|
|
self.chain_reorg_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_contribution_subscribers(&self) -> bool {
|
|
self.contribution_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_payload_attributes_subscribers(&self) -> bool {
|
|
self.payload_attributes_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_late_head_subscribers(&self) -> bool {
|
|
self.late_head.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_block_reward_subscribers(&self) -> bool {
|
|
self.block_reward_tx.receiver_count() > 0
|
|
}
|
|
}
|