From bd1966353a873e7e40fe93e6d89d3b19cba60045 Mon Sep 17 00:00:00 2001 From: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Date: Tue, 3 Feb 2026 20:40:16 -0500 Subject: [PATCH] Use events API to eager send attestations (#7892) Co-Authored-By: hopinheimer Co-Authored-By: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Co-Authored-By: Eitan Seri-Levi Co-Authored-By: Michael Sproul Co-Authored-By: Michael Sproul --- beacon_node/http_api/src/lib.rs | 14 +- book/src/help_vc.md | 6 + common/eth2/src/error.rs | 3 + common/eth2/src/lib.rs | 14 +- lighthouse/tests/validator_client.rs | 18 + .../beacon_node_fallback/Cargo.toml | 2 +- .../src/beacon_head_monitor.rs | 423 ++++++++++++++++++ .../beacon_node_fallback/src/lib.rs | 135 +++++- validator_client/src/cli.rs | 11 + validator_client/src/config.rs | 4 + validator_client/src/lib.rs | 25 +- .../src/attestation_service.rs | 202 +++++++-- 12 files changed, 810 insertions(+), 47 deletions(-) create mode 100644 validator_client/beacon_node_fallback/src/beacon_head_monitor.rs diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d7c76eb20..095c52fb29 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3226,7 +3226,19 @@ pub fn serve( let s = futures::stream::select_all(receivers); - Ok(warp::sse::reply(warp::sse::keep_alive().stream(s))) + let response = warp::sse::reply(warp::sse::keep_alive().stream(s)); + + // Set headers to bypass nginx caching and buffering, which breaks realtime + // delivery. + let response = warp::reply::with_header(response, "X-Accel-Buffering", "no"); + let response = warp::reply::with_header(response, "X-Accel-Expires", "0"); + let response = warp::reply::with_header( + response, + "Cache-Control", + "no-cache, no-store, must-revalidate", + ); + + Ok(response) }) }, ); diff --git a/book/src/help_vc.md b/book/src/help_vc.md index 2a9936d1d2..4647780ea8 100644 --- a/book/src/help_vc.md +++ b/book/src/help_vc.md @@ -185,6 +185,12 @@ Flags: If present, do not attempt to discover new validators in the validators-dir. Validators will need to be manually added to the validator_definitions.yml file. + --disable-beacon-head-monitor + Disable the beacon head monitor which tries to attest as soon as any + of the configured beacon nodes sends a head event. Leaving the service + enabled is recommended, but disabling it can lead to reduced bandwidth + and more predictable usage of the primary beacon node (rather than the + fastest BN). --disable-latency-measurement-service Disables the service that periodically attempts to measure latency to BNs. diff --git a/common/eth2/src/error.rs b/common/eth2/src/error.rs index 1f21220b79..671a617c9e 100644 --- a/common/eth2/src/error.rs +++ b/common/eth2/src/error.rs @@ -17,6 +17,8 @@ pub enum Error { #[cfg(feature = "events")] /// The `reqwest_eventsource` client raised an error. SseClient(Box), + #[cfg(feature = "events")] + SseEventSource(reqwest_eventsource::CannotCloneRequestError), /// The server returned an error message where the body was able to be parsed. ServerMessage(ErrorMessage), /// The server returned an error message with an array of errors. @@ -100,6 +102,7 @@ impl Error { None } } + Error::SseEventSource(_) => None, Error::ServerMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::ServerIndexedMessage(msg) => StatusCode::try_from(msg.code).ok(), Error::StatusCode(status) => Some(*status), diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 8746e3c063..10382b028a 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -40,7 +40,7 @@ use reqwest::{ header::{HeaderMap, HeaderValue}, }; #[cfg(feature = "events")] -use reqwest_eventsource::{Event, EventSource}; +use reqwest_eventsource::{Event, RequestBuilderExt}; use serde::{Serialize, de::DeserializeOwned}; use ssz::Encode; use std::fmt; @@ -76,6 +76,8 @@ const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; const HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT: u32 = 4; +// Generally the timeout for events should be longer than a slot. +const HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER: u32 = 50; const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4; /// A struct to define a variety of different timeouts for different validator tasks to ensure @@ -96,6 +98,7 @@ pub struct Timeouts { pub get_debug_beacon_states: Duration, pub get_deposit_snapshot: Duration, pub get_validator_block: Duration, + pub events: Duration, pub default: Duration, } @@ -116,6 +119,7 @@ impl Timeouts { get_debug_beacon_states: timeout, get_deposit_snapshot: timeout, get_validator_block: timeout, + events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * timeout, default: timeout, } } @@ -138,6 +142,7 @@ impl Timeouts { get_debug_beacon_states: base_timeout / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, get_deposit_snapshot: base_timeout / HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT, get_validator_block: base_timeout / HTTP_GET_VALIDATOR_BLOCK_TIMEOUT_QUOTIENT, + events: HTTP_GET_EVENTS_TIMEOUT_MULTIPLIER * base_timeout, default: base_timeout / HTTP_DEFAULT_TIMEOUT_QUOTIENT, } } @@ -2800,7 +2805,12 @@ impl BeaconNodeHttpClient { .join(","); path.query_pairs_mut().append_pair("topics", &topic_string); - let mut es = EventSource::get(path); + let mut es = self + .client + .get(path) + .timeout(self.timeouts.events) + .eventsource() + .map_err(Error::SseEventSource)?; // If we don't await `Event::Open` here, then the consumer // will not get any Message events until they start awaiting the stream. // This is a way to register the stream with the sse server before diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index ee3e910b36..6fd5a6538c 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -758,3 +758,21 @@ fn validator_proposer_nodes() { ); }); } + +// Head monitor is enabled by default. +#[test] +fn head_monitor_default() { + CommandLineTest::new().run().with_config(|config| { + assert!(config.enable_beacon_head_monitor); + }); +} + +#[test] +fn head_monitor_disabled() { + CommandLineTest::new() + .flag("disable-beacon-head-monitor", None) + .run() + .with_config(|config| { + assert!(!config.enable_beacon_head_monitor); + }); +} diff --git a/validator_client/beacon_node_fallback/Cargo.toml b/validator_client/beacon_node_fallback/Cargo.toml index 481aece48b..bc1ac20d44 100644 --- a/validator_client/beacon_node_fallback/Cargo.toml +++ b/validator_client/beacon_node_fallback/Cargo.toml @@ -11,7 +11,7 @@ path = "src/lib.rs" [dependencies] bls = { workspace = true } clap = { workspace = true } -eth2 = { workspace = true } +eth2 = { workspace = true, features = ["events"] } futures = { workspace = true } itertools = { workspace = true } sensitive_url = { workspace = true } diff --git a/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs b/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs new file mode 100644 index 0000000000..bed107d856 --- /dev/null +++ b/validator_client/beacon_node_fallback/src/beacon_head_monitor.rs @@ -0,0 +1,423 @@ +use crate::BeaconNodeFallback; +use eth2::types::{EventKind, EventTopic, Hash256, SseHead}; +use futures::StreamExt; +use slot_clock::SlotClock; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, info, warn}; +use types::EthSpec; + +type CacheHashMap = HashMap; + +// This is used to send the index derived from `CandidateBeaconNode` to the +// `AttestationService` for further processing +#[derive(Debug)] +pub struct HeadEvent { + pub beacon_node_index: usize, + pub slot: types::Slot, + pub beacon_block_root: Hash256, +} + +/// Cache to maintain the latest head received from each of the beacon nodes +/// in the `BeaconNodeFallback`. +#[derive(Debug)] +pub struct BeaconHeadCache { + cache: RwLock, +} + +impl BeaconHeadCache { + /// Creates a new empty beacon head cache. + pub fn new() -> Self { + Self { + cache: RwLock::new(HashMap::new()), + } + } + + /// Retrieves the cached head for a specific beacon node. + /// Returns `None` if no head has been cached for that node yet. + pub async fn get(&self, beacon_node_index: usize) -> Option { + self.cache.read().await.get(&beacon_node_index).cloned() + } + + /// Stores or updates the head event for a specific beacon node. + /// Replaces any previously cached head for the given node. + pub async fn insert(&self, beacon_node_index: usize, head: SseHead) { + self.cache.write().await.insert(beacon_node_index, head); + } + + /// Checks if the given head is the latest among all cached heads. + /// Returns `true` if the head's slot is >= all cached heads' slots. + pub async fn is_latest(&self, head: &SseHead) -> bool { + let cache = self.cache.read().await; + cache + .values() + .all(|cache_head| head.slot >= cache_head.slot) + } + + /// Clears all cached heads, removing entries for all beacon nodes. + /// Useful when beacon node candidates are refreshed to avoid stale references. + pub async fn purge_cache(&self) { + self.cache.write().await.clear(); + } +} + +impl Default for BeaconHeadCache { + fn default() -> Self { + Self::new() + } +} + +// Runs a non-terminating loop to update the `BeaconHeadCache` with the latest head received +// from the candidate beacon_nodes. This is an attempt to stream events to beacon nodes and +// potential start attestation duties earlier as soon as latest head is receive from any of the +// beacon node in contrast to attest at the 1/3rd mark in the slot. +// +// +// The cache and the candidate BNs list are refresh/purged to avoid dangling reference conditions +// that arise due to `update_candidates_list`. +// +// Starts the service to perpetually stream head events from connected beacon_nodes +pub async fn poll_head_event_from_beacon_nodes( + beacon_nodes: Arc>, +) -> Result<(), String> { + let head_cache = beacon_nodes + .beacon_head_cache + .clone() + .ok_or("Unable to start head monitor without beacon_head_cache")?; + let head_monitor_send = beacon_nodes + .head_monitor_send + .clone() + .ok_or("Unable to start head monitor without head_monitor_send")?; + + info!("Starting head monitoring service"); + let candidates = { + let candidates_guard = beacon_nodes.candidates.read().await; + candidates_guard.clone() + }; + + // Clear the cache in case it contains stale data from a previous run. This function gets + // restarted if it fails (see monitoring in `start_fallback_updater_service`). + head_cache.purge_cache().await; + + // Create Vec of streams, which we will select over. + let mut streams = vec![]; + + for candidate in &candidates { + let head_event_stream = candidate + .beacon_node + .get_events::(&[EventTopic::Head]) + .await; + + let head_event_stream = match head_event_stream { + Ok(stream) => stream, + Err(e) => { + warn!(error = ?e, node_index = candidate.index, "Failed to get head event stream"); + continue; + } + }; + + streams.push(head_event_stream.map(|event| (candidate.index, event))); + } + + if streams.is_empty() { + return Err("No beacon nodes available for head event streaming".to_string()); + } + + // Combine streams into a single stream and poll events from any of them. + let mut combined_stream = futures::stream::select_all(streams); + + while let Some((candidate_index, event_result)) = combined_stream.next().await { + match event_result { + Ok(EventKind::Head(head)) => { + debug!( + candidate_index, + block_root = ?head.block, + slot = %head.slot, + "New head from beacon node" + ); + + // Skip optimistic heads - the beacon node can't produce valid + // attestation data when its execution layer is not verified + if head.execution_optimistic { + debug!( + candidate_index, + block_root = ?head.block, + slot = %head.slot, + "Skipping optimistic head" + ); + continue; + } + + head_cache.insert(candidate_index, head.clone()).await; + + if !head_cache.is_latest(&head).await { + debug!( + candidate_index, + block_root = ?head.block, + slot = %head.slot, + "Skipping stale head" + ); + continue; + } + + if head_monitor_send + .send(HeadEvent { + beacon_node_index: candidate_index, + slot: head.slot, + beacon_block_root: head.block, + }) + .await + .is_err() + { + return Err("Head monitoring service channel closed".into()); + } + } + Ok(event) => { + warn!( + event_kind = event.topic_name(), + candidate_index, "Received unexpected event from BN" + ); + continue; + } + Err(e) => { + return Err(format!( + "Head monitoring stream error, node: {candidate_index}, error: {e:?}" + )); + } + } + } + + Err("Stream ended unexpectedly".into()) +} + +#[cfg(test)] +mod tests { + use super::*; + use bls::FixedBytesExtended; + use types::{Hash256, Slot}; + + fn create_sse_head(slot: u64, block_root: u8) -> SseHead { + SseHead { + slot: types::Slot::new(slot), + block: Hash256::from_low_u64_be(block_root as u64), + state: Hash256::from_low_u64_be(block_root as u64), + epoch_transition: false, + previous_duty_dependent_root: Hash256::from_low_u64_be(block_root as u64), + current_duty_dependent_root: Hash256::from_low_u64_be(block_root as u64), + execution_optimistic: false, + } + } + + #[tokio::test] + async fn test_beacon_head_cache_insertion_and_retrieval() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(1, 1); + let head_2 = create_sse_head(2, 2); + + cache.insert(0, head_1.clone()).await; + cache.insert(1, head_2.clone()).await; + + assert_eq!(cache.get(0).await, Some(head_1)); + assert_eq!(cache.get(1).await, Some(head_2)); + assert_eq!(cache.get(2).await, None); + } + + #[tokio::test] + async fn test_beacon_head_cache_update() { + let cache = BeaconHeadCache::new(); + let head_old = create_sse_head(1, 1); + let head_new = create_sse_head(2, 2); + + cache.insert(0, head_old).await; + cache.insert(0, head_new.clone()).await; + + assert_eq!(cache.get(0).await, Some(head_new)); + } + + #[tokio::test] + async fn test_is_latest_with_higher_slot() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(1, 1); + let head_2 = create_sse_head(2, 2); + let head_3 = create_sse_head(3, 3); + + cache.insert(0, head_1).await; + cache.insert(1, head_2).await; + + assert!(cache.is_latest(&head_3).await); + } + + #[tokio::test] + async fn test_is_latest_with_lower_slot() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(1, 1); + let head_2 = create_sse_head(2, 2); + let head_older = create_sse_head(1, 99); + + cache.insert(0, head_1).await; + cache.insert(1, head_2).await; + + assert!(!cache.is_latest(&head_older).await); + } + + #[tokio::test] + async fn test_is_latest_with_equal_slot() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(5, 1); + let head_2 = create_sse_head(5, 2); + let head_equal = create_sse_head(5, 3); + + cache.insert(0, head_1).await; + cache.insert(1, head_2).await; + + assert!(cache.is_latest(&head_equal).await); + } + + #[tokio::test] + async fn test_is_latest_empty_cache() { + let cache = BeaconHeadCache::new(); + let head = create_sse_head(1, 1); + + assert!(cache.is_latest(&head).await); + } + + #[tokio::test] + async fn test_purge_cache_clears_all_entries() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(1, 1); + let head_2 = create_sse_head(2, 2); + + cache.insert(0, head_1).await; + cache.insert(1, head_2).await; + + assert!(cache.get(0).await.is_some()); + assert!(cache.get(1).await.is_some()); + + cache.purge_cache().await; + + assert!(cache.get(0).await.is_none()); + assert!(cache.get(1).await.is_none()); + } + + #[tokio::test] + async fn test_head_event_creation() { + let block_root = Hash256::from_low_u64_be(99); + let event = HeadEvent { + beacon_node_index: 42, + slot: Slot::new(123), + beacon_block_root: block_root, + }; + assert_eq!(event.beacon_node_index, 42); + assert_eq!(event.slot, Slot::new(123)); + assert_eq!(event.beacon_block_root, block_root); + } + + #[tokio::test] + async fn test_cache_caches_multiple_heads_from_different_nodes() { + let cache = BeaconHeadCache::new(); + let head_1 = create_sse_head(10, 1); + let head_2 = create_sse_head(5, 2); + let head_3 = create_sse_head(8, 3); + + cache.insert(0, head_1.clone()).await; + cache.insert(1, head_2.clone()).await; + cache.insert(2, head_3.clone()).await; + + // Verify all are stored + assert_eq!(cache.get(0).await, Some(head_1)); + assert_eq!(cache.get(1).await, Some(head_2)); + assert_eq!(cache.get(2).await, Some(head_3)); + + // The latest should be slot 10 + let head_10 = create_sse_head(10, 99); + assert!(cache.is_latest(&head_10).await); + + // Anything with slot > 10 should be latest + let head_11 = create_sse_head(11, 99); + assert!(cache.is_latest(&head_11).await); + + // Anything with slot < 10 should not be latest + let head_9 = create_sse_head(9, 99); + assert!(!cache.is_latest(&head_9).await); + } + + #[tokio::test] + async fn test_cache_handles_concurrent_operations() { + let cache = Arc::new(BeaconHeadCache::new()); + let mut handles = vec![]; + + // Spawn multiple tasks that insert heads concurrently + for i in 0..10 { + let cache_clone = cache.clone(); + let handle = tokio::spawn(async move { + let head = create_sse_head(i as u64, (i % 256) as u8); + cache_clone.insert(i, head).await; + }); + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + handle.await.unwrap(); + } + + // Verify all heads are cached + for i in 0..10 { + assert!(cache.get(i).await.is_some()); + } + } + + #[tokio::test] + async fn test_is_latest_after_cache_updates() { + let cache = BeaconHeadCache::new(); + + // Start with head at slot 5 + let head_5 = create_sse_head(5, 1); + cache.insert(0, head_5.clone()).await; + assert!(cache.is_latest(&head_5).await); + + // Add a higher slot + let head_10 = create_sse_head(10, 2); + cache.insert(1, head_10.clone()).await; + + // head_5 should no longer be latest + assert!(!cache.is_latest(&head_5).await); + // head_10 should be latest + assert!(cache.is_latest(&head_10).await); + + // Add an even higher slot + let head_15 = create_sse_head(15, 3); + cache.insert(2, head_15.clone()).await; + + // head_10 should no longer be latest + assert!(!cache.is_latest(&head_10).await); + // head_15 should be latest + assert!(cache.is_latest(&head_15).await); + } + + #[tokio::test] + async fn test_cache_default_is_empty() { + let cache = BeaconHeadCache::default(); + assert!(cache.get(0).await.is_none()); + assert!(cache.get(999).await.is_none()); + } + + #[tokio::test] + async fn test_is_latest_with_multiple_same_slot_heads() { + let cache = BeaconHeadCache::new(); + let head_slot_5_node1 = create_sse_head(5, 1); + let head_slot_5_node2 = create_sse_head(5, 2); + let head_slot_5_node3 = create_sse_head(5, 3); + + cache.insert(0, head_slot_5_node1).await; + cache.insert(1, head_slot_5_node2).await; + + // All heads with slot 5 should be considered latest + assert!(cache.is_latest(&head_slot_5_node3).await); + + // But heads with slot 4 should not be latest + let head_slot_4 = create_sse_head(4, 4); + assert!(!cache.is_latest(&head_slot_4).await); + } +} diff --git a/validator_client/beacon_node_fallback/src/lib.rs b/validator_client/beacon_node_fallback/src/lib.rs index 3c20e57200..b36ec70aa3 100644 --- a/validator_client/beacon_node_fallback/src/lib.rs +++ b/validator_client/beacon_node_fallback/src/lib.rs @@ -2,7 +2,10 @@ //! "fallback" behaviour; it will try a request on all of the nodes until one or none of them //! succeed. +pub mod beacon_head_monitor; pub mod beacon_node_health; + +use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes}; use beacon_node_health::{ BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic, SyncDistanceTier, check_node_health, @@ -22,7 +25,10 @@ use std::time::{Duration, Instant}; use std::vec::Vec; use strum::VariantNames; use task_executor::TaskExecutor; -use tokio::{sync::RwLock, time::sleep}; +use tokio::{ + sync::{RwLock, mpsc}, + time::sleep, +}; use tracing::{debug, error, warn}; use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot}; use validator_metrics::{ENDPOINT_ERRORS, ENDPOINT_REQUESTS, inc_counter_vec}; @@ -68,6 +74,31 @@ pub fn start_fallback_updater_service( return Err("Cannot start fallback updater without slot clock"); } + let beacon_nodes_ref = beacon_nodes.clone(); + + // the existence of head_monitor_send is overloaded with the predicate of + // requirement of starting the head monitoring service or not. + if beacon_nodes_ref.head_monitor_send.is_some() { + let head_monitor_future = async move { + loop { + if let Err(error) = + poll_head_event_from_beacon_nodes::(beacon_nodes_ref.clone()).await + { + warn!(error, "Head service failed retrying starting next slot"); + + let sleep_time = beacon_nodes_ref + .slot_clock + .as_ref() + .and_then(|slot_clock| slot_clock.duration_to_next_slot()) + .unwrap_or_else(|| beacon_nodes_ref.spec.get_slot_duration()); + sleep(sleep_time).await + } + } + }; + + executor.spawn(head_monitor_future, "head_monitoring"); + } + let future = async move { loop { beacon_nodes.update_all_candidates::().await; @@ -96,12 +127,15 @@ pub fn start_fallback_updater_service( pub enum Error { /// We attempted to contact the node but it failed. RequestFailed(T), + /// The beacon node with the requested index was not available. + CandidateIndexUnknown(usize), } impl Error { pub fn request_failure(&self) -> Option<&T> { match self { Error::RequestFailed(e) => Some(e), + Error::CandidateIndexUnknown(_) => None, } } } @@ -380,6 +414,8 @@ pub struct BeaconNodeFallback { pub candidates: Arc>>, distance_tiers: BeaconNodeSyncDistanceTiers, slot_clock: Option, + beacon_head_cache: Option>, + head_monitor_send: Option>>, broadcast_topics: Vec, spec: Arc, } @@ -396,6 +432,8 @@ impl BeaconNodeFallback { candidates: Arc::new(RwLock::new(candidates)), distance_tiers, slot_clock: None, + beacon_head_cache: None, + head_monitor_send: None, broadcast_topics, spec, } @@ -410,6 +448,15 @@ impl BeaconNodeFallback { self.slot_clock = Some(slot_clock); } + /// This the head monitor channel that streams events from all the beacon nodes that the + /// validator client is connected in the `BeaconNodeFallback`. This also initializes the + /// beacon_head_cache under the assumption the beacon_head_cache will always be needed when + /// head_monitor_send is set. + pub fn set_head_send(&mut self, head_monitor_send: Arc>) { + self.head_monitor_send = Some(head_monitor_send); + self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new())); + } + /// The count of candidates, regardless of their state. pub async fn num_total(&self) -> usize { self.candidates.read().await.len() @@ -493,6 +540,10 @@ impl BeaconNodeFallback { let mut candidates = self.candidates.write().await; *candidates = new_candidates; + if let Some(cache) = &self.beacon_head_cache { + cache.purge_cache().await; + } + Ok(new_list) } @@ -646,6 +697,32 @@ impl BeaconNodeFallback { Err(Errors(errors)) } + /// Try `func` on a specific beacon node by index. + /// + /// Returns immediately if the preferred node succeeds, otherwise return an error. + pub async fn run_on_candidate_index( + &self, + candidate_index: usize, + func: F, + ) -> Result> + where + F: Fn(BeaconNodeHttpClient) -> R + Clone, + R: Future>, + Err: Debug, + { + // Find the requested beacon node or return an error. + let candidates = self.candidates.read().await; + let Some(candidate) = candidates.iter().find(|c| c.index == candidate_index) else { + return Err(Error::CandidateIndexUnknown(candidate_index)); + }; + let candidate_node = candidate.beacon_node.clone(); + drop(candidates); + + Self::run_on_candidate(candidate_node, &func) + .await + .map_err(|(_, err)| err) + } + /// Run the future `func` on `candidate` while reporting metrics. async fn run_on_candidate( candidate: BeaconNodeHttpClient, @@ -1073,4 +1150,60 @@ mod tests { mock1.expect(3).assert(); mock2.expect(3).assert(); } + + #[tokio::test] + async fn run_on_candidate_index_success() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let mock1 = mock_beacon_node_1.mock_offline_node(); + let _mock2 = mock_beacon_node_2.mock_online_node(); + let mock3 = mock_beacon_node_3.mock_online_node(); + + // Request with preferred_index=1 (beacon_node_2) + let result = beacon_node_fallback + .run_on_candidate_index(1, |client| async move { client.get_node_version().await }) + .await; + + // Should succeed since beacon_node_2 is online + assert!(result.is_ok()); + + // mock1 should not be called since preferred node succeeds + mock1.expect(0).assert(); + mock3.expect(0).assert(); + } + + #[tokio::test] + async fn run_on_candidate_index_error() { + let spec = Arc::new(MainnetEthSpec::default_spec()); + let (mut mock_beacon_node_1, beacon_node_1) = new_mock_beacon_node(0, &spec).await; + let (mut mock_beacon_node_2, beacon_node_2) = new_mock_beacon_node(1, &spec).await; + let (mut mock_beacon_node_3, beacon_node_3) = new_mock_beacon_node(2, &spec).await; + + let beacon_node_fallback = create_beacon_node_fallback( + vec![beacon_node_1, beacon_node_2, beacon_node_3], + vec![], + spec.clone(), + ); + + let _mock1 = mock_beacon_node_1.mock_online_node(); + let _mock2 = mock_beacon_node_2.mock_offline_node(); + let _mock3 = mock_beacon_node_3.mock_offline_node(); + + // Request with preferred_index=1 (beacon_node_2), but it's offline + let result = beacon_node_fallback + .run_on_candidate_index(1, |client| async move { client.get_node_version().await }) + .await; + + // Should fail. + assert!(result.is_err()); + } } diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 3e1c46097f..0eb0e9e5dd 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -476,6 +476,17 @@ pub struct ValidatorClient { )] pub beacon_nodes_sync_tolerances: Vec, + #[clap( + long, + help = "Disable the beacon head monitor which tries to attest as soon as any of the \ + configured beacon nodes sends a head event. Leaving the service enabled is \ + recommended, but disabling it can lead to reduced bandwidth and more predictable \ + usage of the primary beacon node (rather than the fastest BN).", + display_order = 0, + help_heading = FLAG_HEADER + )] + pub disable_beacon_head_monitor: bool, + #[clap( long, help = "Disable Lighthouse's slashing protection for all web3signer keys. This can \ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 1a286a74dc..d68a78b705 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -82,6 +82,8 @@ pub struct Config { pub broadcast_topics: Vec, /// Enables a service which attempts to measure latency between the VC and BNs. pub enable_latency_measurement_service: bool, + /// Enables the beacon head monitor that reacts to head updates from connected beacon nodes. + pub enable_beacon_head_monitor: bool, /// Defines the number of validators per `validator/register_validator` request sent to the BN. pub validator_registration_batch_size: usize, /// Whether we are running with distributed network support. @@ -132,6 +134,7 @@ impl Default for Config { builder_registration_timestamp_override: None, broadcast_topics: vec![ApiTopic::Subscriptions], enable_latency_measurement_service: true, + enable_beacon_head_monitor: true, validator_registration_batch_size: 500, distributed: false, initialized_validators: <_>::default(), @@ -377,6 +380,7 @@ impl Config { config.validator_store.builder_boost_factor = validator_client_config.builder_boost_factor; config.enable_latency_measurement_service = !validator_client_config.disable_latency_measurement_service; + config.enable_beacon_head_monitor = !validator_client_config.disable_beacon_head_monitor; config.validator_registration_batch_size = validator_client_config.validator_registration_batch_size; diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 2b863715d2..c0d561b175 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -9,10 +9,12 @@ use metrics::set_gauge; use monitoring_api::{MonitoringHttpClient, ProcessType}; use sensitive_url::SensitiveUrl; use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase}; +use tokio::sync::Mutex; use account_utils::validator_definitions::ValidatorDefinitions; use beacon_node_fallback::{ - BeaconNodeFallback, CandidateBeaconNode, start_fallback_updater_service, + BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent, + start_fallback_updater_service, }; use clap::ArgMatches; use doppelganger_service::DoppelgangerService; @@ -70,6 +72,8 @@ pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; /// Number of slots in advance to compute sync selection proofs when in `distributed` mode. pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; +const MAX_HEAD_EVENT_QUEUE_LEN: usize = 1_024; + type ValidatorStore = LighthouseValidatorStore; #[derive(Clone)] @@ -395,6 +399,17 @@ impl ProductionValidatorClient { beacon_nodes.set_slot_clock(slot_clock.clone()); proposer_nodes.set_slot_clock(slot_clock.clone()); + // Only the beacon_nodes are used for attestation duties and thus biconditionally + // proposer_nodes do not need head_send ref. + let head_monitor_rx = if config.enable_beacon_head_monitor { + let (head_monitor_tx, head_receiver) = + mpsc::channel::(MAX_HEAD_EVENT_QUEUE_LEN); + beacon_nodes.set_head_send(Arc::new(head_monitor_tx)); + Some(Mutex::new(head_receiver)) + } else { + None + }; + let beacon_nodes = Arc::new(beacon_nodes); start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?; @@ -505,15 +520,17 @@ impl ProductionValidatorClient { let block_service = block_service_builder.build()?; - let attestation_service = AttestationServiceBuilder::new() + let attestation_builder = AttestationServiceBuilder::new() .duties_service(duties_service.clone()) .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) .beacon_nodes(beacon_nodes.clone()) .executor(context.executor.clone()) + .head_monitor_rx(head_monitor_rx) .chain_spec(context.eth2_config.spec.clone()) - .disable(config.disable_attesting) - .build()?; + .disable(config.disable_attesting); + + let attestation_service = attestation_builder.build()?; let preparation_service = PreparationServiceBuilder::new() .slot_clock(slot_clock.clone()) diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 326ec6d01e..a9d5283312 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,5 +1,5 @@ use crate::duties_service::{DutiesService, DutyAndProof}; -use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; +use beacon_node_fallback::{ApiTopic, BeaconNodeFallback, beacon_head_monitor::HeadEvent}; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; @@ -7,10 +7,12 @@ use std::collections::HashMap; use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; +use tokio::sync::Mutex; +use tokio::sync::mpsc; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; use tree_hash::TreeHash; -use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; +use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Hash256, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; /// Builds an `AttestationService`. @@ -22,6 +24,7 @@ pub struct AttestationServiceBuilder beacon_nodes: Option>>, executor: Option, chain_spec: Option>, + head_monitor_rx: Option>>, disable: bool, } @@ -34,6 +37,7 @@ impl AttestationServiceBuil beacon_nodes: None, executor: None, chain_spec: None, + head_monitor_rx: None, disable: false, } } @@ -73,6 +77,13 @@ impl AttestationServiceBuil self } + pub fn head_monitor_rx( + mut self, + head_monitor_rx: Option>>, + ) -> Self { + self.head_monitor_rx = head_monitor_rx; + self + } pub fn build(self) -> Result, String> { Ok(AttestationService { inner: Arc::new(Inner { @@ -94,7 +105,9 @@ impl AttestationServiceBuil chain_spec: self .chain_spec .ok_or("Cannot build AttestationService without chain_spec")?, + head_monitor_rx: self.head_monitor_rx, disable: self.disable, + latest_attested_slot: Mutex::new(Slot::default()), }), }) } @@ -108,10 +121,13 @@ pub struct Inner { beacon_nodes: Arc>, executor: TaskExecutor, chain_spec: Arc, + head_monitor_rx: Option>>, disable: bool, + latest_attested_slot: Mutex, } -/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot. +/// Attempts to produce attestations for all known validators 1/3rd of the way through each slot +/// or when a head event is received from the BNs. /// /// If any validators are on the same committee, a single attestation will be downloaded and /// returned to the beacon node. This attestation will have a signature from each of the @@ -161,19 +177,42 @@ impl AttestationService None, + event = self.poll_for_head_events() => + event.map(|event| (event.beacon_node_index, event.beacon_block_root)), + } + } else { + sleep(duration + unaggregated_attestation_due).await; + None + }; + + let Some(current_slot) = self.slot_clock.now() else { + error!("Failed to read slot clock after trigger"); + continue; + }; + + let mut last_slot = self.latest_attested_slot.lock().await; + + if current_slot <= *last_slot { + debug!(%current_slot, "Attestation already initiated for the slot"); + continue; + } + + match self.spawn_attestation_tasks(beacon_node_data).await { + Ok(_) => { + *last_slot = current_slot; + } + Err(e) => { + crit!(error = e, "Failed to spawn attestation tasks") + } } } }; @@ -182,15 +221,38 @@ impl AttestationService Option { + let Some(receiver) = &self.head_monitor_rx else { + return None; + }; + let mut receiver = receiver.lock().await; + loop { + match receiver.recv().await { + Some(head_event) => { + // Only return head events for the current slot - this ensures the + // block for this slot has been produced before triggering attestation + let current_slot = self.slot_clock.now()?; + if head_event.slot == current_slot { + return Some(head_event); + } + // Head event is for a previous slot, keep waiting + } + None => { + warn!("Head monitor channel closed unexpectedly"); + return None; + } + } + } + } + /// Spawn only one new task for attestation post-Electra /// For each required aggregates, spawn a new task that downloads, signs and uploads the /// aggregates to the beacon node. - fn spawn_attestation_tasks(&self) -> Result<(), String> { + async fn spawn_attestation_tasks( + &self, + beacon_node_data: Option<(usize, Hash256)>, + ) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; - let duration_to_next_slot = self - .slot_clock - .duration_to_next_slot() - .ok_or("Unable to determine duration to next slot")?; // Create and publish an `Attestation` for all validators only once // as the committee_index is not included in AttestationData post-Electra @@ -201,29 +263,89 @@ impl AttestationService attestation_data_from_head_event = Some(data), + Err(error) => { + warn!(?error, "Failed to attest based on head event"); + } + } + } + + // If the beacon node that sent us the head failed to attest, wait until the attestation + // deadline then try all BNs. + let attestation_data = if let Some(attestation_data) = attestation_data_from_head_event { + attestation_data + } else { + let duration_to_deadline = self + .slot_clock + .duration_to_slot(slot + 1) + .and_then(|duration_to_next_slot| { + duration_to_next_slot + .checked_add(self.chain_spec.get_unaggregated_attestation_due()) + }) + .map(|next_slot_deadline| { + next_slot_deadline.saturating_sub(self.chain_spec.get_slot_duration()) + }) + .unwrap_or(Duration::from_secs(0)); + sleep(duration_to_deadline).await; + + attestation_service + .beacon_nodes + .first_success(|beacon_node| async move { + let _timer = validator_metrics::start_timer_vec( + &validator_metrics::ATTESTATION_SERVICE_TIMES, + &[validator_metrics::ATTESTATIONS_HTTP_GET], + ); + let data = beacon_node + .get_validator_attestation_data(slot, 0) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e))? + .data; + Ok::(data) + }) + .await + .map_err(|e| e.to_string())? + }; + + // Sign and publish attestations. + let publication_handle = self .inner .executor .spawn_handle( async move { - let attestation_data = attestation_service - .beacon_nodes - .first_success(|beacon_node| async move { - let _timer = validator_metrics::start_timer_vec( - &validator_metrics::ATTESTATION_SERVICE_TIMES, - &[validator_metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, 0) - .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) - .map(|result| result.data) - }) - .await - .map_err(|e| e.to_string())?; - attestation_service .sign_and_publish_attestations( slot, @@ -241,12 +363,16 @@ impl AttestationService(attestation_data) }, - "unaggregated attestation production", + "unaggregated attestation publication", ) .ok_or("Failed to spawn attestation data task")?; // If a validator needs to publish an aggregate attestation, they must do so at 2/3 // through the slot. This delay triggers at this time + let duration_to_next_slot = self + .slot_clock + .duration_to_slot(slot + 1) + .ok_or("Unable to determine duration to next slot")?; let aggregate_production_instant = Instant::now() + duration_to_next_slot .checked_add(self.chain_spec.get_aggregate_attestation_due()) @@ -270,7 +396,7 @@ impl AttestationService data, Ok(Some(Err(err))) => { error!(?err, "Attestation production failed");