mirror of
https://github.com/sigp/lighthouse.git
synced 2026-04-24 00:08:27 +00:00
* First pass * Add restrictions to RuntimeVariableList api * Use empty_uninitialized and fix warnings * Fix some todos * Merge branch 'unstable' into max-blobs-preset * Fix take impl on RuntimeFixedList * cleanup * Fix test compilations * Fix some more tests * Fix test from unstable * Merge branch 'unstable' into max-blobs-preset * SingleAttestation * Add post attestation v2 endpoint logic to attestation service * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Implement "Bugfix and more withdrawal tests" * Implement "Add missed exit checks to consolidation processing" * Implement "Update initial earliest_exit_epoch calculation" * Implement "Limit consolidating balance by validator.effective_balance" * Implement "Use 16-bit random value in validator filter" * Implement "Do not change creds type on consolidation" * some tests and fixed attestqtion calc * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Rename PendingPartialWithdraw index field to validator_index * Skip slots to get test to pass and add TODO * Implement "Synchronously check all transactions to have non-zero length" * Merge remote-tracking branch 'origin/unstable' into max-blobs-preset * Remove footgun function * Minor simplifications * Move from preset to config * Fix typo * Revert "Remove footgun function" This reverts commitde01f923c7. * Try fixing tests * Implement "bump minimal preset MAX_BLOB_COMMITMENTS_PER_BLOCK and KZG_COMMITMENT_INCLUSION_PROOF_DEPTH" * Thread through ChainSpec * Fix release tests * Move RuntimeFixedVector into module and rename * Add test * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * Added more test coverage, simplified Attestation conversion, and other minor refactors * Removed unusued codepaths * Fix failing test * Implement "Remove post-altair `initialize_beacon_state_from_eth1` from specs" * Update preset YAML * Remove empty RuntimeVarList awefullness * Make max_blobs_per_block a config parameter (#6329) Squashed commit of the following: commit04b3743ec1Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 17:36:58 2025 +1100 Add test commit440e854199Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 17:24:50 2025 +1100 Move RuntimeFixedVector into module and rename commitf66e179a40Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 17:17:17 2025 +1100 Fix release tests commite4bfe71cd1Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 17:05:30 2025 +1100 Thread through ChainSpec commit063b79c16aAuthor: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 15:32:16 2025 +1100 Try fixing tests commit88bedf09bcAuthor: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 15:04:37 2025 +1100 Revert "Remove footgun function" This reverts commitde01f923c7. commit32483d385bAuthor: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 15:04:32 2025 +1100 Fix typo commit2e86585b47Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 15:04:15 2025 +1100 Move from preset to config commit1095d60a40Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 14:38:40 2025 +1100 Minor simplifications commitde01f923c7Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 14:06:57 2025 +1100 Remove footgun function commit0c2c8c4224Merge:21ecb58fff51a292f7Author: Michael Sproul <michael@sigmaprime.io> Date: Mon Jan 6 14:02:50 2025 +1100 Merge remote-tracking branch 'origin/unstable' into max-blobs-preset commitf51a292f77Author: Daniel Knopik <107140945+dknopik@users.noreply.github.com> Date: Fri Jan 3 20:27:21 2025 +0100 fully lint only explicitly to avoid unnecessary rebuilds (#6753) * fully lint only explicitly to avoid unnecessary rebuilds commit7e0cddef32Author: Akihito Nakano <sora.akatsuki@gmail.com> Date: Tue Dec 24 10:38:56 2024 +0900 Make sure we have fanout peers when publish (#6738) * Ensure that `fanout_peers` is always non-empty if it's `Some` commit21ecb58ff8Merge:2fcb2935e9aefb5539Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Mon Oct 21 14:46:00 2024 -0700 Merge branch 'unstable' into max-blobs-preset commit2fcb2935ecAuthor: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Fri Sep 6 18:28:31 2024 -0700 Fix test from unstable commit12c6ef118aAuthor: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Wed Sep 4 16:16:36 2024 -0700 Fix some more tests commitd37733b846Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Wed Sep 4 12:47:36 2024 -0700 Fix test compilations commit52bb581e07Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Tue Sep 3 18:38:19 2024 -0700 cleanup commite71020e3e6Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Tue Sep 3 17:16:10 2024 -0700 Fix take impl on RuntimeFixedList commit13f9bba647Merge:60100fc6b4e675cf5dAuthor: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Tue Sep 3 16:08:59 2024 -0700 Merge branch 'unstable' into max-blobs-preset commit60100fc6beAuthor: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Fri Aug 30 16:04:11 2024 -0700 Fix some todos commita9cb329a22Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Fri Aug 30 15:54:00 2024 -0700 Use empty_uninitialized and fix warnings commit4dc6e6515eAuthor: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Fri Aug 30 15:53:18 2024 -0700 Add restrictions to RuntimeVariableList api commit25feedfde3Author: Pawan Dhananjay <pawandhananjay@gmail.com> Date: Thu Aug 29 16:11:19 2024 -0700 First pass * Fix tests * Implement max_blobs_per_block_electra * Fix config issues * Simplify BlobSidecarListFromRoot * Disable PeerDAS tests * Cleanup single attestation imports * Fix some single attestation network plumbing * Merge remote-tracking branch 'origin/unstable' into max-blobs-preset * Bump quota to account for new target (6) * Remove clone * Fix issue from review * Try to remove ugliness * Merge branch 'unstable' into max-blobs-preset * Merge remote-tracking branch 'origin/unstable' into electra-alpha10 * Merge commit '04b3743ec1e0b650269dd8e58b540c02430d1c0d' into electra-alpha10 * Merge remote-tracking branch 'pawan/max-blobs-preset' into electra-alpha10 * Update tests to v1.5.0-beta.0 * Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation * Fix some tests * Cargo fmt * lint * fmt * Resolve merge conflicts * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * lint * Linting * fmt * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * Fmt * Fix test and add TODO * Gracefully handle slashed proposers in fork choice tests * Merge remote-tracking branch 'origin/unstable' into electra-alpha10 * Keep latest changes from max_blobs_per_block PR in codec.rs * Revert a few more regressions and add a comment * Merge branch 'electra-alpha10' of https://github.com/sigp/lighthouse into single_attestation * Disable more DAS tests * Improve validator monitor test a little * Make test more robust * Fix sync test that didn't understand blobs * Fill out cropped comment * Merge remote-tracking branch 'origin/electra-alpha10' into single_attestation * Merge remote-tracking branch 'origin/unstable' into single_attestation * Merge remote-tracking branch 'origin/unstable' into single_attestation * Merge branch 'unstable' of https://github.com/sigp/lighthouse into single_attestation * publish_attestations should accept Either<Attestation,SingleAttestation> * log an error when failing to convert to SingleAttestation * Use Cow to avoid clone * Avoid reconverting to SingleAttestation * Tweak VC error message * update comments * update comments * pass in single attestation as ref to subnetid calculation method * Improved API, new error variants and other minor tweaks * Fix single_attestation event topic boilerplate * fix sse event failure * Add single_attestation event topic test coverage
306 lines
11 KiB
Rust
306 lines
11 KiB
Rust
pub use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
|
|
use slog::{trace, Logger};
|
|
use tokio::sync::broadcast;
|
|
use tokio::sync::broadcast::{error::SendError, Receiver, Sender};
|
|
use types::EthSpec;
|
|
|
|
const DEFAULT_CHANNEL_CAPACITY: usize = 16;
|
|
|
|
pub struct ServerSentEventHandler<E: EthSpec> {
|
|
attestation_tx: Sender<EventKind<E>>,
|
|
single_attestation_tx: Sender<EventKind<E>>,
|
|
block_tx: Sender<EventKind<E>>,
|
|
blob_sidecar_tx: Sender<EventKind<E>>,
|
|
finalized_tx: Sender<EventKind<E>>,
|
|
head_tx: Sender<EventKind<E>>,
|
|
exit_tx: Sender<EventKind<E>>,
|
|
chain_reorg_tx: Sender<EventKind<E>>,
|
|
contribution_tx: Sender<EventKind<E>>,
|
|
payload_attributes_tx: Sender<EventKind<E>>,
|
|
late_head: Sender<EventKind<E>>,
|
|
light_client_finality_update_tx: Sender<EventKind<E>>,
|
|
light_client_optimistic_update_tx: Sender<EventKind<E>>,
|
|
block_reward_tx: Sender<EventKind<E>>,
|
|
proposer_slashing_tx: Sender<EventKind<E>>,
|
|
attester_slashing_tx: Sender<EventKind<E>>,
|
|
bls_to_execution_change_tx: Sender<EventKind<E>>,
|
|
block_gossip_tx: Sender<EventKind<E>>,
|
|
log: Logger,
|
|
}
|
|
|
|
impl<E: EthSpec> ServerSentEventHandler<E> {
|
|
pub fn new(log: Logger, capacity_multiplier: usize) -> Self {
|
|
Self::new_with_capacity(
|
|
log,
|
|
capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY),
|
|
)
|
|
}
|
|
|
|
pub fn new_with_capacity(log: Logger, capacity: usize) -> Self {
|
|
let (attestation_tx, _) = broadcast::channel(capacity);
|
|
let (single_attestation_tx, _) = broadcast::channel(capacity);
|
|
let (block_tx, _) = broadcast::channel(capacity);
|
|
let (blob_sidecar_tx, _) = broadcast::channel(capacity);
|
|
let (finalized_tx, _) = broadcast::channel(capacity);
|
|
let (head_tx, _) = broadcast::channel(capacity);
|
|
let (exit_tx, _) = broadcast::channel(capacity);
|
|
let (chain_reorg_tx, _) = broadcast::channel(capacity);
|
|
let (contribution_tx, _) = broadcast::channel(capacity);
|
|
let (payload_attributes_tx, _) = broadcast::channel(capacity);
|
|
let (late_head, _) = broadcast::channel(capacity);
|
|
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
|
|
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
|
|
let (block_reward_tx, _) = broadcast::channel(capacity);
|
|
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
|
|
let (attester_slashing_tx, _) = broadcast::channel(capacity);
|
|
let (bls_to_execution_change_tx, _) = broadcast::channel(capacity);
|
|
let (block_gossip_tx, _) = broadcast::channel(capacity);
|
|
|
|
Self {
|
|
attestation_tx,
|
|
single_attestation_tx,
|
|
block_tx,
|
|
blob_sidecar_tx,
|
|
finalized_tx,
|
|
head_tx,
|
|
exit_tx,
|
|
chain_reorg_tx,
|
|
contribution_tx,
|
|
payload_attributes_tx,
|
|
late_head,
|
|
light_client_finality_update_tx,
|
|
light_client_optimistic_update_tx,
|
|
block_reward_tx,
|
|
proposer_slashing_tx,
|
|
attester_slashing_tx,
|
|
bls_to_execution_change_tx,
|
|
block_gossip_tx,
|
|
log,
|
|
}
|
|
}
|
|
|
|
pub fn register(&self, kind: EventKind<E>) {
|
|
let log_count = |name, count| {
|
|
trace!(
|
|
self.log,
|
|
"Registering server-sent event";
|
|
"kind" => name,
|
|
"receiver_count" => count
|
|
);
|
|
};
|
|
let result = match &kind {
|
|
EventKind::Attestation(_) => self
|
|
.attestation_tx
|
|
.send(kind)
|
|
.map(|count| log_count("attestation", count)),
|
|
EventKind::SingleAttestation(_) => self
|
|
.single_attestation_tx
|
|
.send(kind)
|
|
.map(|count| log_count("single_attestation", count)),
|
|
EventKind::Block(_) => self
|
|
.block_tx
|
|
.send(kind)
|
|
.map(|count| log_count("block", count)),
|
|
EventKind::BlobSidecar(_) => self
|
|
.blob_sidecar_tx
|
|
.send(kind)
|
|
.map(|count| log_count("blob sidecar", count)),
|
|
EventKind::FinalizedCheckpoint(_) => self
|
|
.finalized_tx
|
|
.send(kind)
|
|
.map(|count| log_count("finalized checkpoint", count)),
|
|
EventKind::Head(_) => self
|
|
.head_tx
|
|
.send(kind)
|
|
.map(|count| log_count("head", count)),
|
|
EventKind::VoluntaryExit(_) => self
|
|
.exit_tx
|
|
.send(kind)
|
|
.map(|count| log_count("exit", count)),
|
|
EventKind::ChainReorg(_) => self
|
|
.chain_reorg_tx
|
|
.send(kind)
|
|
.map(|count| log_count("chain reorg", count)),
|
|
EventKind::ContributionAndProof(_) => self
|
|
.contribution_tx
|
|
.send(kind)
|
|
.map(|count| log_count("contribution and proof", count)),
|
|
EventKind::PayloadAttributes(_) => self
|
|
.payload_attributes_tx
|
|
.send(kind)
|
|
.map(|count| log_count("payload attributes", count)),
|
|
EventKind::LateHead(_) => self
|
|
.late_head
|
|
.send(kind)
|
|
.map(|count| log_count("late head", count)),
|
|
EventKind::LightClientFinalityUpdate(_) => self
|
|
.light_client_finality_update_tx
|
|
.send(kind)
|
|
.map(|count| log_count("light client finality update", count)),
|
|
EventKind::LightClientOptimisticUpdate(_) => self
|
|
.light_client_optimistic_update_tx
|
|
.send(kind)
|
|
.map(|count| log_count("light client optimistic update", count)),
|
|
EventKind::BlockReward(_) => self
|
|
.block_reward_tx
|
|
.send(kind)
|
|
.map(|count| log_count("block reward", count)),
|
|
EventKind::ProposerSlashing(_) => self
|
|
.proposer_slashing_tx
|
|
.send(kind)
|
|
.map(|count| log_count("proposer slashing", count)),
|
|
EventKind::AttesterSlashing(_) => self
|
|
.attester_slashing_tx
|
|
.send(kind)
|
|
.map(|count| log_count("attester slashing", count)),
|
|
EventKind::BlsToExecutionChange(_) => self
|
|
.bls_to_execution_change_tx
|
|
.send(kind)
|
|
.map(|count| log_count("bls to execution change", count)),
|
|
EventKind::BlockGossip(_) => self
|
|
.block_gossip_tx
|
|
.send(kind)
|
|
.map(|count| log_count("block gossip", count)),
|
|
};
|
|
if let Err(SendError(event)) = result {
|
|
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
|
|
}
|
|
}
|
|
|
|
pub fn subscribe_attestation(&self) -> Receiver<EventKind<E>> {
|
|
self.attestation_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_single_attestation(&self) -> Receiver<EventKind<E>> {
|
|
self.single_attestation_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_block(&self) -> Receiver<EventKind<E>> {
|
|
self.block_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_blob_sidecar(&self) -> Receiver<EventKind<E>> {
|
|
self.blob_sidecar_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_finalized(&self) -> Receiver<EventKind<E>> {
|
|
self.finalized_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_head(&self) -> Receiver<EventKind<E>> {
|
|
self.head_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_exit(&self) -> Receiver<EventKind<E>> {
|
|
self.exit_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_reorgs(&self) -> Receiver<EventKind<E>> {
|
|
self.chain_reorg_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_contributions(&self) -> Receiver<EventKind<E>> {
|
|
self.contribution_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_payload_attributes(&self) -> Receiver<EventKind<E>> {
|
|
self.payload_attributes_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_late_head(&self) -> Receiver<EventKind<E>> {
|
|
self.late_head.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_light_client_finality_update(&self) -> Receiver<EventKind<E>> {
|
|
self.light_client_finality_update_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_light_client_optimistic_update(&self) -> Receiver<EventKind<E>> {
|
|
self.light_client_optimistic_update_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_block_reward(&self) -> Receiver<EventKind<E>> {
|
|
self.block_reward_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
|
|
self.attester_slashing_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
|
|
self.proposer_slashing_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_bls_to_execution_change(&self) -> Receiver<EventKind<E>> {
|
|
self.bls_to_execution_change_tx.subscribe()
|
|
}
|
|
|
|
pub fn subscribe_block_gossip(&self) -> Receiver<EventKind<E>> {
|
|
self.block_gossip_tx.subscribe()
|
|
}
|
|
|
|
pub fn has_attestation_subscribers(&self) -> bool {
|
|
self.attestation_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_single_attestation_subscribers(&self) -> bool {
|
|
self.single_attestation_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_block_subscribers(&self) -> bool {
|
|
self.block_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_blob_sidecar_subscribers(&self) -> bool {
|
|
self.blob_sidecar_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_finalized_subscribers(&self) -> bool {
|
|
self.finalized_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_head_subscribers(&self) -> bool {
|
|
self.head_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_exit_subscribers(&self) -> bool {
|
|
self.exit_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_reorg_subscribers(&self) -> bool {
|
|
self.chain_reorg_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_contribution_subscribers(&self) -> bool {
|
|
self.contribution_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_payload_attributes_subscribers(&self) -> bool {
|
|
self.payload_attributes_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_late_head_subscribers(&self) -> bool {
|
|
self.late_head.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_block_reward_subscribers(&self) -> bool {
|
|
self.block_reward_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_proposer_slashing_subscribers(&self) -> bool {
|
|
self.proposer_slashing_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_attester_slashing_subscribers(&self) -> bool {
|
|
self.attester_slashing_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_bls_to_execution_change_subscribers(&self) -> bool {
|
|
self.bls_to_execution_change_tx.receiver_count() > 0
|
|
}
|
|
|
|
pub fn has_block_gossip_subscribers(&self) -> bool {
|
|
self.block_gossip_tx.receiver_count() > 0
|
|
}
|
|
}
|