diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 01ccaecc28..c77f0c968f 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -87,10 +87,10 @@ impl Router { // spawn handler task and move the message handler instance into the spawned thread executor.spawn( async move { + debug!(log, "Network message router started"); handler_recv .for_each(move |msg| future::ready(handler.handle_message(msg))) .await; - debug!(log, "Network message router terminated."); }, "router", ); diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index a81b1cc6df..a2d0e17382 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -225,8 +225,8 @@ impl RuntimeContext { } /// A wrapper over a runtime handle which can spawn async and blocking tasks. -/// Spawned futures are wrapped in an exit_future which shuts down the task -/// when the corresponding exit future Signal fires or is dropped. +/// Spawned futures are wrapped in an exit_future which cancels the task +/// when the corresponding exit_future `Signal` is fired/dropped. #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned @@ -237,19 +237,22 @@ pub struct TaskExecutor { } impl TaskExecutor { - /// TODO: make docs better - /// Spawn a future on the async runtime wrapped in an exit future + /// Spawn a future on the tokio runtime wrapped in an exit_future `Exit`. The task is canceled + /// when the corresponding exit_future `Signal` is fired/dropped. + /// /// This function also generates some metrics on number of tasks and task duration. - pub fn spawn(&self, task: impl Future + Send + 'static, nam: &'static str) { + pub fn spawn(&self, task: impl Future + Send + 'static, name: &'static str) { let exit = self.exit.clone(); let log = self.log.clone(); // Start the timer for how long this task runs - if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[nam]) { + if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[name]) { let timer = metric.start_timer(); let future = async move { - let _ = future::select(Box::pin(task), exit).await; - info!(log, "Async task shutdown"; "name" => nam); + // Task is shutdown before it completes if `exit` receives + if let future::Either::Right(_) = future::select(Box::pin(task), exit).await { + info!(log, "Async task shutdown"; "task" => name); + } timer.observe_duration(); }; @@ -273,16 +276,17 @@ impl TaskExecutor { let join_handle = self.handle.spawn_blocking(task); let future = async move { - // TODO: construct a wrapped prometheus future - let _ = future::select(Box::pin(join_handle), exit).await; - info!(log, "Blocking task shutdown"; "name" => name); + // Task is shutdown before it completes if `exit` receives + if let future::Either::Right(_) = future::select(Box::pin(join_handle), exit).await + { + info!(log, "Blocking task shutdown"; "task" => name); + } timer.observe_duration(); }; metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT); self.handle.spawn(future); } - // TODO: handle else case } /// Returns the underlying runtime handle. @@ -290,7 +294,7 @@ impl TaskExecutor { self.handle.clone() } - /// Returns the underlying exit future. + /// Returns a copy of the `exit_future::Exit`. pub fn exit(&self) -> exit_future::Exit { self.exit.clone() }