mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 18:04:18 +00:00
Proposer and attester slashing sse events (#5327)
* default vc to block v3 endpoint and deprecate block-v3 flag * Merge branch 'unstable' of https://github.com/sigp/lighthouse into unstable * add proposer and attester event variants * add TOOOs * add tests, event triggers * Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events * revert * revert * remove double event tracking * Merge branch 'unstable' into proposer-and-attester-slashing-sse-events * remove todo, fix test * resolve merge conflicts * Merge branch 'proposer-and-attester-slashing-sse-events' of https://github.com/eserilev/lighthouse into proposer-and-attester-slashing-sse-events * leftover debugging * Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events * Merge branch 'unstable' of https://github.com/sigp/lighthouse into proposer-and-attester-slashing-sse-events
This commit is contained in:
@@ -2422,6 +2422,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
proposer_slashing: ProposerSlashing,
|
||||
) -> Result<ObservationOutcome<ProposerSlashing, T::EthSpec>, Error> {
|
||||
let wall_clock_state = self.wall_clock_state()?;
|
||||
|
||||
Ok(self.observed_proposer_slashings.lock().verify_and_observe(
|
||||
proposer_slashing,
|
||||
&wall_clock_state,
|
||||
@@ -2434,6 +2435,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
&self,
|
||||
proposer_slashing: SigVerifiedOp<ProposerSlashing, T::EthSpec>,
|
||||
) {
|
||||
if let Some(event_handler) = self.event_handler.as_ref() {
|
||||
if event_handler.has_proposer_slashing_subscribers() {
|
||||
event_handler.register(EventKind::ProposerSlashing(Box::new(
|
||||
proposer_slashing.clone().into_inner(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if self.eth1_chain.is_some() {
|
||||
self.op_pool.insert_proposer_slashing(proposer_slashing)
|
||||
}
|
||||
@@ -2445,6 +2454,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
attester_slashing: AttesterSlashing<T::EthSpec>,
|
||||
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>, T::EthSpec>, Error> {
|
||||
let wall_clock_state = self.wall_clock_state()?;
|
||||
|
||||
Ok(self.observed_attester_slashings.lock().verify_and_observe(
|
||||
attester_slashing,
|
||||
&wall_clock_state,
|
||||
@@ -2465,6 +2475,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
||||
.fork_choice_write_lock()
|
||||
.on_attester_slashing(attester_slashing.as_inner());
|
||||
|
||||
if let Some(event_handler) = self.event_handler.as_ref() {
|
||||
if event_handler.has_attester_slashing_subscribers() {
|
||||
event_handler.register(EventKind::AttesterSlashing(Box::new(
|
||||
attester_slashing.clone().into_inner(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the op pool (if we have the ability to propose blocks).
|
||||
if self.eth1_chain.is_some() {
|
||||
self.op_pool.insert_attester_slashing(attester_slashing)
|
||||
|
||||
@@ -708,8 +708,8 @@ where
|
||||
.ok_or("Cannot build without a genesis state root")?;
|
||||
let validator_monitor_config = self.validator_monitor_config.unwrap_or_default();
|
||||
let head_tracker = Arc::new(self.head_tracker.unwrap_or_default());
|
||||
|
||||
let beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>> = <_>::default();
|
||||
|
||||
let mut validator_monitor = ValidatorMonitor::new(
|
||||
validator_monitor_config,
|
||||
beacon_proposer_cache.clone(),
|
||||
|
||||
@@ -20,6 +20,8 @@ pub struct ServerSentEventHandler<E: EthSpec> {
|
||||
light_client_finality_update_tx: Sender<EventKind<E>>,
|
||||
light_client_optimistic_update_tx: Sender<EventKind<E>>,
|
||||
block_reward_tx: Sender<EventKind<E>>,
|
||||
proposer_slashing_tx: Sender<EventKind<E>>,
|
||||
attester_slashing_tx: Sender<EventKind<E>>,
|
||||
log: Logger,
|
||||
}
|
||||
|
||||
@@ -45,6 +47,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||
let (light_client_finality_update_tx, _) = broadcast::channel(capacity);
|
||||
let (light_client_optimistic_update_tx, _) = broadcast::channel(capacity);
|
||||
let (block_reward_tx, _) = broadcast::channel(capacity);
|
||||
let (proposer_slashing_tx, _) = broadcast::channel(capacity);
|
||||
let (attester_slashing_tx, _) = broadcast::channel(capacity);
|
||||
|
||||
Self {
|
||||
attestation_tx,
|
||||
@@ -60,6 +64,8 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||
light_client_finality_update_tx,
|
||||
light_client_optimistic_update_tx,
|
||||
block_reward_tx,
|
||||
proposer_slashing_tx,
|
||||
attester_slashing_tx,
|
||||
log,
|
||||
}
|
||||
}
|
||||
@@ -126,6 +132,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||
.block_reward_tx
|
||||
.send(kind)
|
||||
.map(|count| log_count("block reward", count)),
|
||||
EventKind::ProposerSlashing(_) => self
|
||||
.proposer_slashing_tx
|
||||
.send(kind)
|
||||
.map(|count| log_count("proposer slashing", count)),
|
||||
EventKind::AttesterSlashing(_) => self
|
||||
.attester_slashing_tx
|
||||
.send(kind)
|
||||
.map(|count| log_count("attester slashing", count)),
|
||||
};
|
||||
if let Err(SendError(event)) = result {
|
||||
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
|
||||
@@ -184,6 +198,14 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||
self.block_reward_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_attester_slashing(&self) -> Receiver<EventKind<E>> {
|
||||
self.attester_slashing_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn subscribe_proposer_slashing(&self) -> Receiver<EventKind<E>> {
|
||||
self.proposer_slashing_tx.subscribe()
|
||||
}
|
||||
|
||||
pub fn has_attestation_subscribers(&self) -> bool {
|
||||
self.attestation_tx.receiver_count() > 0
|
||||
}
|
||||
@@ -227,4 +249,12 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
|
||||
pub fn has_block_reward_subscribers(&self) -> bool {
|
||||
self.block_reward_tx.receiver_count() > 0
|
||||
}
|
||||
|
||||
pub fn has_proposer_slashing_subscribers(&self) -> bool {
|
||||
self.proposer_slashing_tx.receiver_count() > 0
|
||||
}
|
||||
|
||||
pub fn has_attester_slashing_subscribers(&self) -> bool {
|
||||
self.attester_slashing_tx.receiver_count() > 0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4358,6 +4358,12 @@ pub fn serve<T: BeaconChainTypes>(
|
||||
api_types::EventTopic::BlockReward => {
|
||||
event_handler.subscribe_block_reward()
|
||||
}
|
||||
api_types::EventTopic::AttesterSlashing => {
|
||||
event_handler.subscribe_attester_slashing()
|
||||
}
|
||||
api_types::EventTopic::ProposerSlashing => {
|
||||
event_handler.subscribe_proposer_slashing()
|
||||
}
|
||||
};
|
||||
|
||||
receivers.push(
|
||||
|
||||
@@ -5214,6 +5214,8 @@ impl ApiTester {
|
||||
EventTopic::Block,
|
||||
EventTopic::Head,
|
||||
EventTopic::FinalizedCheckpoint,
|
||||
EventTopic::AttesterSlashing,
|
||||
EventTopic::ProposerSlashing,
|
||||
];
|
||||
let mut events_future = self
|
||||
.client
|
||||
@@ -5353,6 +5355,42 @@ impl ApiTester {
|
||||
.await;
|
||||
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);
|
||||
|
||||
// Test attester slashing event
|
||||
let mut attester_slashing_event_future = self
|
||||
.client
|
||||
.get_events::<E>(&[EventTopic::AttesterSlashing])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
self.harness.add_attester_slashing(vec![1, 2, 3]).unwrap();
|
||||
|
||||
let attester_slashing_event = poll_events(
|
||||
&mut attester_slashing_event_future,
|
||||
1,
|
||||
Duration::from_millis(10000),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(attester_slashing_event.len() == 1);
|
||||
|
||||
// Test proposer slashing event
|
||||
let mut proposer_slashing_event_future = self
|
||||
.client
|
||||
.get_events::<E>(&[EventTopic::ProposerSlashing])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
self.harness.add_proposer_slashing(1).unwrap();
|
||||
|
||||
let proposer_slashing_event = poll_events(
|
||||
&mut proposer_slashing_event_future,
|
||||
1,
|
||||
Duration::from_millis(10000),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(proposer_slashing_event.len() == 1);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user