Futher simplify TaskSpawner

This commit is contained in:
Paul Hauner
2023-07-04 17:50:06 +10:00
parent a7b259d354
commit 10dc153ea1

View File

@@ -104,35 +104,16 @@ impl<E: EthSpec> TaskSpawner<E> {
// send can only fail if the receiver is dropped.
let _ = tx.send(func_result);
};
let process_fn = BlockingOrAsync::Async(Box::pin(process_fn));
// Send the function to the beacon processor for execution at some arbitrary time.
let error_message =
match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
// The beacon processor executed the task and sent a result.
Ok(func_result) => {
return func_result
}
// The beacon processor dropped the channel without sending a
// result. The beacon processor dropped this task because its
// queues are full or it's shutting down.
Err(_) => "The task did not execute. The server is overloaded or shutting down.",
}
}
Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.",
Err(TrySendError::Closed(_)) => {
"The task was dropped. The server is shutting down."
}
};
let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.into_response();
Ok(error_response)
.await
.unwrap_or_else(Result::Ok)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.
@@ -163,34 +144,16 @@ impl<E: EthSpec> TaskSpawner<E> {
// send can only fail if the receiver is dropped.
let _ = tx.send(func_result);
};
let process_fn = BlockingOrAsync::Async(Box::pin(process_fn));
// Send the function to the beacon processor for execution at some arbitrary time.
let error_message =
match beacon_processor_send.try_send(priority.work_event(process_fn)) {
Ok(()) => {
match rx.await {
// The beacon processor executed the task and sent a result.
Ok(func_result) => {
return func_result
}
// The beacon processor dropped the channel without sending a
// result. The beacon processor dropped this task because its
// queues are full or it's shutting down.
Err(_) => "The task did not execute. The server is overloaded or shutting down.",
}
}
Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.",
Err(TrySendError::Closed(_)) => {
"The task was dropped. The server is shutting down."
}
};
warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
send_to_beacon_processor(
beacon_processor_send,
priority,
BlockingOrAsync::Async(Box::pin(process_fn)),
rx,
)
.into_response()
.await
.unwrap_or_else(|error_response| error_response)
} else {
// There is no beacon processor so spawn a task directly on the
// tokio executor.