diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index 8915f91298..cf4fcbbc5c 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -35,42 +35,6 @@ impl TaskSpawner { } } - pub async fn blocking_json_task( - self, - priority: Priority, - func: F, - ) -> Result - where - F: FnOnce() -> Result + Send + Sync + 'static, - T: Serialize + Send + 'static, - { - if let Some(beacon_processor_send) = &self.beacon_processor_send { - // Create a closure that will execute `func` and send the result to - // a channel held by this thread. - let (tx, rx) = oneshot::channel(); - let process_fn = move || { - // Execute the function, collect the return value. - let func_result = func(); - // Send the result down the channel. Ignore any failures; the - // send can only fail if the receiver is dropped. - let _ = tx.send(func_result); - }; - - // Send the function to the beacon processor for execution at some arbitrary time. - send_to_beacon_processor( - beacon_processor_send, - priority, - BlockingOrAsync::Blocking(Box::new(process_fn)), - rx, - ) - .await - } else { - // There is no beacon processor so spawn a task directly on the - // tokio executor. - warp_utils::task::blocking_json_task(func).await - } - } - pub async fn blocking_response_task( self, priority: Priority, @@ -91,34 +55,19 @@ impl TaskSpawner { // send can only fail if the receiver is dropped. let _ = tx.send(func_result); }; - let process_fn = BlockingOrAsync::Blocking(Box::new(process_fn)); - let error_message = - match beacon_processor_send.try_send(priority.work_event(process_fn)) { - Ok(()) => { - match rx.await { - // The beacon processor executed the task and sent a result. - Ok(func_result) => { - return func_result.map(Reply::into_response) - } - // The beacon processor dropped the channel without sending a - // result. The beacon processor dropped this task because its - // queues are full or it's shutting down. - Err(_) => "The task did not execute. The server is overloaded or shutting down.", - } - } - Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.", - Err(TrySendError::Closed(_)) => { - "The task was dropped. The server is shutting down." - } - }; - - let error_response = warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, + // Send the function to the beacon processor for execution at some arbitrary time. + match send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Blocking(Box::new(process_fn)), + rx, ) - .into_response(); - Ok(error_response) + .await + { + Ok(result) => result.map(Reply::into_response), + Err(error_response) => Ok(error_response), + } } else { // There is no beacon processor so spawn a task directly on the // tokio executor. @@ -126,6 +75,19 @@ impl TaskSpawner { } } + pub async fn blocking_json_task( + self, + priority: Priority, + func: F, + ) -> Result + where + F: FnOnce() -> Result + Send + Sync + 'static, + T: Serialize + Send + 'static, + { + let func = || func().map(|t| warp::reply::json(&t).into_response()); + self.blocking_response_task(priority, func).await + } + pub async fn spawn_async_with_rejection( self, priority: Priority, @@ -247,18 +209,13 @@ async fn send_to_beacon_processor( beacon_processor_send: &BeaconProcessorSend, priority: Priority, process_fn: BlockingOrAsync, - rx: oneshot::Receiver>, -) -> Result -where - T: Serialize + Send + 'static, -{ + rx: oneshot::Receiver, +) -> Result { let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { Ok(()) => { match rx.await { // The beacon processor executed the task and sent a result. - Ok(func_result) => { - return func_result.map(|r| warp::reply::json(&r).into_response()) - } + Ok(func_result) => return Ok(func_result), // The beacon processor dropped the channel without sending a // result. The beacon processor dropped this task because its // queues are full or it's shutting down. @@ -274,5 +231,5 @@ where eth2::StatusCode::INTERNAL_SERVER_ERROR, ) .into_response(); - Ok(error_response) + Err(error_response) }