mirror of
https://github.com/sigp/lighthouse.git
synced 2026-03-02 16:21:42 +00:00
remove exit-future (#5183)
* remove exit-future usage, as it is non maintained, and replace with async-channel which is already in the repo. * Merge branch 'unstable' of https://github.com/sigp/lighthouse into remove-exit-future * Merge branch 'unstable' of https://github.com/sigp/lighthouse into remove-exit-future
This commit is contained in:
@@ -5,10 +5,10 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
|
||||
edition = { workspace = true }
|
||||
|
||||
[dependencies]
|
||||
async-channel = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
slog = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
exit-future = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
lighthouse_metrics = { workspace = true }
|
||||
sloggers = { workspace = true }
|
||||
|
||||
@@ -73,7 +73,7 @@ pub struct TaskExecutor {
|
||||
/// The handle to the runtime on which tasks are spawned
|
||||
handle_provider: HandleProvider,
|
||||
/// The receiver exit future which on receiving shuts down the task
|
||||
exit: exit_future::Exit,
|
||||
exit: async_channel::Receiver<()>,
|
||||
/// Sender given to tasks, so that if they encounter a state in which execution cannot
|
||||
/// continue they can request that everything shuts down.
|
||||
///
|
||||
@@ -93,7 +93,7 @@ impl TaskExecutor {
|
||||
/// crate).
|
||||
pub fn new<T: Into<HandleProvider>>(
|
||||
handle: T,
|
||||
exit: exit_future::Exit,
|
||||
exit: async_channel::Receiver<()>,
|
||||
log: slog::Logger,
|
||||
signal_tx: Sender<ShutdownReason>,
|
||||
) -> Self {
|
||||
@@ -159,8 +159,8 @@ impl TaskExecutor {
|
||||
|
||||
/// Spawn a future on the tokio runtime.
|
||||
///
|
||||
/// The future is wrapped in an `exit_future::Exit`. The task is cancelled when the corresponding
|
||||
/// exit_future `Signal` is fired/dropped.
|
||||
/// The future is wrapped in an `async-channel::Receiver`. The task is cancelled when the corresponding
|
||||
/// Sender is dropped.
|
||||
///
|
||||
/// The future is monitored via another spawned future to ensure that it doesn't panic. In case
|
||||
/// of a panic, the executor will be shut down via `self.signal_tx`.
|
||||
@@ -172,9 +172,9 @@ impl TaskExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit`
|
||||
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver`
|
||||
/// like [spawn](#method.spawn).
|
||||
/// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to
|
||||
/// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to
|
||||
/// ensure that the task gets canceled appropriately.
|
||||
/// This function generates prometheus metrics on number of tasks and task duration.
|
||||
///
|
||||
@@ -213,9 +213,9 @@ impl TaskExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
|
||||
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
|
||||
/// join handle to the future.
|
||||
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
|
||||
/// The task is canceled when the corresponding async-channel is dropped.
|
||||
///
|
||||
/// This function generates prometheus metrics on number of tasks and task duration.
|
||||
pub fn spawn_handle<R: Send + 'static>(
|
||||
@@ -223,30 +223,29 @@ impl TaskExecutor {
|
||||
task: impl Future<Output = R> + Send + 'static,
|
||||
name: &'static str,
|
||||
) -> Option<tokio::task::JoinHandle<Option<R>>> {
|
||||
let exit = self.exit.clone();
|
||||
let exit = self.exit();
|
||||
let log = self.log.clone();
|
||||
|
||||
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
|
||||
// Task is shutdown before it completes if `exit` receives
|
||||
let int_gauge_1 = int_gauge.clone();
|
||||
let future = future::select(Box::pin(task), exit).then(move |either| {
|
||||
let result = match either {
|
||||
future::Either::Left((value, _)) => {
|
||||
trace!(log, "Async task completed"; "task" => name);
|
||||
Some(value)
|
||||
}
|
||||
future::Either::Right(_) => {
|
||||
debug!(log, "Async task shutdown, exit received"; "task" => name);
|
||||
None
|
||||
}
|
||||
};
|
||||
int_gauge_1.dec();
|
||||
futures::future::ready(result)
|
||||
});
|
||||
|
||||
int_gauge.inc();
|
||||
if let Some(handle) = self.handle() {
|
||||
Some(handle.spawn(future))
|
||||
Some(handle.spawn(async move {
|
||||
futures::pin_mut!(exit);
|
||||
let result = match future::select(Box::pin(task), exit).await {
|
||||
future::Either::Left((value, _)) => {
|
||||
trace!(log, "Async task completed"; "task" => name);
|
||||
Some(value)
|
||||
}
|
||||
future::Either::Right(_) => {
|
||||
debug!(log, "Async task shutdown, exit received"; "task" => name);
|
||||
None
|
||||
}
|
||||
};
|
||||
int_gauge_1.dec();
|
||||
result
|
||||
}))
|
||||
} else {
|
||||
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
|
||||
None
|
||||
@@ -324,7 +323,7 @@ impl TaskExecutor {
|
||||
metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]);
|
||||
let log = self.log.clone();
|
||||
let handle = self.handle()?;
|
||||
let exit = self.exit.clone();
|
||||
let exit = self.exit();
|
||||
|
||||
debug!(
|
||||
log,
|
||||
@@ -362,9 +361,13 @@ impl TaskExecutor {
|
||||
self.handle_provider.handle()
|
||||
}
|
||||
|
||||
/// Returns a copy of the `exit_future::Exit`.
|
||||
pub fn exit(&self) -> exit_future::Exit {
|
||||
self.exit.clone()
|
||||
/// Returns a future that completes when `async-channel::Sender` is dropped or () is sent,
|
||||
/// which translates to the exit signal being triggered.
|
||||
pub fn exit(&self) -> impl Future<Output = ()> {
|
||||
let exit = self.exit.clone();
|
||||
async move {
|
||||
let _ = exit.recv().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get a channel to request shutting down.
|
||||
|
||||
@@ -14,7 +14,7 @@ use tokio::runtime;
|
||||
/// This struct should never be used in production, only testing.
|
||||
pub struct TestRuntime {
|
||||
runtime: Option<Arc<tokio::runtime::Runtime>>,
|
||||
_runtime_shutdown: exit_future::Signal,
|
||||
_runtime_shutdown: async_channel::Sender<()>,
|
||||
pub task_executor: TaskExecutor,
|
||||
pub log: Logger,
|
||||
}
|
||||
@@ -24,7 +24,7 @@ impl Default for TestRuntime {
|
||||
/// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the
|
||||
/// `Self` is dropped.
|
||||
fn default() -> Self {
|
||||
let (runtime_shutdown, exit) = exit_future::signal();
|
||||
let (runtime_shutdown, exit) = async_channel::bounded(1);
|
||||
let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
|
||||
let log = null_logger().unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user