Modularize tracing executor and metrics rename (#6424)

* Tracing executor and metrics rename

* Appease clippy

* Merge branch 'unstable' into modularise-task-executor
This commit is contained in:
Age Manning
2024-10-28 20:41:45 +11:00
committed by GitHub
parent 8188e036a0
commit e31ac508d4
59 changed files with 364 additions and 323 deletions

View File

@@ -4,11 +4,17 @@ version = "0.1.0"
authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = { workspace = true }
[features]
default = ["slog"]
slog = ["dep:slog", "dep:sloggers", "dep:logging"]
tracing = ["dep:tracing"]
[dependencies]
async-channel = { workspace = true }
tokio = { workspace = true }
slog = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
slog = { workspace = true, optional = true }
futures = { workspace = true }
lighthouse_metrics = { workspace = true }
sloggers = { workspace = true }
logging = { workspace = true }
metrics = { workspace = true }
sloggers = { workspace = true, optional = true }
logging = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }

View File

@@ -1,14 +1,20 @@
mod metrics;
#[cfg(not(feature = "tracing"))]
pub mod test_utils;
use futures::channel::mpsc::Sender;
use futures::prelude::*;
use slog::{debug, o, trace};
use std::sync::Weak;
use tokio::runtime::{Handle, Runtime};
pub use tokio::task::JoinHandle;
// Set up logging framework
#[cfg(not(feature = "tracing"))]
use slog::{debug, o};
#[cfg(feature = "tracing")]
use tracing::debug;
/// Provides a reason when Lighthouse is shut down.
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum ShutdownReason {
@@ -79,7 +85,7 @@ pub struct TaskExecutor {
///
/// The task must provide a reason for shutting down.
signal_tx: Sender<ShutdownReason>,
#[cfg(not(feature = "tracing"))]
log: slog::Logger,
}
@@ -94,18 +100,20 @@ impl TaskExecutor {
pub fn new<T: Into<HandleProvider>>(
handle: T,
exit: async_channel::Receiver<()>,
log: slog::Logger,
#[cfg(not(feature = "tracing"))] log: slog::Logger,
signal_tx: Sender<ShutdownReason>,
) -> Self {
Self {
handle_provider: handle.into(),
exit,
signal_tx,
#[cfg(not(feature = "tracing"))]
log,
}
}
/// Clones the task executor adding a service name.
#[cfg(not(feature = "tracing"))]
pub fn clone_with_name(&self, service_name: String) -> Self {
TaskExecutor {
handle_provider: self.handle_provider.clone(),
@@ -115,6 +123,16 @@ impl TaskExecutor {
}
}
/// Clones the task executor adding a service name.
#[cfg(feature = "tracing")]
pub fn clone(&self) -> Self {
TaskExecutor {
handle_provider: self.handle_provider.clone(),
exit: self.exit.clone(),
signal_tx: self.signal_tx.clone(),
}
}
/// A convenience wrapper for `Self::spawn` which ignores a `Result` as long as both `Ok`/`Err`
/// are of type `()`.
///
@@ -150,10 +168,13 @@ impl TaskExecutor {
drop(timer);
});
} else {
#[cfg(not(feature = "tracing"))]
debug!(
self.log,
"Couldn't spawn monitor task. Runtime shutting down"
)
);
#[cfg(feature = "tracing")]
debug!("Couldn't spawn monitor task. Runtime shutting down");
}
}
@@ -175,7 +196,7 @@ impl TaskExecutor {
/// Spawn a future on the tokio runtime. This function does not wrap the task in an `async-channel::Receiver`
/// like [spawn](#method.spawn).
/// The caller of this function is responsible for wrapping up the task with an `async-channel::Receiver` to
/// ensure that the task gets canceled appropriately.
/// ensure that the task gets cancelled appropriately.
/// This function generates prometheus metrics on number of tasks and task duration.
///
/// This is useful in cases where the future to be spawned needs to do additional cleanup work when
@@ -197,7 +218,10 @@ impl TaskExecutor {
if let Some(handle) = self.handle() {
handle.spawn(future);
} else {
#[cfg(not(feature = "tracing"))]
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
#[cfg(feature = "tracing")]
debug!("Couldn't spawn task. Runtime shutting down");
}
}
}
@@ -215,7 +239,7 @@ impl TaskExecutor {
/// Spawn a future on the tokio runtime wrapped in an `async-channel::Receiver` returning an optional
/// join handle to the future.
/// The task is canceled when the corresponding async-channel is dropped.
/// The task is cancelled when the corresponding async-channel is dropped.
///
/// This function generates prometheus metrics on number of tasks and task duration.
pub fn spawn_handle<R: Send + 'static>(
@@ -224,6 +248,8 @@ impl TaskExecutor {
name: &'static str,
) -> Option<tokio::task::JoinHandle<Option<R>>> {
let exit = self.exit();
#[cfg(not(feature = "tracing"))]
let log = self.log.clone();
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
@@ -234,12 +260,12 @@ impl TaskExecutor {
Some(handle.spawn(async move {
futures::pin_mut!(exit);
let result = match future::select(Box::pin(task), exit).await {
future::Either::Left((value, _)) => {
trace!(log, "Async task completed"; "task" => name);
Some(value)
}
future::Either::Left((value, _)) => Some(value),
future::Either::Right(_) => {
#[cfg(not(feature = "tracing"))]
debug!(log, "Async task shutdown, exit received"; "task" => name);
#[cfg(feature = "tracing")]
debug!(task = name, "Async task shutdown, exit received");
None
}
};
@@ -247,7 +273,10 @@ impl TaskExecutor {
result
}))
} else {
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
#[cfg(not(feature = "tracing"))]
debug!(log, "Couldn't spawn task. Runtime shutting down");
#[cfg(feature = "tracing")]
debug!("Couldn't spawn task. Runtime shutting down");
None
}
} else {
@@ -270,6 +299,7 @@ impl TaskExecutor {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(not(feature = "tracing"))]
let log = self.log.clone();
let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]);
@@ -278,19 +308,22 @@ impl TaskExecutor {
let join_handle = if let Some(handle) = self.handle() {
handle.spawn_blocking(task)
} else {
#[cfg(not(feature = "tracing"))]
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
#[cfg(feature = "tracing")]
debug!("Couldn't spawn task. Runtime shutting down");
return None;
};
let future = async move {
let result = match join_handle.await {
Ok(result) => {
trace!(log, "Blocking task completed"; "task" => name);
Ok(result)
}
Err(e) => {
debug!(log, "Blocking task ended unexpectedly"; "error" => %e);
Err(e)
Ok(result) => Ok(result),
Err(error) => {
#[cfg(not(feature = "tracing"))]
debug!(log, "Blocking task ended unexpectedly"; "error" => %error);
#[cfg(feature = "tracing")]
debug!(%error, "Blocking task ended unexpectedly");
Err(error)
}
};
drop(timer);
@@ -321,32 +354,48 @@ impl TaskExecutor {
) -> Option<F::Output> {
let timer = metrics::start_timer_vec(&metrics::BLOCK_ON_TASKS_HISTOGRAM, &[name]);
metrics::inc_gauge_vec(&metrics::BLOCK_ON_TASKS_COUNT, &[name]);
#[cfg(not(feature = "tracing"))]
let log = self.log.clone();
let handle = self.handle()?;
let exit = self.exit();
#[cfg(not(feature = "tracing"))]
debug!(
log,
"Starting block_on task";
"name" => name
);
#[cfg(feature = "tracing")]
debug!(name, "Starting block_on task");
handle.block_on(async {
let output = tokio::select! {
output = future => {
#[cfg(not(feature = "tracing"))]
debug!(
log,
"Completed block_on task";
"name" => name
);
#[cfg(feature = "tracing")]
debug!(
name,
"Completed block_on task"
);
Some(output)
},
_ = exit => {
#[cfg(not(feature = "tracing"))]
debug!(
log,
"Cancelled block_on task";
"name" => name,
);
#[cfg(feature = "tracing")]
debug!(
name,
"Cancelled block_on task"
);
None
}
};
@@ -376,6 +425,7 @@ impl TaskExecutor {
}
/// Returns a reference to the logger.
#[cfg(not(feature = "tracing"))]
pub fn log(&self) -> &slog::Logger {
&self.log
}

View File

@@ -1,5 +1,5 @@
/// Handles async task metrics
pub use lighthouse_metrics::*;
pub use metrics::*;
use std::sync::LazyLock;
pub static ASYNC_TASKS_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {