diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 4224f00acc..d4faf1e4b8 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -10,3 +10,6 @@ futures = { workspace = true } metrics = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing = { workspace = true } + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ["cfg(tokio_unstable)"] } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index dbdac600f3..b47e30cb68 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -144,7 +144,7 @@ impl TaskExecutor { ) { let mut shutdown_sender = self.shutdown_sender(); if let Some(handle) = self.handle() { - handle.spawn(async move { + let fut = async move { let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]); if let Err(join_error) = task_handle.await { if let Ok(_panic) = join_error.try_into_panic() { @@ -153,7 +153,14 @@ impl TaskExecutor { } } drop(timer); - }); + }; + #[cfg(tokio_unstable)] + tokio::task::Builder::new() + .name(&format!("{name}-monitor")) + .spawn_on(fut, &handle) + .expect("Failed to spawn monitor task"); + #[cfg(not(tokio_unstable))] + handle.spawn(fut); } else { debug!("Couldn't spawn monitor task. Runtime shutting down") } @@ -199,6 +206,12 @@ impl TaskExecutor { int_gauge.inc(); if let Some(handle) = self.handle() { + #[cfg(tokio_unstable)] + tokio::task::Builder::new() + .name(name) + .spawn_on(future, &handle) + .expect("Failed to spawn task"); + #[cfg(not(tokio_unstable))] handle.spawn(future); } else { debug!("Couldn't spawn task. Runtime shutting down"); @@ -234,7 +247,7 @@ impl TaskExecutor { let int_gauge_1 = int_gauge.clone(); int_gauge.inc(); if let Some(handle) = self.handle() { - Some(handle.spawn(async move { + let fut = async move { futures::pin_mut!(exit); let result = match future::select(Box::pin(task), exit).await { future::Either::Left((value, _)) => Some(value), @@ -245,7 +258,16 @@ impl TaskExecutor { }; int_gauge_1.dec(); result - })) + }; + #[cfg(tokio_unstable)] + return Some( + tokio::task::Builder::new() + .name(name) + .spawn_on(fut, &handle) + .expect("Failed to spawn task"), + ); + #[cfg(not(tokio_unstable))] + Some(handle.spawn(fut)) } else { debug!("Couldn't spawn task. Runtime shutting down"); None