diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 2cd7db557c..59d3368d34 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -4,8 +4,12 @@ pub mod beacon_head_monitor; pub mod beacon_node_health; +pub mod payload_envelope_monitor; use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes}; +use payload_envelope_monitor::{ + PayloadEnvelopeEvent, poll_payload_envelope_event_from_beacon_nodes, +}; use beacon_node_health::{ BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic, SyncDistanceTier, check_node_health, @@ -99,6 +103,35 @@ pub fn start_fallback_updater_service( executor.spawn(head_monitor_future, "head_monitoring"); } + // Start the payload envelope monitoring service if configured. + let beacon_nodes_ref2 = beacon_nodes.clone(); + if beacon_nodes_ref2.payload_envelope_send.is_some() { + let payload_envelope_future = async move { + loop { + if let Err(error) = + poll_payload_envelope_event_from_beacon_nodes::( + beacon_nodes_ref2.clone(), + ) + .await + { + warn!( + error, + "Payload envelope service failed, retrying next slot" + ); + + let sleep_time = beacon_nodes_ref2 + .slot_clock + .as_ref() + .and_then(|slot_clock| slot_clock.duration_to_next_slot()) + .unwrap_or_else(|| beacon_nodes_ref2.spec.get_slot_duration()); + sleep(sleep_time).await + } + } + }; + + executor.spawn(payload_envelope_future, "payload_envelope_monitoring"); + } + let future = async move { loop { beacon_nodes.update_all_candidates::().await; @@ -423,6 +456,7 @@ pub struct BeaconNodeFallback { slot_clock: Option, beacon_head_cache: Option>, head_monitor_send: Option>>, + pub payload_envelope_send: Option>>, broadcast_topics: Vec, spec: Arc, } @@ -441,6 +475,7 @@ impl BeaconNodeFallback { slot_clock: None, beacon_head_cache: None, head_monitor_send: None, + payload_envelope_send: None, broadcast_topics, spec, } @@ -464,6 +499,15 @@ impl BeaconNodeFallback { self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new())); } + /// Sets the payload envelope monitor channel that streams `ExecutionPayloadAvailable` + /// events from all the beacon nodes that the validator client is connected to. + pub fn set_payload_envelope_send( + &mut self, + payload_envelope_send: Arc>, + ) { + self.payload_envelope_send = Some(payload_envelope_send); + } + /// The count of candidates, regardless of their state. pub async fn num_total(&self) -> usize { self.candidates.read().await.len() diff --git a/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs b/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs new file mode 100644 index 0000000000..be8c00da44 --- /dev/null +++ b/validator_client/beacon_node_fallback/src/payload_envelope_monitor.rs @@ -0,0 +1,100 @@ +use crate::BeaconNodeFallback; +use eth2::types::{EventKind, EventTopic, Hash256}; +use futures::StreamExt; +use slot_clock::SlotClock; +use std::sync::Arc; +use tracing::{debug, info, warn}; +use types::{EthSpec, Slot}; + +/// Event emitted when an execution payload envelope becomes available for a slot. +#[derive(Debug, Clone)] +pub struct PayloadEnvelopeEvent { + pub slot: Slot, + pub block_root: Hash256, +} + +/// Runs a non-terminating loop that subscribes to `ExecutionPayloadAvailable` SSE events +/// from all connected beacon nodes and forwards them over an mpsc channel. +/// +/// This follows the same pattern as `poll_head_event_from_beacon_nodes`. +pub async fn poll_payload_envelope_event_from_beacon_nodes( + beacon_nodes: Arc>, +) -> Result<(), String> { + let payload_envelope_send = beacon_nodes + .payload_envelope_send + .clone() + .ok_or("Unable to start payload envelope monitor without payload_envelope_send")?; + + info!("Starting payload envelope monitoring service"); + let candidates = { + let candidates_guard = beacon_nodes.candidates.read().await; + candidates_guard.clone() + }; + + // Create Vec of streams, which we will select over. + let mut streams = vec![]; + + for candidate in &candidates { + let event_stream = candidate + .beacon_node + .get_events::(&[EventTopic::ExecutionPayloadAvailable]) + .await; + + let event_stream = match event_stream { + Ok(stream) => stream, + Err(e) => { + warn!(error = ?e, node_index = candidate.index, "Failed to get execution payload available event stream"); + continue; + } + }; + + streams.push(event_stream.map(|event| (candidate.index, event))); + } + + if streams.is_empty() { + return Err( + "No beacon nodes available for execution payload available event streaming".to_string(), + ); + } + + // Combine streams into a single stream and poll events from any of them. + let mut combined_stream = futures::stream::select_all(streams); + + while let Some((candidate_index, event_result)) = combined_stream.next().await { + match event_result { + Ok(EventKind::ExecutionPayloadAvailable(payload_event)) => { + debug!( + candidate_index, + block_root = ?payload_event.block_root, + slot = %payload_event.slot, + "Execution payload available from beacon node" + ); + + if payload_envelope_send + .send(PayloadEnvelopeEvent { + slot: payload_event.slot, + block_root: payload_event.block_root, + }) + .await + .is_err() + { + return Err("Payload envelope monitoring service channel closed".into()); + } + } + Ok(event) => { + warn!( + event_kind = event.topic_name(), + candidate_index, "Received unexpected event from BN in payload envelope monitor" + ); + continue; + } + Err(e) => { + return Err(format!( + "Payload envelope monitoring stream error, node: {candidate_index}, error: {e:?}" + )); + } + } + } + + Err("Payload envelope stream ended unexpectedly".into()) +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6f0163a7b6..fbec0d56c3 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -14,7 +14,7 @@ use tokio::sync::Mutex; use account_utils::validator_definitions::ValidatorDefinitions; use beacon_node_fallback::{ BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent, - start_fallback_updater_service, + payload_envelope_monitor::PayloadEnvelopeEvent, start_fallback_updater_service, }; use clap::ArgMatches; use doppelganger_service::DoppelgangerService; @@ -418,6 +418,13 @@ impl ProductionValidatorClient { None }; + // Create the payload envelope monitor channel for the inclusion list service. + // This allows the IL service to fire early when a payload envelope is available. + let (payload_envelope_tx, payload_envelope_rx) = + mpsc::channel::(MAX_HEAD_EVENT_QUEUE_LEN); + beacon_nodes.set_payload_envelope_send(Arc::new(payload_envelope_tx)); + let payload_envelope_rx = Some(Mutex::new(payload_envelope_rx)); + let beacon_nodes = Arc::new(beacon_nodes); start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?; @@ -564,6 +571,7 @@ impl ProductionValidatorClient { .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) .chain_spec(context.eth2_config.spec.clone()) + .payload_envelope_rx(payload_envelope_rx) // TODO(focil) make config driven .disable(false) .build()?; diff --git a/validator_client/validator_services/src/inclusion_list_service.rs b/validator_client/validator_services/src/inclusion_list_service.rs index 5cf9bd79f7..e6fd5c303f 100644 --- a/validator_client/validator_services/src/inclusion_list_service.rs +++ b/validator_client/validator_services/src/inclusion_list_service.rs @@ -1,11 +1,14 @@ use crate::duties_service::DutiesService; -use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use beacon_node_fallback::{ + ApiTopic, BeaconNodeFallback, payload_envelope_monitor::PayloadEnvelopeEvent, +}; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::sync::{Mutex, mpsc}; use tokio::time::{Duration, sleep}; use tracing::{debug, error, info, trace, warn}; use types::{ChainSpec, EthSpec, InclusionList, InclusionListDuty, Slot, Transactions}; @@ -20,6 +23,7 @@ pub struct InclusionListServiceBuilder>>, executor: Option, chain_spec: Option>, + payload_envelope_rx: Option>>, disable: bool, } @@ -32,6 +36,7 @@ impl InclusionListServiceBuilder InclusionListServiceBuilder>>, + ) -> Self { + self.payload_envelope_rx = payload_envelope_rx; + self + } + pub fn disable(mut self, disable: bool) -> Self { self.disable = disable; self @@ -92,6 +105,7 @@ impl InclusionListServiceBuilder { // TODO(focil) #[allow(dead_code)] chain_spec: Arc, + payload_envelope_rx: Option>>, #[allow(dead_code)] disable: bool, } @@ -157,7 +172,22 @@ impl InclusionListService InclusionListService { + debug!( + slot = %target_slot, + "IL deadline reached without payload envelope event" + ); + return; + } + event = rx_guard.recv() => { + match event { + Some(envelope_event) => { + if envelope_event.slot == target_slot { + debug!( + slot = %target_slot, + block_root = ?envelope_event.block_root, + "Payload envelope received for target slot, triggering IL production" + ); + return; + } else { + // Stale event for a different slot, keep waiting + debug!( + event_slot = %envelope_event.slot, + target_slot = %target_slot, + "Ignoring payload envelope event for non-target slot" + ); + continue; + } + } + None => { + // Channel closed, fall back to deadline + warn!("Payload envelope channel closed, falling back to deadline"); + deadline.await; + return; + } + } + } + } + } + } + /// Spawn a task that downloads, signs and uploads the inclusion lists to the beacon node. // TODO(focil) I don't think we need `slot_duration` here, unless we need to make some calculation // related to the freeze deadline.