From 2388accc7846bd79e3f17b9fdf6093183a15f410 Mon Sep 17 00:00:00 2001 From: Devnet Bot Date: Wed, 6 May 2026 07:49:02 +0000 Subject: [PATCH] feat(focil): payload envelope event trigger for IL service Add a payload envelope monitor that subscribes to the ExecutionPayloadAvailable SSE event (following the existing beacon_head_monitor pattern). The inclusion list service now races the IL deadline (66.67% into slot) against the payload envelope event, matching Lodestar's Promise.race approach. This ensures IL production fires as soon as the envelope is imported (when the EL has fresh state) rather than at a fixed offset that may be too early or too late. --- .../beacon_node_fallback/src/lib.rs | 44 ++++++++ .../src/payload_envelope_monitor.rs | 100 ++++++++++++++++++ validator_client/src/lib.rs | 10 +- .../src/inclusion_list_service.rs | 91 +++++++++++++++- 4 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 2cd7db557c..59d3368d34 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -4,8 +4,12 @@ pub mod beacon_head_monitor; pub mod beacon_node_health; +pub mod payload_envelope_monitor; use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes}; +use payload_envelope_monitor::{ + PayloadEnvelopeEvent, poll_payload_envelope_event_from_beacon_nodes, +}; use beacon_node_health::{ BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic, SyncDistanceTier, check_node_health, @@ -99,6 +103,35 @@ pub fn start_fallback_updater_service( executor.spawn(head_monitor_future, "head_monitoring"); } + // Start the payload envelope monitoring service if configured. + let beacon_nodes_ref2 = beacon_nodes.clone(); + if beacon_nodes_ref2.payload_envelope_send.is_some() { + let payload_envelope_future = async move { + loop { + if let Err(error) = + poll_payload_envelope_event_from_beacon_nodes::( + beacon_nodes_ref2.clone(), + ) + .await + { + warn!( + error, + "Payload envelope service failed, retrying next slot" + ); + + let sleep_time = beacon_nodes_ref2 + .slot_clock + .as_ref() + .and_then(|slot_clock| slot_clock.duration_to_next_slot()) + .unwrap_or_else(|| beacon_nodes_ref2.spec.get_slot_duration()); + sleep(sleep_time).await + } + } + }; + + executor.spawn(payload_envelope_future, "payload_envelope_monitoring"); + } + let future = async move { loop { beacon_nodes.update_all_candidates::().await; @@ -423,6 +456,7 @@ pub struct BeaconNodeFallback { slot_clock: Option, beacon_head_cache: Option>, head_monitor_send: Option>>, + pub payload_envelope_send: Option>>, broadcast_topics: Vec, spec: Arc, } @@ -441,6 +475,7 @@ impl BeaconNodeFallback { slot_clock: None, beacon_head_cache: None, head_monitor_send: None, + payload_envelope_send: None, broadcast_topics, spec, } @@ -464,6 +499,15 @@ impl BeaconNodeFallback { self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new())); } + /// Sets the payload envelope monitor channel that streams `ExecutionPayloadAvailable` + /// events from all the beacon nodes that the validator client is connected to. + pub fn set_payload_envelope_send( + &mut self, + payload_envelope_send: Arc>, + ) { + self.payload_envelope_send = Some(payload_envelope_send); + } + /// The count of candidates, regardless of their state. pub async fn num_total(&self) -> usize { self.candidates.read().await.len() diff --git a/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs b/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs new file mode 100644 index 0000000000..be8c00da44 --- /dev/null +++ b/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs @@ -0,0 +1,100 @@ +use crate::BeaconNodeFallback; +use eth2::types::{EventKind, EventTopic, Hash256}; +use futures::StreamExt; +use slot_clock::SlotClock; +use std::sync::Arc; +use tracing::{debug, info, warn}; +use types::{EthSpec, Slot}; + +/// Event emitted when an execution payload envelope becomes available for a slot. +#[derive(Debug, Clone)] +pub struct PayloadEnvelopeEvent { + pub slot: Slot, + pub block_root: Hash256, +} + +/// Runs a non-terminating loop that subscribes to `ExecutionPayloadAvailable` SSE events +/// from all connected beacon nodes and forwards them over an mpsc channel. +/// +/// This follows the same pattern as `poll_head_event_from_beacon_nodes`. +pub async fn poll_payload_envelope_event_from_beacon_nodes( + beacon_nodes: Arc>, +) -> Result<(), String> { + let payload_envelope_send = beacon_nodes + .payload_envelope_send + .clone() + .ok_or("Unable to start payload envelope monitor without payload_envelope_send")?; + + info!("Starting payload envelope monitoring service"); + let candidates = { + let candidates_guard = beacon_nodes.candidates.read().await; + candidates_guard.clone() + }; + + // Create Vec of streams, which we will select over. + let mut streams = vec![]; + + for candidate in &candidates { + let event_stream = candidate + .beacon_node + .get_events::(&[EventTopic::ExecutionPayloadAvailable]) + .await; + + let event_stream = match event_stream { + Ok(stream) => stream, + Err(e) => { + warn!(error = ?e, node_index = candidate.index, "Failed to get execution payload available event stream"); + continue; + } + }; + + streams.push(event_stream.map(|event| (candidate.index, event))); + } + + if streams.is_empty() { + return Err( + "No beacon nodes available for execution payload available event streaming".to_string(), + ); + } + + // Combine streams into a single stream and poll events from any of them. + let mut combined_stream = futures::stream::select_all(streams); + + while let Some((candidate_index, event_result)) = combined_stream.next().await { + match event_result { + Ok(EventKind::ExecutionPayloadAvailable(payload_event)) => { + debug!( + candidate_index, + block_root = ?payload_event.block_root, + slot = %payload_event.slot, + "Execution payload available from beacon node" + ); + + if payload_envelope_send + .send(PayloadEnvelopeEvent { + slot: payload_event.slot, + block_root: payload_event.block_root, + }) + .await + .is_err() + { + return Err("Payload envelope monitoring service channel closed".into()); + } + } + Ok(event) => { + warn!( + event_kind = event.topic_name(), + candidate_index, "Received unexpected event from BN in payload envelope monitor" + ); + continue; + } + Err(e) => { + return Err(format!( + "Payload envelope monitoring stream error, node: {candidate_index}, error: {e:?}" + )); + } + } + } + + Err("Payload envelope stream ended unexpectedly".into()) +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6f0163a7b6..fbec0d56c3 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -14,7 +14,7 @@ use tokio::sync::Mutex; use account_utils::validator_definitions::ValidatorDefinitions; use beacon_node_fallback::{ BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent, - start_fallback_updater_service, + payload_envelope_monitor::PayloadEnvelopeEvent, start_fallback_updater_service, }; use clap::ArgMatches; use doppelganger_service::DoppelgangerService; @@ -418,6 +418,13 @@ impl ProductionValidatorClient { None }; + // Create the payload envelope monitor channel for the inclusion list service. + // This allows the IL service to fire early when a payload envelope is available. + let (payload_envelope_tx, payload_envelope_rx) = + mpsc::channel::(MAX_HEAD_EVENT_QUEUE_LEN); + beacon_nodes.set_payload_envelope_send(Arc::new(payload_envelope_tx)); + let payload_envelope_rx = Some(Mutex::new(payload_envelope_rx)); + let beacon_nodes = Arc::new(beacon_nodes); start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?; @@ -564,6 +571,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) .chain_spec(context.eth2_config.spec.clone()) + .payload_envelope_rx(payload_envelope_rx) // TODO(focil) make config driven .disable(false) .build()?; diff --git a/validator_client/validator_services/src/inclusion_list_service.rs b/validator_client/validator_services/src/inclusion_list_service.rs index 5cf9bd79f7..e6fd5c303f 100644 --- a/validator_client/validator_services/src/inclusion_list_service.rs +++ b/validator_client/validator_services/src/inclusion_list_service.rs @@ -1,11 +1,14 @@ use crate::duties_service::DutiesService; -use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use beacon_node_fallback::{ + ApiTopic, BeaconNodeFallback, payload_envelope_monitor::PayloadEnvelopeEvent, +}; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::sync::{Mutex, mpsc}; use tokio::time::{Duration, sleep}; use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec, InclusionList, InclusionListDuty, Slot, Transactions}; @@ -20,6 +23,7 @@ pub struct InclusionListServiceBuilder>>, executor: Option, chain_spec: Option>, + payload_envelope_rx: Option>>, disable: bool, } @@ -32,6 +36,7 @@ impl InclusionListServiceBuilder InclusionListServiceBuilder>>, + ) -> Self { + self.payload_envelope_rx = payload_envelope_rx; + self + } + pub fn disable(mut self, disable: bool) -> Self { self.disable = disable; self @@ -92,6 +105,7 @@ impl InclusionListServiceBuilder { // TODO(focil) #[allow(dead_code)] chain_spec: Arc, + payload_envelope_rx: Option>>, #[allow(dead_code)] disable: bool, } @@ -157,7 +172,22 @@ impl InclusionListService InclusionListService { + debug!( + slot = %target_slot, + "IL deadline reached without payload envelope event" + ); + return; + } + event = rx_guard.recv() => { + match event { + Some(envelope_event) => { + if envelope_event.slot == target_slot { + debug!( + slot = %target_slot, + block_root = ?envelope_event.block_root, + "Payload envelope received for target slot, triggering IL production" + ); + return; + } else { + // Stale event for a different slot, keep waiting + debug!( + event_slot = %envelope_event.slot, + target_slot = %target_slot, + "Ignoring payload envelope event for non-target slot" + ); + continue; + } + } + None => { + // Channel closed, fall back to deadline + warn!("Payload envelope channel closed, falling back to deadline"); + deadline.await; + return; + } + } + } + } + } + } + /// Spawn a task that downloads, signs and uploads the inclusion lists to the beacon node. // TODO(focil) I don't think we need `slot_duration` here, unless we need to make some calculation // related to the freeze deadline.