Add duration histograms; minor fixes

This commit is contained in:
pawan
2020-05-20 17:58:05 +05:30
parent 8be6a4ecd5
commit f5d49287b9
4 changed files with 73 additions and 23 deletions

View File

@@ -118,7 +118,8 @@ pub fn start_server<T: BeaconChainTypes>(
inner_log,
"HTTP server failed to start, Unable to bind"; "address" => format!("{:?}", e)
)
});
})
.unwrap_or_else(|_| ());
info!(
log,
@@ -127,7 +128,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(),
);
executor.runtime_handle().spawn(server_future);
executor.spawn(server_future, "http_service");
Ok(actual_listen_addr)
}

View File

@@ -56,7 +56,9 @@
use prometheus::{HistogramOpts, HistogramTimer, Opts};
pub use prometheus::{Encoder, Gauge, Histogram, IntCounter, IntGauge, Result, TextEncoder};
pub use prometheus::{
Encoder, Gauge, Histogram, HistogramVec, IntCounter, IntGauge, Result, TextEncoder,
};
/// Collect all the metrics for reporting.
pub fn gather() -> Vec<prometheus::proto::MetricFamily> {
@@ -98,6 +100,25 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result<Histogram> {
prometheus::register(Box::new(histogram.clone()))?;
Ok(histogram)
}
pub fn try_create_histogram_vec(
name: &str,
help: &str,
label_names: &[&str],
) -> Result<HistogramVec> {
let opts = HistogramOpts::new(name, help);
let histogram_vec = HistogramVec::new(opts, label_names)?;
prometheus::register(Box::new(histogram_vec.clone()))?;
Ok(histogram_vec)
}
pub fn get_histogram(histogram_vec: &Result<HistogramVec>, name: &[&str]) -> Option<Histogram> {
if let Ok(histogram_vec) = histogram_vec {
// TODO: handle panic
Some(histogram_vec.with_label_values(name))
} else {
None
}
}
/// Starts a timer for the given `Histogram`, stopping when it gets dropped or given to `stop_timer(..)`.
pub fn start_timer(histogram: &Result<Histogram>) -> Option<HistogramTimer> {

View File

@@ -224,6 +224,9 @@ impl<E: EthSpec> RuntimeContext<E> {
}
}
/// A wrapper over a runtime handle which can spawn async and blocking tasks.
/// Spawned futures are wrapped in an exit_future which shuts down the task
/// when the corresponding exit future Signal fires or is dropped.
#[derive(Clone)]
pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
@@ -234,21 +237,29 @@ pub struct TaskExecutor {
}
impl TaskExecutor {
/// TODO: make docs better
/// Spawn a future on the async runtime wrapped in an exit future
/// This function also generates some metrics on number of tasks and task duration.
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, _name: &'static str) {
let exit = self.exit.clone();
let log = self.log.clone();
let future = async move {
// TODO: construct a wrapped prometheus future
let _ = future::select(Box::pin(task), exit).await;
info!(log, "Service shutdown"; "name" => _name);
// TODO: increment a counter for completed tasks
};
// Start the timer for how long this task runs
if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[_name]) {
let timer = metric.start_timer();
let future = async move {
let _ = future::select(Box::pin(task), exit).await;
info!(log, "Service shutdown"; "name" => _name);
timer.observe_duration();
};
metrics::inc_counter(&metrics::ASYNC_TASKS_COUNT);
self.handle.spawn(future);
metrics::inc_counter(&metrics::ASYNC_TASKS_COUNT);
self.handle.spawn(future);
}
}
/// TODO: make docs better
/// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future.
/// This function also generates some metrics on number of tasks and task duration.
pub fn spawn_blocking<F>(&self, task: F, _name: &'static str)
where
F: FnOnce() -> () + Send + 'static,
@@ -256,23 +267,30 @@ impl TaskExecutor {
let exit = self.exit.clone();
let log = self.log.clone();
let join_handle = self.handle.spawn_blocking(task);
// Start the timer for how long this task runs
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[_name]) {
let timer = metric.start_timer();
let join_handle = self.handle.spawn_blocking(task);
let future = async move {
// TODO: construct a wrapped prometheus future
let _ = future::select(Box::pin(join_handle), exit).await;
info!(log, "Service shutdown"; "name" => _name);
// TODO: increment a counter for completed tasks
};
let future = async move {
// TODO: construct a wrapped prometheus future
let _ = future::select(Box::pin(join_handle), exit).await;
info!(log, "Service shutdown"; "name" => _name);
timer.observe_duration();
};
metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT);
self.handle.spawn(future);
metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT);
self.handle.spawn(future);
}
// TODO: handle else case
}
/// Returns the underlying runtime handle.
pub fn runtime_handle(&self) -> Handle {
self.handle.clone()
}
/// Returns the underlying exit future.
pub fn exit(&self) -> exit_future::Exit {
self.exit.clone()
}

View File

@@ -4,11 +4,21 @@ pub use lighthouse_metrics::*;
lazy_static! {
pub static ref ASYNC_TASKS_COUNT: Result<IntCounter> = try_create_int_counter(
"async_tasks",
"async_tasks_count",
"Total number of async tasks spawned using spawn"
);
pub static ref BLOCKING_TASKS_COUNT: Result<IntCounter> = try_create_int_counter(
"blocking_tasks",
"blocking_tasks_count",
"Total number of async tasks spawned using spawn_blocking"
);
pub static ref ASYNC_TASKS_HISTOGRAM: Result<HistogramVec> = try_create_histogram_vec(
"async_tasks_histogram",
"Time taken by async tasks",
&["task"]
);
pub static ref BLOCKING_TASKS_HISTOGRAM: Result<HistogramVec> = try_create_histogram_vec(
"blocking_tasks_histogram",
"Time taken by blocking tasks",
&["task"]
);
}