From a62dc65ca49676e4f2d812bca21294aed6ef2d9c Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 18 Dec 2020 09:17:03 +0000 Subject: [PATCH] BN Fallback v2 (#2080) ## Issue Addressed - Resolves #1883 ## Proposed Changes This follows on from @blacktemplar's work in #2018. - Allows the VC to connect to multiple BN for redundancy. - Update the simulator so some nodes always need to rely on their fallback. - Adds some extra deprecation warnings for `--eth1-endpoint` - Pass `SignatureBytes` as a reference instead of by value. ## Additional Info NA Co-authored-by: blacktemplar --- Cargo.lock | 1 + beacon_node/http_api/tests/tests.rs | 2 +- beacon_node/src/config.rs | 5 + common/eth2/src/lib.rs | 14 +- lcli/src/eth1_genesis.rs | 5 +- testing/simulator/src/eth1_sim.rs | 11 +- testing/simulator/src/local_network.rs | 10 +- testing/simulator/src/no_eth1_sim.rs | 2 +- testing/simulator/src/sync_sim.rs | 2 +- validator_client/Cargo.toml | 1 + validator_client/src/attestation_service.rs | 75 ++- validator_client/src/beacon_node_fallback.rs | 463 ++++++++++++++++++ validator_client/src/block_service.rs | 58 ++- .../src/{is_synced.rs => check_synced.rs} | 32 +- validator_client/src/cli.rs | 17 +- validator_client/src/config.rs | 28 +- validator_client/src/duties_service.rs | 91 ++-- validator_client/src/fork_service.rs | 103 ++-- validator_client/src/http_api/tests.rs | 6 +- validator_client/src/http_metrics/metrics.rs | 13 + validator_client/src/lib.rs | 186 +++---- validator_client/src/notifier.rs | 31 +- validator_client/src/validator_store.rs | 7 +- 23 files changed, 882 insertions(+), 281 deletions(-) create mode 100644 validator_client/src/beacon_node_fallback.rs rename validator_client/src/{is_synced.rs => check_synced.rs} (67%) diff --git a/Cargo.lock b/Cargo.lock index 81fdf1dcd8..a0bb005933 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7141,6 +7141,7 @@ dependencies = [ "eth2_ssz", "eth2_ssz_derive", "exit-future", + "fallback", "futures 0.3.8", "hex", "hyper 0.13.9", diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 3a426b3915..18d6c332d9 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1643,7 +1643,7 @@ impl ApiTester { let block = self .client - .get_validator_blocks::(slot, randao_reveal, None) + .get_validator_blocks::(slot, &randao_reveal, None) .await .unwrap() .data; diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 699e8d4189..437a946208 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -170,6 +170,11 @@ pub fn get_config( // Defines the URL to reach the eth1 node. if let Some(val) = cli_args.value_of("eth1-endpoint") { + warn!( + log, + "The --eth1-endpoint flag is deprecated"; + "msg" => "please use --eth1-endpoints instead" + ); client_config.sync_eth1_chain = true; client_config.eth1.endpoints = vec![val.to_string()]; } else if let Some(val) = cli_args.value_of("eth1-endpoints") { diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index d5104b0798..902d5fd28a 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -83,6 +83,18 @@ pub struct BeaconNodeHttpClient { server: Url, } +impl fmt::Display for BeaconNodeHttpClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.server.fmt(f) + } +} + +impl AsRef for BeaconNodeHttpClient { + fn as_ref(&self) -> &str { + self.server.as_str() + } +} + impl BeaconNodeHttpClient { pub fn new(server: Url) -> Self { Self { @@ -848,7 +860,7 @@ impl BeaconNodeHttpClient { pub async fn get_validator_blocks( &self, slot: Slot, - randao_reveal: SignatureBytes, + randao_reveal: &SignatureBytes, graffiti: Option<&Graffiti>, ) -> Result>, Error> { let mut path = self.eth_path()?; diff --git a/lcli/src/eth1_genesis.rs b/lcli/src/eth1_genesis.rs index 7b91d11410..bd064abe68 100644 --- a/lcli/src/eth1_genesis.rs +++ b/lcli/src/eth1_genesis.rs @@ -15,7 +15,10 @@ pub const ETH1_GENESIS_UPDATE_INTERVAL: Duration = Duration::from_millis(7_000); pub fn run(mut env: Environment, matches: &ArgMatches<'_>) -> Result<(), String> { let endpoints = matches .value_of("eth1-endpoint") - .map(|e| vec![String::from(e)]) + .map(|e| { + warn!("The --eth1-endpoint flag is deprecated. Please use --eth1-endpoints instead"); + vec![String::from(e)] + }) .or_else(|| { matches .value_of("eth1-endpoints") diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index acabc11d2e..f54a59db89 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -1,3 +1,4 @@ +use crate::local_network::INVALID_ADDRESS; use crate::{checks, LocalNetwork, E}; use clap::ArgMatches; use eth1::http::Eth1Id; @@ -128,8 +129,12 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { /* * One by one, add beacon nodes to the network. */ - for _ in 0..node_count - 1 { - network.add_beacon_node(beacon_config.clone()).await?; + for i in 0..node_count - 1 { + let mut config = beacon_config.clone(); + if i % 2 == 0 { + config.eth1.endpoints.insert(0, INVALID_ADDRESS.to_string()); + } + network.add_beacon_node(config).await?; } /* @@ -137,7 +142,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { */ for (i, files) in validator_files.into_iter().enumerate() { network - .add_validator_client(testing_validator_config(), i, files) + .add_validator_client(testing_validator_config(), i, files, i % 2 == 0) .await?; } diff --git a/testing/simulator/src/local_network.rs b/testing/simulator/src/local_network.rs index 3efccea417..d4f1aa6ca6 100644 --- a/testing/simulator/src/local_network.rs +++ b/testing/simulator/src/local_network.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use types::{Epoch, EthSpec}; const BOOTNODE_PORT: u16 = 42424; +pub const INVALID_ADDRESS: &str = "http://127.0.0.1:42423"; /// Helper struct to reduce `Arc` usage. pub struct Inner { @@ -118,6 +119,7 @@ impl LocalNetwork { mut validator_config: ValidatorConfig, beacon_node: usize, validator_files: ValidatorFiles, + invalid_first_beacon_node: bool, //to test beacon node fallbacks ) -> Result<(), String> { let index = self.validator_clients.read().len(); let context = self.context.service_context(format!("validator_{}", index)); @@ -133,8 +135,12 @@ impl LocalNetwork { .expect("Must have http started") }; - validator_config.beacon_node = - format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); + let beacon_node = format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); + validator_config.beacon_nodes = if invalid_first_beacon_node { + vec![INVALID_ADDRESS.to_string(), beacon_node] + } else { + vec![beacon_node] + }; let validator_client = LocalValidatorClient::production_with_insecure_keypairs( context, validator_config, diff --git a/testing/simulator/src/no_eth1_sim.rs b/testing/simulator/src/no_eth1_sim.rs index 56296ed8d8..fb2054608b 100644 --- a/testing/simulator/src/no_eth1_sim.rs +++ b/testing/simulator/src/no_eth1_sim.rs @@ -103,7 +103,7 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let add_validators_fut = async { for (i, files) in validator_files.into_iter().enumerate() { network - .add_validator_client(testing_validator_config(), i, files) + .add_validator_client(testing_validator_config(), i, files, i % 2 == 0) .await?; } diff --git a/testing/simulator/src/sync_sim.rs b/testing/simulator/src/sync_sim.rs index 22ac0432d8..37eb7a1f0b 100644 --- a/testing/simulator/src/sync_sim.rs +++ b/testing/simulator/src/sync_sim.rs @@ -94,7 +94,7 @@ fn syncing_sim( * Add a validator client which handles all validators from the genesis state. */ network - .add_validator_client(testing_validator_config(), 0, validator_files) + .add_validator_client(testing_validator_config(), 0, validator_files, true) .await?; // Check all syncing strategies one after other. diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 0af1dccee2..c06bc0e75e 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -63,3 +63,4 @@ rand = "0.7.3" scrypt = { version = "0.5.0", default-features = false } lighthouse_metrics = { path = "../common/lighthouse_metrics" } lazy_static = "1.4.0" +fallback = { path = "../common/fallback" } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 794b59e384..5d244c1635 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,10 +1,10 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, validator_store::ValidatorStore, }; use environment::RuntimeContext; -use eth2::BeaconNodeHttpClient; use futures::future::FutureExt; use futures::StreamExt; use slog::{crit, error, info, trace}; @@ -24,7 +24,7 @@ pub struct AttestationServiceBuilder { duties_service: Option>, validator_store: Option>, slot_clock: Option, - beacon_node: Option, + beacon_nodes: Option>>, context: Option>, } @@ -34,7 +34,7 @@ impl AttestationServiceBuilder { duties_service: None, validator_store: None, slot_clock: None, - beacon_node: None, + beacon_nodes: None, context: None, } } @@ -54,8 +54,8 @@ impl AttestationServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self { - self.beacon_node = Some(beacon_node); + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); self } @@ -76,9 +76,9 @@ impl AttestationServiceBuilder { slot_clock: self .slot_clock .ok_or("Cannot build AttestationService without slot_clock")?, - beacon_node: self - .beacon_node - .ok_or("Cannot build AttestationService without beacon_node")?, + beacon_nodes: self + .beacon_nodes + .ok_or("Cannot build AttestationService without beacon_nodes")?, context: self .context .ok_or("Cannot build AttestationService without runtime_context")?, @@ -92,7 +92,7 @@ pub struct Inner { duties_service: DutiesService, validator_store: ValidatorStore, slot_clock: T, - beacon_node: BeaconNodeHttpClient, + beacon_nodes: Arc>, context: RuntimeContext, } @@ -337,11 +337,16 @@ impl AttestationService { .epoch(E::slots_per_epoch()); let attestation_data = self - .beacon_node - .get_validator_attestation_data(slot, committee_index) + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .get_validator_attestation_data(slot, committee_index) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e))? - .data; + .map_err(|e| e.to_string())?; let mut attestations = Vec::with_capacity(validator_duties.len()); @@ -408,9 +413,14 @@ impl AttestationService { } } + let attestations_slice = attestations.as_slice(); match self - .beacon_node - .post_beacon_pool_attestations(attestations.as_slice()) + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_beacon_pool_attestations(attestations_slice) + .await + }) .await { Ok(()) => info!( @@ -425,7 +435,7 @@ impl AttestationService { Err(e) => error!( log, "Unable to publish attestations"; - "error" => ?e, + "error" => %e, "committee_index" => attestation_data.index, "slot" => slot.as_u64(), "type" => "unaggregated", @@ -455,16 +465,22 @@ impl AttestationService { ) -> Result<(), String> { let log = self.context.log(); + let attestation_data_ref = &attestation_data; let aggregated_attestation = self - .beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .get_validator_aggregate_attestation( + attestation_data_ref.slot, + attestation_data_ref.tree_hash_root(), + ) + .await + .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data_ref)) + .map(|result| result.data) + }) .await - .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data))? - .data; + .map_err(|e| e.to_string())?; let mut signed_aggregate_and_proofs = Vec::new(); @@ -507,9 +523,14 @@ impl AttestationService { } if !signed_aggregate_and_proofs.is_empty() { + let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); match self - .beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs.as_slice()) + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) + .await + }) .await { Ok(()) => { @@ -533,7 +554,7 @@ impl AttestationService { crit!( log, "Failed to publish attestation"; - "error" => e.to_string(), + "error" => %e, "committee_index" => attestation.data.index, "slot" => attestation.data.slot.as_u64(), "type" => "aggregated", diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs new file mode 100644 index 0000000000..7bc9eeca92 --- /dev/null +++ b/validator_client/src/beacon_node_fallback.rs @@ -0,0 +1,463 @@ +//! Allows for a list of `BeaconNodeHttpClient` to appear as a single entity which will exhibits +//! "fallback" behaviour; it will try a request on all of the nodes until one or none of them +//! succeed. + +use crate::check_synced::check_synced; +use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS}; +use environment::RuntimeContext; +use eth2::BeaconNodeHttpClient; +use futures::future; +use slog::{error, info, warn, Logger}; +use slot_clock::SlotClock; +use std::fmt; +use std::fmt::Debug; +use std::future::Future; +use std::marker::PhantomData; +use std::sync::Arc; +use std::time::Duration; +use tokio::{sync::RwLock, time::sleep}; +use types::{ChainSpec, EthSpec}; + +/// The number of seconds *prior* to slot start that we will try and update the state of fallback +/// nodes. +/// +/// Ideally this should be somewhere between 2/3rds through the slot and the end of it. If we set it +/// too early, we risk switching nodes between the time of publishing an attestation and publishing +/// an aggregate; this may result in a missed aggregation. If we set this time too late, we risk not +/// having the correct nodes up and running prior to the start of the slot. +const SLOT_LOOKAHEAD: Duration = Duration::from_secs(1); + +/// Starts a service that will routinely try and update the status of the provided `beacon_nodes`. +/// +/// See `SLOT_LOOKAHEAD` for information about when this should run. +pub fn start_fallback_updater_service( + context: RuntimeContext, + beacon_nodes: Arc>, +) -> Result<(), &'static str> { + let executor = context.executor; + if beacon_nodes.slot_clock.is_none() { + return Err("Cannot start fallback updater without slot clock"); + } + + let future = async move { + loop { + beacon_nodes.update_unready_candidates().await; + + let sleep_time = beacon_nodes + .slot_clock + .as_ref() + .and_then(|slot_clock| { + let slot = slot_clock.now()?; + let till_next_slot = slot_clock.duration_to_slot(slot + 1)?; + + till_next_slot.checked_sub(SLOT_LOOKAHEAD) + }) + .unwrap_or_else(|| Duration::from_secs(1)); + + sleep(sleep_time).await + } + }; + + executor.spawn(future, "fallback"); + + Ok(()) +} + +/// Indicates if a beacon node must be synced before some action is performed on it. +#[derive(PartialEq, Clone, Copy)] +pub enum RequireSynced { + Yes, + No, +} + +impl PartialEq for RequireSynced { + fn eq(&self, other: &bool) -> bool { + if *other { + *self == RequireSynced::Yes + } else { + *self == RequireSynced::No + } + } +} + +#[derive(Debug)] +pub enum Error { + /// The node was unavailable and we didn't attempt to contact it. + Unavailable(CandidateError), + /// We attempted to contact the node but it failed. + RequestFailed(E), +} + +impl Error { + pub fn request_failure(&self) -> Option<&E> { + match self { + Error::RequestFailed(e) => Some(e), + _ => None, + } + } +} + +/// The list of errors encountered whilst attempting to perform a query. +pub struct AllErrored(pub Vec<(String, Error)>); + +impl fmt::Display for AllErrored { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "All endpoints failed")?; + for (i, (id, error)) in self.0.iter().enumerate() { + let comma = if i + 1 < self.0.len() { "," } else { "" }; + + write!(f, " {} => {:?}{}", id, error, comma)?; + } + Ok(()) + } +} + +/// Reasons why a candidate might not be ready. +#[derive(Debug, Clone, Copy)] +pub enum CandidateError { + Uninitialized, + Offline, + Incompatible, + NotSynced, +} + +/// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used +/// for a query. +pub struct CandidateBeaconNode { + beacon_node: BeaconNodeHttpClient, + status: RwLock>, + _phantom: PhantomData, +} + +impl CandidateBeaconNode { + /// Instantiate a new node. + pub fn new(beacon_node: BeaconNodeHttpClient) -> Self { + Self { + beacon_node, + status: RwLock::new(Err(CandidateError::Uninitialized)), + _phantom: PhantomData, + } + } + + /// Returns the status of `self`. + /// + /// If `RequiredSynced::No`, any `NotSynced` node will be ignored and mapped to `Ok(())`. + pub async fn status(&self, synced: RequireSynced) -> Result<(), CandidateError> { + match *self.status.read().await { + Err(CandidateError::NotSynced) if synced == false => Ok(()), + other => other, + } + } + + /// Indicate that `self` is offline. + pub async fn set_offline(&self) { + *self.status.write().await = Err(CandidateError::Offline) + } + + /// Perform some queries against the node to determine if it is a good candidate, updating + /// `self.status` and returning that result. + pub async fn refresh_status( + &self, + slot_clock: Option<&T>, + spec: &ChainSpec, + log: &Logger, + ) -> Result<(), CandidateError> { + let mut status = self.status.write().await; + + if let Err(e) = self.is_online(log).await { + *status = Err(e); + } else if let Err(e) = self.is_compatible(spec, log).await { + *status = Err(e); + } else if let Err(e) = self.is_synced(slot_clock, log).await { + *status = Err(e); + } else { + *status = Ok(()) + } + + *status + } + + /// Checks if the node is reachable. + async fn is_online(&self, log: &Logger) -> Result<(), CandidateError> { + let result = self + .beacon_node + .get_node_version() + .await + .map(|body| body.data.version); + + match result { + Ok(version) => { + info!( + log, + "Connected to beacon node"; + "version" => version, + "endpoint" => %self.beacon_node, + ); + Ok(()) + } + Err(e) => { + warn!( + log, + "Offline beacon node"; + "error" => %e, + "endpoint" => %self.beacon_node, + ); + Err(CandidateError::Offline) + } + } + } + + /// Checks if the node has the correct specification. + async fn is_compatible(&self, spec: &ChainSpec, log: &Logger) -> Result<(), CandidateError> { + let yaml_config = self + .beacon_node + .get_config_spec() + .await + .map_err(|e| { + error!( + log, + "Unable to read spec from beacon node"; + "error" => %e, + "endpoint" => %self.beacon_node, + ); + CandidateError::Offline + })? + .data; + + let beacon_node_spec = yaml_config + .apply_to_chain_spec::(&E::default_spec()) + .ok_or_else(|| { + error!( + log, + "The minimal/mainnet spec type of the beacon node does not match the validator \ + client. See the --network command."; + "endpoint" => %self.beacon_node, + ); + CandidateError::Incompatible + })?; + + if *spec == beacon_node_spec { + Ok(()) + } else { + error!( + log, + "The beacon node is using a different Eth2 specification to this validator client. \ + See the --network command."; + "endpoint" => %self.beacon_node, + ); + Err(CandidateError::Incompatible) + } + } + + /// Checks if the beacon node is synced. + async fn is_synced( + &self, + slot_clock: Option<&T>, + log: &Logger, + ) -> Result<(), CandidateError> { + if let Some(slot_clock) = slot_clock { + match check_synced(&self.beacon_node, slot_clock, Some(log)).await { + r @ Err(CandidateError::NotSynced) => { + warn!( + log, + "Beacon node is not synced"; + "endpoint" => %self.beacon_node, + ); + r + } + result => result, + } + } else { + // Skip this check if we don't supply a slot clock. + Ok(()) + } + } +} + +/// A collection of `CandidateBeaconNode` that can be used to perform requests with "fallback" +/// behaviour, where the failure of one candidate results in the next candidate receiving an +/// identical query. +pub struct BeaconNodeFallback { + candidates: Vec>, + slot_clock: Option, + spec: ChainSpec, + log: Logger, +} + +impl BeaconNodeFallback { + pub fn new(candidates: Vec>, spec: ChainSpec, log: Logger) -> Self { + Self { + candidates, + slot_clock: None, + spec, + log, + } + } + + /// Used to update the slot clock post-instantiation. + /// + /// This is the result of a chicken-and-egg issue where `Self` needs a slot clock for some + /// operations, but `Self` is required to obtain the slot clock since we need the genesis time + /// from a beacon node. + pub fn set_slot_clock(&mut self, slot_clock: T) { + self.slot_clock = Some(slot_clock); + } + + /// The count of candidates, regardless of their state. + pub async fn num_total(&self) -> usize { + self.candidates.len() + } + + /// The count of synced and ready candidates. + pub async fn num_synced(&self) -> usize { + let mut n = 0; + for candidate in &self.candidates { + if candidate.status(RequireSynced::Yes).await.is_ok() { + n += 1 + } + } + n + } + + /// The count of candidates that are online and compatible, but not necessarily synced. + pub async fn num_available(&self) -> usize { + let mut n = 0; + for candidate in &self.candidates { + if candidate.status(RequireSynced::No).await.is_ok() { + n += 1 + } + } + n + } + + /// Loop through any `self.candidates` that we don't think are online, compatible or synced and + /// poll them to see if their status has changed. + /// + /// We do not poll nodes that are synced to avoid sending additional requests when everything is + /// going smoothly. + pub async fn update_unready_candidates(&self) { + let mut futures = Vec::new(); + for candidate in &self.candidates { + // There is a potential race condition between having the read lock and the write + // lock. The worst case of this race is running `try_become_ready` twice, which is + // acceptable. + // + // Note: `RequireSynced` is always set to false here. This forces us to recheck the sync + // status of nodes that were previously not-synced. + if candidate.status(RequireSynced::Yes).await.is_err() { + // There exists a race-condition that could result in `refresh_status` being called + // when the status does not require refreshing anymore. This deemed is an + // acceptable inefficiency. + futures.push(candidate.refresh_status( + self.slot_clock.as_ref(), + &self.spec, + &self.log, + )); + } + } + + //run all updates concurrently and ignore results + let _ = future::join_all(futures).await; + } + + /// Run `func` against each candidate in `self`, returning immediately if a result is found. + /// Otherwise, return all the errors encountered along the way. + /// + /// First this function will try all nodes with a suitable status. If no candidates are suitable + /// or all the requests fail, it will try updating the status of all unsuitable nodes and + /// re-running `func` again. + pub async fn first_success<'a, F, O, Err, R>( + &'a self, + require_synced: RequireSynced, + func: F, + ) -> Result> + where + F: Fn(&'a BeaconNodeHttpClient) -> R, + R: Future>, + { + let mut errors = vec![]; + let mut to_retry = vec![]; + let mut retry_unsynced = vec![]; + + // Run `func` using a `candidate`, returning the value or capturing errors. + // + // We use a macro instead of a closure here since it is not trivial to move `func` into a + // closure. + macro_rules! try_func { + ($candidate: ident) => {{ + inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]); + + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(&$candidate.beacon_node).await { + Ok(val) => return Ok(val), + Err(e) => { + // If we have an error on this function, make the client as not-ready. + // + // There exists a race condition where the candidate may have been marked + // as ready between the `func` call and now. We deem this an acceptable + // inefficiency. + $candidate.set_offline().await; + errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); + inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); + } + } + }}; + } + + // First pass: try `func` on all synced and ready candidates. + // + // This ensures that we always choose a synced node if it is available. + for candidate in &self.candidates { + match candidate.status(RequireSynced::Yes).await { + Err(e @ CandidateError::NotSynced) if require_synced == false => { + // This client is unsynced we will try it after trying all synced clients + retry_unsynced.push(candidate); + errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); + } + Err(e) => { + // This client was not ready on the first pass, we might try it again later. + to_retry.push(candidate); + errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); + } + _ => try_func!(candidate), + } + } + + // Second pass: try `func` on ready unsynced candidates. This only runs if we permit + // unsynced candidates. + // + // Due to async race-conditions, it is possible that we will send a request to a candidate + // that has been set to an offline/unready status. This is acceptable. + if require_synced == false { + for candidate in retry_unsynced { + try_func!(candidate); + } + } + + // Third pass: try again, attempting to make non-ready clients become ready. + for candidate in to_retry { + // If the candidate hasn't luckily transferred into the correct state in the meantime, + // force an update of the state. + let new_status = match candidate.status(require_synced).await { + Ok(()) => Ok(()), + Err(_) => { + candidate + .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) + .await + } + }; + + match new_status { + Ok(()) => try_func!(candidate), + Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), + Err(e) => { + errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); + } + } + } + + // There were no candidates already ready and we were unable to make any of them ready. + Err(AllErrored(errors)) + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index d34947a139..d4e42efc67 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,6 +1,7 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{http_metrics::metrics, validator_store::ValidatorStore}; use environment::RuntimeContext; -use eth2::{types::Graffiti, BeaconNodeHttpClient}; +use eth2::types::Graffiti; use futures::channel::mpsc::Receiver; use futures::{StreamExt, TryFutureExt}; use slog::{crit, debug, error, info, trace, warn}; @@ -13,7 +14,7 @@ use types::{EthSpec, PublicKey, Slot}; pub struct BlockServiceBuilder { validator_store: Option>, slot_clock: Option>, - beacon_node: Option, + beacon_nodes: Option>>, context: Option>, graffiti: Option, } @@ -23,7 +24,7 @@ impl BlockServiceBuilder { Self { validator_store: None, slot_clock: None, - beacon_node: None, + beacon_nodes: None, context: None, graffiti: None, } @@ -39,8 +40,8 @@ impl BlockServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self { - self.beacon_node = Some(beacon_node); + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); self } @@ -63,8 +64,8 @@ impl BlockServiceBuilder { slot_clock: self .slot_clock .ok_or("Cannot build BlockService without slot_clock")?, - beacon_node: self - .beacon_node + beacon_nodes: self + .beacon_nodes .ok_or("Cannot build BlockService without beacon_node")?, context: self .context @@ -79,7 +80,7 @@ impl BlockServiceBuilder { pub struct Inner { validator_store: ValidatorStore, slot_clock: Arc, - beacon_node: BeaconNodeHttpClient, + beacon_nodes: Arc>, context: RuntimeContext, graffiti: Option, } @@ -222,24 +223,37 @@ impl BlockService { let randao_reveal = self .validator_store .randao_reveal(&validator_pubkey, slot.epoch(E::slots_per_epoch())) - .ok_or("Unable to produce randao reveal")?; - - let block = self - .beacon_node - .get_validator_blocks(slot, randao_reveal.into(), self.graffiti.as_ref()) - .await - .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))? - .data; + .ok_or("Unable to produce randao reveal")? + .into(); + let randao_reveal_ref = &randao_reveal; + let self_ref = &self; + let validator_pubkey_ref = &validator_pubkey; let signed_block = self - .validator_store - .sign_block(&validator_pubkey, block, current_slot) - .ok_or("Unable to sign block")?; + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + let block = beacon_node + .get_validator_blocks(slot, randao_reveal_ref, self_ref.graffiti.as_ref()) + .await + .map_err(|e| format!("Error from beacon node when producing block: {:?}", e))? + .data; - self.beacon_node - .post_beacon_blocks(&signed_block) + let signed_block = self_ref + .validator_store + .sign_block(validator_pubkey_ref, block, current_slot) + .ok_or("Unable to sign block")?; + + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + format!("Error from beacon node when publishing block: {:?}", e) + })?; + + Ok::<_, String>(signed_block) + }) .await - .map_err(|e| format!("Error from beacon node when publishing block: {:?}", e))?; + .map_err(|e| e.to_string())?; info!( log, diff --git a/validator_client/src/is_synced.rs b/validator_client/src/check_synced.rs similarity index 67% rename from validator_client/src/is_synced.rs rename to validator_client/src/check_synced.rs index f967d629c1..9717742ba0 100644 --- a/validator_client/src/is_synced.rs +++ b/validator_client/src/check_synced.rs @@ -1,3 +1,4 @@ +use crate::beacon_node_fallback::CandidateError; use eth2::BeaconNodeHttpClient; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; @@ -5,33 +6,33 @@ use slot_clock::SlotClock; /// A distance in slots. const SYNC_TOLERANCE: u64 = 4; -/// Returns `true` if the beacon node is synced and ready for action. +/// Returns /// -/// Returns `false` if: -/// -/// - The beacon node is unreachable. -/// - The beacon node indicates that it is syncing **AND** it is more than `SYNC_TOLERANCE` behind -/// the highest known slot. +/// `Ok(())` if the beacon node is synced and ready for action, +/// `Err(CandidateError::Offline)` if the beacon node is unreachable, +/// `Err(CandidateError::NotSynced)` if the beacon node indicates that it is syncing **AND** +/// it is more than `SYNC_TOLERANCE` behind the highest +/// known slot. /// /// The second condition means the even if the beacon node thinks that it's syncing, we'll still /// try to use it if it's close enough to the head. -pub async fn is_synced( +pub async fn check_synced( beacon_node: &BeaconNodeHttpClient, slot_clock: &T, log_opt: Option<&Logger>, -) -> bool { +) -> Result<(), CandidateError> { let resp = match beacon_node.get_node_syncing().await { Ok(resp) => resp, Err(e) => { if let Some(log) = log_opt { - error!( + warn!( log, "Unable connect to beacon node"; - "error" => e.to_string() + "error" => %e ) } - return false; + return Err(CandidateError::Offline); } }; @@ -48,9 +49,9 @@ pub async fn is_synced( warn!( log, "Beacon node is syncing"; - "msg" => "not receiving new duties", "sync_distance" => resp.data.sync_distance.as_u64(), "head_slot" => resp.data.head_slot.as_u64(), + "endpoint" => %beacon_node, ); } @@ -63,10 +64,15 @@ pub async fn is_synced( "msg" => "check the system time on this host and the beacon node", "beacon_node_slot" => remote_slot, "local_slot" => local_slot, + "endpoint" => %beacon_node, ); } } } - is_synced + if is_synced { + Ok(()) + } else { + Err(CandidateError::NotSynced) + } } diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index 99bf643f21..2e888e12eb 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -9,22 +9,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { "When connected to a beacon node, performs the duties of a staked \ validator (e.g., proposing blocks and attestations).", ) + // This argument is deprecated, use `--beacon-nodes` instead. .arg( Arg::with_name("beacon-node") .long("beacon-node") .value_name("NETWORK_ADDRESS") - .help("Address to a beacon node HTTP API") + .help("Deprecated. Use --beacon-nodes.") + .takes_value(true) + .conflicts_with("beacon-nodes"), + ) + .arg( + Arg::with_name("beacon-nodes") + .long("beacon-nodes") + .value_name("NETWORK_ADDRESSES") + .help("Comma-separated addresses to one or more beacon node HTTP APIs") .default_value(&DEFAULT_BEACON_NODE) .takes_value(true), ) - // This argument is deprecated, use `--beacon-node` instead. + // This argument is deprecated, use `--beacon-nodes` instead. .arg( Arg::with_name("server") .long("server") .value_name("NETWORK_ADDRESS") - .help("Deprecated. Use --beacon-node.") + .help("Deprecated. Use --beacon-nodes.") .takes_value(true) - .conflicts_with("beacon-node"), + .conflicts_with_all(&["beacon-node", "beacon-nodes"]), ) .arg( Arg::with_name("validators-dir") diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index fcba726ad8..ad23cdb5d9 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -22,10 +22,10 @@ pub struct Config { pub validator_dir: PathBuf, /// The directory containing the passwords to unlock validator keystores. pub secrets_dir: PathBuf, - /// The http endpoint of the beacon node API. + /// The http endpoints of the beacon node APIs. /// - /// Should be similar to `http://localhost:8080` - pub beacon_node: String, + /// Should be similar to `["http://localhost:8080"]` + pub beacon_nodes: Vec, /// If true, the validator client will still poll for duties and produce blocks even if the /// beacon node is not synced at startup. pub allow_unsynced_beacon_node: bool, @@ -55,7 +55,7 @@ impl Default for Config { Self { validator_dir, secrets_dir, - beacon_node: DEFAULT_BEACON_NODE.to_string(), + beacon_nodes: vec![DEFAULT_BEACON_NODE.to_string()], allow_unsynced_beacon_node: false, disable_auto_discover: false, init_slashing_protection: false, @@ -106,18 +106,26 @@ impl Config { .map_err(|e| format!("Failed to create {:?}: {:?}", config.validator_dir, e))?; } - if let Some(beacon_node) = parse_optional(cli_args, "beacon-node")? { - config.beacon_node = beacon_node; + if let Some(beacon_nodes) = parse_optional::(cli_args, "beacon-nodes")? { + config.beacon_nodes = beacon_nodes.as_str().split(',').map(String::from).collect() } - // To be deprecated. - if let Some(server) = parse_optional(cli_args, "server")? { + else if let Some(beacon_node) = parse_optional(cli_args, "beacon-node")? { + warn!( + log, + "The --beacon-node flag is deprecated"; + "msg" => "please use --beacon-nodes instead" + ); + config.beacon_nodes = vec![beacon_node]; + } + // To be deprecated. + else if let Some(server) = parse_optional(cli_args, "server")? { warn!( log, "The --server flag is deprecated"; - "msg" => "please use --beacon-node instead" + "msg" => "please use --beacon-nodes instead" ); - config.beacon_node = server; + config.beacon_nodes = vec![server]; } if cli_args.is_present("delete-lockfiles") { diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 52d47d3fec..eec9893ddb 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1,9 +1,9 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::{ - block_service::BlockServiceNotification, http_metrics::metrics, is_synced::is_synced, - validator_duty::ValidatorDuty, validator_store::ValidatorStore, + block_service::BlockServiceNotification, http_metrics::metrics, validator_duty::ValidatorDuty, + validator_store::ValidatorStore, }; use environment::RuntimeContext; -use eth2::BeaconNodeHttpClient; use futures::channel::mpsc::Sender; use futures::{SinkExt, StreamExt}; use parking_lot::RwLock; @@ -324,12 +324,12 @@ impl DutiesStore { } } -pub struct DutiesServiceBuilder { +pub struct DutiesServiceBuilder { validator_store: Option>, slot_clock: Option, - beacon_node: Option, - context: Option>, + beacon_nodes: Option>>, allow_unsynced_beacon_node: bool, + context: Option>, } impl DutiesServiceBuilder { @@ -337,9 +337,9 @@ impl DutiesServiceBuilder { Self { validator_store: None, slot_clock: None, - beacon_node: None, - context: None, + beacon_nodes: None, allow_unsynced_beacon_node: false, + context: None, } } @@ -353,8 +353,13 @@ impl DutiesServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self { - self.beacon_node = Some(beacon_node); + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); + self + } + + pub fn allow_unsynced_beacon_node(mut self, allow_unsynced_beacon_node: bool) -> Self { + self.allow_unsynced_beacon_node = allow_unsynced_beacon_node; self } @@ -363,12 +368,6 @@ impl DutiesServiceBuilder { self } - /// Set to `true` to allow polling for duties when the beacon node is not synced. - pub fn allow_unsynced_beacon_node(mut self, allow_unsynced_beacon_node: bool) -> Self { - self.allow_unsynced_beacon_node = allow_unsynced_beacon_node; - self - } - pub fn build(self) -> Result, String> { Ok(DutiesService { inner: Arc::new(Inner { @@ -379,13 +378,13 @@ impl DutiesServiceBuilder { slot_clock: self .slot_clock .ok_or("Cannot build DutiesService without slot_clock")?, - beacon_node: self - .beacon_node + beacon_nodes: self + .beacon_nodes .ok_or("Cannot build DutiesService without beacon_node")?, + allow_unsynced_beacon_node: self.allow_unsynced_beacon_node, context: self .context .ok_or("Cannot build DutiesService without runtime_context")?, - allow_unsynced_beacon_node: self.allow_unsynced_beacon_node, }), }) } @@ -396,11 +395,9 @@ pub struct Inner { store: Arc, validator_store: ValidatorStore, pub(crate) slot_clock: T, - pub(crate) beacon_node: BeaconNodeHttpClient, - context: RuntimeContext, - /// If true, the duties service will poll for duties from the beacon node even if it is not - /// synced. + pub(crate) beacon_nodes: Arc>, allow_unsynced_beacon_node: bool, + context: RuntimeContext, } /// Maintains a store of the duties for all voting validators in the `validator_store`. @@ -513,12 +510,6 @@ impl DutiesService { let _timer = metrics::start_timer_vec(&metrics::DUTIES_SERVICE_TIMES, &[metrics::FULL_UPDATE]); - if !is_synced(&self.beacon_node, &self.slot_clock, None).await - && !self.allow_unsynced_beacon_node - { - return; - } - let slot = if let Some(slot) = self.slot_clock.now() { slot } else { @@ -594,6 +585,12 @@ impl DutiesService { ) -> Result<(), String> { let log = self.context.log(); + let maybe_require_synced = if self.allow_unsynced_beacon_node { + RequireSynced::No + } else { + RequireSynced::Yes + }; + let mut new_validator = 0; let mut new_epoch = 0; let mut new_proposal_slots = 0; @@ -614,21 +611,27 @@ impl DutiesService { .collect(); let mut validator_subscriptions = vec![]; - let remote_duties: Vec = match ValidatorDuty::download( - &self.beacon_node, - current_epoch, - request_epoch, - pubkeys, - &log, - ) - .await + let pubkeys_ref = &pubkeys; + let remote_duties: Vec = match self + .beacon_nodes + .first_success(maybe_require_synced, |beacon_node| async move { + ValidatorDuty::download( + &beacon_node, + current_epoch, + request_epoch, + pubkeys_ref.clone(), + &log, + ) + .await + }) + .await { Ok(duties) => duties, Err(e) => { error!( log, "Failed to download validator duties"; - "error" => e + "error" => %e ); vec![] } @@ -720,10 +723,16 @@ impl DutiesService { if count == 0 { debug!(log, "No new subscriptions required"); } else { - self.beacon_node - .post_validator_beacon_committee_subscriptions(&validator_subscriptions) + let validator_subscriptions_ref = &validator_subscriptions; + self.beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .post_validator_beacon_committee_subscriptions(validator_subscriptions_ref) + .await + }) .await - .map_err(|e| format!("Failed to subscribe validators: {:?}", e))?; + .map_err(|e| format!("Failed to subscribe validators: {}", e))?; + debug!( log, "Successfully subscribed validators"; diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 27149892ab..7c3b456d85 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -1,6 +1,7 @@ +use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; use crate::http_metrics::metrics; use environment::RuntimeContext; -use eth2::{types::StateId, BeaconNodeHttpClient}; +use eth2::types::StateId; use futures::future::FutureExt; use futures::StreamExt; use parking_lot::RwLock; @@ -16,19 +17,19 @@ use types::{EthSpec, Fork}; const TIME_DELAY_FROM_SLOT: Duration = Duration::from_millis(80); /// Builds a `ForkService`. -pub struct ForkServiceBuilder { +pub struct ForkServiceBuilder { fork: Option, slot_clock: Option, - beacon_node: Option, + beacon_nodes: Option>>, log: Option, } -impl ForkServiceBuilder { +impl ForkServiceBuilder { pub fn new() -> Self { Self { fork: None, slot_clock: None, - beacon_node: None, + beacon_nodes: None, log: None, } } @@ -38,8 +39,8 @@ impl ForkServiceBuilder { self } - pub fn beacon_node(mut self, beacon_node: BeaconNodeHttpClient) -> Self { - self.beacon_node = Some(beacon_node); + pub fn beacon_nodes(mut self, beacon_nodes: Arc>) -> Self { + self.beacon_nodes = Some(beacon_nodes); self } @@ -48,15 +49,15 @@ impl ForkServiceBuilder { self } - pub fn build(self) -> Result, String> { + pub fn build(self) -> Result, String> { Ok(ForkService { inner: Arc::new(Inner { fork: RwLock::new(self.fork), slot_clock: self .slot_clock .ok_or("Cannot build ForkService without slot_clock")?, - beacon_node: self - .beacon_node + beacon_nodes: self + .beacon_nodes .ok_or("Cannot build ForkService without beacon_node")?, log: self .log @@ -69,8 +70,21 @@ impl ForkServiceBuilder { #[cfg(test)] #[allow(dead_code)] -impl ForkServiceBuilder { - pub fn testing_only(log: Logger) -> Self { +impl ForkServiceBuilder { + pub fn testing_only(spec: types::ChainSpec, log: Logger) -> Self { + use crate::beacon_node_fallback::CandidateBeaconNode; + + let slot_clock = slot_clock::TestingSlotClock::new( + types::Slot::new(0), + std::time::Duration::from_secs(42), + std::time::Duration::from_secs(42), + ); + let candidates = vec![CandidateBeaconNode::new(eth2::BeaconNodeHttpClient::new( + eth2::Url::parse("http://127.0.0.1").unwrap(), + ))]; + let mut beacon_nodes = BeaconNodeFallback::new(candidates, spec, log.clone()); + beacon_nodes.set_slot_clock(slot_clock.clone()); + Self { fork: Some(types::Fork::default()), slot_clock: Some(slot_clock::TestingSlotClock::new( @@ -78,28 +92,26 @@ impl ForkServiceBuilder { std::time::Duration::from_secs(42), std::time::Duration::from_secs(42), )), - beacon_node: Some(eth2::BeaconNodeHttpClient::new( - eth2::Url::parse("http://127.0.0.1").unwrap(), - )), + beacon_nodes: Some(Arc::new(beacon_nodes)), log: Some(log), } } } /// Helper to minimise `Arc` usage. -pub struct Inner { +pub struct Inner { fork: RwLock>, - beacon_node: BeaconNodeHttpClient, + beacon_nodes: Arc>, log: Logger, slot_clock: T, } /// Attempts to download the `Fork` struct from the beacon node at the start of each epoch. -pub struct ForkService { - inner: Arc>, +pub struct ForkService { + inner: Arc>, } -impl Clone for ForkService { +impl Clone for ForkService { fn clone(&self) -> Self { Self { inner: self.inner.clone(), @@ -107,25 +119,22 @@ impl Clone for ForkService { } } -impl Deref for ForkService { - type Target = Inner; +impl Deref for ForkService { + type Target = Inner; fn deref(&self) -> &Self::Target { self.inner.deref() } } -impl ForkService { +impl ForkService { /// Returns the last fork downloaded from the beacon node, if any. pub fn fork(&self) -> Option { *self.fork.read() } /// Starts the service that periodically polls for the `Fork`. - pub fn start_update_service( - self, - context: &RuntimeContext, - ) -> Result<(), String> { + pub fn start_update_service(self, context: &RuntimeContext) -> Result<(), String> { let spec = &context.eth2_config.spec; let duration_to_next_epoch = self @@ -165,26 +174,32 @@ impl ForkService { let _timer = metrics::start_timer_vec(&metrics::FORK_SERVICE_TIMES, &[metrics::FULL_UPDATE]); + let log = &self.log; let fork = self .inner - .beacon_node - .get_beacon_states_fork(StateId::Head) + .beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node + .get_beacon_states_fork(StateId::Head) + .await + .map_err(|e| { + trace!( + log, + "Fork update failed"; + "error" => format!("Error retrieving fork: {:?}", e) + ) + })? + .ok_or_else(|| { + trace!( + log, + "Fork update failed"; + "error" => "The beacon head fork is unknown" + ) + }) + .map(|result| result.data) + }) .await - .map_err(|e| { - trace!( - self.log, - "Fork update failed"; - "error" => format!("Error retrieving fork: {:?}", e) - ) - })? - .ok_or_else(|| { - trace!( - self.log, - "Fork update failed"; - "error" => "The beacon head fork is unknown" - ) - })? - .data; + .map_err(|_| ())?; if self.fork.read().as_ref() != Some(&fork) { *(self.fork.write()) = Some(fork); diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index 6df6bfc3ed..ce0a1ef690 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -73,7 +73,9 @@ impl ApiTester { config.validator_dir = validator_dir.path().into(); config.secrets_dir = secrets_dir.path().into(); - let fork_service = ForkServiceBuilder::testing_only(log.clone()) + let spec = E::default_spec(); + + let fork_service = ForkServiceBuilder::testing_only(spec.clone(), log.clone()) .build() .unwrap(); @@ -84,7 +86,7 @@ impl ApiTester { initialized_validators, slashing_protection, Hash256::repeat_byte(42), - E::default_spec(), + spec, fork_service.clone(), log.clone(), ); diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index beb9e2d164..d60592f1b0 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -80,6 +80,19 @@ lazy_static::lazy_static! { "Number of attesters on this host", &["task"] ); + /* + * Endpoint metrics + */ + pub static ref ENDPOINT_ERRORS: Result = try_create_int_counter_vec( + "bn_endpoint_errors", + "The number of beacon node request errors for each endpoint", + &["endpoint"] + ); + pub static ref ENDPOINT_REQUESTS: Result = try_create_int_counter_vec( + "bn_endpoint_requests", + "The number of beacon node requests for each endpoint", + &["endpoint"] + ); } pub fn gather_prometheus_metrics( diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index cbe42aaf45..65256b5f5b 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,12 +1,13 @@ mod attestation_service; +mod beacon_node_fallback; mod block_service; +mod check_synced; mod cli; mod config; mod duties_service; mod fork_service; mod http_metrics; mod initialized_validators; -mod is_synced; mod key_cache; mod notifier; mod validator_duty; @@ -17,6 +18,9 @@ pub mod http_api; pub use cli::cli_app; pub use config::Config; +use crate::beacon_node_fallback::{ + start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, RequireSynced, +}; use account_utils::validator_definitions::ValidatorDefinitions; use attestation_service::{AttestationService, AttestationServiceBuilder}; use block_service::{BlockService, BlockServiceBuilder}; @@ -55,7 +59,7 @@ const HTTP_TIMEOUT: Duration = Duration::from_secs(12); pub struct ProductionValidatorClient { context: RuntimeContext, duties_service: DutiesService, - fork_service: ForkService, + fork_service: ForkService, block_service: BlockService, attestation_service: AttestationService, validator_store: ValidatorStore, @@ -84,7 +88,7 @@ impl ProductionValidatorClient { info!( log, "Starting validator client"; - "beacon_node" => &config.beacon_node, + "beacon_nodes" => format!("{:?}", &config.beacon_nodes), "validator_dir" => format!("{:?}", config.validator_dir), ); @@ -202,20 +206,36 @@ impl ProductionValidatorClient { })?; } - let beacon_node_url: Url = config - .beacon_node - .parse() + let beacon_node_urls: Vec = config + .beacon_nodes + .iter() + .map(|s| s.parse()) + .collect::>() .map_err(|e| format!("Unable to parse beacon node URL: {:?}", e))?; - let beacon_node_http_client = ClientBuilder::new() - .timeout(HTTP_TIMEOUT) - .build() - .map_err(|e| format!("Unable to build HTTP client: {:?}", e))?; - let beacon_node = - BeaconNodeHttpClient::from_components(beacon_node_url, beacon_node_http_client); + let beacon_nodes: Vec = beacon_node_urls + .into_iter() + .map(|url| { + let beacon_node_http_client = ClientBuilder::new() + .timeout(HTTP_TIMEOUT) + .build() + .map_err(|e| format!("Unable to build HTTP client: {:?}", e))?; + Ok(BeaconNodeHttpClient::from_components( + url, + beacon_node_http_client, + )) + }) + .collect::, String>>()?; + + let candidates = beacon_nodes + .into_iter() + .map(CandidateBeaconNode::new) + .collect(); + let mut beacon_nodes: BeaconNodeFallback<_, T> = + BeaconNodeFallback::new(candidates, context.eth2_config.spec.clone(), log.clone()); // Perform some potentially long-running initialization tasks. let (genesis_time, genesis_validators_root) = tokio::select! { - tuple = init_from_beacon_node(&beacon_node, &context) => tuple?, + tuple = init_from_beacon_node(&beacon_nodes, &context) => tuple?, () = context.executor.exit() => return Err("Shutting down".to_string()) }; @@ -230,9 +250,13 @@ impl ProductionValidatorClient { Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot), ); + beacon_nodes.set_slot_clock(slot_clock.clone()); + let beacon_nodes = Arc::new(beacon_nodes); + start_fallback_updater_service(context.clone(), beacon_nodes.clone())?; + let fork_service = ForkServiceBuilder::new() .slot_clock(slot_clock.clone()) - .beacon_node(beacon_node.clone()) + .beacon_nodes(beacon_nodes.clone()) .log(log.clone()) .build()?; @@ -254,9 +278,9 @@ impl ProductionValidatorClient { let duties_service = DutiesServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) - .beacon_node(beacon_node.clone()) - .runtime_context(context.service_context("duties".into())) + .beacon_nodes(beacon_nodes.clone()) .allow_unsynced_beacon_node(config.allow_unsynced_beacon_node) + .runtime_context(context.service_context("duties".into())) .build()?; // Update the metrics server. @@ -268,7 +292,7 @@ impl ProductionValidatorClient { let block_service = BlockServiceBuilder::new() .slot_clock(slot_clock.clone()) .validator_store(validator_store.clone()) - .beacon_node(beacon_node.clone()) + .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("block".into())) .graffiti(config.graffiti) .build()?; @@ -277,7 +301,7 @@ impl ProductionValidatorClient { .duties_service(duties_service.clone()) .slot_clock(slot_clock) .validator_store(validator_store.clone()) - .beacon_node(beacon_node.clone()) + .beacon_nodes(beacon_nodes.clone()) .runtime_context(context.service_context("attestation".into())) .build()?; @@ -285,7 +309,7 @@ impl ProductionValidatorClient { // // It seems most sensible to move this into the `start_service` function, but I'm caution // of making too many changes this close to genesis (<1 week). - wait_for_genesis(&beacon_node, genesis_time, &context).await?; + wait_for_genesis(&beacon_nodes, genesis_time, &context).await?; Ok(Self { context, @@ -368,40 +392,50 @@ impl ProductionValidatorClient { } async fn init_from_beacon_node( - beacon_node: &BeaconNodeHttpClient, + beacon_nodes: &BeaconNodeFallback, context: &RuntimeContext, ) -> Result<(u64, Hash256), String> { - // Wait for the beacon node to come online. - wait_for_connectivity(beacon_node, context.log()).await?; - - let yaml_config = beacon_node - .get_config_spec() - .await - .map_err(|e| format!("Unable to read spec from beacon node: {:?}", e))? - .data; - - let beacon_node_spec = yaml_config - .apply_to_chain_spec::(&E::default_spec()) - .ok_or_else(|| { - "The minimal/mainnet spec type of the beacon node does not match the validator client. \ - See the --network command." - .to_string() - })?; - - if context.eth2_config.spec != beacon_node_spec { - return Err( - "The beacon node is using a different Eth2 specification to this validator client. \ - See the --network command." - .to_string(), - ); + loop { + beacon_nodes.update_unready_candidates().await; + let num_available = beacon_nodes.num_available().await; + let num_total = beacon_nodes.num_total().await; + if num_available > 0 { + info!( + context.log(), + "Initialized beacon node connections"; + "total" => num_total, + "available" => num_available, + ); + break; + } else { + warn!( + context.log(), + "Unable to connect to a beacon node"; + "retry in" => format!("{} seconds", RETRY_DELAY.as_secs()), + "total" => num_total, + "available" => num_available, + ); + sleep(RETRY_DELAY).await; + } } let genesis = loop { - match beacon_node.get_beacon_genesis().await { + match beacon_nodes + .first_success(RequireSynced::No, |node| async move { + node.get_beacon_genesis().await + }) + .await + { Ok(genesis) => break genesis.data, - Err(e) => { - // A 404 error on the genesis endpoint indicates that genesis has not yet occurred. - if e.status() == Some(StatusCode::NOT_FOUND) { + Err(errors) => { + // Search for a 404 error which indicates that genesis has not yet + // occurred. + if errors + .0 + .iter() + .filter_map(|(_, e)| e.request_failure()) + .any(|e| e.status() == Some(StatusCode::NOT_FOUND)) + { info!( context.log(), "Waiting for genesis"; @@ -409,8 +443,8 @@ async fn init_from_beacon_node( } else { error!( context.log(), - "Error polling beacon node"; - "error" => format!("{:?}", e) + "Errors polling beacon node"; + "error" => %errors ); } } @@ -423,7 +457,7 @@ async fn init_from_beacon_node( } async fn wait_for_genesis( - beacon_node: &BeaconNodeHttpClient, + beacon_nodes: &BeaconNodeFallback, genesis_time: u64, context: &RuntimeContext, ) -> Result<(), String> { @@ -447,7 +481,7 @@ async fn wait_for_genesis( // Start polling the node for pre-genesis information, cancelling the polling as soon as the // timer runs out. tokio::select! { - result = poll_whilst_waiting_for_genesis(beacon_node, genesis_time, context.log()) => result?, + result = poll_whilst_waiting_for_genesis(beacon_nodes, genesis_time, context.log()) => result?, () = sleep(genesis_time - now) => () }; @@ -469,50 +503,18 @@ async fn wait_for_genesis( /// Request the version from the node, looping back and trying again on failure. Exit once the node /// has been contacted. -async fn wait_for_connectivity( - beacon_node: &BeaconNodeHttpClient, - log: &Logger, -) -> Result<(), String> { - // Try to get the version string from the node, looping until success is returned. - loop { - let log = log.clone(); - let result = beacon_node - .get_node_version() - .await - .map_err(|e| format!("{:?}", e)) - .map(|body| body.data.version); - - match result { - Ok(version) => { - info!( - log, - "Connected to beacon node"; - "version" => version, - ); - - return Ok(()); - } - Err(e) => { - error!( - log, - "Unable to connect to beacon node"; - "error" => format!("{:?}", e), - ); - sleep(RETRY_DELAY).await; - } - } - } -} - -/// Request the version from the node, looping back and trying again on failure. Exit once the node -/// has been contacted. -async fn poll_whilst_waiting_for_genesis( - beacon_node: &BeaconNodeHttpClient, +async fn poll_whilst_waiting_for_genesis( + beacon_nodes: &BeaconNodeFallback, genesis_time: Duration, log: &Logger, ) -> Result<(), String> { loop { - match beacon_node.get_lighthouse_staking().await { + match beacon_nodes + .first_success(RequireSynced::No, |beacon_node| async move { + beacon_node.get_lighthouse_staking().await + }) + .await + { Ok(is_staking) => { let now = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -542,7 +544,7 @@ async fn poll_whilst_waiting_for_genesis( error!( log, "Error polling beacon node"; - "error" => format!("{:?}", e) + "error" => %e ); } } diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 944537d172..e7b587103d 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,4 +1,4 @@ -use crate::{is_synced::is_synced, ProductionValidatorClient}; +use crate::ProductionValidatorClient; use futures::StreamExt; use slog::{error, info}; use slot_clock::SlotClock; @@ -10,7 +10,6 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu let context = client.context.service_context("notifier".into()); let executor = context.executor.clone(); let duties_service = client.duties_service.clone(); - let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node; let slot_duration = Duration::from_millis(context.eth2_config.spec.milliseconds_per_slot); let duration_to_next_slot = duties_service @@ -26,15 +25,25 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu let log = context.log(); while interval.next().await.is_some() { - if !is_synced( - &duties_service.beacon_node, - &duties_service.slot_clock, - Some(&log), - ) - .await - && !allow_unsynced_beacon_node - { - continue; + let num_available = duties_service.beacon_nodes.num_available().await; + let num_synced = duties_service.beacon_nodes.num_synced().await; + let num_total = duties_service.beacon_nodes.num_total().await; + if num_synced > 0 { + info!( + log, + "Connected to beacon node(s)"; + "total" => num_total, + "available" => num_available, + "synced" => num_synced, + ) + } else { + error!( + log, + "No synced beacon nodes"; + "total" => num_total, + "available" => num_available, + "synced" => num_synced, + ) } if let Some(slot) = duties_service.slot_clock.now() { diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 7cc46c9cda..167d1cd58a 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -6,7 +6,6 @@ use parking_lot::RwLock; use slashing_protection::{NotSafe, Safe, SlashingDatabase}; use slog::{crit, error, warn, Logger}; use slot_clock::SlotClock; -use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; use tempdir::TempDir; @@ -49,8 +48,7 @@ pub struct ValidatorStore { spec: Arc, log: Logger, temp_dir: Option>, - fork_service: ForkService, - _phantom: PhantomData, + fork_service: ForkService, } impl ValidatorStore { @@ -59,7 +57,7 @@ impl ValidatorStore { slashing_protection: SlashingDatabase, genesis_validators_root: Hash256, spec: ChainSpec, - fork_service: ForkService, + fork_service: ForkService, log: Logger, ) -> Self { Self { @@ -70,7 +68,6 @@ impl ValidatorStore { log, temp_dir: None, fork_service, - _phantom: PhantomData, } }