From da96733e6bdf944c959564d5b884b75779362a56 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Tue, 4 Jul 2023 15:05:31 +1000 Subject: [PATCH] Start adding `TaskSpawner` --- beacon_node/http_api/src/lib.rs | 1 + beacon_node/http_api/src/task_spawner.rs | 104 +++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 beacon_node/http_api/src/task_spawner.rs diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 3fff4e510d..4f4a57be29 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -18,6 +18,7 @@ mod standard_block_rewards; mod state_id; mod sync_committee_rewards; mod sync_committees; +mod task_spawner; pub mod test_utils; mod ui; mod validator_inclusion; diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs new file mode 100644 index 0000000000..2a847ecfa7 --- /dev/null +++ b/beacon_node/http_api/src/task_spawner.rs @@ -0,0 +1,104 @@ +use beacon_processor::{AsyncFn, BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent}; +use execution_layer::deposit_methods::Block; +use serde::Serialize; +use tokio::sync::{mpsc::error::TrySendError, oneshot}; +use types::EthSpec; +use warp::{ + http::StatusCode, + reply::{Reply, Response}, +}; + +#[derive(Clone, Copy)] +enum Priority { + P0, +} + +struct TaskSpawner { + beacon_processor_send: Option>, +} + +impl TaskSpawner { + pub fn new(beacon_processor_send: Option>) -> Self { + Self { + beacon_processor_send, + } + } + + pub async fn blocking_json_task( + &self, + priority: Priority, + func: F, + ) -> Result + where + F: FnOnce() -> Result + Send + Sync + 'static, + T: Serialize + Send + 'static, + { + if let Some(beacon_processor_send) = &self.beacon_processor_send { + // Create a closure that will execute `func` and send the result to + // a channel held by this thread. + let (tx, rx) = oneshot::channel(); + let process_fn = move || { + // Execute the function, collect the return value. + let func_result = func(); + // Send the result down the channel. Ignore any failures; the + // send can only fail if the receiver is dropped. + let _ = tx.send(func_result); + }; + + // Send the function to the beacon processor for execution at some arbitrary time. + send_to_beacon_processor( + beacon_processor_send, + priority, + BlockingOrAsync::Blocking(Box::new(process_fn)), + ); + + match rx.await { + // The beacon processor executed the task and sent a result. + Ok(func_result) => func_result.map(|r| warp::reply::json(&r).into_response()), + // The beacon processor dropped the channel without sending a + // result. The beacon processor dropped this task because its + // queues are full or it's shutting down. + Err(_) => Ok(warp::reply::with_status( + warp::reply::json( + &"The task did not execute. Server is overloaded or shutting down.", + ), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response()), + } + } else { + // There is no beacon processor, spawn a task directly on the tokio + // executor. + warp_utils::task::blocking_json_task(func).await + } + } +} + +fn send_to_beacon_processor( + beacon_processor_send: &BeaconProcessorSend, + priority: Priority, + process_fn: BlockingOrAsync, +) -> Response { + let work = match priority { + Priority::P0 => Work::ApiRequestP0(process_fn), + }; + let work_event = WorkEvent { + drop_during_sync: false, + work, + }; + + let error_message = match beacon_processor_send.try_send(work_event) { + Ok(()) => None, + Err(TrySendError::Full(_)) => Some("The node is overloaded and the task was dropped."), + Err(TrySendError::Closed(_)) => Some("The node is shutting down and the task was dropped."), + }; + if let Some(error_message) = error_message { + return warp::reply::with_status( + warp::reply::json(&error_message), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(); + } + + todo!() +}