diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 26718ad294..1ab519dd90 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -1274,7 +1274,6 @@ impl ApiTester { .await .0; - assert!(self .client .clone() @@ -5124,4 +5123,4 @@ async fn optimistic_responses() { .await .test_check_optimistic_responses() .await; -} \ No newline at end of file +} diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index f0a9258c74..9869fb8c3f 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,9 +1,8 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, validator_store::{Error as ValidatorStoreError, ValidatorStore}, - OfflineOnFailure, }; use environment::RuntimeContext; use futures::future::join_all; @@ -338,21 +337,17 @@ impl AttestationService { let attestation_data = self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, committee_index) - .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) - .map(|result| result.data) - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, committee_index) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) .await .map_err(|e| e.to_string())?; @@ -433,19 +428,15 @@ impl AttestationService { // Post the attestations to the BN. match self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_POST], - ); - beacon_node - .post_beacon_pool_attestations(attestations) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_POST], + ); + beacon_node + .post_beacon_pool_attestations(attestations) + .await + }) .await { Ok(()) => info!( @@ -493,27 +484,21 @@ impl AttestationService { let aggregated_attestation = &self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_GET], - ); - beacon_node - .get_validator_aggregate_attestation( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await - .map_err(|e| { - format!("Failed to produce an aggregate attestation: {:?}", e) - })? - .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) - .map(|result| result.data) - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_GET], + ); + beacon_node + .get_validator_aggregate_attestation( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + .map_err(|e| format!("Failed to produce an aggregate attestation: {:?}", e))? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) + .map(|result| result.data) + }) .await .map_err(|e| e.to_string())?; @@ -574,19 +559,15 @@ impl AttestationService { let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); match self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_POST], - ); - beacon_node - .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_POST], + ); + beacon_node + .post_validator_aggregate_and_proof(signed_aggregate_and_proofs_slice) + .await + }) .await { Ok(()) => { diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index cb432271e4..e3d5da9585 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -5,14 +5,14 @@ use crate::beacon_node_health::{ BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, SyncDistanceTier, }; -use crate::check_synced::{check_node_health, check_synced}; +use crate::check_synced::check_node_health; use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS}; use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; use parking_lot::RwLock as PLRwLock; use serde_derive::{Deserialize, Serialize}; -use slog::{debug, error, info, warn, Logger}; +use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; use std::cmp::Ordering; use std::fmt; @@ -90,30 +90,6 @@ pub fn start_fallback_updater_service( Ok(()) } -/// Indicates if a beacon node must be synced before some action is performed on it. -#[derive(PartialEq, Clone, Copy)] -pub enum RequireSynced { - Yes, - No, -} - -/// Indicates if a beacon node should be set to `Offline` if a request fails. -#[derive(PartialEq, Clone, Copy)] -pub enum OfflineOnFailure { - Yes, - No, -} - -impl PartialEq for RequireSynced { - fn eq(&self, other: &bool) -> bool { - if *other { - *self == RequireSynced::Yes - } else { - *self == RequireSynced::No - } - } -} - #[derive(Debug)] pub enum Error { /// The node was unavailable and we didn't attempt to contact it. @@ -207,48 +183,8 @@ impl CandidateBeaconNode { } /// Returns the status of `self`. - /// - /// If `RequiredSynced::No`, any `NotSynced` node will be ignored and mapped to `Ok(())`. - pub async fn status(&self, synced: RequireSynced) -> Result<(), CandidateError> { - match *self.status.read().await { - Err(CandidateError::NotSynced) if synced == false => Ok(()), - other => other, - } - } - - /// Indicate that `self` is offline. - pub async fn set_offline(&self) { - *self.status.write().await = Err(CandidateError::Offline) - } - - /// Perform some queries against the node to determine if it is a good candidate, updating - /// `self.status` and returning that result. - pub async fn refresh_status( - &self, - slot_clock: Option<&T>, - spec: &ChainSpec, - log: &Logger, - ) -> Result<(), CandidateError> { - let previous_status = self.status(RequireSynced::Yes).await; - let was_offline = matches!(previous_status, Err(CandidateError::Offline)); - - let new_status = if let Err(e) = self.is_online(was_offline, log).await { - Err(e) - } else if let Err(e) = self.is_compatible(spec, log).await { - Err(e) - } else if let Err(e) = self.is_synced(slot_clock, log).await { - Err(e) - } else { - Ok(()) - }; - - // In case of concurrent use, the latest value will always be used. It's possible that a - // long time out might over-ride a recent successful response, leading to a falsely-offline - // status. I deem this edge-case acceptable in return for the concurrency benefits of not - // holding a write-lock whilst we check the online status of the node. - *self.status.write().await = new_status; - - new_status + pub async fn status(&self) -> Result<(), CandidateError> { + *self.status.read().await } pub async fn refresh_health( @@ -309,38 +245,6 @@ impl CandidateBeaconNode { } } - /// Checks if the node is reachable. - async fn is_online(&self, was_offline: bool, log: &Logger) -> Result<(), CandidateError> { - let result = self - .beacon_node - .get_node_version() - .await - .map(|body| body.data.version); - - match result { - Ok(version) => { - if was_offline { - info!( - log, - "Connected to beacon node"; - "version" => version, - "endpoint" => %self.beacon_node, - ); - } - Ok(()) - } - Err(e) => { - warn!( - log, - "Offline beacon node"; - "error" => %e, - "endpoint" => %self.beacon_node, - ); - Err(CandidateError::Offline) - } - } - } - /// Checks if the node has the correct specification. async fn is_compatible(&self, spec: &ChainSpec, log: &Logger) -> Result<(), CandidateError> { let config = self @@ -405,20 +309,6 @@ impl CandidateBeaconNode { Ok(()) } - - /// Checks if the beacon node is synced. - async fn is_synced( - &self, - slot_clock: Option<&T>, - log: &Logger, - ) -> Result<(), CandidateError> { - if let Some(slot_clock) = slot_clock { - check_synced(&self.beacon_node, slot_clock, Some(log)).await - } else { - // Skip this check if we don't supply a slot clock. - Ok(()) - } - } } /// A collection of `CandidateBeaconNode` that can be used to perform requests with "fallback" @@ -504,7 +394,7 @@ impl BeaconNodeFallback { pub async fn num_available(&self) -> usize { let mut n = 0; for candidate in self.candidates.read().await.iter() { - if candidate.status(RequireSynced::No).await.is_ok() { + if candidate.status().await.is_ok() { n += 1 } } @@ -588,12 +478,7 @@ impl BeaconNodeFallback { /// First this function will try all nodes with a suitable status. If no candidates are suitable /// or all the requests fail, it will try updating the status of all unsuitable nodes and /// re-running `func` again. - pub async fn first_success( - &self, - _require_synced: RequireSynced, - _offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result> + pub async fn first_success(&self, func: F) -> Result> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, @@ -626,9 +511,6 @@ impl BeaconNodeFallback { // There exists a race condition where the candidate may have been marked // as ready between the `func` call and now. We deem this an acceptable // inefficiency. - //if matches!(offline_on_failure, OfflineOnFailure::Yes) { - // $candidate.set_offline().await; - //} errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); } @@ -658,12 +540,7 @@ impl BeaconNodeFallback { /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn run_on_all( - &self, - _require_synced: RequireSynced, - _offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result<(), Errors> + pub async fn run_on_all(&self, func: F) -> Result<(), Errors> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, @@ -688,9 +565,6 @@ impl BeaconNodeFallback { // There exists a race condition where the candidate may have been marked // as ready between the `func` call and now. We deem this an acceptable // inefficiency. - //if matches!(offline_on_failure, OfflineOnFailure::Yes) { - // $candidate.set_offline().await; - //} results.push(Err(( $candidate.beacon_node.to_string(), Error::RequestFailed(e), @@ -720,24 +594,17 @@ impl BeaconNodeFallback { /// Call `func` on first beacon node that returns success or on all beacon nodes /// depending on the value of `disable_run_on_all`. - pub async fn run( - &self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result<(), Errors> + pub async fn run(&self, func: F) -> Result<(), Errors> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, Err: Debug, { if self.disable_run_on_all { - self.first_success(require_synced, offline_on_failure, func) - .await?; + self.first_success(func).await?; Ok(()) } else { - self.run_on_all(require_synced, offline_on_failure, func) - .await + self.run_on_all(func).await } } } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 8b0b5dcacb..255b532862 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,9 +1,6 @@ use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ - beacon_node_fallback::{BeaconNodeFallback, RequireSynced}, - determine_graffiti, - graffiti_file::GraffitiFile, - OfflineOnFailure, + beacon_node_fallback::BeaconNodeFallback, determine_graffiti, graffiti_file::GraffitiFile, }; use crate::{ http_metrics::metrics, @@ -147,8 +144,6 @@ impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. pub async fn first_success_try_proposers_first( &self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where @@ -158,25 +153,18 @@ impl ProposerFallback { { // If there are proposer nodes, try calling `func` on them and return early if they are successful. if let Some(proposer_nodes) = &self.proposer_nodes { - if let Ok(result) = proposer_nodes - .first_success(require_synced, offline_on_failure, func.clone()) - .await - { + if let Ok(result) = proposer_nodes.first_success(func.clone()).await { return Ok(result); } } // If the proposer nodes failed, try on the non-proposer nodes. - self.beacon_nodes - .first_success(require_synced, offline_on_failure, func) - .await + self.beacon_nodes.first_success(func).await } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. pub async fn first_success_try_proposers_last( &self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, func: F, ) -> Result> where @@ -185,10 +173,7 @@ impl ProposerFallback { Err: Debug, { // Try running `func` on the non-proposer beacon nodes. - let beacon_nodes_result = self - .beacon_nodes - .first_success(require_synced, offline_on_failure, func.clone()) - .await; + let beacon_nodes_result = self.beacon_nodes.first_success(func.clone()).await; match (beacon_nodes_result, &self.proposer_nodes) { // The non-proposer node call succeed, return the result. @@ -196,11 +181,7 @@ impl ProposerFallback { // The non-proposer node call failed, but we don't have any proposer nodes. Return an error. (Err(e), None) => Err(e), // The non-proposer node call failed, try the same call on the proposer nodes. - (Err(_), Some(proposer_nodes)) => { - proposer_nodes - .first_success(require_synced, offline_on_failure, func) - .await - } + (Err(_), Some(proposer_nodes)) => proposer_nodes.first_success(func).await, } } } @@ -474,68 +455,64 @@ impl BlockService { // Try the proposer nodes last, since it's likely that they don't have a // great view of attestations on the network. let block = proposer_fallback - .first_success_try_proposers_last( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let block = match Payload::block_type() { - BlockType::Full => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - BlockType::Blinded => { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], - ); - beacon_node - .get_validator_blinded_blocks::( - slot, - randao_reveal_ref, - graffiti.as_ref(), - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - })? - .data - } - }; - - info!( - log, - "Received unsigned block"; - "slot" => slot.as_u64(), - ); - if proposer_index != Some(block.proposer_index()) { - return Err(BlockError::Recoverable( - "Proposer index does not match block proposer. Beacon chain re-orged" - .to_string(), - )); + .first_success_try_proposers_last(|beacon_node| async move { + let block = match Payload::block_type() { + BlockType::Full => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data } + BlockType::Blinded => { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_GET], + ); + beacon_node + .get_validator_blinded_blocks::( + slot, + randao_reveal_ref, + graffiti.as_ref(), + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + })? + .data + } + }; - Ok::<_, BlockError>(block) - }, - ) + info!( + log, + "Received unsigned block"; + "slot" => slot.as_u64(), + ); + if proposer_index != Some(block.proposer_index()) { + return Err(BlockError::Recoverable( + "Proposer index does not match block proposer. Beacon chain re-orged" + .to_string(), + )); + } + + Ok::<_, BlockError>(block) + }) .await?; let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); @@ -580,45 +557,41 @@ impl BlockService { // protect them from DoS attacks and they're most likely to successfully // publish a block. proposer_fallback - .first_success_try_proposers_first( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - match Payload::block_type() { - BlockType::Full => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } - BlockType::Blinded => { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], - ); - beacon_node - .post_beacon_blinded_blocks(&signed_block) - .await - .map_err(|e| { - BlockError::Irrecoverable(format!( - "Error from beacon node when publishing block: {:?}", - e - )) - })? - } + .first_success_try_proposers_first(|beacon_node| async { + match Payload::block_type() { + BlockType::Full => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? } - Ok::<_, BlockError>(()) - }, - ) + BlockType::Blinded => { + let _post_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BLINDED_BEACON_BLOCK_HTTP_POST], + ); + beacon_node + .post_beacon_blinded_blocks(&signed_block) + .await + .map_err(|e| { + BlockError::Irrecoverable(format!( + "Error from beacon node when publishing block: {:?}", + e + )) + })? + } + } + Ok::<_, BlockError>(()) + }) .await?; info!( diff --git a/validator_client/src/check_synced.rs b/validator_client/src/check_synced.rs index be6d591c9e..a7b81df9d8 100644 --- a/validator_client/src/check_synced.rs +++ b/validator_client/src/check_synced.rs @@ -1,87 +1,6 @@ use crate::beacon_node_fallback::CandidateError; use eth2::{types::Slot, BeaconNodeHttpClient}; -use slog::{debug, error, warn, Logger}; -use slot_clock::SlotClock; - -/// A distance in slots. -const SYNC_TOLERANCE: u64 = 4; - -/// Returns -/// -/// `Ok(())` if the beacon node is synced and ready for action, -/// `Err(CandidateError::Offline)` if the beacon node is unreachable, -/// `Err(CandidateError::NotSynced)` if the beacon node indicates that it is syncing **AND** -/// it is more than `SYNC_TOLERANCE` behind the highest -/// known slot. -/// -/// The second condition means the even if the beacon node thinks that it's syncing, we'll still -/// try to use it if it's close enough to the head. -pub async fn check_synced( - beacon_node: &BeaconNodeHttpClient, - slot_clock: &T, - log_opt: Option<&Logger>, -) -> Result<(), CandidateError> { - let resp = match beacon_node.get_node_syncing().await { - Ok(resp) => resp, - Err(e) => { - if let Some(log) = log_opt { - warn!( - log, - "Unable connect to beacon node"; - "error" => %e - ) - } - - return Err(CandidateError::Offline); - } - }; - - // Default EL status to "online" for backwards-compatibility with BNs that don't include it. - let el_offline = resp.data.el_offline.unwrap_or(false); - let bn_is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE); - let is_synced = bn_is_synced && !el_offline; - - if let Some(log) = log_opt { - if !is_synced { - debug!( - log, - "Beacon node sync status"; - "status" => format!("{:?}", resp), - ); - - warn!( - log, - "Beacon node is not synced"; - "sync_distance" => resp.data.sync_distance.as_u64(), - "head_slot" => resp.data.head_slot.as_u64(), - "endpoint" => %beacon_node, - "el_offline" => el_offline, - ); - } - - if let Some(local_slot) = slot_clock.now() { - let remote_slot = resp.data.head_slot + resp.data.sync_distance; - if remote_slot + 1 < local_slot || local_slot + 1 < remote_slot { - error!( - log, - "Time discrepancy with beacon node"; - "msg" => "check the system time on this host and the beacon node", - "beacon_node_slot" => remote_slot, - "local_slot" => local_slot, - "endpoint" => %beacon_node, - ); - - return Err(CandidateError::TimeDiscrepancy); - } - } - } - - if is_synced { - Ok(()) - } else { - Err(CandidateError::NotSynced) - } -} +use slog::{warn, Logger}; pub async fn check_node_health( beacon_node: &BeaconNodeHttpClient, diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs index 558b9e199f..efb14babdb 100644 --- a/validator_client/src/doppelganger_service.rs +++ b/validator_client/src/doppelganger_service.rs @@ -29,9 +29,8 @@ //! //! Doppelganger protection is a best-effort, last-line-of-defence mitigation. Do not rely upon it. -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::validator_store::ValidatorStore; -use crate::OfflineOnFailure; use environment::RuntimeContext; use eth2::types::LivenessResponseData; use parking_lot::RwLock; @@ -177,17 +176,13 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( } else { // Request the previous epoch liveness state from the beacon node. beacon_nodes - .first_success( - RequireSynced::Yes, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_lighthouse_liveness(validator_indices, previous_epoch) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) - }, - ) + .first_success(|beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, previous_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }) .await .unwrap_or_else(|e| { crit!( @@ -204,17 +199,13 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( // Request the current epoch liveness state from the beacon node. let current_epoch_responses = beacon_nodes - .first_success( - RequireSynced::Yes, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_lighthouse_liveness(validator_indices, current_epoch) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| result.data) - }, - ) + .first_success(|beacon_node| async move { + beacon_node + .post_lighthouse_liveness(validator_indices, current_epoch) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| result.data) + }) .await .unwrap_or_else(|e| { crit!( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index a3b3cabccc..da697025f1 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,7 @@ mod sync; -use crate::beacon_node_fallback::{BeaconNodeFallback, OfflineOnFailure, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY}; use crate::{ block_service::BlockServiceNotification, @@ -415,22 +415,18 @@ async fn poll_validator_indices( // Query the remote BN to resolve a pubkey to a validator index. let download_result = duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::VALIDATOR_ID_HTTP_GET], - ); - beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey), - ) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::VALIDATOR_ID_HTTP_GET], + ); + beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey), + ) + .await + }) .await; let fee_recipient = duties_service @@ -612,19 +608,15 @@ async fn poll_beacon_attesters( let subscriptions_ref = &subscriptions; if let Err(e) = duties_service .beacon_nodes - .run( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::SUBSCRIPTIONS_HTTP_POST], - ); - beacon_node - .post_validator_beacon_committee_subscriptions(subscriptions_ref) - .await - }, - ) + .run(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::SUBSCRIPTIONS_HTTP_POST], + ); + beacon_node + .post_validator_beacon_committee_subscriptions(subscriptions_ref) + .await + }) .await { error!( @@ -850,19 +842,15 @@ async fn post_validator_duties_attester( ) -> Result>, Error> { duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::ATTESTER_DUTIES_HTTP_POST], - ); - beacon_node - .post_validator_duties_attester(epoch, validator_indices) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTER_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_attester(epoch, validator_indices) + .await + }) .await .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) } @@ -1071,19 +1059,15 @@ async fn poll_beacon_proposers( if !local_pubkeys.is_empty() { let download_result = duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::PROPOSER_DUTIES_HTTP_GET], - ); - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::PROPOSER_DUTIES_HTTP_GET], + ); + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + }) .await; match download_result { diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index cf63d8ac62..bf21097385 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -1,4 +1,3 @@ -use crate::beacon_node_fallback::{OfflineOnFailure, RequireSynced}; use crate::{ doppelganger_service::DoppelgangerStatus, duties_service::{DutiesService, Error}, @@ -424,19 +423,15 @@ pub async fn poll_sync_committee_duties_for_period( let genesis = loop { match beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |node| async move { node.get_beacon_genesis().await }, - ) + .first_success(|node| async move { node.get_beacon_genesis().await }) .await { Ok(genesis) => break genesis.data, @@ -769,11 +764,7 @@ async fn poll_whilst_waiting_for_genesis( ) -> Result<(), String> { loop { match beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { beacon_node.get_lighthouse_staking().await }, - ) + .first_success(|beacon_node| async move { beacon_node.get_lighthouse_staking().await }) .await { Ok(is_staking) => { diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index 2d2221680f..c250a124d2 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -1,6 +1,5 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}; -use crate::OfflineOnFailure; use bls::PublicKeyBytes; use environment::RuntimeContext; use parking_lot::RwLock; @@ -342,15 +341,11 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .run( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_validator_prepare_beacon_proposer(preparation_entries) - .await - }, - ) + .run(|beacon_node| async move { + beacon_node + .post_validator_prepare_beacon_proposer(preparation_entries) + .await + }) .await { Ok(()) => debug!( @@ -476,13 +471,9 @@ impl PreparationService { for batch in signed.chunks(self.validator_registration_batch_size) { match self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::No, - |beacon_node| async move { - beacon_node.post_validator_register_validator(batch).await - }, - ) + .first_success(|beacon_node| async move { + beacon_node.post_validator_register_validator(batch).await + }) .await { Ok(()) => info!( diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index e01bf09cf2..0318a1d5bf 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -1,8 +1,7 @@ -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::{ duties_service::DutiesService, validator_store::{Error as ValidatorStoreError, ValidatorStore}, - OfflineOnFailure, }; use environment::RuntimeContext; use eth2::types::BlockId; @@ -182,8 +181,6 @@ impl SyncCommitteeService { let response = self .beacon_nodes .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, |beacon_node| async move { match beacon_node.get_beacon_blocks_root(BlockId::Head).await { Ok(Some(block)) if block.execution_optimistic == Some(false) => { @@ -301,15 +298,11 @@ impl SyncCommitteeService { .collect::>(); self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_beacon_pool_sync_committee_signatures(committee_signatures) - .await - }, - ) + .first_success(|beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }) .await .map_err(|e| { error!( @@ -372,21 +365,17 @@ impl SyncCommitteeService { let contribution = &self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let sync_contribution_data = SyncContributionData { - slot, - beacon_block_root, - subcommittee_index: subnet_id.into(), - }; + .first_success(|beacon_node| async move { + let sync_contribution_data = SyncContributionData { + slot, + beacon_block_root, + subcommittee_index: subnet_id.into(), + }; - beacon_node - .get_validator_sync_committee_contribution::(&sync_contribution_data) - .await - }, - ) + beacon_node + .get_validator_sync_committee_contribution::(&sync_contribution_data) + .await + }) .await .map_err(|e| { crit!( @@ -454,15 +443,11 @@ impl SyncCommitteeService { // Publish to the beacon node. self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_validator_contribution_and_proofs(signed_contributions) - .await - }, - ) + .first_success(|beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }) .await .map_err(|e| { error!( @@ -596,15 +581,11 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .run( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_validator_sync_committee_subscriptions(subscriptions_slice) - .await - }, - ) + .run(|beacon_node| async move { + beacon_node + .post_validator_sync_committee_subscriptions(subscriptions_slice) + .await + }) .await { error!(