Refactor to avoid repetition

This commit is contained in:
Paul Hauner
2023-07-04 15:11:42 +10:00
parent da96733e6b
commit 5e19518321

View File

@@ -1,12 +1,8 @@
use beacon_processor::{AsyncFn, BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent};
use execution_layer::deposit_methods::Block;
use serde::Serialize;
use tokio::sync::{mpsc::error::TrySendError, oneshot};
use types::EthSpec;
use warp::{
http::StatusCode,
reply::{Reply, Response},
};
use warp::reply::{Reply, Response};
#[derive(Clone, Copy)]
enum Priority {
@@ -50,22 +46,9 @@ impl<E: EthSpec> TaskSpawner<E> {
beacon_processor_send,
priority,
BlockingOrAsync::Blocking(Box::new(process_fn)),
);
match rx.await {
// The beacon processor executed the task and sent a result.
Ok(func_result) => func_result.map(|r| warp::reply::json(&r).into_response()),
// The beacon processor dropped the channel without sending a
// result. The beacon processor dropped this task because its
// queues are full or it's shutting down.
Err(_) => Ok(warp::reply::with_status(
warp::reply::json(
&"The task did not execute. Server is overloaded or shutting down.",
),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response()),
}
rx,
)
.await
} else {
// There is no beacon processor, spawn a task directly on the tokio
// executor.
@@ -74,11 +57,15 @@ impl<E: EthSpec> TaskSpawner<E> {
}
}
fn send_to_beacon_processor<E: EthSpec>(
async fn send_to_beacon_processor<E: EthSpec, T>(
beacon_processor_send: &BeaconProcessorSend<E>,
priority: Priority,
process_fn: BlockingOrAsync,
) -> Response {
rx: oneshot::Receiver<Result<T, warp::Rejection>>,
) -> Result<Response, warp::Rejection>
where
T: Serialize + Send + 'static,
{
let work = match priority {
Priority::P0 => Work::ApiRequestP0(process_fn),
};
@@ -88,17 +75,26 @@ fn send_to_beacon_processor<E: EthSpec>(
};
let error_message = match beacon_processor_send.try_send(work_event) {
Ok(()) => None,
Err(TrySendError::Full(_)) => Some("The node is overloaded and the task was dropped."),
Err(TrySendError::Closed(_)) => Some("The node is shutting down and the task was dropped."),
Ok(()) => {
match rx.await {
// The beacon processor executed the task and sent a result.
Ok(func_result) => {
return func_result.map(|r| warp::reply::json(&r).into_response())
}
// The beacon processor dropped the channel without sending a
// result. The beacon processor dropped this task because its
// queues are full or it's shutting down.
Err(_) => "The task did not execute. Server is overloaded or shutting down.",
}
}
Err(TrySendError::Full(_)) => "The node is overloaded and the task was dropped.",
Err(TrySendError::Closed(_)) => "The node is shutting down and the task was dropped.",
};
if let Some(error_message) = error_message {
return warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
}
todo!()
let error_response = warp::reply::with_status(
warp::reply::json(&error_message),
eth2::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response();
Ok(error_response)
}