diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index eef1d70294..ed88199e69 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -118,7 +118,8 @@ pub fn start_server( inner_log, "HTTP server failed to start, Unable to bind"; "address" => format!("{:?}", e) ) - }); + }) + .unwrap_or_else(|_| ()); info!( log, @@ -127,7 +128,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - executor.runtime_handle().spawn(server_future); + executor.spawn(server_future, "http_service"); Ok(actual_listen_addr) } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index 2874ddc503..a363deba65 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -56,7 +56,9 @@ use prometheus::{HistogramOpts, HistogramTimer, Opts}; -pub use prometheus::{Encoder, Gauge, Histogram, IntCounter, IntGauge, Result, TextEncoder}; +pub use prometheus::{ + Encoder, Gauge, Histogram, HistogramVec, IntCounter, IntGauge, Result, TextEncoder, +}; /// Collect all the metrics for reporting. pub fn gather() -> Vec { @@ -98,6 +100,25 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result { prometheus::register(Box::new(histogram.clone()))?; Ok(histogram) } +pub fn try_create_histogram_vec( + name: &str, + help: &str, + label_names: &[&str], +) -> Result { + let opts = HistogramOpts::new(name, help); + let histogram_vec = HistogramVec::new(opts, label_names)?; + prometheus::register(Box::new(histogram_vec.clone()))?; + Ok(histogram_vec) +} + +pub fn get_histogram(histogram_vec: &Result, name: &[&str]) -> Option { + if let Ok(histogram_vec) = histogram_vec { + // TODO: handle panic + Some(histogram_vec.with_label_values(name)) + } else { + None + } +} /// Starts a timer for the given `Histogram`, stopping when it gets dropped or given to `stop_timer(..)`. pub fn start_timer(histogram: &Result) -> Option { diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index dd15f6358e..6a9c59ec34 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -224,6 +224,9 @@ impl RuntimeContext { } } +/// A wrapper over a runtime handle which can spawn async and blocking tasks. +/// Spawned futures are wrapped in an exit_future which shuts down the task +/// when the corresponding exit future Signal fires or is dropped. #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned @@ -234,21 +237,29 @@ pub struct TaskExecutor { } impl TaskExecutor { + /// TODO: make docs better + /// Spawn a future on the async runtime wrapped in an exit future + /// This function also generates some metrics on number of tasks and task duration. pub fn spawn(&self, task: impl Future + Send + 'static, _name: &'static str) { let exit = self.exit.clone(); let log = self.log.clone(); - let future = async move { - // TODO: construct a wrapped prometheus future - let _ = future::select(Box::pin(task), exit).await; - info!(log, "Service shutdown"; "name" => _name); - // TODO: increment a counter for completed tasks - }; + // Start the timer for how long this task runs + if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[_name]) { + let timer = metric.start_timer(); + let future = async move { + let _ = future::select(Box::pin(task), exit).await; + info!(log, "Service shutdown"; "name" => _name); + timer.observe_duration(); + }; - metrics::inc_counter(&metrics::ASYNC_TASKS_COUNT); - self.handle.spawn(future); + metrics::inc_counter(&metrics::ASYNC_TASKS_COUNT); + self.handle.spawn(future); + } } - + /// TODO: make docs better + /// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future. + /// This function also generates some metrics on number of tasks and task duration. pub fn spawn_blocking(&self, task: F, _name: &'static str) where F: FnOnce() -> () + Send + 'static, @@ -256,23 +267,30 @@ impl TaskExecutor { let exit = self.exit.clone(); let log = self.log.clone(); - let join_handle = self.handle.spawn_blocking(task); + // Start the timer for how long this task runs + if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[_name]) { + let timer = metric.start_timer(); + let join_handle = self.handle.spawn_blocking(task); - let future = async move { - // TODO: construct a wrapped prometheus future - let _ = future::select(Box::pin(join_handle), exit).await; - info!(log, "Service shutdown"; "name" => _name); - // TODO: increment a counter for completed tasks - }; + let future = async move { + // TODO: construct a wrapped prometheus future + let _ = future::select(Box::pin(join_handle), exit).await; + info!(log, "Service shutdown"; "name" => _name); + timer.observe_duration(); + }; - metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT); - self.handle.spawn(future); + metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT); + self.handle.spawn(future); + } + // TODO: handle else case } + /// Returns the underlying runtime handle. pub fn runtime_handle(&self) -> Handle { self.handle.clone() } + /// Returns the underlying exit future. pub fn exit(&self) -> exit_future::Exit { self.exit.clone() } diff --git a/lighthouse/environment/src/metrics.rs b/lighthouse/environment/src/metrics.rs index 0a4a0a2472..9b057ac7a8 100644 --- a/lighthouse/environment/src/metrics.rs +++ b/lighthouse/environment/src/metrics.rs @@ -4,11 +4,21 @@ pub use lighthouse_metrics::*; lazy_static! { pub static ref ASYNC_TASKS_COUNT: Result = try_create_int_counter( - "async_tasks", + "async_tasks_count", "Total number of async tasks spawned using spawn" ); pub static ref BLOCKING_TASKS_COUNT: Result = try_create_int_counter( - "blocking_tasks", + "blocking_tasks_count", "Total number of async tasks spawned using spawn_blocking" ); + pub static ref ASYNC_TASKS_HISTOGRAM: Result = try_create_histogram_vec( + "async_tasks_histogram", + "Time taken by async tasks", + &["task"] + ); + pub static ref BLOCKING_TASKS_HISTOGRAM: Result = try_create_histogram_vec( + "blocking_tasks_histogram", + "Time taken by blocking tasks", + &["task"] + ); }