From be7f32f5eb3fa2dffcdc95634d09465467bd8424 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 4 Jul 2023 15:32:01 +1000 Subject: [PATCH] Add async fns to task spawner --- beacon_node/http_api/src/task_spawner.rs | 62 +++++++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index 57286cce8b..d170d31118 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -1,15 +1,16 @@ -use beacon_processor::{AsyncFn, BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent}; +use beacon_processor::{BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent}; use serde::Serialize; +use std::future::Future; use tokio::sync::{mpsc::error::TrySendError, oneshot}; use types::EthSpec; use warp::reply::{Reply, Response}; #[derive(Clone, Copy)] -enum Priority { +pub enum Priority { P0, } -struct TaskSpawner { +pub struct TaskSpawner { beacon_processor_send: Option>, } @@ -50,11 +51,56 @@ impl TaskSpawner { ) .await } else { - // There is no beacon processor, spawn a task directly on the tokio - // executor. + // There is no beacon processor so spawn a task directly on the + // tokio executor. warp_utils::task::blocking_json_task(func).await } } + + pub async fn async_task( + &self, + priority: Priority, + func: impl Future> + Send + Sync + 'static, + ) -> Result + where + T: Serialize + Send + 'static, + { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + // Create a wrapper future that will execute `func` and send the + // result to a channel held by this thread. + let (tx, rx) = oneshot::channel(); + let process_fn = async move { + // Await the future, collect the return value. + let func_result = func.await; + // Send the result down the channel. Ignore any failures; the + // send can only fail if the receiver is dropped. + let _ = tx.send(func_result); + }; + + // Send the function to the beacon processor for execution at some arbitrary time. + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Async(Box::pin(process_fn)), + rx, + ) + .await + } else { + // There is no beacon processor so spawn a task directly on the + // tokio executor. + tokio::task::spawn(func) + .await + .map(|r| r.map(|r| warp::reply::json(&r).into_response())) + .unwrap_or_else(|e| { + let response = warp::reply::with_status( + warp::reply::json(&format!("Tokio did not execute task: {e:?}")), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(); + Ok(response) + }) + } + } } async fn send_to_beacon_processor( @@ -84,11 +130,11 @@ where // The beacon processor dropped the channel without sending a // result. The beacon processor dropped this task because its // queues are full or it's shutting down. - Err(_) => "The task did not execute. Server is overloaded or shutting down.", + Err(_) => "The task did not execute. The server is overloaded or shutting down.", } } - Err(TrySendError::Full(_)) => "The node is overloaded and the task was dropped.", - Err(TrySendError::Closed(_)) => "The node is shutting down and the task was dropped.", + Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.", + Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.", }; let error_response = warp::reply::with_status(