mirror of
https://github.com/sigp/lighthouse.git
synced 2026-07-04 21:34:36 +00:00
Emit NewHead SSE event earlier in block import (#8718)
Co-Authored-By: Jimmy Chen <jchen.tc@gmail.com>
This commit is contained in:
@@ -77,6 +77,7 @@ use bls::{PublicKey, PublicKeyBytes, Signature};
|
|||||||
use eth2::beacon_response::ForkVersionedResponse;
|
use eth2::beacon_response::ForkVersionedResponse;
|
||||||
use eth2::types::{
|
use eth2::types::{
|
||||||
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
|
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
|
||||||
|
SseHead,
|
||||||
};
|
};
|
||||||
use execution_layer::{
|
use execution_layer::{
|
||||||
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
|
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
|
||||||
@@ -3863,6 +3864,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Read the cached head prior to taking the fork choice lock to avoid potential deadlocks.
|
||||||
|
let old_head_slot = self.canonical_head.cached_head().head_slot();
|
||||||
|
|
||||||
// Take an upgradable read lock on fork choice so we can check if this block has already
|
// Take an upgradable read lock on fork choice so we can check if this block has already
|
||||||
// been imported. We don't want to repeat work importing a block that is already imported.
|
// been imported. We don't want to repeat work importing a block that is already imported.
|
||||||
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
let fork_choice_reader = self.canonical_head.fork_choice_upgradable_read_lock();
|
||||||
@@ -3918,6 +3922,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
// This block became the head, add it to the early attester cache.
|
// This block became the head, add it to the early attester cache.
|
||||||
Ok(new_head_root) if new_head_root == block_root => {
|
Ok(new_head_root) if new_head_root == block_root => {
|
||||||
if let Some(proto_block) = fork_choice.get_block(&block_root) {
|
if let Some(proto_block) = fork_choice.get_block(&block_root) {
|
||||||
|
let new_head_is_optimistic =
|
||||||
|
proto_block.execution_status.is_optimistic_or_invalid();
|
||||||
|
|
||||||
if let Err(e) = self.early_attester_cache.add_head_block(
|
if let Err(e) = self.early_attester_cache.add_head_block(
|
||||||
block_root,
|
block_root,
|
||||||
&signed_block,
|
&signed_block,
|
||||||
@@ -3937,6 +3944,50 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
attestable_timestamp,
|
attestable_timestamp,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register a server-sent-event for a new head.
|
||||||
|
if let Some(event_handler) = self
|
||||||
|
.event_handler
|
||||||
|
.as_ref()
|
||||||
|
.filter(|handler| handler.has_head_subscribers())
|
||||||
|
{
|
||||||
|
let head_slot = state.slot();
|
||||||
|
let state_root = block.state_root();
|
||||||
|
let is_epoch_transition = state.current_epoch()
|
||||||
|
> old_head_slot.epoch(T::EthSpec::slots_per_epoch());
|
||||||
|
|
||||||
|
let dependent_root = state.attester_shuffling_decision_root(
|
||||||
|
self.genesis_block_root,
|
||||||
|
RelativeEpoch::Next,
|
||||||
|
);
|
||||||
|
let prev_dependent_root = state.attester_shuffling_decision_root(
|
||||||
|
self.genesis_block_root,
|
||||||
|
RelativeEpoch::Current,
|
||||||
|
);
|
||||||
|
|
||||||
|
match (dependent_root, prev_dependent_root) {
|
||||||
|
(
|
||||||
|
Ok(current_duty_dependent_root),
|
||||||
|
Ok(previous_duty_dependent_root),
|
||||||
|
) => {
|
||||||
|
event_handler.register(EventKind::Head(SseHead {
|
||||||
|
slot: head_slot,
|
||||||
|
block: block_root,
|
||||||
|
state: state_root,
|
||||||
|
current_duty_dependent_root,
|
||||||
|
previous_duty_dependent_root,
|
||||||
|
epoch_transition: is_epoch_transition,
|
||||||
|
execution_optimistic: new_head_is_optimistic,
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
(Err(e), _) | (_, Err(e)) => {
|
||||||
|
warn!(
|
||||||
|
error = ?e,
|
||||||
|
"Unable to find dependent roots, cannot register head event"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!(?block_root, "Early attester block missing");
|
warn!(?block_root, "Early attester block missing");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ use crate::{
|
|||||||
metrics,
|
metrics,
|
||||||
validator_monitor::get_slot_delay_ms,
|
validator_monitor::get_slot_delay_ms,
|
||||||
};
|
};
|
||||||
use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead};
|
use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseLateHead};
|
||||||
use fork_choice::{
|
use fork_choice::{
|
||||||
ExecutionStatus, ForkChoiceStore, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock,
|
ExecutionStatus, ForkChoiceStore, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock,
|
||||||
ResetPayloadStatuses,
|
ResetPayloadStatuses,
|
||||||
@@ -824,15 +824,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
.slot()
|
.slot()
|
||||||
.epoch(T::EthSpec::slots_per_epoch());
|
.epoch(T::EthSpec::slots_per_epoch());
|
||||||
|
|
||||||
// These fields are used for server-sent events.
|
// This field is used for server-sent events.
|
||||||
let state_root = new_snapshot.beacon_state_root();
|
|
||||||
let head_slot = new_snapshot.beacon_state.slot();
|
let head_slot = new_snapshot.beacon_state.slot();
|
||||||
let dependent_root = new_snapshot
|
|
||||||
.beacon_state
|
|
||||||
.attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Next);
|
|
||||||
let prev_dependent_root = new_snapshot
|
|
||||||
.beacon_state
|
|
||||||
.attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current);
|
|
||||||
|
|
||||||
match BlockShufflingIds::try_from_head(
|
match BlockShufflingIds::try_from_head(
|
||||||
new_snapshot.beacon_block_root,
|
new_snapshot.beacon_block_root,
|
||||||
@@ -870,33 +863,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
|
|||||||
self.op_pool.prune_attestations(self.epoch()?);
|
self.op_pool.prune_attestations(self.epoch()?);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register server-sent-events for a new head.
|
|
||||||
if let Some(event_handler) = self
|
|
||||||
.event_handler
|
|
||||||
.as_ref()
|
|
||||||
.filter(|handler| handler.has_head_subscribers())
|
|
||||||
{
|
|
||||||
match (dependent_root, prev_dependent_root) {
|
|
||||||
(Ok(current_duty_dependent_root), Ok(previous_duty_dependent_root)) => {
|
|
||||||
event_handler.register(EventKind::Head(SseHead {
|
|
||||||
slot: head_slot,
|
|
||||||
block: new_snapshot.beacon_block_root,
|
|
||||||
state: state_root,
|
|
||||||
current_duty_dependent_root,
|
|
||||||
previous_duty_dependent_root,
|
|
||||||
epoch_transition: is_epoch_transition,
|
|
||||||
execution_optimistic: new_head_is_optimistic,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
(Err(e), _) | (_, Err(e)) => {
|
|
||||||
warn!(
|
|
||||||
error = ?e,
|
|
||||||
"Unable to find dependent roots, cannot register head event"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register a server-sent-event for a reorg (if necessary).
|
// Register a server-sent-event for a reorg (if necessary).
|
||||||
if let Some(depth) = reorg_distance
|
if let Some(depth) = reorg_distance
|
||||||
&& let Some(event_handler) = self
|
&& let Some(event_handler) = self
|
||||||
|
|||||||
@@ -213,3 +213,45 @@ async fn data_column_sidecar_event_on_process_rpc_columns() {
|
|||||||
EventKind::DataColumnSidecar(expected_sse_data_column)
|
EventKind::DataColumnSidecar(expected_sse_data_column)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Verifies that a head event is emitted when a block is imported and becomes the head.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn head_event_on_block_import() {
|
||||||
|
let spec = Arc::new(test_spec::<E>());
|
||||||
|
let harness = BeaconChainHarness::builder(E::default())
|
||||||
|
.spec(spec.clone())
|
||||||
|
.deterministic_keypairs(8)
|
||||||
|
.fresh_ephemeral_store()
|
||||||
|
.mock_execution_layer()
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Subscribe to head events before importing the block
|
||||||
|
let event_handler = harness.chain.event_handler.as_ref().unwrap();
|
||||||
|
let mut head_event_receiver = event_handler.subscribe_head();
|
||||||
|
|
||||||
|
// Build and process a block that will become the new head
|
||||||
|
let head_state = harness.get_current_state();
|
||||||
|
let target_slot = head_state.slot() + 1;
|
||||||
|
harness.advance_slot();
|
||||||
|
let ((signed_block, blobs), _) = harness.make_block(head_state, target_slot).await;
|
||||||
|
|
||||||
|
let block_root = signed_block.canonical_root();
|
||||||
|
let state_root = signed_block.message().state_root();
|
||||||
|
|
||||||
|
harness
|
||||||
|
.process_block(target_slot, block_root, (signed_block, blobs))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Verify the head event was emitted with correct data
|
||||||
|
let head_event = head_event_receiver.try_recv().unwrap();
|
||||||
|
if let EventKind::Head(sse_head) = head_event {
|
||||||
|
assert_eq!(sse_head.slot, target_slot);
|
||||||
|
assert_eq!(sse_head.block, block_root);
|
||||||
|
assert_eq!(sse_head.state, state_root);
|
||||||
|
// execution_optimistic should be false since we're using mock execution layer
|
||||||
|
assert!(!sse_head.execution_optimistic);
|
||||||
|
} else {
|
||||||
|
panic!("Expected Head event, got {:?}", head_event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -6539,8 +6539,13 @@ impl ApiTester {
|
|||||||
block_events.as_slice(),
|
block_events.as_slice(),
|
||||||
&[
|
&[
|
||||||
expected_gossip,
|
expected_gossip,
|
||||||
expected_block,
|
// SSE `Head`` event is now emitted before `Block` event, because we only emit the block event
|
||||||
|
// after it's persisted to the database. We could consider changing this later, but
|
||||||
|
// we might have to serve http API requests for blocks from early_attester_cache
|
||||||
|
// before they're persisted to the database.
|
||||||
|
// https://github.com/sigp/lighthouse/pull/8718#issuecomment-3815593310
|
||||||
expected_head,
|
expected_head,
|
||||||
|
expected_block,
|
||||||
expected_finalized
|
expected_finalized
|
||||||
]
|
]
|
||||||
);
|
);
|
||||||
@@ -6797,7 +6802,12 @@ impl ApiTester {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let block_events = poll_events(&mut events_future, 2, Duration::from_millis(10000)).await;
|
let block_events = poll_events(&mut events_future, 2, Duration::from_millis(10000)).await;
|
||||||
assert_eq!(block_events.as_slice(), &[expected_block, expected_head]);
|
// SSE `Head`` event is now emitted before `Block` event, because we only emit the block event
|
||||||
|
// after it's persisted to the database. We could consider changing this later, but
|
||||||
|
// we might have to serve http API requests for blocks from early_attester_cache
|
||||||
|
// before they're persisted to the database.
|
||||||
|
// https://github.com/sigp/lighthouse/pull/8718#issuecomment-3815593310
|
||||||
|
assert_eq!(block_events.as_slice(), &[expected_head, expected_block]);
|
||||||
|
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user