diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index 441f0e275a..bd9a1b981a 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] clap = "2.33.0" -tokio = "0.2.20" +tokio = "0.2.21" slog = { version = "2.5.2", features = ["max_level_trace"] } sloggers = "1.0.0" types = { "path" = "../../consensus/types" } @@ -20,6 +20,9 @@ ctrlc = { version = "3.1.4", features = ["termination"] } futures = "0.3.5" parking_lot = "0.10.2" slog-json = "2.3.0" +exit-future = "0.2.0" +lazy_static = "1.4.0" +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } [dev-dependencies] beacon_node = { path = "../../beacon_node" } diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index e6076d49f1..edc6fae006 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -10,6 +10,8 @@ use eth2_config::Eth2Config; use eth2_testnet_config::Eth2TestnetConfig; use futures::channel::oneshot; +use futures::future; +use futures::prelude::*; use slog::{info, o, Drain, Level, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; use std::cell::RefCell; @@ -19,6 +21,7 @@ use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; use types::{EthSpec, InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; +mod metrics; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; @@ -172,10 +175,13 @@ impl EnvironmentBuilder { /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { + let (signal, exit) = exit_future::signal(); Ok(Environment { runtime: self .runtime .ok_or_else(|| "Cannot build environment without runtime".to_string())?, + signal: Some(signal), + exit, log: self .log .ok_or_else(|| "Cannot build environment without log".to_string())?, @@ -192,7 +198,8 @@ impl EnvironmentBuilder { /// `Runtime`, instead it only has access to a `Runtime`. #[derive(Clone)] pub struct RuntimeContext { - pub runtime_handle: Handle, + pub runtime_handle: TaskExecutor, + // TODO: remove if TaskExecutor contains log pub log: Logger, pub eth_spec_instance: E, pub eth2_config: Eth2Config, @@ -217,10 +224,66 @@ impl RuntimeContext { } } +#[derive(Clone)] +pub struct TaskExecutor { + /// The handle to the runtime on which tasks are spawned + handle: Handle, + /// The receiver exit future which on receiving shuts down the task + exit: exit_future::Exit, + log: slog::Logger, +} + +impl TaskExecutor { + pub fn spawn(&self, task: impl Future + Send + 'static, _name: &'static str) { + let exit = self.exit.clone(); + let log = self.log.clone(); + + let future = async move { + // TODO: construct a wrapped prometheus future + let _ = future::select(Box::pin(task), exit).await; + info!(log, "Service shutdown"; "name" => _name); + // TODO: increment a counter for completed tasks + }; + + metrics::inc_counter(&metrics::ASYNC_TASKS_COUNT); + self.handle.spawn(future); + } + + pub fn spawn_blocking(&self, task: F, _name: &'static str) + where + F: FnOnce() -> () + Send + 'static, + { + let exit = self.exit.clone(); + let log = self.log.clone(); + + let join_handle = self.handle.spawn_blocking(task); + + let future = async move { + // TODO: construct a wrapped prometheus future + let _ = future::select(Box::pin(join_handle), exit).await; + info!(log, "Service shutdown"; "name" => _name); + // TODO: increment a counter for completed tasks + }; + + metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT); + self.handle.spawn(future); + } + + pub fn runtime_handle(&self) -> Handle { + self.handle.clone() + } + + pub fn exit(&self) -> exit_future::Exit { + self.exit.clone() + } +} + /// An environment where Lighthouse services can run. Used to start a production beacon node or /// validator client, or to run tests that involve logging and async task execution. pub struct Environment { runtime: Runtime, + signal: Option, + exit: exit_future::Exit, log: Logger, eth_spec_instance: E, pub eth2_config: Eth2Config, @@ -239,7 +302,11 @@ impl Environment { /// Returns a `Context` where no "service" has been added to the logger output. pub fn core_context(&mut self) -> RuntimeContext { RuntimeContext { - runtime_handle: self.runtime.handle().clone(), + runtime_handle: TaskExecutor { + exit: self.exit.clone(), + handle: self.runtime().handle().clone(), + log: self.log.clone(), + }, log: self.log.clone(), eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), @@ -249,7 +316,11 @@ impl Environment { /// Returns a `Context` where the `service_name` is added to the logger output. pub fn service_context(&mut self, service_name: String) -> RuntimeContext { RuntimeContext { - runtime_handle: self.runtime.handle().clone(), + runtime_handle: TaskExecutor { + exit: self.exit.clone(), + handle: self.runtime().handle().clone(), + log: self.log.new(o!("service" => service_name.clone())), + }, log: self.log.new(o!("service" => service_name)), eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), @@ -279,6 +350,13 @@ impl Environment { .shutdown_timeout(std::time::Duration::from_secs(5)) } + /// Fire exit signal which shuts down all spawned services + pub fn fire_signal(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } + /// Sets the logger (and all child loggers) to log to a file. pub fn log_to_json_file( &mut self, diff --git a/lighthouse/environment/src/metrics.rs b/lighthouse/environment/src/metrics.rs new file mode 100644 index 0000000000..0a4a0a2472 --- /dev/null +++ b/lighthouse/environment/src/metrics.rs @@ -0,0 +1,14 @@ +/// Handles async task metrics +use lazy_static::lazy_static; +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref ASYNC_TASKS_COUNT: Result = try_create_int_counter( + "async_tasks", + "Total number of async tasks spawned using spawn" + ); + pub static ref BLOCKING_TASKS_COUNT: Result = try_create_int_counter( + "blocking_tasks", + "Total number of async tasks spawned using spawn_blocking" + ); +}