Use tokio RwLock and remove parking_lot

This commit is contained in:
Mac L
2024-08-19 22:15:33 +10:00
parent 9c206d5d6e
commit 526a894a2b
4 changed files with 105 additions and 88 deletions

View File

@@ -11,7 +11,6 @@ use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_RE
use environment::RuntimeContext;
use eth2::BeaconNodeHttpClient;
use futures::future;
use parking_lot::RwLock as PLRwLock;
use serde::{Deserialize, Serialize};
use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
@@ -142,11 +141,11 @@ pub struct CandidateInfo {
/// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used
/// for a query.
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct CandidateBeaconNode<E> {
pub index: usize,
pub beacon_node: BeaconNodeHttpClient,
pub health: PLRwLock<Result<BeaconNodeHealth, CandidateError>>,
pub health: Arc<RwLock<Result<BeaconNodeHealth, CandidateError>>>,
_phantom: PhantomData<E>,
}
@@ -158,37 +157,20 @@ impl<E: EthSpec> PartialEq for CandidateBeaconNode<E> {
impl<E: EthSpec> Eq for CandidateBeaconNode<E> {}
impl<E: EthSpec> Ord for CandidateBeaconNode<E> {
fn cmp(&self, other: &Self) -> Ordering {
match (&(self.health()), &(other.health())) {
(Err(_), Err(_)) => Ordering::Equal,
(Err(_), _) => Ordering::Greater,
(_, Err(_)) => Ordering::Less,
(Ok(health_1), Ok(health_2)) => health_1.cmp(health_2),
}
}
}
impl<E: EthSpec> PartialOrd for CandidateBeaconNode<E> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<E: EthSpec> CandidateBeaconNode<E> {
/// Instantiate a new node.
pub fn new(beacon_node: BeaconNodeHttpClient, index: usize) -> Self {
Self {
index,
beacon_node,
health: PLRwLock::new(Err(CandidateError::Uninitialized)),
health: Arc::new(RwLock::new(Err(CandidateError::Uninitialized))),
_phantom: PhantomData,
}
}
/// Returns the health of `self`.
pub fn health(&self) -> Result<BeaconNodeHealth, CandidateError> {
*self.health.read()
pub async fn health(&self) -> Result<BeaconNodeHealth, CandidateError> {
*self.health.read().await
}
pub async fn refresh_health<T: SlotClock>(
@@ -199,7 +181,7 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
log: &Logger,
) -> Result<(), CandidateError> {
if let Err(e) = self.is_compatible(spec, log).await {
*self.health.write() = Err(e);
*self.health.write().await = Err(e);
return Err(e);
}
@@ -240,12 +222,12 @@ impl<E: EthSpec> CandidateBeaconNode<E> {
distance_tiers,
);
*self.health.write() = Ok(new_health);
*self.health.write().await = Ok(new_health);
Ok(())
}
Err(e) => {
// Set the health as `Err` which is sorted last in the list.
*self.health.write() = Err(e);
*self.health.write().await = Err(e);
Err(e)
}
}
@@ -388,7 +370,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
pub async fn num_available(&self) -> usize {
let mut n = 0;
for candidate in self.candidates.read().await.iter() {
match candidate.health() {
match candidate.health().await {
Ok(_) | Err(CandidateError::Uninitialized) => n += 1,
Err(_) => continue,
}
@@ -405,9 +387,9 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
let mut num_synced = 0;
for candidate in candidates.iter() {
let health = candidate.health();
let health = candidate.health().await;
match candidate.health() {
match candidate.health().await {
Ok(health) => {
if self
.distance_tiers
@@ -471,9 +453,8 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
drop(candidates);
// Sort the list to put the healthiest candidate first.
let mut write = self.candidates.write().await;
write.sort();
let mut candidates = self.candidates.write().await;
sort_nodes_by_health(&mut candidates).await;
}
/// Concurrently send a request to all candidates (regardless of
@@ -519,7 +500,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// Run `func` against each candidate in `self`, returning immediately if a result is found.
/// Otherwise, return all the errors encountered along the way.
pub async fn first_success<F, O, Err, R>(&self, func: F) -> Result<O, Errors<Err>>
pub async fn first_success<'a, F, O, Err, R>(&'a self, func: F) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
@@ -562,8 +543,8 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
try_func!(candidate);
}
// Second pass. No candidates returned successfully. Try again with the same order.
// This will duplicate errors.
//// Second pass. No candidates returned successfully. Try again with the same order.
//// This will duplicate errors.
for candidate in candidates.iter() {
try_func!(candidate);
}
@@ -579,7 +560,7 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// It returns a list of errors along with the beacon node id that failed for `func`.
/// Since this ignores the actual result of `func`, this function should only be used for beacon
/// node calls whose results we do not care about, only that they completed successfully.
pub async fn broadcast<F, O, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
pub async fn broadcast<'a, F, O, Err, R>(&'a self, func: F) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
@@ -637,7 +618,11 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
/// Call `func` on first beacon node that returns success or on all beacon nodes
/// depending on the `topic` and configuration.
pub async fn request<F, Err, R>(&self, topic: ApiTopic, func: F) -> Result<(), Errors<Err>>
pub async fn request<'a, F, Err, R>(
&'a self,
topic: ApiTopic,
func: F,
) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R,
R: Future<Output = Result<(), Err>>,
@@ -652,6 +637,32 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
}
}
/// Helper functions to allow sorting candidate nodes by health.
async fn sort_nodes_by_health<E: EthSpec>(nodes: &mut Vec<CandidateBeaconNode<E>>) {
// Fetch all health values.
let health_results: Vec<Result<BeaconNodeHealth, CandidateError>> =
future::join_all(nodes.iter().map(|node| node.health())).await;
// Pair health results with their indices.
let mut indices_with_health: Vec<(usize, Result<BeaconNodeHealth, CandidateError>)> =
health_results.into_iter().enumerate().collect();
// Sort indices based on their health.
indices_with_health.sort_by(|a, b| match (&a.1, &b.1) {
(Ok(health_a), Ok(health_b)) => health_a.cmp(health_b),
(Err(_), Ok(_)) => Ordering::Greater,
(Ok(_), Err(_)) => Ordering::Less,
(Err(_), Err(_)) => Ordering::Equal,
});
// Reorder candidates based on the sorted indices.
let sorted_nodes: Vec<CandidateBeaconNode<E>> = indices_with_health
.into_iter()
.map(|(index, _)| nodes[index].clone())
.collect();
*nodes = sorted_nodes;
}
/// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted.
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)]
#[strum(serialize_all = "kebab-case")]
@@ -691,8 +702,8 @@ mod tests {
.eq(all.into_iter()));
}
#[test]
fn check_candidate_order() {
#[tokio::test]
async fn check_candidate_order() {
// These fields is irrelvant for sorting. They are set to arbitrary values.
let head = Slot::new(99);
let optimistic_status = IsOptimistic::No;
@@ -773,12 +784,12 @@ mod tests {
health_tier: BeaconNodeHealthTier::new(4, Slot::new(10), small),
};
*candidate_1.health.write() = Ok(health_1);
*candidate_2.health.write() = Ok(health_2);
*candidate_3.health.write() = Ok(health_3);
*candidate_4.health.write() = Ok(health_4);
*candidate_5.health.write() = Ok(health_5);
*candidate_6.health.write() = Ok(health_6);
*candidate_1.health.write().await = Ok(health_1);
*candidate_2.health.write().await = Ok(health_2);
*candidate_3.health.write().await = Ok(health_3);
*candidate_4.health.write().await = Ok(health_4);
*candidate_5.health.write().await = Ok(health_5);
*candidate_6.health.write().await = Ok(health_6);
let mut candidates = vec![
candidate_3,
@@ -797,7 +808,7 @@ mod tests {
expected_candidate_6,
];
candidates.sort();
sort_nodes_by_health(&mut candidates).await;
assert_eq!(candidates, expected_candidates);
}

View File

@@ -140,7 +140,10 @@ pub struct ProposerFallback<T, E: EthSpec> {
impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
// Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`.
pub async fn request_proposers_first<F, Err, R>(&self, func: F) -> Result<(), Errors<Err>>
pub async fn request_proposers_first<'a, F, Err, R>(
&'a self,
func: F,
) -> Result<(), Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<(), Err>>,
@@ -162,7 +165,10 @@ impl<T: SlotClock, E: EthSpec> ProposerFallback<T, E> {
}
// Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`.
pub async fn request_proposers_last<F, O, Err, R>(&self, func: F) -> Result<O, Errors<Err>>
pub async fn request_proposers_last<'a, F, O, Err, R>(
&'a self,
func: F,
) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,

View File

@@ -174,25 +174,25 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
} else {
// Request the previous epoch liveness state from the beacon node.
beacon_nodes
.first_success(|beacon_node| async {
let owned_beacon_node = beacon_node.clone();
drop(beacon_node);
owned_beacon_node
.post_validator_liveness_epoch(previous_epoch, &validator_indices)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
result
.data
.into_iter()
.map(|response| LivenessResponseData {
index: response.index,
epoch: previous_epoch,
is_live: response.is_live,
})
.collect()
})
.first_success(|beacon_node| {
let validator_indices_ref = &validator_indices;
async move {
beacon_node
.post_validator_liveness_epoch(previous_epoch, validator_indices_ref)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
result
.data
.into_iter()
.map(|response| LivenessResponseData {
index: response.index,
epoch: previous_epoch,
is_live: response.is_live,
})
.collect()
})
}
})
.await
.unwrap_or_else(|e| {
@@ -210,25 +210,25 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>(
// Request the current epoch liveness state from the beacon node.
let current_epoch_responses = beacon_nodes
.first_success(|beacon_node| async {
let owned_beacon_node = beacon_node.clone();
drop(beacon_node);
owned_beacon_node
.post_validator_liveness_epoch(current_epoch, &validator_indices)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
result
.data
.into_iter()
.map(|response| LivenessResponseData {
index: response.index,
epoch: current_epoch,
is_live: response.is_live,
})
.collect()
})
.first_success(|beacon_node| {
let validator_indices_ref = &validator_indices;
async move {
beacon_node
.post_validator_liveness_epoch(current_epoch, validator_indices_ref)
.await
.map_err(|e| format!("Failed query for validator liveness: {:?}", e))
.map(|result| {
result
.data
.into_iter()
.map(|response| LivenessResponseData {
index: response.index,
epoch: current_epoch,
is_live: response.is_live,
})
.collect()
})
}
})
.await
.unwrap_or_else(|e| {

View File

@@ -424,14 +424,14 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
for node in &*block_filter.beacon_nodes.candidates.read().await {
result.insert(
(node.index, node.beacon_node.to_string()),
*node.health.read(),
*node.health.read().await,
);
}
if let Some(proposer_nodes) = &block_filter.proposer_nodes {
for node in &*proposer_nodes.candidates.read().await {
result.insert(
(node.index, node.beacon_node.to_string()),
*node.health.read(),
*node.health.read().await,
);
}
}