diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 509dc968f5..25183f706d 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -11,7 +11,6 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; -use parking_lot::RwLock as PLRwLock; use serde::{Deserialize, Serialize}; use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; @@ -142,11 +141,11 @@ pub struct CandidateInfo { /// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used /// for a query. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CandidateBeaconNode { pub index: usize, pub beacon_node: BeaconNodeHttpClient, - pub health: PLRwLock>, + pub health: Arc>>, _phantom: PhantomData, } @@ -158,37 +157,20 @@ impl PartialEq for CandidateBeaconNode { impl Eq for CandidateBeaconNode {} -impl Ord for CandidateBeaconNode { - fn cmp(&self, other: &Self) -> Ordering { - match (&(self.health()), &(other.health())) { - (Err(_), Err(_)) => Ordering::Equal, - (Err(_), _) => Ordering::Greater, - (_, Err(_)) => Ordering::Less, - (Ok(health_1), Ok(health_2)) => health_1.cmp(health_2), - } - } -} - -impl PartialOrd for CandidateBeaconNode { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - impl CandidateBeaconNode { /// Instantiate a new node. pub fn new(beacon_node: BeaconNodeHttpClient, index: usize) -> Self { Self { index, beacon_node, - health: PLRwLock::new(Err(CandidateError::Uninitialized)), + health: Arc::new(RwLock::new(Err(CandidateError::Uninitialized))), _phantom: PhantomData, } } /// Returns the health of `self`. - pub fn health(&self) -> Result { - *self.health.read() + pub async fn health(&self) -> Result { + *self.health.read().await } pub async fn refresh_health( @@ -199,7 +181,7 @@ impl CandidateBeaconNode { log: &Logger, ) -> Result<(), CandidateError> { if let Err(e) = self.is_compatible(spec, log).await { - *self.health.write() = Err(e); + *self.health.write().await = Err(e); return Err(e); } @@ -240,12 +222,12 @@ impl CandidateBeaconNode { distance_tiers, ); - *self.health.write() = Ok(new_health); + *self.health.write().await = Ok(new_health); Ok(()) } Err(e) => { // Set the health as `Err` which is sorted last in the list. - *self.health.write() = Err(e); + *self.health.write().await = Err(e); Err(e) } } @@ -388,7 +370,7 @@ impl BeaconNodeFallback { pub async fn num_available(&self) -> usize { let mut n = 0; for candidate in self.candidates.read().await.iter() { - match candidate.health() { + match candidate.health().await { Ok(_) | Err(CandidateError::Uninitialized) => n += 1, Err(_) => continue, } @@ -405,9 +387,9 @@ impl BeaconNodeFallback { let mut num_synced = 0; for candidate in candidates.iter() { - let health = candidate.health(); + let health = candidate.health().await; - match candidate.health() { + match candidate.health().await { Ok(health) => { if self .distance_tiers @@ -471,9 +453,8 @@ impl BeaconNodeFallback { drop(candidates); - // Sort the list to put the healthiest candidate first. - let mut write = self.candidates.write().await; - write.sort(); + let mut candidates = self.candidates.write().await; + sort_nodes_by_health(&mut candidates).await; } /// Concurrently send a request to all candidates (regardless of @@ -519,7 +500,7 @@ impl BeaconNodeFallback { /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. - pub async fn first_success(&self, func: F) -> Result> + pub async fn first_success<'a, F, O, Err, R>(&'a self, func: F) -> Result> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, @@ -562,8 +543,8 @@ impl BeaconNodeFallback { try_func!(candidate); } - // Second pass. No candidates returned successfully. Try again with the same order. - // This will duplicate errors. + //// Second pass. No candidates returned successfully. Try again with the same order. + //// This will duplicate errors. for candidate in candidates.iter() { try_func!(candidate); } @@ -579,7 +560,7 @@ impl BeaconNodeFallback { /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn broadcast(&self, func: F) -> Result<(), Errors> + pub async fn broadcast<'a, F, O, Err, R>(&'a self, func: F) -> Result<(), Errors> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, @@ -637,7 +618,11 @@ impl BeaconNodeFallback { /// Call `func` on first beacon node that returns success or on all beacon nodes /// depending on the `topic` and configuration. - pub async fn request(&self, topic: ApiTopic, func: F) -> Result<(), Errors> + pub async fn request<'a, F, Err, R>( + &'a self, + topic: ApiTopic, + func: F, + ) -> Result<(), Errors> where F: Fn(BeaconNodeHttpClient) -> R, R: Future>, @@ -652,6 +637,32 @@ impl BeaconNodeFallback { } } +/// Helper functions to allow sorting candidate nodes by health. +async fn sort_nodes_by_health(nodes: &mut Vec>) { + // Fetch all health values. + let health_results: Vec> = + future::join_all(nodes.iter().map(|node| node.health())).await; + + // Pair health results with their indices. + let mut indices_with_health: Vec<(usize, Result)> = + health_results.into_iter().enumerate().collect(); + + // Sort indices based on their health. + indices_with_health.sort_by(|a, b| match (&a.1, &b.1) { + (Ok(health_a), Ok(health_b)) => health_a.cmp(health_b), + (Err(_), Ok(_)) => Ordering::Greater, + (Ok(_), Err(_)) => Ordering::Less, + (Err(_), Err(_)) => Ordering::Equal, + }); + + // Reorder candidates based on the sorted indices. + let sorted_nodes: Vec> = indices_with_health + .into_iter() + .map(|(index, _)| nodes[index].clone()) + .collect(); + *nodes = sorted_nodes; +} + /// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted. #[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)] #[strum(serialize_all = "kebab-case")] @@ -691,8 +702,8 @@ mod tests { .eq(all.into_iter())); } - #[test] - fn check_candidate_order() { + #[tokio::test] + async fn check_candidate_order() { // These fields is irrelvant for sorting. They are set to arbitrary values. let head = Slot::new(99); let optimistic_status = IsOptimistic::No; @@ -773,12 +784,12 @@ mod tests { health_tier: BeaconNodeHealthTier::new(4, Slot::new(10), small), }; - *candidate_1.health.write() = Ok(health_1); - *candidate_2.health.write() = Ok(health_2); - *candidate_3.health.write() = Ok(health_3); - *candidate_4.health.write() = Ok(health_4); - *candidate_5.health.write() = Ok(health_5); - *candidate_6.health.write() = Ok(health_6); + *candidate_1.health.write().await = Ok(health_1); + *candidate_2.health.write().await = Ok(health_2); + *candidate_3.health.write().await = Ok(health_3); + *candidate_4.health.write().await = Ok(health_4); + *candidate_5.health.write().await = Ok(health_5); + *candidate_6.health.write().await = Ok(health_6); let mut candidates = vec![ candidate_3, @@ -797,7 +808,7 @@ mod tests { expected_candidate_6, ]; - candidates.sort(); + sort_nodes_by_health(&mut candidates).await; assert_eq!(candidates, expected_candidates); } diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index fff8b0d0f3..b7a9df4c5d 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -140,7 +140,10 @@ pub struct ProposerFallback { impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. - pub async fn request_proposers_first(&self, func: F) -> Result<(), Errors> + pub async fn request_proposers_first<'a, F, Err, R>( + &'a self, + func: F, + ) -> Result<(), Errors> where F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, @@ -162,7 +165,10 @@ impl ProposerFallback { } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. - pub async fn request_proposers_last(&self, func: F) -> Result> + pub async fn request_proposers_last<'a, F, O, Err, R>( + &'a self, + func: F, + ) -> Result> where F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs index 94cc9f9724..42a2eb84d3 100644 --- a/validator_client/src/doppelganger_service.rs +++ b/validator_client/src/doppelganger_service.rs @@ -174,25 +174,25 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( } else { // Request the previous epoch liveness state from the beacon node. beacon_nodes - .first_success(|beacon_node| async { - let owned_beacon_node = beacon_node.clone(); - drop(beacon_node); - - owned_beacon_node - .post_validator_liveness_epoch(previous_epoch, &validator_indices) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| { - result - .data - .into_iter() - .map(|response| LivenessResponseData { - index: response.index, - epoch: previous_epoch, - is_live: response.is_live, - }) - .collect() - }) + .first_success(|beacon_node| { + let validator_indices_ref = &validator_indices; + async move { + beacon_node + .post_validator_liveness_epoch(previous_epoch, validator_indices_ref) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| { + result + .data + .into_iter() + .map(|response| LivenessResponseData { + index: response.index, + epoch: previous_epoch, + is_live: response.is_live, + }) + .collect() + }) + } }) .await .unwrap_or_else(|e| { @@ -210,25 +210,25 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( // Request the current epoch liveness state from the beacon node. let current_epoch_responses = beacon_nodes - .first_success(|beacon_node| async { - let owned_beacon_node = beacon_node.clone(); - drop(beacon_node); - - owned_beacon_node - .post_validator_liveness_epoch(current_epoch, &validator_indices) - .await - .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) - .map(|result| { - result - .data - .into_iter() - .map(|response| LivenessResponseData { - index: response.index, - epoch: current_epoch, - is_live: response.is_live, - }) - .collect() - }) + .first_success(|beacon_node| { + let validator_indices_ref = &validator_indices; + async move { + beacon_node + .post_validator_liveness_epoch(current_epoch, validator_indices_ref) + .await + .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) + .map(|result| { + result + .data + .into_iter() + .map(|response| LivenessResponseData { + index: response.index, + epoch: current_epoch, + is_live: response.is_live, + }) + .collect() + }) + } }) .await .unwrap_or_else(|e| { diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 2625baa5c9..950d4f9f04 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -424,14 +424,14 @@ pub fn serve( for node in &*block_filter.beacon_nodes.candidates.read().await { result.insert( (node.index, node.beacon_node.to_string()), - *node.health.read(), + *node.health.read().await, ); } if let Some(proposer_nodes) = &block_filter.proposer_nodes { for node in &*proposer_nodes.candidates.read().await { result.insert( (node.index, node.beacon_node.to_string()), - *node.health.read(), + *node.health.read().await, ); } }