mirror of
https://github.com/sigp/lighthouse.git
synced 2026-05-30 20:57:10 +00:00
feat(focil): payload envelope event trigger for IL service
Add a payload envelope monitor that subscribes to the ExecutionPayloadAvailable SSE event (following the existing beacon_head_monitor pattern). The inclusion list service now races the IL deadline (66.67% into slot) against the payload envelope event, matching Lodestar's Promise.race approach. This ensures IL production fires as soon as the envelope is imported (when the EL has fresh state) rather than at a fixed offset that may be too early or too late.
This commit is contained in:
@@ -4,8 +4,12 @@
|
|||||||
|
|
||||||
pub mod beacon_head_monitor;
|
pub mod beacon_head_monitor;
|
||||||
pub mod beacon_node_health;
|
pub mod beacon_node_health;
|
||||||
|
pub mod payload_envelope_monitor;
|
||||||
|
|
||||||
use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes};
|
use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes};
|
||||||
|
use payload_envelope_monitor::{
|
||||||
|
PayloadEnvelopeEvent, poll_payload_envelope_event_from_beacon_nodes,
|
||||||
|
};
|
||||||
use beacon_node_health::{
|
use beacon_node_health::{
|
||||||
BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic,
|
BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic,
|
||||||
SyncDistanceTier, check_node_health,
|
SyncDistanceTier, check_node_health,
|
||||||
@@ -99,6 +103,35 @@ pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
|
|||||||
executor.spawn(head_monitor_future, "head_monitoring");
|
executor.spawn(head_monitor_future, "head_monitoring");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start the payload envelope monitoring service if configured.
|
||||||
|
let beacon_nodes_ref2 = beacon_nodes.clone();
|
||||||
|
if beacon_nodes_ref2.payload_envelope_send.is_some() {
|
||||||
|
let payload_envelope_future = async move {
|
||||||
|
loop {
|
||||||
|
if let Err(error) =
|
||||||
|
poll_payload_envelope_event_from_beacon_nodes::<E, T>(
|
||||||
|
beacon_nodes_ref2.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!(
|
||||||
|
error,
|
||||||
|
"Payload envelope service failed, retrying next slot"
|
||||||
|
);
|
||||||
|
|
||||||
|
let sleep_time = beacon_nodes_ref2
|
||||||
|
.slot_clock
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|slot_clock| slot_clock.duration_to_next_slot())
|
||||||
|
.unwrap_or_else(|| beacon_nodes_ref2.spec.get_slot_duration());
|
||||||
|
sleep(sleep_time).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
executor.spawn(payload_envelope_future, "payload_envelope_monitoring");
|
||||||
|
}
|
||||||
|
|
||||||
let future = async move {
|
let future = async move {
|
||||||
loop {
|
loop {
|
||||||
beacon_nodes.update_all_candidates::<E>().await;
|
beacon_nodes.update_all_candidates::<E>().await;
|
||||||
@@ -423,6 +456,7 @@ pub struct BeaconNodeFallback<T> {
|
|||||||
slot_clock: Option<T>,
|
slot_clock: Option<T>,
|
||||||
beacon_head_cache: Option<Arc<BeaconHeadCache>>,
|
beacon_head_cache: Option<Arc<BeaconHeadCache>>,
|
||||||
head_monitor_send: Option<Arc<mpsc::Sender<HeadEvent>>>,
|
head_monitor_send: Option<Arc<mpsc::Sender<HeadEvent>>>,
|
||||||
|
pub payload_envelope_send: Option<Arc<mpsc::Sender<PayloadEnvelopeEvent>>>,
|
||||||
broadcast_topics: Vec<ApiTopic>,
|
broadcast_topics: Vec<ApiTopic>,
|
||||||
spec: Arc<ChainSpec>,
|
spec: Arc<ChainSpec>,
|
||||||
}
|
}
|
||||||
@@ -441,6 +475,7 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
|
|||||||
slot_clock: None,
|
slot_clock: None,
|
||||||
beacon_head_cache: None,
|
beacon_head_cache: None,
|
||||||
head_monitor_send: None,
|
head_monitor_send: None,
|
||||||
|
payload_envelope_send: None,
|
||||||
broadcast_topics,
|
broadcast_topics,
|
||||||
spec,
|
spec,
|
||||||
}
|
}
|
||||||
@@ -464,6 +499,15 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
|
|||||||
self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new()));
|
self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Sets the payload envelope monitor channel that streams `ExecutionPayloadAvailable`
|
||||||
|
/// events from all the beacon nodes that the validator client is connected to.
|
||||||
|
pub fn set_payload_envelope_send(
|
||||||
|
&mut self,
|
||||||
|
payload_envelope_send: Arc<mpsc::Sender<PayloadEnvelopeEvent>>,
|
||||||
|
) {
|
||||||
|
self.payload_envelope_send = Some(payload_envelope_send);
|
||||||
|
}
|
||||||
|
|
||||||
/// The count of candidates, regardless of their state.
|
/// The count of candidates, regardless of their state.
|
||||||
pub async fn num_total(&self) -> usize {
|
pub async fn num_total(&self) -> usize {
|
||||||
self.candidates.read().await.len()
|
self.candidates.read().await.len()
|
||||||
|
|||||||
@@ -0,0 +1,100 @@
|
|||||||
|
use crate::BeaconNodeFallback;
|
||||||
|
use eth2::types::{EventKind, EventTopic, Hash256};
|
||||||
|
use futures::StreamExt;
|
||||||
|
use slot_clock::SlotClock;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
|
use types::{EthSpec, Slot};
|
||||||
|
|
||||||
|
/// Event emitted when an execution payload envelope becomes available for a slot.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct PayloadEnvelopeEvent {
|
||||||
|
pub slot: Slot,
|
||||||
|
pub block_root: Hash256,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a non-terminating loop that subscribes to `ExecutionPayloadAvailable` SSE events
|
||||||
|
/// from all connected beacon nodes and forwards them over an mpsc channel.
|
||||||
|
///
|
||||||
|
/// This follows the same pattern as `poll_head_event_from_beacon_nodes`.
|
||||||
|
pub async fn poll_payload_envelope_event_from_beacon_nodes<E: EthSpec, T: SlotClock + 'static>(
|
||||||
|
beacon_nodes: Arc<BeaconNodeFallback<T>>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let payload_envelope_send = beacon_nodes
|
||||||
|
.payload_envelope_send
|
||||||
|
.clone()
|
||||||
|
.ok_or("Unable to start payload envelope monitor without payload_envelope_send")?;
|
||||||
|
|
||||||
|
info!("Starting payload envelope monitoring service");
|
||||||
|
let candidates = {
|
||||||
|
let candidates_guard = beacon_nodes.candidates.read().await;
|
||||||
|
candidates_guard.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create Vec of streams, which we will select over.
|
||||||
|
let mut streams = vec![];
|
||||||
|
|
||||||
|
for candidate in &candidates {
|
||||||
|
let event_stream = candidate
|
||||||
|
.beacon_node
|
||||||
|
.get_events::<E>(&[EventTopic::ExecutionPayloadAvailable])
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let event_stream = match event_stream {
|
||||||
|
Ok(stream) => stream,
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = ?e, node_index = candidate.index, "Failed to get execution payload available event stream");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
streams.push(event_stream.map(|event| (candidate.index, event)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if streams.is_empty() {
|
||||||
|
return Err(
|
||||||
|
"No beacon nodes available for execution payload available event streaming".to_string(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Combine streams into a single stream and poll events from any of them.
|
||||||
|
let mut combined_stream = futures::stream::select_all(streams);
|
||||||
|
|
||||||
|
while let Some((candidate_index, event_result)) = combined_stream.next().await {
|
||||||
|
match event_result {
|
||||||
|
Ok(EventKind::ExecutionPayloadAvailable(payload_event)) => {
|
||||||
|
debug!(
|
||||||
|
candidate_index,
|
||||||
|
block_root = ?payload_event.block_root,
|
||||||
|
slot = %payload_event.slot,
|
||||||
|
"Execution payload available from beacon node"
|
||||||
|
);
|
||||||
|
|
||||||
|
if payload_envelope_send
|
||||||
|
.send(PayloadEnvelopeEvent {
|
||||||
|
slot: payload_event.slot,
|
||||||
|
block_root: payload_event.block_root,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
return Err("Payload envelope monitoring service channel closed".into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(event) => {
|
||||||
|
warn!(
|
||||||
|
event_kind = event.topic_name(),
|
||||||
|
candidate_index, "Received unexpected event from BN in payload envelope monitor"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(format!(
|
||||||
|
"Payload envelope monitoring stream error, node: {candidate_index}, error: {e:?}"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err("Payload envelope stream ended unexpectedly".into())
|
||||||
|
}
|
||||||
@@ -14,7 +14,7 @@ use tokio::sync::Mutex;
|
|||||||
use account_utils::validator_definitions::ValidatorDefinitions;
|
use account_utils::validator_definitions::ValidatorDefinitions;
|
||||||
use beacon_node_fallback::{
|
use beacon_node_fallback::{
|
||||||
BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent,
|
BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent,
|
||||||
start_fallback_updater_service,
|
payload_envelope_monitor::PayloadEnvelopeEvent, start_fallback_updater_service,
|
||||||
};
|
};
|
||||||
use clap::ArgMatches;
|
use clap::ArgMatches;
|
||||||
use doppelganger_service::DoppelgangerService;
|
use doppelganger_service::DoppelgangerService;
|
||||||
@@ -418,6 +418,13 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Create the payload envelope monitor channel for the inclusion list service.
|
||||||
|
// This allows the IL service to fire early when a payload envelope is available.
|
||||||
|
let (payload_envelope_tx, payload_envelope_rx) =
|
||||||
|
mpsc::channel::<PayloadEnvelopeEvent>(MAX_HEAD_EVENT_QUEUE_LEN);
|
||||||
|
beacon_nodes.set_payload_envelope_send(Arc::new(payload_envelope_tx));
|
||||||
|
let payload_envelope_rx = Some(Mutex::new(payload_envelope_rx));
|
||||||
|
|
||||||
let beacon_nodes = Arc::new(beacon_nodes);
|
let beacon_nodes = Arc::new(beacon_nodes);
|
||||||
start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?;
|
start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?;
|
||||||
|
|
||||||
@@ -564,6 +571,7 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
|
|||||||
.beacon_nodes(beacon_nodes.clone())
|
.beacon_nodes(beacon_nodes.clone())
|
||||||
.executor(context.executor.clone())
|
.executor(context.executor.clone())
|
||||||
.chain_spec(context.eth2_config.spec.clone())
|
.chain_spec(context.eth2_config.spec.clone())
|
||||||
|
.payload_envelope_rx(payload_envelope_rx)
|
||||||
// TODO(focil) make config driven
|
// TODO(focil) make config driven
|
||||||
.disable(false)
|
.disable(false)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|||||||
@@ -1,11 +1,14 @@
|
|||||||
use crate::duties_service::DutiesService;
|
use crate::duties_service::DutiesService;
|
||||||
use beacon_node_fallback::{ApiTopic, BeaconNodeFallback};
|
use beacon_node_fallback::{
|
||||||
|
ApiTopic, BeaconNodeFallback, payload_envelope_monitor::PayloadEnvelopeEvent,
|
||||||
|
};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use logging::crit;
|
use logging::crit;
|
||||||
use slot_clock::SlotClock;
|
use slot_clock::SlotClock;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use task_executor::TaskExecutor;
|
use task_executor::TaskExecutor;
|
||||||
|
use tokio::sync::{Mutex, mpsc};
|
||||||
use tokio::time::{Duration, sleep};
|
use tokio::time::{Duration, sleep};
|
||||||
use tracing::{debug, error, info, trace, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
use types::{ChainSpec, EthSpec, InclusionList, InclusionListDuty, Slot, Transactions};
|
use types::{ChainSpec, EthSpec, InclusionList, InclusionListDuty, Slot, Transactions};
|
||||||
@@ -20,6 +23,7 @@ pub struct InclusionListServiceBuilder<S: ValidatorStore, T: SlotClock + 'static
|
|||||||
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
|
beacon_nodes: Option<Arc<BeaconNodeFallback<T>>>,
|
||||||
executor: Option<TaskExecutor>,
|
executor: Option<TaskExecutor>,
|
||||||
chain_spec: Option<Arc<ChainSpec>>,
|
chain_spec: Option<Arc<ChainSpec>>,
|
||||||
|
payload_envelope_rx: Option<Mutex<mpsc::Receiver<PayloadEnvelopeEvent>>>,
|
||||||
disable: bool,
|
disable: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -32,6 +36,7 @@ impl<S: ValidatorStore, T: SlotClock + 'static> InclusionListServiceBuilder<S, T
|
|||||||
beacon_nodes: None,
|
beacon_nodes: None,
|
||||||
executor: None,
|
executor: None,
|
||||||
chain_spec: None,
|
chain_spec: None,
|
||||||
|
payload_envelope_rx: None,
|
||||||
disable: true,
|
disable: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -66,6 +71,14 @@ impl<S: ValidatorStore, T: SlotClock + 'static> InclusionListServiceBuilder<S, T
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn payload_envelope_rx(
|
||||||
|
mut self,
|
||||||
|
payload_envelope_rx: Option<Mutex<mpsc::Receiver<PayloadEnvelopeEvent>>>,
|
||||||
|
) -> Self {
|
||||||
|
self.payload_envelope_rx = payload_envelope_rx;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub fn disable(mut self, disable: bool) -> Self {
|
pub fn disable(mut self, disable: bool) -> Self {
|
||||||
self.disable = disable;
|
self.disable = disable;
|
||||||
self
|
self
|
||||||
@@ -92,6 +105,7 @@ impl<S: ValidatorStore, T: SlotClock + 'static> InclusionListServiceBuilder<S, T
|
|||||||
chain_spec: self
|
chain_spec: self
|
||||||
.chain_spec
|
.chain_spec
|
||||||
.ok_or("Cannot build AttestationService without chain_spec")?,
|
.ok_or("Cannot build AttestationService without chain_spec")?,
|
||||||
|
payload_envelope_rx: self.payload_envelope_rx,
|
||||||
disable: self.disable,
|
disable: self.disable,
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
@@ -108,6 +122,7 @@ pub struct Inner<S, T> {
|
|||||||
// TODO(focil)
|
// TODO(focil)
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
chain_spec: Arc<ChainSpec>,
|
chain_spec: Arc<ChainSpec>,
|
||||||
|
payload_envelope_rx: Option<Mutex<mpsc::Receiver<PayloadEnvelopeEvent>>>,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
disable: bool,
|
disable: bool,
|
||||||
}
|
}
|
||||||
@@ -157,7 +172,22 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> InclusionListService<S
|
|||||||
// Wait until the start of the next slot, then sleep to
|
// Wait until the start of the next slot, then sleep to
|
||||||
// inclusion_list_due_bps fraction into that slot.
|
// inclusion_list_due_bps fraction into that slot.
|
||||||
let il_offset = slot_duration * il_due_fraction / 10000;
|
let il_offset = slot_duration * il_due_fraction / 10000;
|
||||||
sleep(duration_to_next_slot + il_offset).await;
|
|
||||||
|
// Compute the target slot (the slot we'll be in after waiting)
|
||||||
|
let target_slot = self
|
||||||
|
.slot_clock
|
||||||
|
.now()
|
||||||
|
.map(|s| s + 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
// Race: wait for either the IL deadline or a payload envelope event
|
||||||
|
// for the target slot.
|
||||||
|
self.wait_for_il_trigger(
|
||||||
|
duration_to_next_slot + il_offset,
|
||||||
|
target_slot,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
if let Err(e) = self.spawn_inclusion_list_task(slot_duration, &chain_spec) {
|
if let Err(e) = self.spawn_inclusion_list_task(slot_duration, &chain_spec) {
|
||||||
crit!(
|
crit!(
|
||||||
error = ?e,
|
error = ?e,
|
||||||
@@ -179,6 +209,63 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> InclusionListService<S
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Waits for either the IL deadline timeout or a payload envelope event for the target slot.
|
||||||
|
/// If no payload_envelope_rx is configured, falls back to just sleeping the full duration.
|
||||||
|
async fn wait_for_il_trigger(&self, timeout_duration: Duration, target_slot: Slot) {
|
||||||
|
let Some(receiver) = &self.payload_envelope_rx else {
|
||||||
|
// No payload envelope monitor configured, just sleep
|
||||||
|
sleep(timeout_duration).await;
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut rx_guard = receiver.lock().await;
|
||||||
|
|
||||||
|
// Use select! to race between the timeout and receiving a payload envelope event
|
||||||
|
// for the target slot.
|
||||||
|
let deadline = sleep(timeout_duration);
|
||||||
|
tokio::pin!(deadline);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = &mut deadline => {
|
||||||
|
debug!(
|
||||||
|
slot = %target_slot,
|
||||||
|
"IL deadline reached without payload envelope event"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
event = rx_guard.recv() => {
|
||||||
|
match event {
|
||||||
|
Some(envelope_event) => {
|
||||||
|
if envelope_event.slot == target_slot {
|
||||||
|
debug!(
|
||||||
|
slot = %target_slot,
|
||||||
|
block_root = ?envelope_event.block_root,
|
||||||
|
"Payload envelope received for target slot, triggering IL production"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
// Stale event for a different slot, keep waiting
|
||||||
|
debug!(
|
||||||
|
event_slot = %envelope_event.slot,
|
||||||
|
target_slot = %target_slot,
|
||||||
|
"Ignoring payload envelope event for non-target slot"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Channel closed, fall back to deadline
|
||||||
|
warn!("Payload envelope channel closed, falling back to deadline");
|
||||||
|
deadline.await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn a task that downloads, signs and uploads the inclusion lists to the beacon node.
|
/// Spawn a task that downloads, signs and uploads the inclusion lists to the beacon node.
|
||||||
// TODO(focil) I don't think we need `slot_duration` here, unless we need to make some calculation
|
// TODO(focil) I don't think we need `slot_duration` here, unless we need to make some calculation
|
||||||
// related to the freeze deadline.
|
// related to the freeze deadline.
|
||||||
|
|||||||
Reference in New Issue
Block a user