Files
lighthouse/beacon_node/beacon_chain/src/events.rs
Eitan Seri-Levi 06329ec2d1 SingleAttestation implementation (#6488)
* First pass

* Add restrictions to RuntimeVariableList api

* Use empty_uninitialized and fix warnings

* Fix some todos

* Merge branch 'unstable' into max-blobs-preset

* Fix take impl on RuntimeFixedList

* cleanup

* Fix test compilations

* Fix some more tests

* Fix test from unstable

* Merge branch 'unstable' into max-blobs-preset

* SingleAttestation

* Add post attestation v2 endpoint logic to attestation service

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation

* Implement "Bugfix and more withdrawal tests"

* Implement "Add missed exit checks to consolidation processing"

* Implement "Update initial earliest_exit_epoch calculation"

* Implement "Limit consolidating balance by validator.effective_balance"

* Implement "Use 16-bit random value in validator filter"

* Implement "Do not change creds type on consolidation"

* some tests and fixed attestqtion calc

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation

* Rename PendingPartialWithdraw index field to validator_index

* Skip slots to get test to pass and add TODO

* Implement "Synchronously check all transactions to have non-zero length"

* Merge remote-tracking branch 'origin/unstable' into max-blobs-preset

* Remove footgun function

* Minor simplifications

* Move from preset to config

* Fix typo

* Revert "Remove footgun function"

This reverts commit de01f923c7.

* Try fixing tests

* Implement "bump minimal preset MAX_BLOB_COMMITMENTS_PER_BLOCK and KZG_COMMITMENT_INCLUSION_PROOF_DEPTH"

* Thread through ChainSpec

* Fix release tests

* Move RuntimeFixedVector into module and rename

* Add test

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation

* Added more test coverage, simplified Attestation conversion, and other minor refactors

* Removed unusued codepaths

* Fix failing test

* Implement "Remove post-altair `initialize_beacon_state_from_eth1` from specs"

* Update preset YAML

* Remove empty RuntimeVarList awefullness

* Make max_blobs_per_block a config parameter (#6329)

Squashed commit of the following:

commit 04b3743ec1
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 17:36:58 2025 +1100

    Add test

commit 440e854199
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 17:24:50 2025 +1100

    Move RuntimeFixedVector into module and rename

commit f66e179a40
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 17:17:17 2025 +1100

    Fix release tests

commit e4bfe71cd1
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 17:05:30 2025 +1100

    Thread through ChainSpec

commit 063b79c16a
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 15:32:16 2025 +1100

    Try fixing tests

commit 88bedf09bc
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 15:04:37 2025 +1100

    Revert "Remove footgun function"

    This reverts commit de01f923c7.

commit 32483d385b
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 15:04:32 2025 +1100

    Fix typo

commit 2e86585b47
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 15:04:15 2025 +1100

    Move from preset to config

commit 1095d60a40
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 14:38:40 2025 +1100

    Minor simplifications

commit de01f923c7
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 14:06:57 2025 +1100

    Remove footgun function

commit 0c2c8c4224
Merge: 21ecb58ff f51a292f7
Author: Michael Sproul <michael@sigmaprime.io>
Date:   Mon Jan 6 14:02:50 2025 +1100

    Merge remote-tracking branch 'origin/unstable' into max-blobs-preset

commit f51a292f77
Author: Daniel Knopik <107140945+dknopik@users.noreply.github.com>
Date:   Fri Jan 3 20:27:21 2025 +0100

    fully lint only explicitly to avoid unnecessary rebuilds (#6753)

    * fully lint only explicitly to avoid unnecessary rebuilds

commit 7e0cddef32
Author: Akihito Nakano <sora.akatsuki@gmail.com>
Date:   Tue Dec 24 10:38:56 2024 +0900

    Make sure we have fanout peers when publish (#6738)

    * Ensure that `fanout_peers` is always non-empty if it's `Some`

commit 21ecb58ff8
Merge: 2fcb2935e 9aefb5539
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Mon Oct 21 14:46:00 2024 -0700

    Merge branch 'unstable' into max-blobs-preset

commit 2fcb2935ec
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Fri Sep 6 18:28:31 2024 -0700

    Fix test from unstable

commit 12c6ef118a
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Wed Sep 4 16:16:36 2024 -0700

    Fix some more tests

commit d37733b846
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Wed Sep 4 12:47:36 2024 -0700

    Fix test compilations

commit 52bb581e07
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Tue Sep 3 18:38:19 2024 -0700

    cleanup

commit e71020e3e6
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Tue Sep 3 17:16:10 2024 -0700

    Fix take impl on RuntimeFixedList

commit 13f9bba647
Merge: 60100fc6b 4e675cf5d
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Tue Sep 3 16:08:59 2024 -0700

    Merge branch 'unstable' into max-blobs-preset

commit 60100fc6be
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Fri Aug 30 16:04:11 2024 -0700

    Fix some todos

commit a9cb329a22
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Fri Aug 30 15:54:00 2024 -0700

    Use empty_uninitialized and fix warnings

commit 4dc6e6515e
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Fri Aug 30 15:53:18 2024 -0700

    Add restrictions to RuntimeVariableList api

commit 25feedfde3
Author: Pawan Dhananjay <pawandhananjay@gmail.com>
Date:   Thu Aug 29 16:11:19 2024 -0700

    First pass

* Fix tests

* Implement max_blobs_per_block_electra

* Fix config issues

* Simplify BlobSidecarListFromRoot

* Disable PeerDAS tests

* Cleanup single attestation imports

* Fix some single attestation network plumbing

* Merge remote-tracking branch 'origin/unstable' into max-blobs-preset

* Bump quota to account for new target (6)

* Remove clone

* Fix issue from review

* Try to remove ugliness

* Merge branch 'unstable' into max-blobs-preset

* Merge remote-tracking branch 'origin/unstable' into electra-alpha10

* Merge commit '04b3743ec1e0b650269dd8e58b540c02430d1c0d' into electra-alpha10

* Merge remote-tracking branch 'pawan/max-blobs-preset' into electra-alpha10

* Update tests to v1.5.0-beta.0

* Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation

* Fix some tests

* Cargo fmt

* lint

* fmt

* Resolve merge conflicts

* Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation

* lint

* Linting

* fmt

* Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation

* Fmt

* Fix test and add TODO

* Gracefully handle slashed proposers in fork choice tests

* Merge remote-tracking branch 'origin/unstable' into electra-alpha10

* Keep latest changes from max_blobs_per_block PR in codec.rs

* Revert a few more regressions and add a comment

* Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation

* Disable more DAS tests

* Improve validator monitor test a little

* Make test more robust

* Fix sync test that didn't understand blobs

* Fill out cropped comment

* Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation

* Merge remote-tracking branch 'origin/unstable' into single_attestation

* Merge remote-tracking branch 'origin/unstable' into single_attestation

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation

* publish_attestations should accept Either<Attestation,SingleAttestation>

* log an error when failing to convert to SingleAttestation

* Use Cow to avoid clone

* Avoid reconverting to SingleAttestation

* Tweak VC error message

* update comments

* update comments

* pass in single attestation as ref to subnetid calculation method

* Improved API, new error variants and other minor tweaks

* Fix single_attestation event topic boilerplate

* fix sse event failure

* Add single_attestation event topic test coverage
2025-01-16 18:27:08 +00:00

306 lines
11 KiB
Rust

pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use slog::{trace, Logger};
use tokio::sync::broadcast;
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
use types::EthSpec;
const DEFAULT_CHANNEL_CAPACITY: usize = 16;
pub struct ServerSentEventHandler<E: EthSpec> {
attestation_tx: Sender<EventKind<E>>,
single_attestation_tx: Sender<EventKind<E>>,
block_tx: Sender<EventKind<E>>,
blob_sidecar_tx: Sender<EventKind<E>>,
finalized_tx: Sender<EventKind<E>>,
head_tx: Sender<EventKind<E>>,
exit_tx: Sender<EventKind<E>>,
chain_reorg_tx: Sender<EventKind<E>>,
contribution_tx: Sender<EventKind<E>>,
payload_attributes_tx: Sender<EventKind<E>>,
late_head: Sender<EventKind<E>>,
light_client_finality_update_tx: Sender<EventKind<E>>,
light_client_optimistic_update_tx: Sender<EventKind<E>>,
block_reward_tx: Sender<EventKind<E>>,
proposer_slashing_tx: Sender<EventKind<E>>,
attester_slashing_tx: Sender<EventKind<E>>,
bls_to_execution_change_tx: Sender<EventKind<E>>,
block_gossip_tx: Sender<EventKind<E>>,
log: Logger,
}
impl<E: EthSpec> ServerSentEventHandler<E> {
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
Self::new_with_capacity(
log,
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
)
}
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
let (attestation_tx, _) = broadcast::channel(capacity);
let (single_attestation_tx, _) = broadcast::channel(capacity);
let (block_tx, _) = broadcast::channel(capacity);
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg_tx, _) = broadcast::channel(capacity);
let (contribution_tx, _) = broadcast::channel(capacity);
let (payload_attributes_tx, _) = broadcast::channel(capacity);
let (late_head, _) = broadcast::channel(capacity);
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
let (block_reward_tx, _) = broadcast::channel(capacity);
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
let (attester_slashing_tx, _) = broadcast::channel(capacity);
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
let (block_gossip_tx, _) = broadcast::channel(capacity);
Self {
attestation_tx,
single_attestation_tx,
block_tx,
blob_sidecar_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg_tx,
contribution_tx,
payload_attributes_tx,
late_head,
light_client_finality_update_tx,
light_client_optimistic_update_tx,
block_reward_tx,
proposer_slashing_tx,
attester_slashing_tx,
bls_to_execution_change_tx,
block_gossip_tx,
log,
}
}
pub fn register(&self, kind: EventKind<E>) {
let log_count = |name, count| {
trace!(
self.log,
"Registering server-sent event";
"kind" => name,
"receiver_count" => count
);
};
let result = match &kind {
EventKind::Attestation(_) => self
.attestation_tx
.send(kind)
.map(|count| log_count("attestation", count)),
EventKind::SingleAttestation(_) => self
.single_attestation_tx
.send(kind)
.map(|count| log_count("single_attestation", count)),
EventKind::Block(_) => self
.block_tx
.send(kind)
.map(|count| log_count("block", count)),
EventKind::BlobSidecar(_) => self
.blob_sidecar_tx
.send(kind)
.map(|count| log_count("blob sidecar", count)),
EventKind::FinalizedCheckpoint(_) => self
.finalized_tx
.send(kind)
.map(|count| log_count("finalized checkpoint", count)),
EventKind::Head(_) => self
.head_tx
.send(kind)
.map(|count| log_count("head", count)),
EventKind::VoluntaryExit(_) => self
.exit_tx
.send(kind)
.map(|count| log_count("exit", count)),
EventKind::ChainReorg(_) => self
.chain_reorg_tx
.send(kind)
.map(|count| log_count("chain reorg", count)),
EventKind::ContributionAndProof(_) => self
.contribution_tx
.send(kind)
.map(|count| log_count("contribution and proof", count)),
EventKind::PayloadAttributes(_) => self
.payload_attributes_tx
.send(kind)
.map(|count| log_count("payload attributes", count)),
EventKind::LateHead(_) => self
.late_head
.send(kind)
.map(|count| log_count("late head", count)),
EventKind::LightClientFinalityUpdate(_) => self
.light_client_finality_update_tx
.send(kind)
.map(|count| log_count("light client finality update", count)),
EventKind::LightClientOptimisticUpdate(_) => self
.light_client_optimistic_update_tx
.send(kind)
.map(|count| log_count("light client optimistic update", count)),
EventKind::BlockReward(_) => self
.block_reward_tx
.send(kind)
.map(|count| log_count("block reward", count)),
EventKind::ProposerSlashing(_) => self
.proposer_slashing_tx
.send(kind)
.map(|count| log_count("proposer slashing", count)),
EventKind::AttesterSlashing(_) => self
.attester_slashing_tx
.send(kind)
.map(|count| log_count("attester slashing", count)),
EventKind::BlsToExecutionChange(_) => self
.bls_to_execution_change_tx
.send(kind)
.map(|count| log_count("bls to execution change", count)),
EventKind::BlockGossip(_) => self
.block_gossip_tx
.send(kind)
.map(|count| log_count("block gossip", count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
}
}
pub fn subscribe_attestation(&self) -> Receiver<EventKind<E>> {
self.attestation_tx.subscribe()
}
pub fn subscribe_single_attestation(&self) -> Receiver<EventKind<E>> {
self.single_attestation_tx.subscribe()
}
pub fn subscribe_block(&self) -> Receiver<EventKind<E>> {
self.block_tx.subscribe()
}
pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<E>> {
self.blob_sidecar_tx.subscribe()
}
pub fn subscribe_finalized(&self) -> Receiver<EventKind<E>> {
self.finalized_tx.subscribe()
}
pub fn subscribe_head(&self) -> Receiver<EventKind<E>> {
self.head_tx.subscribe()
}
pub fn subscribe_exit(&self) -> Receiver<EventKind<E>> {
self.exit_tx.subscribe()
}
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<E>> {
self.chain_reorg_tx.subscribe()
}
pub fn subscribe_contributions(&self) -> Receiver<EventKind<E>> {
self.contribution_tx.subscribe()
}
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<E>> {
self.payload_attributes_tx.subscribe()
}
pub fn subscribe_late_head(&self) -> Receiver<EventKind<E>> {
self.late_head.subscribe()
}
pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<E>> {
self.light_client_finality_update_tx.subscribe()
}
pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<E>> {
self.light_client_optimistic_update_tx.subscribe()
}
pub fn subscribe_block_reward(&self) -> Receiver<EventKind<E>> {
self.block_reward_tx.subscribe()
}
pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
self.attester_slashing_tx.subscribe()
}
pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
self.proposer_slashing_tx.subscribe()
}
pub fn subscribe_bls_to_execution_change(&self) -> Receiver<EventKind<E>> {
self.bls_to_execution_change_tx.subscribe()
}
pub fn subscribe_block_gossip(&self) -> Receiver<EventKind<E>> {
self.block_gossip_tx.subscribe()
}
pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
pub fn has_single_attestation_subscribers(&self) -> bool {
self.single_attestation_tx.receiver_count() > 0
}
pub fn has_block_subscribers(&self) -> bool {
self.block_tx.receiver_count() > 0
}
pub fn has_blob_sidecar_subscribers(&self) -> bool {
self.blob_sidecar_tx.receiver_count() > 0
}
pub fn has_finalized_subscribers(&self) -> bool {
self.finalized_tx.receiver_count() > 0
}
pub fn has_head_subscribers(&self) -> bool {
self.head_tx.receiver_count() > 0
}
pub fn has_exit_subscribers(&self) -> bool {
self.exit_tx.receiver_count() > 0
}
pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg_tx.receiver_count() > 0
}
pub fn has_contribution_subscribers(&self) -> bool {
self.contribution_tx.receiver_count() > 0
}
pub fn has_payload_attributes_subscribers(&self) -> bool {
self.payload_attributes_tx.receiver_count() > 0
}
pub fn has_late_head_subscribers(&self) -> bool {
self.late_head.receiver_count() > 0
}
pub fn has_block_reward_subscribers(&self) -> bool {
self.block_reward_tx.receiver_count() > 0
}
pub fn has_proposer_slashing_subscribers(&self) -> bool {
self.proposer_slashing_tx.receiver_count() > 0
}
pub fn has_attester_slashing_subscribers(&self) -> bool {
self.attester_slashing_tx.receiver_count() > 0
}
pub fn has_bls_to_execution_change_subscribers(&self) -> bool {
self.bls_to_execution_change_tx.receiver_count() > 0
}
pub fn has_block_gossip_subscribers(&self) -> bool {
self.block_gossip_tx.receiver_count() > 0
}
}