diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 3ae495378e..26718ad294 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1231,7 +1231,11 @@ impl ApiTester { pub async fn test_post_beacon_blocks_valid(mut self) -> Self { let next_block = &self.next_block; - self.client.post_beacon_blocks(next_block).await.unwrap(); + self.client + .clone() + .post_beacon_blocks(next_block) + .await + .unwrap(); assert!( self.network_rx.network_recv.recv().await.is_some(), @@ -1270,7 +1274,13 @@ impl ApiTester { .await .0; - assert!(self.client.post_beacon_blocks(&block).await.is_err()); + + assert!(self + .client + .clone() + .post_beacon_blocks(&next_block) + .await + .is_err()); assert!( self.network_rx.network_recv.recv().await.is_some(), @@ -2419,7 +2429,11 @@ impl ApiTester { let signed_block = block.sign(&sk, &fork, genesis_validators_root, &self.chain.spec); - self.client.post_beacon_blocks(&signed_block).await.unwrap(); + self.client + .clone() + .post_beacon_blocks(&signed_block) + .await + .unwrap(); assert_eq!(self.chain.head_beacon_block().as_ref(), &signed_block); @@ -4281,6 +4295,7 @@ impl ApiTester { }); self.client + .clone() .post_beacon_blocks(&self.next_block) .await .unwrap(); @@ -4312,6 +4327,7 @@ impl ApiTester { self.harness.advance_slot(); self.client + .clone() .post_beacon_blocks(&self.reorg_block) .await .unwrap(); @@ -4389,6 +4405,7 @@ impl ApiTester { }); self.client + .clone() .post_beacon_blocks(&self.next_block) .await .unwrap(); @@ -5107,4 +5124,4 @@ async fn optimistic_responses() { .await .test_check_optimistic_responses() .await; -} +} \ No newline at end of file diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 146a832e38..1fc65f9301 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -108,7 +108,7 @@ impl fmt::Display for Error { /// A struct to define a variety of different timeouts for different validator tasks to ensure /// proper fallback behaviour. -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, @@ -141,13 +141,21 @@ impl Timeouts { /// A wrapper around `reqwest::Client` which provides convenience methods for interfacing with a /// Lighthouse Beacon Node HTTP server (`http_api`). -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct BeaconNodeHttpClient { client: reqwest::Client, server: SensitiveUrl, timeouts: Timeouts, } +impl PartialEq for BeaconNodeHttpClient { + fn eq(&self, other: &Self) -> bool { + self.server == other.server && self.timeouts == other.timeouts + } +} + +impl Eq for BeaconNodeHttpClient {} + impl fmt::Display for BeaconNodeHttpClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.server.fmt(f) @@ -686,7 +694,7 @@ impl BeaconNodeHttpClient { /// /// Returns `Ok(None)` on a 404 error. pub async fn post_beacon_blocks>( - &self, + self, block: &SignedBeaconBlock, ) -> Result<(), Error> { let mut path = self.eth_path(V1)?; diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index 9bcfe2a1d5..400ca77711 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -487,7 +487,7 @@ fn monitoring_endpoint() { #[test] fn disable_run_on_all_default() { CommandLineTest::new().run().with_config(|config| { - assert!(!config.disable_run_on_all); + assert!(!config.beacon_node_fallback.disable_run_on_all); }); } @@ -497,7 +497,17 @@ fn disable_run_on_all() { .flag("disable-run-on-all", None) .run() .with_config(|config| { - assert!(config.disable_run_on_all); + assert!(config.beacon_node_fallback.disable_run_on_all); + }); +} + +#[test] +fn sync_tolerance_flag() { + CommandLineTest::new() + .flag("beacon-node-sync-tolerance", Some("8")) + .run() + .with_config(|config| { + assert_eq!(config.beacon_node_fallback.sync_tolerance, Some(8)); }); } diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index d34cdbc9ff..01e50eb7fb 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -1,5 +1,6 @@ use crate::local_network::LocalNetwork; use node_test_rig::eth2::types::{BlockId, StateId}; + use std::time::Duration; use types::{Epoch, EthSpec, ExecPayload, ExecutionBlockHash, Hash256, Slot, Unsigned}; @@ -243,3 +244,67 @@ pub async fn verify_transition_block_finalized( )) } } + +pub async fn disconnect_from_execution_layer( + network: LocalNetwork, + transition_epoch: Epoch, + slot_duration: Duration, +) -> Result<(), String> { + epoch_delay(transition_epoch + 1, slot_duration, E::slots_per_epoch()).await; + + eprintln!("Disabling Execution Layer"); + + // Take the execution node at position 0 and force it to return the `syncing` status. + network.execution_nodes.read()[0] + .server + .all_payloads_syncing(false); + + // Run for 2 epochs with the 0th execution node stalled. + epoch_delay( + transition_epoch + 1 + 2, + slot_duration, + E::slots_per_epoch(), + ) + .await; + + // Restore the functionality of the 0th execution node. + network.execution_nodes.read()[0] + .server + .all_payloads_valid(); + + eprintln!("Re-enabling Execution Layer"); + Ok(()) +} + +/// Ensure all validators have attested correctly. +pub async fn check_attestation_correctness( + network: LocalNetwork, + upto_epoch: Epoch, + slots_per_epoch: u64, + slot_duration: Duration, +) -> Result<(), String> { + let upto_slot = upto_epoch.start_slot(slots_per_epoch); + slot_delay(upto_slot, slot_duration).await; + + let remote_node = &network.remote_nodes()?[1]; + + let results = remote_node + .get_lighthouse_analysis_attestation_performance( + Epoch::new(2), + upto_epoch - 2, + "global".to_string(), + ) + .await + .map_err(|e| format!("Unable to get attestation performance: {e}"))?; + + for result in results { + for epochs in result.epochs.values() { + assert!(epochs.active); + assert!(epochs.head); + assert!(epochs.target); + assert!(epochs.source); + } + } + + Ok(()) +} diff --git a/testing/simulator/src/cli.rs b/testing/simulator/src/cli.rs index ff80201051..0b888314cf 100644 --- a/testing/simulator/src/cli.rs +++ b/testing/simulator/src/cli.rs @@ -119,7 +119,45 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) .default_value("all") .possible_values(&["one-node", "two-nodes", "mixed", "all"]) - .help("Sync verification strategy to run."), + .help("Sync verification strategy to run.")) + ) + .subcommand( + SubCommand::with_name("fallback-sim") + .about("Run the fallback simulation") + .arg(Arg::with_name("vc_count") + .short("c") + .long("vc-count") + .takes_value(true) + .default_value("4") + .help("Number of validator clients")) + .arg(Arg::with_name("bns_per_vc") + .short("b") + .long("bns_per_vc") + .takes_value(true) + .default_value("2") + .help("Number of beacon nodes to connect to each validator client")) + .arg(Arg::with_name("validators_per_vc") + .short("v") + .long("validators_per_vc") + .takes_value(true) + .default_value("20") + .help("Number of validators per client")) + .arg(Arg::with_name("speed_up_factor") + .short("s") + .long("speed_up_factor") + .takes_value(true) + .default_value("3") + .help("Speed up factor. Please use a divisor of 12.")) + .arg(Arg::with_name("post-merge") + .short("m") + .long("post-merge") + .takes_value(false) + .help("Simulate the merge transition")) + .arg(Arg::with_name("continue_after_checks") + .short("c") + .long("continue_after_checks") + .takes_value(false) + .help("Continue after checks (default false)") ), ) } diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 57c944cf1a..03a0205574 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -64,7 +64,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let mut env = EnvironmentBuilder::minimal() .initialize_logger(LoggerConfig { path: None, - debug_level: String::from("debug"), + debug_level: String::from("info"), logfile_debug_level: String::from("debug"), log_format: None, logfile_format: None, diff --git a/testing/simulator/src/fallback_sim.rs b/testing/simulator/src/fallback_sim.rs new file mode 100644 index 0000000000..63b50d5c3c --- /dev/null +++ b/testing/simulator/src/fallback_sim.rs @@ -0,0 +1,333 @@ +use futures::prelude::*; + +use std::cmp::max; +use std::net::Ipv4Addr; +use std::time::Duration; + +use crate::local_network::{EXECUTION_PORT, TERMINAL_BLOCK, TERMINAL_DIFFICULTY}; +use crate::{checks, LocalNetwork, E}; +use clap::ArgMatches; +use eth1::{Eth1Endpoint, DEFAULT_CHAIN_ID}; +use eth1_test_rig::AnvilEth1Instance; + +use execution_layer::http::deposit_methods::Eth1Id; + +use node_test_rig::{ + environment::{EnvironmentBuilder, LoggerConfig}, + testing_client_config, testing_validator_config, ClientGenesis, ValidatorFiles, +}; +use rayon::prelude::*; +use sensitive_url::SensitiveUrl; +use tokio::time::sleep; +use types::{Epoch, EthSpec, MinimalEthSpec}; + +const END_EPOCH: u64 = 20; +const ALTAIR_FORK_EPOCH: u64 = 1; +const BELLATRIX_FORK_EPOCH: u64 = 2; + +const SUGGESTED_FEE_RECIPIENT: [u8; 20] = + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]; + +pub fn run_fallback_sim(matches: &ArgMatches) -> Result<(), String> { + let speed_up_factor = + value_t!(matches, "speed_up_factor", u64).expect("missing speed_up_factor default"); + let vc_count = value_t!(matches, "vc_count", usize).expect("missing vc_count default"); + let validators_per_vc = + value_t!(matches, "validators_per_vc", usize).expect("missing validators_per_vc default"); + let bns_per_vc = value_t!(matches, "bns_per_vc", usize).expect("missing bns_per_vc default"); + let continue_after_checks = matches.is_present("continue_after_checks"); + //let post_merge_sim = matches.is_present("post-merge"); + let post_merge_sim = true; + + println!("Fallback Simulator:"); + println!(" Validator Clients: {}", vc_count); + println!(" Validators per Client: {}", validators_per_vc); + println!(" Beacon Nodes per Validator Client: {}", bns_per_vc); + println!(" speed up factor:{}", speed_up_factor); + + let log_level = "debug"; + + fallback_sim( + speed_up_factor, + vc_count, + validators_per_vc, + bns_per_vc, + post_merge_sim, + continue_after_checks, + log_level, + ) +} + +fn fallback_sim( + speed_up_factor: u64, + vc_count: usize, + validators_per_vc: usize, + bns_per_vc: usize, + post_merge_sim: bool, + continue_after_checks: bool, + log_level: &str, +) -> Result<(), String> { + // Generate the directories and keystores required for the validator clients. + let validator_files = (0..vc_count) + .into_par_iter() + .map(|i| { + println!( + "Generating keystores for validator {} of {}", + i + 1, + vc_count + ); + + let indices = (i * validators_per_vc..(i + 1) * validators_per_vc).collect::>(); + ValidatorFiles::with_keystores(&indices).unwrap() + }) + .collect::>(); + + let mut env = EnvironmentBuilder::minimal() + .initialize_logger(LoggerConfig { + path: None, + debug_level: String::from(log_level), + logfile_debug_level: String::from("debug"), + log_format: None, + logfile_format: None, + log_color: false, + disable_log_timestamp: false, + max_log_size: 0, + max_log_number: 0, + compression: false, + is_restricted: true, + sse_logging: false, + })? + .multi_threaded_tokio_runtime()? + .build()?; + + let eth1_block_time = Duration::from_millis(15_000 / speed_up_factor); + + let spec = &mut env.eth2_config.spec; + + let total_validator_count = validators_per_vc * vc_count; + let node_count = vc_count * bns_per_vc; + //let altair_fork_version = spec.altair_fork_version; + //let bellatrix_fork_version = spec.bellatrix_fork_version; + + spec.seconds_per_slot /= speed_up_factor; + spec.seconds_per_slot = max(1, spec.seconds_per_slot); + spec.eth1_follow_distance = 16; + spec.genesis_delay = eth1_block_time.as_secs() * spec.eth1_follow_distance * 2; + spec.min_genesis_time = 0; + spec.min_genesis_active_validator_count = total_validator_count as u64; + spec.seconds_per_eth1_block = eth1_block_time.as_secs(); + spec.altair_fork_epoch = Some(Epoch::new(ALTAIR_FORK_EPOCH)); + // Set these parameters only if we are doing a merge simulation + if post_merge_sim { + spec.terminal_total_difficulty = TERMINAL_DIFFICULTY.into(); + spec.bellatrix_fork_epoch = Some(Epoch::new(BELLATRIX_FORK_EPOCH)); + } + + let seconds_per_slot = spec.seconds_per_slot; + let slot_duration = Duration::from_secs(spec.seconds_per_slot); + let _initial_validator_count = spec.min_genesis_active_validator_count as usize; + let deposit_amount = env.eth2_config.spec.max_effective_balance; + + let context = env.core_context(); + + let main_future = async { + /* + * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit + * validators. + */ + let anvil_eth1_instance = AnvilEth1Instance::new(DEFAULT_CHAIN_ID.into()).await?; + let deposit_contract = anvil_eth1_instance.deposit_contract; + let chain_id = anvil_eth1_instance.anvil.chain_id(); + let anvil = anvil_eth1_instance.anvil; + let eth1_endpoint = SensitiveUrl::parse(anvil.endpoint().as_str()) + .expect("Unable to parse anvil endpoint."); + let deposit_contract_address = deposit_contract.address(); + + // Start a timer that produces eth1 blocks on an interval. + tokio::spawn(async move { + let mut interval = tokio::time::interval(eth1_block_time); + loop { + interval.tick().await; + let _ = anvil.evm_mine().await; + } + }); + + // Submit deposits to the deposit contract. + tokio::spawn(async move { + for i in 0..total_validator_count { + println!("Submitting deposit for validator {}...", i); + let _ = deposit_contract + .deposit_deterministic_async::(i, deposit_amount) + .await; + } + }); + + let mut beacon_config = testing_client_config(); + + beacon_config.genesis = ClientGenesis::DepositContract; + beacon_config.eth1.endpoint = Eth1Endpoint::NoAuth(eth1_endpoint); + beacon_config.eth1.deposit_contract_address = deposit_contract_address; + beacon_config.eth1.deposit_contract_deploy_block = 0; + beacon_config.eth1.lowest_cached_block_number = 0; + beacon_config.eth1.follow_distance = 1; + beacon_config.eth1.node_far_behind_seconds = 20; + beacon_config.dummy_eth1_backend = false; + beacon_config.sync_eth1_chain = true; + beacon_config.eth1.auto_update_interval_millis = eth1_block_time.as_millis() as u64; + beacon_config.eth1.chain_id = Eth1Id::from(chain_id); + beacon_config.network.target_peers = node_count - 1; + + beacon_config.network.enr_address = (Some(Ipv4Addr::LOCALHOST), None); + + if post_merge_sim { + let el_config = execution_layer::Config { + execution_endpoints: vec![SensitiveUrl::parse(&format!( + "http://localhost:{}", + EXECUTION_PORT + )) + .unwrap()], + ..Default::default() + }; + + beacon_config.execution_layer = Some(el_config); + } + + /* + * Create a new `LocalNetwork` with one beacon node. + */ + let network = LocalNetwork::new(context.clone(), beacon_config.clone()).await?; + + /* + * One by one, add beacon nodes to the network. + */ + for _ in 0..node_count - 1 { + network + .add_beacon_node(beacon_config.clone(), false) + .await?; + } + + /* + * One by one, add validators to the network. + */ + let executor = context.executor.clone(); + for (i, files) in validator_files.into_iter().enumerate() { + let network_1 = network.clone(); + let beacon_nodes = if i == vc_count { + vec![i, 0] + } else { + vec![i, i + 1] + }; + executor.spawn( + async move { + let mut validator_config = testing_validator_config(); + if post_merge_sim { + validator_config.fee_recipient = Some(SUGGESTED_FEE_RECIPIENT.into()); + } + println!("Adding validator client {}", i); + network_1 + .add_validator_client_with_fallbacks( + validator_config, + i, + beacon_nodes, + files, + ) + .await + .expect("should add validator"); + }, + "vc", + ); + } + + let duration_to_genesis = network.duration_to_genesis().await; + println!("Duration to genesis: {}", duration_to_genesis.as_secs()); + sleep(duration_to_genesis).await; + + if post_merge_sim { + let executor = executor.clone(); + let network_2 = network.clone(); + executor.spawn( + async move { + println!("Mining pow blocks"); + let mut interval = tokio::time::interval(Duration::from_secs(seconds_per_slot)); + for i in 1..=TERMINAL_BLOCK + 1 { + interval.tick().await; + let _ = network_2.mine_pow_blocks(i); + } + }, + "pow_mining", + ); + } + /* + * Start the checks that ensure the network performs as expected. + * + * We start these checks immediately after the validators have started. This means we're + * relying on the validator futures to all return immediately after genesis so that these + * tests start at the right time. Whilst this is works well for now, it's subject to + * breakage by changes to the VC. + */ + + let ( + //finalization, + //block_prod, + //validator_count, + //onboarding, + fallback, + check_attestations, + //fork, + //sync_aggregate, + //transition, + ) = futures::join!( + //checks::verify_first_finalization(network.clone(), slot_duration), + checks::disconnect_from_execution_layer( + network.clone(), + Epoch::new(BELLATRIX_FORK_EPOCH), + slot_duration + ), + checks::check_attestation_correctness( + network.clone(), + Epoch::new(END_EPOCH), + MinimalEthSpec::slots_per_epoch(), + slot_duration + ), + //checks::stall_node(network.clone(), 0, 30, seconds_per_slot), + ); + + //block_prod?; + //finalization?; + //validator_count?; + //onboarding?; + fallback?; + check_attestations?; + //fork?; + //sync_aggregate?; + //transition?; + + // The `final_future` either completes immediately or never completes, depending on the value + // of `continue_after_checks`. + + if continue_after_checks { + future::pending::<()>().await; + } + /* + * End the simulation by dropping the network. This will kill all running beacon nodes and + * validator clients. + */ + println!( + "Simulation complete. Finished with {} beacon nodes and {} validator clients", + network.beacon_node_count(), + network.validator_client_count() + ); + + // Be explicit about dropping the network, as this kills all the nodes. This ensures + // all the checks have adequate time to pass. + drop(network); + Ok::<(), String>(()) + }; + + env.runtime().block_on(main_future).unwrap(); + + env.fire_signal(); + env.shutdown_on_idle(); + + Ok(()) +} diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index e35870d126..b3d7afd7b9 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -265,6 +265,48 @@ impl LocalNetwork { Ok(()) } + pub async fn add_validator_client_with_fallbacks( + &self, + mut validator_config: ValidatorConfig, + validator_index: usize, + beacon_nodes: Vec, + validator_files: ValidatorFiles, + ) -> Result<(), String> { + let context = self + .context + .service_context(format!("validator_{}", validator_index)); + let self_1 = self.clone(); + let mut beacon_node_urls = vec![]; + for beacon_node in beacon_nodes { + let socket_addr = { + let read_lock = self.beacon_nodes.read(); + let beacon_node = read_lock + .get(beacon_node) + .ok_or_else(|| format!("No beacon node for index {}", beacon_node))?; + beacon_node + .client + .http_api_listen_addr() + .expect("Must have http started") + }; + let beacon_node = SensitiveUrl::parse( + format!("http://{}:{}", socket_addr.ip(), socket_addr.port()).as_str(), + ) + .unwrap(); + beacon_node_urls.push(beacon_node); + } + + validator_config.beacon_nodes = beacon_node_urls; + + let validator_client = LocalValidatorClient::production_with_insecure_keypairs( + context, + validator_config, + validator_files, + ) + .await?; + self_1.validator_clients.write().push(validator_client); + Ok(()) + } + /// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API. pub fn remote_nodes(&self) -> Result, String> { let beacon_nodes = self.beacon_nodes.read(); diff --git a/testing/simulator/src/main.rs b/testing/simulator/src/main.rs index e8af9c1806..83ca1135ef 100644 --- a/testing/simulator/src/main.rs +++ b/testing/simulator/src/main.rs @@ -19,6 +19,7 @@ extern crate clap; mod checks; mod cli; mod eth1_sim; +mod fallback_sim; mod local_network; mod no_eth1_sim; mod retry; @@ -58,6 +59,13 @@ fn main() { std::process::exit(1) } }, + ("fallback-sim", Some(matches)) => match fallback_sim::run_fallback_sim(matches) { + Ok(()) => println!("Simulation exited successfully"), + Err(e) => { + eprintln!("Simulation exited with an error: {}", e); + std::process::exit(1) + } + }, _ => { eprintln!("Invalid subcommand. Use --help to see available options"); std::process::exit(1) diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 531cec08ac..cb432271e4 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -2,13 +2,19 @@ //! "fallback" behaviour; it will try a request on all of the nodes until one or none of them //! succeed. -use crate::check_synced::check_synced; +use crate::beacon_node_health::{ + BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, SyncDistanceTier, +}; +use crate::check_synced::{check_node_health, check_synced}; use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS}; use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; +use parking_lot::RwLock as PLRwLock; +use serde_derive::{Deserialize, Serialize}; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; +use std::cmp::Ordering; use std::fmt; use std::fmt::Debug; use std::future::Future; @@ -16,7 +22,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::{sync::RwLock, time::sleep}; -use types::{ChainSpec, Config, EthSpec}; +use types::{ChainSpec, Config as ConfigSpec, EthSpec}; /// Message emitted when the VC detects the BN is using a different spec. const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updating"; @@ -30,6 +36,16 @@ const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updati /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(2); +// Configuration for the Beacon Node fallback. +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +pub struct Config { + /// Disables publishing http api requests to all beacon nodes for select api calls. + pub disable_run_on_all: bool, + /// Sets the number of slots behind the head a beacon node is allowed to be to still be + /// considered `synced`. + pub sync_tolerance: Option, +} + /// Indicates a measurement of latency between the VC and a BN. pub struct LatencyMeasurement { /// An identifier for the beacon node (e.g. the URL). @@ -139,21 +155,52 @@ pub enum CandidateError { Offline, Incompatible, NotSynced, + TimeDiscrepancy, } /// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used /// for a query. +#[derive(Debug)] pub struct CandidateBeaconNode { + id: usize, beacon_node: BeaconNodeHttpClient, + health: PLRwLock>, status: RwLock>, _phantom: PhantomData, } +impl PartialEq for CandidateBeaconNode { + fn eq(&self, other: &Self) -> bool { + self.id == other.id && self.beacon_node == other.beacon_node + } +} + +impl Eq for CandidateBeaconNode {} + +impl Ord for CandidateBeaconNode { + fn cmp(&self, other: &Self) -> Ordering { + match (&(*self.health.read()), &(*other.health.read())) { + (None, None) => Ordering::Equal, + (None, _) => Ordering::Greater, + (_, None) => Ordering::Less, + (Some(health_1), Some(health_2)) => health_1.cmp(health_2), + } + } +} + +impl PartialOrd for CandidateBeaconNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl CandidateBeaconNode { /// Instantiate a new node. - pub fn new(beacon_node: BeaconNodeHttpClient) -> Self { + pub fn new(beacon_node: BeaconNodeHttpClient, id: usize) -> Self { Self { + id, beacon_node, + health: PLRwLock::new(None), status: RwLock::new(Err(CandidateError::Uninitialized)), _phantom: PhantomData, } @@ -204,6 +251,64 @@ impl CandidateBeaconNode { new_status } + pub async fn refresh_health( + &self, + distance_tiers: &BeaconNodeSyncDistanceTiers, + slot_clock: Option<&T>, + spec: &ChainSpec, + log: &Logger, + ) -> Result<(), CandidateError> { + if let Err(e) = self.is_compatible(spec, log).await { + *self.status.write().await = Err(e); + return Ok(()); + } + + if let Some(slot_clock) = slot_clock { + match check_node_health(&self.beacon_node, log).await { + Ok((head, is_optimistic, el_offline)) => { + // Currently ExecutionEngineHealth is solely determined by online status. + let execution_status = if el_offline { + ExecutionEngineHealth::Unhealthy + } else { + ExecutionEngineHealth::Healthy + }; + + let new_health = BeaconNodeHealth::from_status( + self.id, + head, + is_optimistic, + execution_status, + distance_tiers, + slot_clock, + ); + + warn!( + log, + "Health of Beacon Node: {}, updated. Health tier: {}", + new_health.get_id(), + new_health.get_health_tier() + ); + + *self.health.write() = Some(new_health); + *self.status.write().await = Ok(()); + + Ok(()) + } + Err(status) => { + // Set the health as None which is sorted last in the list. + *self.health.write() = None; + *self.status.write().await = Err(status); + Ok(()) + } + } + } else { + // Slot clock will only be None at startup. + // Assume compatible nodes are available. + *self.status.write().await = Ok(()); + Ok(()) + } + } + /// Checks if the node is reachable. async fn is_online(&self, was_offline: bool, log: &Logger) -> Result<(), CandidateError> { let result = self @@ -240,7 +345,7 @@ impl CandidateBeaconNode { async fn is_compatible(&self, spec: &ChainSpec, log: &Logger) -> Result<(), CandidateError> { let config = self .beacon_node - .get_config_spec::() + .get_config_spec::() .await .map_err(|e| { error!( @@ -319,10 +424,12 @@ impl CandidateBeaconNode { /// A collection of `CandidateBeaconNode` that can be used to perform requests with "fallback" /// behaviour, where the failure of one candidate results in the next candidate receiving an /// identical query. +#[derive(Clone, Debug)] pub struct BeaconNodeFallback { - candidates: Vec>, - slot_clock: Option, + candidates: Arc>>>, disable_run_on_all: bool, + distance_tiers: BeaconNodeSyncDistanceTiers, + slot_clock: Option, spec: ChainSpec, log: Logger, } @@ -330,14 +437,16 @@ pub struct BeaconNodeFallback { impl BeaconNodeFallback { pub fn new( candidates: Vec>, - disable_run_on_all: bool, + config: Config, spec: ChainSpec, log: Logger, ) -> Self { + let distance_tiers = BeaconNodeSyncDistanceTiers::from_config(&config); Self { - candidates, + candidates: Arc::new(RwLock::new(candidates)), + disable_run_on_all: config.disable_run_on_all, + distance_tiers, slot_clock: None, - disable_run_on_all, spec, log, } @@ -353,16 +462,22 @@ impl BeaconNodeFallback { } /// The count of candidates, regardless of their state. - pub fn num_total(&self) -> usize { - self.candidates.len() + pub async fn num_total(&self) -> usize { + self.candidates.read().await.len() } /// The count of synced and ready candidates. pub async fn num_synced(&self) -> usize { let mut n = 0; - for candidate in &self.candidates { - if candidate.status(RequireSynced::Yes).await.is_ok() { - n += 1 + for candidate in self.candidates.read().await.iter() { + if let Some(cand) = candidate.health.read().as_ref() { + if self + .distance_tiers + .distance_tier(cand.health_tier.sync_distance) + == SyncDistanceTier::Synced + { + n += 1 + } } } n @@ -371,9 +486,15 @@ impl BeaconNodeFallback { /// The count of synced and ready fallbacks excluding the primary beacon node candidate. pub async fn num_synced_fallback(&self) -> usize { let mut n = 0; - for candidate in self.candidates.iter().skip(1) { - if candidate.status(RequireSynced::Yes).await.is_ok() { - n += 1 + for candidate in self.candidates.read().await.iter().skip(1) { + if let Some(cand) = candidate.health.read().as_ref() { + if self + .distance_tiers + .distance_tier(cand.health_tier.sync_distance) + == SyncDistanceTier::Synced + { + n += 1 + } } } n @@ -382,7 +503,7 @@ impl BeaconNodeFallback { /// The count of candidates that are online and compatible, but not necessarily synced. pub async fn num_available(&self) -> usize { let mut n = 0; - for candidate in &self.candidates { + for candidate in self.candidates.read().await.iter() { if candidate.status(RequireSynced::No).await.is_ok() { n += 1 } @@ -396,24 +517,36 @@ impl BeaconNodeFallback { /// low quality responses. To route around this it's best to poll all connected beacon nodes. /// A previous implementation of this function polled only the unavailable BNs. pub async fn update_all_candidates(&self) { - let futures = self - .candidates + let candidates = self.candidates.read().await; + + let futures = candidates .iter() .map(|candidate| { - candidate.refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) + candidate.refresh_health( + &self.distance_tiers, + self.slot_clock.as_ref(), + &self.spec, + &self.log, + ) }) .collect::>(); - // run all updates concurrently and ignore errors + // Run all updates concurrently and ignore errors. let _ = future::join_all(futures).await; + + drop(candidates); + + // Sort the list to put the healthiest candidate first. + let mut write = self.candidates.write().await; + write.sort(); } /// Concurrently send a request to all candidates (regardless of /// offline/online) status and attempt to collect a rough reading on the /// latency between the VC and candidate. pub async fn measure_latency(&self) -> Vec { - let futures: Vec<_> = self - .candidates + let candidates = self.candidates.read().await; + let futures: Vec<_> = candidates .iter() .map(|candidate| async { let beacon_node_id = candidate.beacon_node.to_string(); @@ -455,20 +588,18 @@ impl BeaconNodeFallback { /// First this function will try all nodes with a suitable status. If no candidates are suitable /// or all the requests fail, it will try updating the status of all unsuitable nodes and /// re-running `func` again. - pub async fn first_success<'a, F, O, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, + pub async fn first_success( + &self, + _require_synced: RequireSynced, + _offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, Err: Debug, { let mut errors = vec![]; - let mut to_retry = vec![]; - let mut retry_unsynced = vec![]; let log = &self.log.clone(); // Run `func` using a `candidate`, returning the value or capturing errors. @@ -481,7 +612,7 @@ impl BeaconNodeFallback { // There exists a race condition where `func` may be called when the candidate is // actually not ready. We deem this an acceptable inefficiency. - match func(&$candidate.beacon_node).await { + match func($candidate.beacon_node.clone()).await { Ok(val) => return Ok(val), Err(e) => { debug!( @@ -495,9 +626,9 @@ impl BeaconNodeFallback { // There exists a race condition where the candidate may have been marked // as ready between the `func` call and now. We deem this an acceptable // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - $candidate.set_offline().await; - } + //if matches!(offline_on_failure, OfflineOnFailure::Yes) { + // $candidate.set_offline().await; + //} errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); } @@ -508,53 +639,9 @@ impl BeaconNodeFallback { // First pass: try `func` on all synced and ready candidates. // // This ensures that we always choose a synced node if it is available. - for candidate in &self.candidates { - match candidate.status(RequireSynced::Yes).await { - Err(e @ CandidateError::NotSynced) if require_synced == false => { - // This client is unsynced we will try it after trying all synced clients - retry_unsynced.push(candidate); - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } - Err(e) => { - // This client was not ready on the first pass, we might try it again later. - to_retry.push(candidate); - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } - _ => try_func!(candidate), - } - } - - // Second pass: try `func` on ready unsynced candidates. This only runs if we permit - // unsynced candidates. - // - // Due to async race-conditions, it is possible that we will send a request to a candidate - // that has been set to an offline/unready status. This is acceptable. - if require_synced == false { - for candidate in retry_unsynced { - try_func!(candidate); - } - } - - // Third pass: try again, attempting to make non-ready clients become ready. - for candidate in to_retry { - // If the candidate hasn't luckily transferred into the correct state in the meantime, - // force an update of the state. - let new_status = match candidate.status(require_synced).await { - Ok(()) => Ok(()), - Err(_) => { - candidate - .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) - .await - } - }; - - match new_status { - Ok(()) => try_func!(candidate), - Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), - Err(e) => { - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } - } + let candidates = self.candidates.read().await; + for candidate in candidates.iter() { + try_func!(candidate); } // There were no candidates already ready and we were unable to make any of them ready. @@ -571,19 +658,17 @@ impl BeaconNodeFallback { /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn run_on_all<'a, F, O, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, + pub async fn run_on_all( + &self, + _require_synced: RequireSynced, + _offline_on_failure: OfflineOnFailure, func: F, ) -> Result<(), Errors> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, { let mut results = vec![]; - let mut to_retry = vec![]; - let mut retry_unsynced = vec![]; // Run `func` using a `candidate`, returning the value or capturing errors. // @@ -595,7 +680,7 @@ impl BeaconNodeFallback { // There exists a race condition where `func` may be called when the candidate is // actually not ready. We deem this an acceptable inefficiency. - match func(&$candidate.beacon_node).await { + match func($candidate.beacon_node.clone()).await { Ok(val) => results.push(Ok(val)), Err(e) => { // If we have an error on this function, make the client as not-ready. @@ -603,9 +688,9 @@ impl BeaconNodeFallback { // There exists a race condition where the candidate may have been marked // as ready between the `func` call and now. We deem this an acceptable // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - $candidate.set_offline().await; - } + //if matches!(offline_on_failure, OfflineOnFailure::Yes) { + // $candidate.set_offline().await; + //} results.push(Err(( $candidate.beacon_node.to_string(), Error::RequestFailed(e), @@ -619,54 +704,9 @@ impl BeaconNodeFallback { // First pass: try `func` on all synced and ready candidates. // // This ensures that we always choose a synced node if it is available. - for candidate in &self.candidates { - match candidate.status(RequireSynced::Yes).await { - Err(CandidateError::NotSynced) if require_synced == false => { - // This client is unsynced we will try it after trying all synced clients - retry_unsynced.push(candidate); - } - Err(_) => { - // This client was not ready on the first pass, we might try it again later. - to_retry.push(candidate); - } - Ok(_) => try_func!(candidate), - } - } - - // Second pass: try `func` on ready unsynced candidates. This only runs if we permit - // unsynced candidates. - // - // Due to async race-conditions, it is possible that we will send a request to a candidate - // that has been set to an offline/unready status. This is acceptable. - if require_synced == false { - for candidate in retry_unsynced { - try_func!(candidate); - } - } - - // Third pass: try again, attempting to make non-ready clients become ready. - for candidate in to_retry { - // If the candidate hasn't luckily transferred into the correct state in the meantime, - // force an update of the state. - let new_status = match candidate.status(require_synced).await { - Ok(()) => Ok(()), - Err(_) => { - candidate - .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) - .await - } - }; - - match new_status { - Ok(()) => try_func!(candidate), - Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), - Err(e) => { - results.push(Err(( - candidate.beacon_node.to_string(), - Error::Unavailable(e), - ))); - } - } + let candidates = self.candidates.read().await; + for candidate in candidates.iter() { + try_func!(candidate); } let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect(); @@ -680,14 +720,14 @@ impl BeaconNodeFallback { /// Call `func` on first beacon node that returns success or on all beacon nodes /// depending on the value of `disable_run_on_all`. - pub async fn run<'a, F, Err, R>( - &'a self, + pub async fn run( + &self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, ) -> Result<(), Errors> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, Err: Debug, { @@ -701,3 +741,154 @@ impl BeaconNodeFallback { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::beacon_node_health::BeaconNodeHealthTier; + use crate::SensitiveUrl; + use eth2::Timeouts; + use types::{MainnetEthSpec, Slot}; + + type E = MainnetEthSpec; + + #[test] + fn check_candidate_order() { + let candidate_1: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_1.com").unwrap(), + Timeouts::set_all(Duration::from_secs(1)), + ), + 1, + ); + let expected_candidate_1: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_1.com").unwrap(), + Timeouts::set_all(Duration::from_secs(1)), + ), + 1, + ); + let candidate_2: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_2.com").unwrap(), + Timeouts::set_all(Duration::from_secs(2)), + ), + 2, + ); + let expected_candidate_2: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_2.com").unwrap(), + Timeouts::set_all(Duration::from_secs(2)), + ), + 2, + ); + let candidate_3: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_3.com").unwrap(), + Timeouts::set_all(Duration::from_secs(3)), + ), + 3, + ); + let expected_candidate_3: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_3.com").unwrap(), + Timeouts::set_all(Duration::from_secs(3)), + ), + 3, + ); + let candidate_4: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_4.com").unwrap(), + Timeouts::set_all(Duration::from_secs(4)), + ), + 3, + ); + let expected_candidate_4: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_4.com").unwrap(), + Timeouts::set_all(Duration::from_secs(4)), + ), + 3, + ); + let candidate_5: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_5.com").unwrap(), + Timeouts::set_all(Duration::from_secs(5)), + ), + 3, + ); + let expected_candidate_5: CandidateBeaconNode = CandidateBeaconNode::new( + BeaconNodeHttpClient::new( + SensitiveUrl::parse("http://example_5.com").unwrap(), + Timeouts::set_all(Duration::from_secs(5)), + ), + 3, + ); + + // All health parameters other than `health_tier` are irrelevant for ordering. + let health_1 = BeaconNodeHealth { + id: 1, + head: Slot::new(99), + optimistic_status: false, + execution_status: ExecutionEngineHealth::Healthy, + health_tier: BeaconNodeHealthTier::new(1, Slot::new(1)), + }; + + let health_2 = BeaconNodeHealth { + id: 2, + head: Slot::new(99), + optimistic_status: false, + execution_status: ExecutionEngineHealth::Healthy, + health_tier: BeaconNodeHealthTier::new(2, Slot::new(1)), + }; + + let health_3 = BeaconNodeHealth { + id: 3, + head: Slot::new(99), + optimistic_status: false, + execution_status: ExecutionEngineHealth::Healthy, + health_tier: BeaconNodeHealthTier::new(3, Slot::new(1)), + }; + + let health_4 = BeaconNodeHealth { + id: 4, + head: Slot::new(99), + optimistic_status: false, + execution_status: ExecutionEngineHealth::Healthy, + health_tier: BeaconNodeHealthTier::new(4, Slot::new(1)), + }; + + let health_5 = BeaconNodeHealth { + id: 5, + head: Slot::new(99), + optimistic_status: false, + execution_status: ExecutionEngineHealth::Unhealthy, + health_tier: BeaconNodeHealthTier::new(4, Slot::new(5)), + }; + + *candidate_1.health.write() = Some(health_1); + *candidate_2.health.write() = Some(health_2); + *candidate_3.health.write() = Some(health_3); + *candidate_4.health.write() = Some(health_4); + *candidate_5.health.write() = Some(health_5); + + let mut candidates = vec![ + candidate_3, + candidate_5, + candidate_1, + candidate_4, + candidate_2, + ]; + let expected_candidates = vec![ + expected_candidate_1, + expected_candidate_2, + expected_candidate_3, + expected_candidate_4, + expected_candidate_5, + ]; + + candidates.sort(); + + assert_eq!(candidates, expected_candidates); + } +} diff --git a/validator_client/src/beacon_node_health.rs b/validator_client/src/beacon_node_health.rs new file mode 100644 index 0000000000..3ab6b473e0 --- /dev/null +++ b/validator_client/src/beacon_node_health.rs @@ -0,0 +1,363 @@ +use crate::beacon_node_fallback::Config; +use slot_clock::SlotClock; +use std::cmp::Ordering; +use std::fmt::{Debug, Display, Formatter}; +use types::Slot; + +// Sync distances between 0 and DEFAULT_SYNC_TOLERANCE are considered `synced`. +// Sync distance tiers are determined by the different modifiers. +const DEFAULT_SYNC_TOLERANCE: Slot = Slot::new(4); +const SYNC_DISTANCE_SMALL_MODIFIER: Slot = Slot::new(7); +const SYNC_DISTANCE_MEDIUM_MODIFIER: Slot = Slot::new(31); + +type HealthTier = u8; +type SyncDistance = Slot; +type OptimisticStatus = bool; + +/// Helpful enum which is used when pattern matching to determine health tier. +#[derive(PartialEq, Debug)] +pub enum SyncDistanceTier { + Synced, + Small, + Medium, + Large, +} + +/// Contains the different sync distance tiers which are determined at runtime by the +/// `sync_tolerance` CLI flag. +#[derive(Clone, Debug)] +pub struct BeaconNodeSyncDistanceTiers { + synced: SyncDistance, + small: SyncDistance, + medium: SyncDistance, +} + +impl BeaconNodeSyncDistanceTiers { + pub fn from_config(config: &Config) -> Self { + if let Some(sync_tolerance) = config.sync_tolerance { + Self { + synced: Slot::new(sync_tolerance), + small: Slot::new(sync_tolerance) + SYNC_DISTANCE_SMALL_MODIFIER, + medium: Slot::new(sync_tolerance) + SYNC_DISTANCE_MEDIUM_MODIFIER, + } + } else { + Self::default() + } + } + + /// Takes a given sync distance and determines its tier based on the `sync_tolerance` defined by + /// the CLI. + pub fn distance_tier(&self, distance: SyncDistance) -> SyncDistanceTier { + let distance = distance.as_u64(); + // Add 1 since we are using exclusive ranges. + let synced = self.synced.as_u64() + 1; + let small = self.small.as_u64() + 1; + let medium = self.medium.as_u64() + 1; + + if (0..synced).contains(&distance) { + SyncDistanceTier::Synced + } else if (synced..small).contains(&distance) { + SyncDistanceTier::Small + } else if (small..medium).contains(&distance) { + SyncDistanceTier::Medium + } else { + SyncDistanceTier::Large + } + } +} + +impl Default for BeaconNodeSyncDistanceTiers { + fn default() -> Self { + Self { + synced: DEFAULT_SYNC_TOLERANCE, + small: DEFAULT_SYNC_TOLERANCE + SYNC_DISTANCE_SMALL_MODIFIER, + medium: DEFAULT_SYNC_TOLERANCE + SYNC_DISTANCE_MEDIUM_MODIFIER, + } + } +} + +/// Execution Node health metrics. +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[allow(dead_code)] +pub enum ExecutionEngineHealth { + Healthy, + Unhealthy, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub struct BeaconNodeHealthTier { + pub tier: HealthTier, + pub sync_distance: SyncDistance, +} + +impl Display for BeaconNodeHealthTier { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Tier{}({})", self.tier, self.sync_distance) + } +} + +impl Ord for BeaconNodeHealthTier { + fn cmp(&self, other: &Self) -> Ordering { + self.tier.cmp(&other.tier) + } +} + +impl PartialOrd for BeaconNodeHealthTier { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl BeaconNodeHealthTier { + pub fn new(tier: HealthTier, sync_distance: SyncDistance) -> Self { + Self { + tier, + sync_distance, + } + } +} + +/// Beacon Node Health metrics. +#[derive(Debug, PartialEq, Eq)] +pub struct BeaconNodeHealth { + // The ID of the Beacon Node. This should correspond with its position in the `--beacon-nodes` + // list. Note that the ID field is used to tie-break nodes with the same health so that nodes + // with a lower ID are preferred. + pub id: usize, + // The slot number of the head. + pub head: Slot, + // Whether the node is optimistically synced. + pub optimistic_status: OptimisticStatus, + // The status of the nodes connected Execution Engine. + pub execution_status: ExecutionEngineHealth, + // The overall health tier of the Beacon Node. Used to rank the nodes for the purposes of + // fallbacks. + pub health_tier: BeaconNodeHealthTier, +} + +impl Ord for BeaconNodeHealth { + fn cmp(&self, other: &Self) -> Ordering { + let ordering = self.health_tier.cmp(&other.health_tier); + if ordering == Ordering::Equal { + // Tie-break node health by ID. + self.id.cmp(&other.id) + } else { + ordering + } + } +} + +impl PartialOrd for BeaconNodeHealth { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl BeaconNodeHealth { + pub fn from_status( + id: usize, + head: Slot, + optimistic_status: OptimisticStatus, + execution_status: ExecutionEngineHealth, + distance_tiers: &BeaconNodeSyncDistanceTiers, + slot_clock: &T, + ) -> Self { + let sync_distance = BeaconNodeHealth::compute_sync_distance(head, slot_clock); + let health_tier = BeaconNodeHealth::compute_health_tier( + sync_distance, + optimistic_status, + execution_status, + distance_tiers, + ); + + Self { + id, + head, + optimistic_status, + execution_status, + health_tier, + } + } + + pub fn get_id(&self) -> usize { + self.id + } + + pub fn get_health_tier(&self) -> BeaconNodeHealthTier { + self.health_tier + } + + fn compute_sync_distance(head: Slot, slot_clock: &T) -> SyncDistance { + // TODO(mac) May be worth distinguishing between nodes that are ahead of the `slot_clock`. + slot_clock + .now() + .map(|head_slot| head_slot.saturating_sub(head)) + .unwrap_or(Slot::max_value()) + } + + fn compute_health_tier( + sync_distance: SyncDistance, + optimistic_status: OptimisticStatus, + execution_status: ExecutionEngineHealth, + sync_distance_tiers: &BeaconNodeSyncDistanceTiers, + ) -> BeaconNodeHealthTier { + let sync_distance_tier = sync_distance_tiers.distance_tier(sync_distance); + let health = (sync_distance_tier, optimistic_status, execution_status); + + match health { + (SyncDistanceTier::Synced, false, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(1, sync_distance) + } + (SyncDistanceTier::Small, false, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(2, sync_distance) + } + (SyncDistanceTier::Synced, false, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(3, sync_distance) + } + (SyncDistanceTier::Medium, false, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(4, sync_distance) + } + (SyncDistanceTier::Synced, true, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(5, sync_distance) + } + (SyncDistanceTier::Synced, true, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(6, sync_distance) + } + (SyncDistanceTier::Small, false, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(7, sync_distance) + } + (SyncDistanceTier::Small, true, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(8, sync_distance) + } + (SyncDistanceTier::Small, true, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(9, sync_distance) + } + (SyncDistanceTier::Large, false, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(10, sync_distance) + } + (SyncDistanceTier::Medium, false, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(11, sync_distance) + } + (SyncDistanceTier::Medium, true, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(12, sync_distance) + } + (SyncDistanceTier::Medium, true, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(13, sync_distance) + } + (SyncDistanceTier::Large, false, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(14, sync_distance) + } + (SyncDistanceTier::Large, true, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(15, sync_distance) + } + (SyncDistanceTier::Large, true, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(16, sync_distance) + } + } + } +} + +#[cfg(test)] +mod tests { + + use super::ExecutionEngineHealth::{Healthy, Unhealthy}; + use super::{BeaconNodeHealth, BeaconNodeSyncDistanceTiers, SyncDistanceTier}; + use crate::beacon_node_fallback::Config; + use slot_clock::{SlotClock, TestingSlotClock}; + use std::time::Duration; + use types::Slot; + + #[test] + fn all_possible_health_tiers() { + let current_head = Slot::new(64); + + let config = Config::default(); + let beacon_node_sync_distance_tiers = BeaconNodeSyncDistanceTiers::from_config(&config); + + let slot_clock = + TestingSlotClock::new(current_head, Duration::from_secs(0), Duration::from_secs(1)); + + let mut health_vec = vec![]; + + for head_slot in (0..=64).rev() { + for optimistic_status in &[false, true] { + for ee_health in &[Healthy, Unhealthy] { + let health = BeaconNodeHealth::from_status( + 0, + Slot::new(head_slot), + *optimistic_status, + *ee_health, + &beacon_node_sync_distance_tiers, + &slot_clock, + ); + health_vec.push(health); + } + } + } + + for health in health_vec { + let health_tier = health.get_health_tier(); + let tier = health_tier.tier; + let distance = health_tier.sync_distance; + + let distance_tier = beacon_node_sync_distance_tiers.distance_tier(distance); + + // Check sync distance. + if [1, 3, 5, 6].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Synced) + } else if [2, 7, 8, 9].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Small); + } else if [4, 11, 12, 13].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Medium); + } else { + assert!(distance_tier == SyncDistanceTier::Large); + } + + // Check optimistic status. + if [1, 2, 3, 4, 7, 10, 11, 14].contains(&tier) { + assert!(!health.optimistic_status); + } else { + assert!(health.optimistic_status); + } + + // Check execution health. + if [3, 6, 7, 9, 11, 13, 14, 16].contains(&tier) { + assert_eq!(health.execution_status, Unhealthy); + } else { + assert_eq!(health.execution_status, Healthy); + } + } + } + + #[test] + fn sync_tolerance() { + let config = Config { + disable_run_on_all: false, + sync_tolerance: Some(8), + }; + let distance_tiers = BeaconNodeSyncDistanceTiers::from_config(&config); + + let synced_low = + BeaconNodeHealth::compute_health_tier(Slot::new(0), false, Healthy, &distance_tiers); + let synced_high = + BeaconNodeHealth::compute_health_tier(Slot::new(8), false, Healthy, &distance_tiers); + let small_low = + BeaconNodeHealth::compute_health_tier(Slot::new(9), false, Healthy, &distance_tiers); + let small_high = + BeaconNodeHealth::compute_health_tier(Slot::new(15), false, Healthy, &distance_tiers); + let medium_low = + BeaconNodeHealth::compute_health_tier(Slot::new(16), false, Healthy, &distance_tiers); + let medium_high = + BeaconNodeHealth::compute_health_tier(Slot::new(39), false, Healthy, &distance_tiers); + let large = + BeaconNodeHealth::compute_health_tier(Slot::new(40), false, Healthy, &distance_tiers); + + assert!(synced_low.tier == 1); + assert!(synced_high.tier == 1); + assert!(small_low.tier == 2); + assert!(small_high.tier == 2); + assert!(medium_low.tier == 4); + assert!(medium_high.tier == 4); + assert!(large.tier == 10); + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 2a09455b6f..8b0b5dcacb 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -145,14 +145,14 @@ pub struct ProposerFallback { impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. - pub async fn first_success_try_proposers_first<'a, F, O, Err, R>( - &'a self, + pub async fn first_success_try_proposers_first( + &self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where - F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, + F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, Err: Debug, { @@ -173,14 +173,14 @@ impl ProposerFallback { } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. - pub async fn first_success_try_proposers_last<'a, F, O, Err, R>( - &'a self, + pub async fn first_success_try_proposers_last( + &self, require_synced: RequireSynced, offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where - F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, + F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, Err: Debug, { diff --git a/validator_client/src/check_synced.rs b/validator_client/src/check_synced.rs index fb88d33dae..be6d591c9e 100644 --- a/validator_client/src/check_synced.rs +++ b/validator_client/src/check_synced.rs @@ -1,5 +1,5 @@ use crate::beacon_node_fallback::CandidateError; -use eth2::BeaconNodeHttpClient; +use eth2::{types::Slot, BeaconNodeHttpClient}; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; @@ -70,6 +70,8 @@ pub async fn check_synced( "local_slot" => local_slot, "endpoint" => %beacon_node, ); + + return Err(CandidateError::TimeDiscrepancy); } } } @@ -80,3 +82,29 @@ pub async fn check_synced( Err(CandidateError::NotSynced) } } + +pub async fn check_node_health( + beacon_node: &BeaconNodeHttpClient, + log: &Logger, +) -> Result<(Slot, bool, bool), CandidateError> { + let resp = match beacon_node.get_node_syncing().await { + Ok(resp) => resp, + Err(e) => { + warn!( + log, + "Unable connect to beacon node"; + "error" => %e + ); + + return Err(CandidateError::Offline); + } + }; + + Ok(( + resp.data.head_slot, + // Note that optimistic and EL status will both default to their healthy variants which may + // be undesirable. + resp.data.is_optimistic.unwrap_or(false), + resp.data.el_offline.unwrap_or(false), + )) +} diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 0789ac78a0..012c31ff73 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -363,6 +363,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("500") .takes_value(true), ) + .arg( + Arg::with_name("beacon-node-sync-tolerance") + .long("beacon-node-sync-tolerance") + .help("Sets the number of slots behind the head that each connected Beacon Node can be \ + to still be considered synced. Effectively this gives more priority to the first \ + connected Beacon Node.") + .takes_value(true), + ) /* * Experimental/development options. */ diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 7c662db937..81e1ac1411 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,5 +1,5 @@ use crate::graffiti_file::GraffitiFile; -use crate::{http_api, http_metrics}; +use crate::{beacon_node_fallback, http_api, http_metrics}; use clap::ArgMatches; use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required}; use directory::{ @@ -19,7 +19,7 @@ use types::{Address, GRAFFITI_BYTES_LEN}; pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; /// Stores the core configuration for this validator instance. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { /// The data directory, which stores all validator databases pub validator_dir: PathBuf, @@ -50,6 +50,8 @@ pub struct Config { pub http_api: http_api::Config, /// Configuration for the HTTP REST API. pub http_metrics: http_metrics::Config, + /// Configuration for the Beacon Node fallback. + pub beacon_node_fallback: beacon_node_fallback::Config, /// Configuration for sending metrics to a remote explorer endpoint. pub monitoring_api: Option, /// If true, enable functionality that monitors the network for attestations or proposals from @@ -73,8 +75,6 @@ pub struct Config { /// /// This is *not* recommended in prod and should only be used for testing. pub block_delay: Option, - /// Disables publishing http api requests to all beacon nodes for select api calls. - pub disable_run_on_all: bool, /// Enables a service which attempts to measure latency between the VC and BNs. pub enable_latency_measurement_service: bool, /// Defines the number of validators per `validator/register_validator` request sent to the BN. @@ -109,6 +109,7 @@ impl Default for Config { fee_recipient: None, http_api: <_>::default(), http_metrics: <_>::default(), + beacon_node_fallback: <_>::default(), monitoring_api: None, enable_doppelganger_protection: false, enable_high_validator_count_metrics: false, @@ -117,7 +118,6 @@ impl Default for Config { builder_proposals: false, builder_registration_timestamp_override: None, gas_limit: None, - disable_run_on_all: false, enable_latency_measurement_service: true, validator_registration_batch_size: 500, } @@ -215,7 +215,6 @@ impl Config { "msg" => "it no longer has any effect", ); } - config.disable_run_on_all = cli_args.is_present("disable-run-on-all"); config.disable_auto_discover = cli_args.is_present("disable-auto-discover"); config.init_slashing_protection = cli_args.is_present("init-slashing-protection"); config.use_long_timeouts = cli_args.is_present("use-long-timeouts"); @@ -258,6 +257,20 @@ impl Config { config.beacon_nodes_tls_certs = Some(tls_certs.split(',').map(PathBuf::from).collect()); } + /* + * Beacon node fallback + */ + + config.beacon_node_fallback.disable_run_on_all = cli_args.is_present("disable-run-on-all"); + + if let Some(sync_tolerance) = cli_args.value_of("beacon-node-sync-tolerance") { + config.beacon_node_fallback.sync_tolerance = Some( + sync_tolerance + .parse::() + .map_err(|_| "beacon-node-sync-tolerance is not a valid u64.")?, + ); + } + /* * Http API server */ diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6f071055a4..de6dc55f5b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,5 +1,6 @@ mod attestation_service; mod beacon_node_fallback; +mod beacon_node_health; mod block_service; mod check_synced; mod cli; @@ -334,15 +335,18 @@ impl ProductionValidatorClient { .collect::, String>>()?; let num_nodes = beacon_nodes.len(); + let candidates = beacon_nodes .into_iter() - .map(CandidateBeaconNode::new) + .zip(0..num_nodes) + .map(|(node, id)| CandidateBeaconNode::new(node, id)) .collect(); let proposer_nodes_num = proposer_nodes.len(); let proposer_candidates = proposer_nodes .into_iter() - .map(CandidateBeaconNode::new) + .zip(0..num_nodes) + .map(|(node, id)| CandidateBeaconNode::new(node, id)) .collect(); // Set the count for beacon node fallbacks excluding the primary beacon node. @@ -364,14 +368,14 @@ impl ProductionValidatorClient { let mut beacon_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( candidates, - config.disable_run_on_all, + config.beacon_node_fallback, context.eth2_config.spec.clone(), log.clone(), ); let mut proposer_nodes: BeaconNodeFallback<_, T> = BeaconNodeFallback::new( proposer_candidates, - config.disable_run_on_all, + config.beacon_node_fallback, context.eth2_config.spec.clone(), log.clone(), ); @@ -625,10 +629,10 @@ async fn init_from_beacon_node( proposer_nodes.update_all_candidates().await; let num_available = beacon_nodes.num_available().await; - let num_total = beacon_nodes.num_total(); + let num_total = beacon_nodes.num_total().await; let proposer_available = proposer_nodes.num_available().await; - let proposer_total = proposer_nodes.num_total(); + let proposer_total = proposer_nodes.num_total().await; if proposer_total > 0 && proposer_available == 0 { warn!( diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 909e64a78a..e8b5f4c5ba 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -49,7 +49,7 @@ async fn notify( &http_metrics::metrics::SYNCED_BEACON_NODES_COUNT, num_synced as i64, ); - let num_total = duties_service.beacon_nodes.num_total(); + let num_total = duties_service.beacon_nodes.num_total().await; set_gauge( &http_metrics::metrics::TOTAL_BEACON_NODES_COUNT, num_total as i64,