diff --git a/testing/simulator/src/checks.rs b/testing/simulator/src/checks.rs index 01e50eb7fb..ac672cbc21 100644 --- a/testing/simulator/src/checks.rs +++ b/testing/simulator/src/checks.rs @@ -1,6 +1,5 @@ use crate::local_network::LocalNetwork; use node_test_rig::eth2::types::{BlockId, StateId}; - use std::time::Duration; use types::{Epoch, EthSpec, ExecPayload, ExecutionBlockHash, Hash256, Slot, Unsigned}; diff --git a/testing/simulator/src/eth1_sim.rs b/testing/simulator/src/eth1_sim.rs index 03a0205574..57c944cf1a 100644 --- a/testing/simulator/src/eth1_sim.rs +++ b/testing/simulator/src/eth1_sim.rs @@ -64,7 +64,7 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let mut env = EnvironmentBuilder::minimal() .initialize_logger(LoggerConfig { path: None, - debug_level: String::from("info"), + debug_level: String::from("debug"), logfile_debug_level: String::from("debug"), log_format: None, logfile_format: None, diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 5d841791ef..73c7b7d0b0 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -137,18 +137,23 @@ pub enum CandidateError { Uninitialized, Offline, Incompatible, - NotSynced, TimeDiscrepancy, } +#[derive(Debug, Clone)] +pub struct CandidateInfo { + pub id: usize, + pub node: String, + pub health: Option, +} + /// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used /// for a query. #[derive(Debug)] pub struct CandidateBeaconNode { id: usize, beacon_node: BeaconNodeHttpClient, - health: PLRwLock>, - status: RwLock>, + health: PLRwLock>, _phantom: PhantomData, } @@ -162,11 +167,11 @@ impl Eq for CandidateBeaconNode {} impl Ord for CandidateBeaconNode { fn cmp(&self, other: &Self) -> Ordering { - match (&(*self.health.read()), &(*other.health.read())) { - (None, None) => Ordering::Equal, - (None, _) => Ordering::Greater, - (_, None) => Ordering::Less, - (Some(health_1), Some(health_2)) => health_1.cmp(health_2), + match (&(self.health()), &(other.health())) { + (Err(_), Err(_)) => Ordering::Equal, + (Err(_), _) => Ordering::Greater, + (_, Err(_)) => Ordering::Less, + (Ok(health_1), Ok(health_2)) => health_1.cmp(health_2), } } } @@ -183,15 +188,14 @@ impl CandidateBeaconNode { Self { id, beacon_node, - health: PLRwLock::new(None), - status: RwLock::new(Err(CandidateError::Uninitialized)), + health: PLRwLock::new(Err(CandidateError::Uninitialized)), _phantom: PhantomData, } } - /// Returns the status of `self`. - pub async fn status(&self) -> Result<(), CandidateError> { - *self.status.read().await + /// Returns the health of `self`. + pub fn health(&self) -> Result { + *self.health.read() } pub async fn refresh_health( @@ -202,7 +206,7 @@ impl CandidateBeaconNode { log: &Logger, ) -> Result<(), CandidateError> { if let Err(e) = self.is_compatible(spec, log).await { - *self.status.write().await = Err(e); + *self.health.write() = Err(e); return Ok(()); } @@ -231,29 +235,20 @@ impl CandidateBeaconNode { slot_clock, ); - warn!( - log, - "Health of Beacon Node: {}, updated. Health tier: {}", - new_health.get_id(), - new_health.get_health_tier() - ); - - *self.health.write() = Some(new_health); - *self.status.write().await = Ok(()); + // TODO(mac): Set metric here. + *self.health.write() = Ok(new_health); Ok(()) } - Err(status) => { - // Set the health as None which is sorted last in the list. - *self.health.write() = None; - *self.status.write().await = Err(status); + Err(e) => { + // Set the health as `Err` which is sorted last in the list. + *self.health.write() = Err(e); Ok(()) } } } else { - // Slot clock will only be None at startup. + // Slot clock will only be `None` at startup. // Assume compatible nodes are available. - *self.status.write().await = Ok(()); Ok(()) } } @@ -373,7 +368,7 @@ impl BeaconNodeFallback { pub async fn num_synced(&self) -> usize { let mut n = 0; for candidate in self.candidates.read().await.iter() { - if let Some(cand) = candidate.health.read().as_ref() { + if let Ok(cand) = candidate.health().as_ref() { if self .distance_tiers .distance_tier(cand.health_tier.sync_distance) @@ -390,7 +385,7 @@ impl BeaconNodeFallback { pub async fn num_synced_fallback(&self) -> usize { let mut n = 0; for candidate in self.candidates.read().await.iter().skip(1) { - if let Some(cand) = candidate.health.read().as_ref() { + if let Ok(cand) = candidate.health().as_ref() { if self .distance_tiers .distance_tier(cand.health_tier.sync_distance) @@ -407,13 +402,26 @@ impl BeaconNodeFallback { pub async fn num_available(&self) -> usize { let mut n = 0; for candidate in self.candidates.read().await.iter() { - if candidate.status().await.is_ok() { + if candidate.health().is_ok() { n += 1 } } n } + pub async fn get_all_candidate_info(&self) -> Vec { + let candidates = self.candidates.read().await; + let mut results = Vec::with_capacity(candidates.len()); + for candidate in candidates.iter() { + let id = candidate.id; + let node = candidate.beacon_node.to_string(); + let health = candidate.health().ok(); + let info = CandidateInfo { id, node, health }; + results.push(info); + } + results + } + /// Loop through ALL candidates in `self.candidates` and update their sync status. /// /// It is possible for a node to return an unsynced status while continuing to serve @@ -421,21 +429,33 @@ impl BeaconNodeFallback { /// A previous implementation of this function polled only the unavailable BNs. pub async fn update_all_candidates(&self) { let candidates = self.candidates.read().await; + let mut futures = Vec::with_capacity(candidates.len()); + let mut nodes = Vec::with_capacity(candidates.len()); - let futures = candidates - .iter() - .map(|candidate| { - candidate.refresh_health( - &self.distance_tiers, - self.slot_clock.as_ref(), - &self.spec, - &self.log, - ) - }) - .collect::>(); + for candidate in candidates.iter() { + futures.push(candidate.refresh_health( + &self.distance_tiers, + self.slot_clock.as_ref(), + &self.spec, + &self.log, + )); + nodes.push(candidate.beacon_node.to_string()); + } - // Run all updates concurrently and ignore errors. - let _ = future::join_all(futures).await; + // Run all updates concurrently. + let future_results = future::join_all(futures).await; + let results = future_results.iter().zip(nodes); + + for (result, node) in results { + if let Err(e) = result { + warn!( + self.log, + "A connected beacon node errored during routine health check."; + "error" => ?e, + "endpoint" => node, + ); + } + } drop(candidates); @@ -519,11 +539,7 @@ impl BeaconNodeFallback { "node" => $candidate.beacon_node.to_string(), "error" => ?e, ); - // If we have an error on this function, make the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. + errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); } @@ -573,11 +589,6 @@ impl BeaconNodeFallback { match func($candidate.beacon_node.clone()).await { Ok(val) => results.push(Ok(val)), Err(e) => { - // If we have an error on this function, make the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. results.push(Err(( $candidate.beacon_node.to_string(), Error::RequestFailed(e), @@ -714,12 +725,12 @@ mod tests { health_tier: BeaconNodeHealthTier::new(4, Slot::new(10), small), }; - *candidate_1.health.write() = Some(health_1); - *candidate_2.health.write() = Some(health_2); - *candidate_3.health.write() = Some(health_3); - *candidate_4.health.write() = Some(health_4); - *candidate_5.health.write() = Some(health_5); - *candidate_6.health.write() = Some(health_6); + *candidate_1.health.write() = Ok(health_1); + *candidate_2.health.write() = Ok(health_2); + *candidate_3.health.write() = Ok(health_3); + *candidate_4.health.write() = Ok(health_4); + *candidate_5.health.write() = Ok(health_5); + *candidate_6.health.write() = Ok(health_6); let mut candidates = vec![ candidate_3, diff --git a/validator_client/src/beacon_node_health.rs b/validator_client/src/beacon_node_health.rs index 1166123f56..f1a6c7bbec 100644 --- a/validator_client/src/beacon_node_health.rs +++ b/validator_client/src/beacon_node_health.rs @@ -158,7 +158,7 @@ impl BeaconNodeHealthTier { } /// Beacon Node Health metrics. -#[derive(Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct BeaconNodeHealth { // The ID of the Beacon Node. This should correspond with its position in the `--beacon-nodes` // list. Note that the ID field is used to tie-break nodes with the same health so that nodes diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index e8b5f4c5ba..984ffadf89 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,7 +1,7 @@ use crate::http_metrics; use crate::{DutiesService, ProductionValidatorClient}; use lighthouse_metrics::set_gauge; -use slog::{error, info, Logger}; +use slog::{debug, error, info, Logger}; use slot_clock::SlotClock; use tokio::time::{sleep, Duration}; use types::EthSpec; @@ -39,6 +39,7 @@ async fn notify( duties_service: &DutiesService, log: &Logger, ) { + let candidate_info = duties_service.beacon_nodes.get_all_candidate_info().await; let num_available = duties_service.beacon_nodes.num_available().await; set_gauge( &http_metrics::metrics::AVAILABLE_BEACON_NODES_COUNT, @@ -55,9 +56,14 @@ async fn notify( num_total as i64, ); if num_synced > 0 { + let primary = candidate_info + .get(0) + .map(|candidate| candidate.node.as_str()) + .unwrap_or("None"); info!( log, "Connected to beacon node(s)"; + "primary" => primary, "total" => num_total, "available" => num_available, "synced" => num_synced, @@ -78,6 +84,31 @@ async fn notify( set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 0); } + // TODO(mac) Store all connected node info into metrics. + for info in candidate_info { + if let Some(health) = info.health { + debug!( + log, + "Beacon node info"; + "status" => "Connected", + "id" => info.id, + "endpoint" => info.node, + "head_slot" => %health.head, + "is_optimistic" => ?health.optimistic_status, + "execution_engine_status" => ?health.execution_status, + "health_tier" => ?health.health_tier, + ); + } else { + debug!( + log, + "Beacon node info"; + "status" => "Disconnected", + "id" => info.id, + "endpoint" => info.node, + ); + } + } + if let Some(slot) = duties_service.slot_clock.now() { let epoch = slot.epoch(E::slots_per_epoch());