mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-11 04:31:51 +00:00
Partially https://github.com/sigp/lighthouse/issues/6291 This PR removes the reprocess event channel from being externally exposed. All work events are now sent through the single `BeaconProcessorSend` channel. I've introduced a new `Work::Reprocess` enum variant which we then use to schedule jobs for reprocess. I've also created a new scheduler module which will eventually house the different scheduler impls. This is all needed as an initial step to generalize the beacon processor A "full" implementation for the generalized beacon processor can be found here https://github.com/sigp/lighthouse/pull/6448 I'm going to try to break up the full implementation into smaller PR's so it can actually be reviewed
216 lines
8.1 KiB
Rust
216 lines
8.1 KiB
Rust
use beacon_processor::{BeaconProcessorSend, BlockingOrAsync, Work, WorkEvent};
|
|
use serde::Serialize;
|
|
use std::future::Future;
|
|
use tokio::sync::{mpsc::error::TrySendError, oneshot};
|
|
use types::EthSpec;
|
|
use warp::reply::{Reply, Response};
|
|
use warp_utils::reject::convert_rejection;
|
|
|
|
/// Maps a request to a queue in the `BeaconProcessor`.
|
|
#[derive(Clone, Copy)]
|
|
pub enum Priority {
|
|
/// The highest priority.
|
|
P0,
|
|
/// The lowest priority.
|
|
P1,
|
|
}
|
|
|
|
impl Priority {
|
|
/// Wrap `self` in a `WorkEvent` with an appropriate priority.
|
|
fn work_event<E: EthSpec>(&self, process_fn: BlockingOrAsync) -> WorkEvent<E> {
|
|
let work = match self {
|
|
Priority::P0 => Work::ApiRequestP0(process_fn),
|
|
Priority::P1 => Work::ApiRequestP1(process_fn),
|
|
};
|
|
WorkEvent {
|
|
drop_during_sync: false,
|
|
work,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
|
|
#[derive(Clone)]
|
|
pub struct TaskSpawner<E: EthSpec> {
|
|
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
|
|
/// used if this is `None`.
|
|
beacon_processor_send: Option<BeaconProcessorSend<E>>,
|
|
}
|
|
|
|
impl<E: EthSpec> TaskSpawner<E> {
|
|
pub fn new(beacon_processor_send: Option<BeaconProcessorSend<E>>) -> Self {
|
|
Self {
|
|
beacon_processor_send,
|
|
}
|
|
}
|
|
|
|
/// Executes a "blocking" (non-async) task which returns an arbitrary value.
|
|
pub async fn blocking_task<F, T>(
|
|
self,
|
|
priority: Priority,
|
|
func: F,
|
|
) -> Result<T, warp::Rejection>
|
|
where
|
|
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
|
|
T: Send + 'static,
|
|
{
|
|
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
// Create a closure that will execute `func` and send the result to
|
|
// a channel held by this thread.
|
|
let (tx, rx) = oneshot::channel();
|
|
let process_fn = move || {
|
|
// Execute the function, collect the return value.
|
|
let func_result = func();
|
|
// Send the result down the channel. Ignore any failures; the
|
|
// send can only fail if the receiver is dropped.
|
|
let _ = tx.send(func_result);
|
|
};
|
|
|
|
// Send the function to the beacon processor for execution at some arbitrary time.
|
|
send_to_beacon_processor(
|
|
beacon_processor_send,
|
|
priority,
|
|
BlockingOrAsync::Blocking(Box::new(process_fn)),
|
|
rx,
|
|
)
|
|
.await
|
|
.and_then(|x| x)
|
|
} else {
|
|
// There is no beacon processor so spawn a task directly on the
|
|
// tokio executor.
|
|
warp_utils::task::blocking_task(func).await
|
|
}
|
|
}
|
|
|
|
/// Executes a "blocking" (non-async) task which returns a `Response`.
|
|
pub async fn blocking_response_task<F, T>(self, priority: Priority, func: F) -> Response
|
|
where
|
|
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
|
|
T: Reply + Send + 'static,
|
|
{
|
|
let result = self.blocking_task(priority, func).await;
|
|
convert_rejection(result).await
|
|
}
|
|
|
|
/// Executes a "blocking" (non-async) task which returns a JSON-serializable
|
|
/// object.
|
|
pub async fn blocking_json_task<F, T>(self, priority: Priority, func: F) -> Response
|
|
where
|
|
F: FnOnce() -> Result<T, warp::Rejection> + Send + Sync + 'static,
|
|
T: Serialize + Send + 'static,
|
|
{
|
|
let func = || func().map(|t| warp::reply::json(&t).into_response());
|
|
self.blocking_response_task(priority, func).await
|
|
}
|
|
|
|
/// Executes an async task which may return a `Rejection`, which will be converted to a response.
|
|
pub async fn spawn_async_with_rejection(
|
|
self,
|
|
priority: Priority,
|
|
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
|
|
) -> Response {
|
|
let result = self
|
|
.spawn_async_with_rejection_no_conversion(priority, func)
|
|
.await;
|
|
convert_rejection(result).await
|
|
}
|
|
|
|
/// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection.
|
|
///
|
|
/// If you call this function you MUST convert the rejection to a response and not let it
|
|
/// propagate into Warp's filters. See `convert_rejection`.
|
|
pub async fn spawn_async_with_rejection_no_conversion(
|
|
self,
|
|
priority: Priority,
|
|
func: impl Future<Output = Result<Response, warp::Rejection>> + Send + Sync + 'static,
|
|
) -> Result<Response, warp::Rejection> {
|
|
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
// Create a wrapper future that will execute `func` and send the
|
|
// result to a channel held by this thread.
|
|
let (tx, rx) = oneshot::channel();
|
|
let process_fn = async move {
|
|
// Await the future, collect the return value.
|
|
let func_result = func.await;
|
|
// Send the result down the channel. Ignore any failures; the
|
|
// send can only fail if the receiver is dropped.
|
|
let _ = tx.send(func_result);
|
|
};
|
|
|
|
// Send the function to the beacon processor for execution at some arbitrary time.
|
|
send_to_beacon_processor(
|
|
beacon_processor_send,
|
|
priority,
|
|
BlockingOrAsync::Async(Box::pin(process_fn)),
|
|
rx,
|
|
)
|
|
.await
|
|
.and_then(|x| x)
|
|
} else {
|
|
// There is no beacon processor so spawn a task directly on the
|
|
// tokio executor.
|
|
tokio::task::spawn(func)
|
|
.await
|
|
.map_err(|_| {
|
|
warp_utils::reject::custom_server_error("Tokio failed to spawn task".into())
|
|
})
|
|
.and_then(|x| x)
|
|
}
|
|
}
|
|
|
|
pub fn try_send(&self, work_event: WorkEvent<E>) -> Result<(), warp::Rejection> {
|
|
if let Some(beacon_processor_send) = &self.beacon_processor_send {
|
|
let error_message = match beacon_processor_send.try_send(work_event) {
|
|
Ok(()) => None,
|
|
Err(TrySendError::Full(_)) => {
|
|
Some("The task was dropped. The server is overloaded.")
|
|
}
|
|
Err(TrySendError::Closed(_)) => {
|
|
Some("The task was dropped. The server is shutting down.")
|
|
}
|
|
};
|
|
|
|
if let Some(error_message) = error_message {
|
|
return Err(warp_utils::reject::custom_server_error(
|
|
error_message.to_string(),
|
|
));
|
|
};
|
|
|
|
Ok(())
|
|
} else {
|
|
Err(warp_utils::reject::custom_server_error(
|
|
"The beacon processor is unavailable".to_string(),
|
|
))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Send a task to the beacon processor and await execution.
|
|
///
|
|
/// If the task is not executed, return an `Err` with an error message
|
|
/// for the API consumer.
|
|
async fn send_to_beacon_processor<E: EthSpec, T>(
|
|
beacon_processor_send: &BeaconProcessorSend<E>,
|
|
priority: Priority,
|
|
process_fn: BlockingOrAsync,
|
|
rx: oneshot::Receiver<T>,
|
|
) -> Result<T, warp::Rejection> {
|
|
let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) {
|
|
Ok(()) => {
|
|
match rx.await {
|
|
// The beacon processor executed the task and sent a result.
|
|
Ok(func_result) => return Ok(func_result),
|
|
// The beacon processor dropped the channel without sending a
|
|
// result. The beacon processor dropped this task because its
|
|
// queues are full or it's shutting down.
|
|
Err(_) => "The task did not execute. The server is overloaded or shutting down.",
|
|
}
|
|
}
|
|
Err(TrySendError::Full(_)) => "The task was dropped. The server is overloaded.",
|
|
Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.",
|
|
};
|
|
|
|
Err(warp_utils::reject::custom_server_error(
|
|
error_message.to_string(),
|
|
))
|
|
}
|