From 10dc153ea13df3cad75be98f779acd8214594d28 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 4 Jul 2023 17:50:06 +1000 Subject: [PATCH] Futher simplify `TaskSpawner` --- beacon_node/http_api/src/task_spawner.rs | 65 +++++------------------- 1 file changed, 14 insertions(+), 51 deletions(-) diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index cf4fcbbc5c..21fbce8b34 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -104,35 +104,16 @@ impl TaskSpawner { // send can only fail if the receiver is dropped. let _ = tx.send(func_result); }; - let process_fn = BlockingOrAsync::Async(Box::pin(process_fn)); // Send the function to the beacon processor for execution at some arbitrary time. - let error_message = - match beacon_processor_send.try_send(priority.work_event(process_fn)) { - Ok(()) => { - match rx.await { - // The beacon processor executed the task and sent a result. - Ok(func_result) => { - return func_result - } - // The beacon processor dropped the channel without sending a - // result. The beacon processor dropped this task because its - // queues are full or it's shutting down. - Err(_) => "The task did not execute. The server is overloaded or shutting down.", - } - } - Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.", - Err(TrySendError::Closed(_)) => { - "The task was dropped. The server is shutting down." - } - }; - - let error_response = warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Async(Box::pin(process_fn)), + rx, ) - .into_response(); - Ok(error_response) + .await + .unwrap_or_else(Result::Ok) } else { // There is no beacon processor so spawn a task directly on the // tokio executor. @@ -163,34 +144,16 @@ impl TaskSpawner { // send can only fail if the receiver is dropped. let _ = tx.send(func_result); }; - let process_fn = BlockingOrAsync::Async(Box::pin(process_fn)); // Send the function to the beacon processor for execution at some arbitrary time. - let error_message = - match beacon_processor_send.try_send(priority.work_event(process_fn)) { - Ok(()) => { - match rx.await { - // The beacon processor executed the task and sent a result. - Ok(func_result) => { - return func_result - } - // The beacon processor dropped the channel without sending a - // result. The beacon processor dropped this task because its - // queues are full or it's shutting down. - Err(_) => "The task did not execute. The server is overloaded or shutting down.", - } - } - Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.", - Err(TrySendError::Closed(_)) => { - "The task was dropped. The server is shutting down." - } - }; - - warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Async(Box::pin(process_fn)), + rx, ) - .into_response() + .await + .unwrap_or_else(|error_response| error_response) } else { // There is no beacon processor so spawn a task directly on the // tokio executor.