diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index 2a847ecfa7..57286cce8b 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -1,12 +1,8 @@ use beacon_processor::{AsyncFn, BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent}; -use execution_layer::deposit_methods::Block; use serde::Serialize; use tokio::sync::{mpsc::error::TrySendError, oneshot}; use types::EthSpec; -use warp::{ - http::StatusCode, - reply::{Reply, Response}, -}; +use warp::reply::{Reply, Response}; #[derive(Clone, Copy)] enum Priority { @@ -50,22 +46,9 @@ impl TaskSpawner { beacon_processor_send, priority, BlockingOrAsync::Blocking(Box::new(process_fn)), - ); - - match rx.await { - // The beacon processor executed the task and sent a result. - Ok(func_result) => func_result.map(|r| warp::reply::json(&r).into_response()), - // The beacon processor dropped the channel without sending a - // result. The beacon processor dropped this task because its - // queues are full or it's shutting down. - Err(_) => Ok(warp::reply::with_status( - warp::reply::json( - &"The task did not execute. Server is overloaded or shutting down.", - ), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response()), - } + rx, + ) + .await } else { // There is no beacon processor, spawn a task directly on the tokio // executor. @@ -74,11 +57,15 @@ impl TaskSpawner { } } -fn send_to_beacon_processor( +async fn send_to_beacon_processor( beacon_processor_send: &BeaconProcessorSend, priority: Priority, process_fn: BlockingOrAsync, -) -> Response { + rx: oneshot::Receiver>, +) -> Result +where + T: Serialize + Send + 'static, +{ let work = match priority { Priority::P0 => Work::ApiRequestP0(process_fn), }; @@ -88,17 +75,26 @@ fn send_to_beacon_processor( }; let error_message = match beacon_processor_send.try_send(work_event) { - Ok(()) => None, - Err(TrySendError::Full(_)) => Some("The node is overloaded and the task was dropped."), - Err(TrySendError::Closed(_)) => Some("The node is shutting down and the task was dropped."), + Ok(()) => { + match rx.await { + // The beacon processor executed the task and sent a result. + Ok(func_result) => { + return func_result.map(|r| warp::reply::json(&r).into_response()) + } + // The beacon processor dropped the channel without sending a + // result. The beacon processor dropped this task because its + // queues are full or it's shutting down. + Err(_) => "The task did not execute. Server is overloaded or shutting down.", + } + } + Err(TrySendError::Full(_)) => "The node is overloaded and the task was dropped.", + Err(TrySendError::Closed(_)) => "The node is shutting down and the task was dropped.", }; - if let Some(error_message) = error_message { - return warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - } - todo!() + let error_response = warp::reply::with_status( + warp::reply::json(&error_message), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(); + Ok(error_response) }