Broadcast VC requests in parallel and fix subscription error (#6223)

* Broadcast VC requests in parallel

* Remove outdated comment

* Try some things

* Fix subscription error

* Remove junk logging
This commit is contained in:
Michael Sproul
2024-08-08 09:31:35 +10:00
committed by GitHub
parent 42a1cd81fb
commit a68f34a014
4 changed files with 94 additions and 59 deletions

View File

@@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
}
}
impl<T> Errors<T> {
pub fn num_errors(&self) -> usize {
self.0.len()
}
}
/// Reasons why a candidate might not be ready.
#[derive(Debug, Clone, Copy)]
pub enum CandidateError {
@@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
{
let mut results = vec![];
let mut to_retry = vec![];
let mut retry_unsynced = vec![];
// Run `func` using a `candidate`, returning the value or capturing errors.
//
// We use a macro instead of a closure here since it is not trivial to move `func` into a
// closure.
macro_rules! try_func {
($candidate: ident) => {{
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&$candidate.beacon_node).await {
Ok(val) => results.push(Ok(val)),
Err(e) => {
// If we have an error on this function, make the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
results.push(Err((
$candidate.beacon_node.to_string(),
Error::RequestFailed(e),
)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&candidate.beacon_node).await {
Ok(val) => Ok(val),
Err(e) => {
// If we have an error on this function, mark the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
candidate.set_offline().await;
}
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
}
}};
}
}
};
// First pass: try `func` on all synced and ready candidates.
//
// This ensures that we always choose a synced node if it is available.
let mut first_batch_futures = vec![];
for candidate in &self.candidates {
match candidate.status(RequireSynced::Yes).await {
Ok(_) => {
first_batch_futures.push(run_on_candidate(candidate));
}
Err(CandidateError::NotSynced) if require_synced == false => {
// This client is unsynced we will try it after trying all synced clients
retry_unsynced.push(candidate);
@@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// This client was not ready on the first pass, we might try it again later.
to_retry.push(candidate);
}
Ok(_) => try_func!(candidate),
}
}
let first_batch_results = futures::future::join_all(first_batch_futures).await;
// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
// unsynced candidates.
//
// Due to async race-conditions, it is possible that we will send a request to a candidate
// that has been set to an offline/unready status. This is acceptable.
if require_synced == false {
for candidate in retry_unsynced {
try_func!(candidate);
}
}
let second_batch_results = if require_synced == false {
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
} else {
vec![]
};
// Third pass: try again, attempting to make non-ready clients become ready.
let mut third_batch_futures = vec![];
let mut third_batch_results = vec![];
for candidate in to_retry {
// If the candidate hasn't luckily transferred into the correct state in the meantime,
// force an update of the state.
@@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
};
match new_status {
Ok(()) => try_func!(candidate),
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
Err(e) => {
results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
)));
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
Err(CandidateError::NotSynced) if require_synced == false => {
third_batch_futures.push(run_on_candidate(candidate))
}
Err(e) => third_batch_results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
))),
}
}
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);
let mut results = first_batch_results;
results.extend(second_batch_results);
results.extend(third_batch_results);
let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();