diff --git a/Cargo.lock b/Cargo.lock index eefda513ca..3c3fa5840d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,7 +1161,10 @@ dependencies = [ "env_logger", "eth2_config", "eth2_testnet_config", + "exit-future", "futures 0.3.5", + "lazy_static", + "lighthouse_metrics", "logging", "parking_lot 0.10.2", "slog", @@ -1231,10 +1234,12 @@ dependencies = [ "base64 0.12.1", "dirs", "discv5", + "environment", "error-chain", "eth2_ssz", "eth2_ssz_derive", "eth2_ssz_types", + "exit-future", "fnv", "futures 0.3.5", "hashset_delay", @@ -1365,7 +1370,7 @@ dependencies = [ name = "eth2_testnet_config" version = "0.2.0" dependencies = [ - "eth2-libp2p", + "enr", "eth2_ssz", "reqwest", "serde", @@ -2932,9 +2937,11 @@ name = "network" version = "0.1.2" dependencies = [ "beacon_chain", + "environment", "error-chain", "eth2-libp2p", "eth2_ssz", + "exit-future", "fnv", "futures 0.3.5", "genesis", @@ -3808,6 +3815,7 @@ dependencies = [ "assert_matches", "beacon_chain", "bls", + "environment", "eth2-libp2p", "eth2_config", "eth2_ssz", @@ -4853,6 +4861,7 @@ name = "timer" version = "0.1.2" dependencies = [ "beacon_chain", + "environment", "futures 0.3.5", "parking_lot 0.10.2", "slog", @@ -5823,6 +5832,7 @@ dependencies = [ name = "websocket_server" version = "0.1.2" dependencies = [ + "environment", "futures 0.3.5", "serde", "serde_derive", diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index 005ff28787..f53bf52a5c 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -31,5 +31,5 @@ eth2_wallet = { path = "../crypto/eth2_wallet" } eth2_wallet_manager = { path = "../common/eth2_wallet_manager" } rand = "0.7.2" validator_dir = { path = "../common/validator_dir", features = ["unencrypted_keys"] } -tokio = {version = "0.2.20", features = ["full"]} +tokio = { version = "0.2.21", features = ["full"] } eth2_keystore = { path = "../crypto/eth2_keystore" } diff --git a/account_manager/src/validator/deposit.rs b/account_manager/src/validator/deposit.rs index aa54039f81..0cd970030f 100644 --- a/account_manager/src/validator/deposit.rs +++ b/account_manager/src/validator/deposit.rs @@ -83,7 +83,7 @@ pub fn cli_run( matches: &ArgMatches<'_>, mut env: Environment, ) -> Result<(), String> { - let log = env.core_context().log; + let log = env.core_context().log().clone(); let data_dir = clap_utils::parse_path_with_default_in_home_dir( matches, diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 6ffa70df86..b58b1ce51e 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -27,7 +27,7 @@ slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_tr slog-term = "2.5.0" slog-async = "2.5.0" ctrlc = { version = "3.1.4", features = ["termination"] } -tokio = {version = "0.2.20", features = ["time"] } +tokio = { version = "0.2.21", features = ["time"] } exit-future = "0.2.0" env_logger = "0.7.1" dirs = "2.0.2" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 796ca4b6da..9b6d80194f 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -33,7 +33,7 @@ eth2_ssz_derive = "0.1.0" state_processing = { path = "../../consensus/state_processing" } tree_hash = "0.1.0" types = { path = "../../consensus/types" } -tokio = "0.2.20" +tokio = "0.2.21" eth1 = { path = "../eth1" } websocket_server = { path = "../websocket_server" } futures = "0.3.5" @@ -46,7 +46,8 @@ tempfile = "3.1.0" bitvec = "0.17.4" bls = { path = "../../crypto/bls" } safe_arith = { path = "../../consensus/safe_arith" } +environment = { path = "../../lighthouse/environment" } [dev-dependencies] lazy_static = "1.4.0" -environment = { path = "../../lighthouse/environment" } + diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 53a3104f1b..2133444139 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -1,4 +1,5 @@ use crate::metrics; +use environment::TaskExecutor; use eth1::{Config as Eth1Config, Eth1Block, Service as HttpService}; use eth2_hashing::hash; use slog::{debug, error, trace, Logger}; @@ -285,10 +286,8 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - pub fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { - // don't need to spawn as a task is being spawned in auto_update - // TODO: check if this is correct - HttpService::auto_update(self.core.clone(), exit); + pub fn start(&self, handle: TaskExecutor) { + HttpService::auto_update(self.core.clone(), handle); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/Cargo.toml b/beacon_node/client/Cargo.toml index 144d8f0c4f..28ad6d1b56 100644 --- a/beacon_node/client/Cargo.toml +++ b/beacon_node/client/Cargo.toml @@ -28,7 +28,7 @@ error-chain = "0.12.2" serde_yaml = "0.8.11" slog = { version = "2.5.2", features = ["max_level_trace"] } slog-async = "2.5.0" -tokio = "0.2.20" +tokio = "0.2.21" dirs = "2.0.2" futures = "0.3.5" reqwest = "0.10.4" diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 717471ec69..4b68f93a8e 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -21,6 +21,7 @@ use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use timer::spawn_timer; use tokio::sync::mpsc::UnboundedSender; use types::{test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec}; use websocket_server::{Config as WebSocketConfig, WebSocketSender}; @@ -50,7 +51,6 @@ pub struct ClientBuilder { beacon_chain_builder: Option>, beacon_chain: Option>>, eth1_service: Option, - exit_channels: Vec>, event_handler: Option, network_globals: Option>>, network_send: Option>>, @@ -84,7 +84,6 @@ where beacon_chain_builder: None, beacon_chain: None, eth1_service: None, - exit_channels: vec![], event_handler: None, network_globals: None, network_send: None, @@ -132,7 +131,7 @@ where .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; let builder = BeaconChainBuilder::new(eth_spec_instance) - .logger(context.log.clone()) + .logger(context.log().clone()) .store(store) .store_migrator(store_migrator) .data_dir(data_dir) @@ -150,7 +149,7 @@ where // Alternatively, if there's a beacon chain in the database then always resume // using it. let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists { - info!(context.log, "Defaulting to deposit contract genesis"); + info!(context.log(), "Defaulting to deposit contract genesis"); ClientGenesis::DepositContract } else if chain_exists { @@ -172,7 +171,7 @@ where genesis_state_bytes, } => { info!( - context.log, + context.log(), "Starting from known genesis state"; ); @@ -183,14 +182,14 @@ where } ClientGenesis::DepositContract => { info!( - context.log, + context.log(), "Waiting for eth2 genesis from eth1"; "eth1_endpoint" => &config.eth1.endpoint, "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, "deposit_contract" => &config.eth1.deposit_contract_address ); - let genesis_service = Eth1GenesisService::new(config.eth1, context.log.clone()); + let genesis_service = Eth1GenesisService::new(config.eth1, context.log().clone()); let genesis_state = genesis_service .wait_for_genesis_state( @@ -223,19 +222,18 @@ where .ok_or_else(|| "network requires a runtime_context")? .clone(); - let (network_globals, network_send, network_exit) = - NetworkService::start(beacon_chain, config, &context.runtime_handle, context.log) + let (network_globals, network_send) = + NetworkService::start(beacon_chain, config, context.executor) .map_err(|e| format!("Failed to start network: {:?}", e))?; self.network_globals = Some(network_globals); self.network_send = Some(network_send); - self.exit_channels.push(network_exit); Ok(self) } /// Immediately starts the timer service. - fn timer(mut self) -> Result { + fn timer(self) -> Result { let context = self .runtime_context .as_ref() @@ -251,13 +249,9 @@ where .ok_or_else(|| "node timer requires a chain spec".to_string())? .milliseconds_per_slot; - let timer_exit = context - .runtime_handle - .enter(|| timer::spawn(beacon_chain, milliseconds_per_slot)) + spawn_timer(context.executor, beacon_chain, milliseconds_per_slot) .map_err(|e| format!("Unable to start node timer: {}", e))?; - self.exit_channels.push(timer_exit); - Ok(self) } @@ -290,32 +284,28 @@ where network_chan: network_send, }; - let log = context.log.clone(); - let (exit_channel, listening_addr) = context.runtime_handle.enter(|| { - rest_api::start_server( - &client_config.rest_api, - beacon_chain, - network_info, - client_config - .create_db_path() - .map_err(|_| "unable to read data dir")?, - client_config - .create_freezer_db_path() - .map_err(|_| "unable to read freezer DB dir")?, - eth2_config.clone(), - log, - ) - .map_err(|e| format!("Failed to start HTTP API: {:?}", e)) - })?; + let listening_addr = rest_api::start_server( + context.executor, + &client_config.rest_api, + beacon_chain, + network_info, + client_config + .create_db_path() + .map_err(|_| "unable to read data dir")?, + client_config + .create_freezer_db_path() + .map_err(|_| "unable to read freezer DB dir")?, + eth2_config.clone(), + ) + .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; - self.exit_channels.push(exit_channel); self.http_listen_addr = Some(listening_addr); Ok(self) } /// Immediately starts the service that periodically logs information each slot. - pub fn notifier(mut self) -> Result { + pub fn notifier(self) -> Result { let context = self .runtime_context .as_ref() @@ -335,19 +325,13 @@ where .ok_or_else(|| "slot_notifier requires a chain spec".to_string())? .milliseconds_per_slot; - let exit_channel = context - .runtime_handle - .enter(|| { - spawn_notifier( - beacon_chain, - network_globals, - milliseconds_per_slot, - context.log.clone(), - ) - }) - .map_err(|e| format!("Unable to start slot notifier: {}", e))?; - - self.exit_channels.push(exit_channel); + spawn_notifier( + context.executor, + beacon_chain, + network_globals, + milliseconds_per_slot, + ) + .map_err(|e| format!("Unable to start slot notifier: {}", e))?; Ok(self) } @@ -365,7 +349,6 @@ where network_globals: self.network_globals, http_listen_addr: self.http_listen_addr, websocket_listen_addr: self.websocket_listen_addr, - _exit_channels: self.exit_channels, } } } @@ -436,22 +419,14 @@ where .ok_or_else(|| "websocket_event_handler requires a runtime_context")? .service_context("ws".into()); - let (sender, exit_channel, listening_addr): ( - WebSocketSender, - Option<_>, - Option<_>, - ) = if config.enabled { - let (sender, exit, listening_addr) = context - .runtime_handle - .enter(|| websocket_server::start_server(&config, &context.log))?; - (sender, Some(exit), Some(listening_addr)) + let (sender, listening_addr): (WebSocketSender, Option<_>) = if config.enabled { + let (sender, listening_addr) = + websocket_server::start_server(context.executor, &config)?; + (sender, Some(listening_addr)) } else { - (WebSocketSender::dummy(), None, None) + (WebSocketSender::dummy(), None) }; - if let Some(channel) = exit_channel { - self.exit_channels.push(channel); - } self.event_handler = Some(sender); self.websocket_listen_addr = listening_addr; @@ -494,7 +469,7 @@ where .clone() .ok_or_else(|| "disk_store requires a chain spec".to_string())?; - let store = HotColdDB::open(hot_path, cold_path, config, spec, context.log) + let store = HotColdDB::open(hot_path, cold_path, config, spec, context.log().clone()) .map_err(|e| format!("Unable to open database: {:?}", e))?; self.store = Some(Arc::new(store)); Ok(self) @@ -555,7 +530,7 @@ where let store = self.store.clone().ok_or_else(|| { "background_migrator requires the store to be initialized".to_string() })?; - self.store_migrator = Some(BackgroundMigrator::new(store, context.log.clone())); + self.store_migrator = Some(BackgroundMigrator::new(store, context.log().clone())); Ok(self) } } @@ -617,25 +592,23 @@ where &persisted, config.clone(), store.clone(), - &context.log, + &context.log().clone(), ) .map(|chain| chain.into_backend()) }) .unwrap_or_else(|| { - Ok(CachingEth1Backend::new(config, context.log.clone(), store)) + Ok(CachingEth1Backend::new( + config, + context.log().clone(), + store, + )) })? }; self.eth1_service = None; - let exit = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.exit_channels.push(tx); - rx - }; - // Starts the service that connects to an eth1 node and periodically updates caches. - context.runtime_handle.enter(|| backend.start(exit)); + backend.start(context.executor); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 7f665b9cb1..da670ff134 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -25,8 +25,6 @@ pub struct Client { network_globals: Option>>, http_listen_addr: Option, websocket_listen_addr: Option, - /// Exit channels will complete/error when dropped, causing each service to exit gracefully. - _exit_channels: Vec>, } impl Client { diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 9375de0a1f..644e13c94f 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -23,11 +23,11 @@ const SPEEDO_OBSERVATIONS: usize = 4; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier( + executor: environment::TaskExecutor, beacon_chain: Arc>, network: Arc>, milliseconds_per_slot: u64, - log: slog::Logger, -) -> Result, String> { +) -> Result<(), String> { let slot_duration = Duration::from_millis(milliseconds_per_slot); let duration_to_next_slot = beacon_chain .slot_clock @@ -41,6 +41,7 @@ pub fn spawn_notifier( let interval_duration = slot_duration; let speedo = Mutex::new(Speedo::default()); + let log = executor.log().clone(); let mut interval = tokio::time::interval_at(start_instant, interval_duration); let interval_future = async move { @@ -163,12 +164,10 @@ pub fn spawn_notifier( Ok::<(), ()>(()) }; - let (exit_signal, exit) = tokio::sync::oneshot::channel(); - // run the notifier on the current executor - tokio::spawn(futures::future::select(Box::pin(interval_future), exit)); + executor.spawn(interval_future.unwrap_or_else(|_| ()), "notifier"); - Ok(exit_signal) + Ok(()) } /// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index 695bdb1cb9..ba211a4e85 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dev-dependencies] eth1_test_rig = { path = "../../testing/eth1_test_rig" } -environment = { path = "../../lighthouse/environment" } toml = "0.5.6" web3 = "0.11.0" sloggers = "1.0.0" @@ -25,8 +24,9 @@ tree_hash = "0.1.0" eth2_hashing = "0.1.0" parking_lot = "0.10.2" slog = "2.5.2" -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } state_processing = { path = "../../consensus/state_processing" } libflate = "1.0.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics"} lazy_static = "1.4.0" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 8f8e7db133..8c299251d5 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -290,7 +290,7 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub fn auto_update(service: Self, exit: tokio::sync::oneshot::Receiver<()>) { + pub fn auto_update(service: Self, handle: environment::TaskExecutor) { let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); let mut interval = interval_at(Instant::now(), update_interval); @@ -303,9 +303,7 @@ impl Service { } }; - let future = futures::future::select(Box::pin(update_future), exit); - - tokio::task::spawn(future); + handle.spawn(update_future, "eth1"); } async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { diff --git a/beacon_node/eth2-libp2p/Cargo.toml b/beacon_node/eth2-libp2p/Cargo.toml index e3a3164d03..af9eafcf8c 100644 --- a/beacon_node/eth2-libp2p/Cargo.toml +++ b/beacon_node/eth2-libp2p/Cargo.toml @@ -35,6 +35,7 @@ tokio-util = { version = "0.3.1", features = ["codec", "compat"] } # Patched for quick updates discv5 = { git = "https://github.com/sigp/discv5", rev = "7b3bd40591b62b8c002ffdb85de008aa9f82e2e5" } tiny-keccak = "2.0.2" +environment = { path = "../../lighthouse/environment" } libp2p-tcp = { version = "0.19.1", default-features = false, features = ["tokio"] } [dependencies.libp2p] @@ -49,3 +50,4 @@ slog-stdlog = "4.0.0" slog-term = "2.5.0" slog-async = "2.5.0" tempdir = "0.3.7" +exit-future = "0.2.0" \ No newline at end of file diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index efb2c062ad..4542855e82 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -84,6 +84,7 @@ pub struct Service { impl Service { pub fn new( + executor: environment::TaskExecutor, config: &NetworkConfig, enr_fork_id: EnrForkId, log: &slog::Logger, @@ -122,15 +123,15 @@ impl Service { let behaviour = Behaviour::new(&local_keypair, config, network_globals.clone(), &log)?; // use the executor for libp2p - struct Executor(tokio::runtime::Handle); + struct Executor(environment::TaskExecutor); impl libp2p::core::Executor for Executor { fn exec(&self, f: Pin + Send>>) { - self.0.spawn(f); + self.0.spawn(f, "libp2p"); } } SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) .peer_connection_limit(MAX_CONNECTIONS_PER_PEER) - .executor(Box::new(Executor(tokio::runtime::Handle::current()))) + .executor(Box::new(Executor(executor))) .build() }; diff --git a/beacon_node/eth2-libp2p/tests/common/mod.rs b/beacon_node/eth2-libp2p/tests/common/mod.rs index 99857cb1a3..e26381f51c 100644 --- a/beacon_node/eth2-libp2p/tests/common/mod.rs +++ b/beacon_node/eth2-libp2p/tests/common/mod.rs @@ -12,6 +12,21 @@ use types::{EnrForkId, MinimalEthSpec}; type E = MinimalEthSpec; use tempdir::TempDir; +pub struct Libp2pInstance(LibP2PService, exit_future::Signal); + +impl std::ops::Deref for Libp2pInstance { + type Target = LibP2PService; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for Libp2pInstance { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + pub fn build_log(level: slog::Level, enabled: bool) -> slog::Logger { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); @@ -82,13 +97,20 @@ pub fn build_libp2p_instance( boot_nodes: Vec, secret_key: Option, log: slog::Logger, -) -> LibP2PService { +) -> Libp2pInstance { let port = unused_port("tcp").unwrap(); let config = build_config(port, boot_nodes, secret_key); // launch libp2p service - LibP2PService::new(&config, EnrForkId::default(), &log) - .expect("should build libp2p instance") - .1 + + let (signal, exit) = exit_future::signal(); + let executor = + environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); + Libp2pInstance( + LibP2PService::new(executor, &config, EnrForkId::default(), &log) + .expect("should build libp2p instance") + .1, + signal, + ) } #[allow(dead_code)] @@ -99,8 +121,8 @@ pub fn get_enr(node: &LibP2PService) -> Enr { // Returns `n` libp2p peers in fully connected topology. #[allow(dead_code)] -pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec> { - let mut nodes: Vec> = (0..n) +pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec { + let mut nodes: Vec<_> = (0..n) .map(|_| build_libp2p_instance(vec![], None, log.clone())) .collect(); let multiaddrs: Vec = nodes @@ -124,7 +146,7 @@ pub fn build_full_mesh(log: slog::Logger, n: usize) -> Vec> { // Constructs a pair of nodes with separate loggers. The sender dials the receiver. // This returns a (sender, receiver) pair. #[allow(dead_code)] -pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService, LibP2PService) { +pub async fn build_node_pair(log: &slog::Logger) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); @@ -168,8 +190,8 @@ pub async fn build_node_pair(log: &slog::Logger) -> (LibP2PService, LibP2PSer // Returns `n` peers in a linear topology #[allow(dead_code)] -pub fn build_linear(log: slog::Logger, n: usize) -> Vec> { - let mut nodes: Vec> = (0..n) +pub fn build_linear(log: slog::Logger, n: usize) -> Vec { + let mut nodes: Vec<_> = (0..n) .map(|_| build_libp2p_instance(vec![], None, log.clone())) .collect(); let multiaddrs: Vec = nodes diff --git a/beacon_node/eth2-libp2p/tests/noise.rs b/beacon_node/eth2-libp2p/tests/noise.rs index ba12c7346d..973fb425c6 100644 --- a/beacon_node/eth2-libp2p/tests/noise.rs +++ b/beacon_node/eth2-libp2p/tests/noise.rs @@ -136,7 +136,10 @@ async fn test_secio_noise_fallback() { let port = common::unused_port("tcp").unwrap(); let noisy_config = common::build_config(port, vec![], None); - let mut noisy_node = Service::new(&noisy_config, EnrForkId::default(), &log) + let (_signal, exit) = exit_future::signal(); + let executor = + environment::TaskExecutor::new(tokio::runtime::Handle::current(), exit, log.clone()); + let mut noisy_node = Service::new(executor, &noisy_config, EnrForkId::default(), &log) .expect("should build a libp2p instance") .1; diff --git a/beacon_node/genesis/Cargo.toml b/beacon_node/genesis/Cargo.toml index 8b01f7fdea..3100f060ae 100644 --- a/beacon_node/genesis/Cargo.toml +++ b/beacon_node/genesis/Cargo.toml @@ -18,7 +18,7 @@ merkle_proof = { path = "../../consensus/merkle_proof" } eth2_ssz = "0.1.2" eth2_hashing = "0.1.0" tree_hash = "0.1.0" -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } parking_lot = "0.10.2" slog = "2.5.2" exit-future = "0.2.0" diff --git a/beacon_node/genesis/tests/tests.rs b/beacon_node/genesis/tests/tests.rs index 7e2131f511..b4f72d8c8f 100644 --- a/beacon_node/genesis/tests/tests.rs +++ b/beacon_node/genesis/tests/tests.rs @@ -24,7 +24,7 @@ pub fn new_env() -> Environment { #[test] fn basic() { let mut env = new_env(); - let log = env.core_context().log.clone(); + let log = env.core_context().log().clone(); let mut spec = env.eth2_config().spec.clone(); env.runtime().block_on(async { diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 56a6b37002..e3963efcb1 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -10,6 +10,7 @@ genesis = { path = "../genesis" } lazy_static = "1.4.0" matches = "0.1.8" tempfile = "3.1.0" +exit-future = "0.2.0" [dependencies] beacon_chain = { path = "../beacon_chain" } @@ -25,7 +26,7 @@ eth2_ssz = "0.1.2" tree_hash = "0.1.0" futures = "0.3.5" error-chain = "0.12.2" -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } parking_lot = "0.10.2" smallvec = "1.4.0" # TODO: Remove rand crate for mainnet @@ -34,3 +35,4 @@ fnv = "1.0.6" rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +environment = { path = "../../lighthouse/environment" } \ No newline at end of file diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 78e1b8b2a1..44e1014d79 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -58,7 +58,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - runtime_handle: &tokio::runtime::Handle, + executor: environment::TaskExecutor, log: slog::Logger, ) -> error::Result>> { let message_handler_log = log.new(o!("service"=> "router")); @@ -68,7 +68,7 @@ impl Router { // Initialise a message instance, which itself spawns the syncing thread. let processor = Processor::new( - runtime_handle, + executor.clone(), beacon_chain, network_globals.clone(), network_send.clone(), @@ -84,12 +84,15 @@ impl Router { }; // spawn handler task and move the message handler instance into the spawned thread - runtime_handle.spawn(async move { - handler_recv - .for_each(move |msg| future::ready(handler.handle_message(msg))) - .await; - debug!(log, "Network message handler terminated."); - }); + executor.spawn( + async move { + debug!(log, "Network message router started"); + handler_recv + .for_each(move |msg| future::ready(handler.handle_message(msg))) + .await; + }, + "router", + ); Ok(handler_send) } diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 7cab8ad316..3ed9358781 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -14,7 +14,7 @@ use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::{ Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, Slot, @@ -33,8 +33,6 @@ pub struct Processor { chain: Arc>, /// A channel to the syncing thread. sync_send: mpsc::UnboundedSender>, - /// A oneshot channel for destroying the sync thread. - _sync_exit: oneshot::Sender<()>, /// A network context to return and handle RPC requests. network: HandlerNetworkContext, /// The `RPCHandler` logger. @@ -44,7 +42,7 @@ pub struct Processor { impl Processor { /// Instantiate a `Processor` instance pub fn new( - runtime_handle: &tokio::runtime::Handle, + executor: environment::TaskExecutor, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, @@ -53,8 +51,8 @@ impl Processor { let sync_logger = log.new(o!("service"=> "sync")); // spawn the sync thread - let (sync_send, _sync_exit) = crate::sync::manager::spawn( - runtime_handle, + let sync_send = crate::sync::manager::spawn( + executor, beacon_chain.clone(), network_globals, network_send.clone(), @@ -64,7 +62,6 @@ impl Processor { Processor { chain: beacon_chain, sync_send, - _sync_exit, network: HandlerNetworkContext::new(network_send, log.clone()), log: log.clone(), } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ff9063ea50..856198ae6b 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,8 +14,7 @@ use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Handle; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::time::Delay; use types::EthSpec; @@ -53,13 +52,12 @@ impl NetworkService { pub fn start( beacon_chain: Arc>, config: &NetworkConfig, - runtime_handle: &Handle, - network_log: slog::Logger, + executor: environment::TaskExecutor, ) -> error::Result<( Arc>, mpsc::UnboundedSender>, - oneshot::Sender<()>, )> { + let network_log = executor.log().clone(); // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::>(); // get a reference to the beacon chain store @@ -75,7 +73,7 @@ impl NetworkService { // launch libp2p service let (network_globals, mut libp2p) = - runtime_handle.enter(|| LibP2PService::new(config, enr_fork_id, &network_log))?; + LibP2PService::new(executor.clone(), config, enr_fork_id, &network_log)?; for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); @@ -88,7 +86,7 @@ impl NetworkService { beacon_chain.clone(), network_globals.clone(), network_send.clone(), - runtime_handle, + executor.clone(), network_log.clone(), )?; @@ -111,19 +109,20 @@ impl NetworkService { propagation_percentage, }; - let network_exit = runtime_handle.enter(|| spawn_service(network_service))?; + spawn_service(executor, network_service)?; - Ok((network_globals, network_send, network_exit)) + Ok((network_globals, network_send)) } } fn spawn_service( + executor: environment::TaskExecutor, mut service: NetworkService, -) -> error::Result> { - let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); +) -> error::Result<()> { + let mut exit_rx = executor.exit(); // spawn on the current executor - tokio::spawn(async move { + executor.spawn_without_exit(async move { loop { // build the futures to check simultaneously tokio::select! { @@ -361,9 +360,9 @@ fn spawn_service( } } } - }); + }, "network"); - Ok(network_exit) + Ok(()) } /// Returns a `Delay` that triggers shortly after the next change in the beacon chain fork version. diff --git a/beacon_node/network/src/service/tests.rs b/beacon_node/network/src/service/tests.rs index a33bd9eeb3..48aa038c8b 100644 --- a/beacon_node/network/src/service/tests.rs +++ b/beacon_node/network/src/service/tests.rs @@ -32,7 +32,9 @@ mod tests { let enrs = vec![enr1, enr2]; let runtime = Runtime::new().unwrap(); - let handle = runtime.handle().clone(); + + let (signal, exit) = exit_future::signal(); + let executor = environment::TaskExecutor::new(runtime.handle().clone(), exit, log.clone()); let mut config = NetworkConfig::default(); config.libp2p_port = 21212; @@ -42,8 +44,8 @@ mod tests { // Create a new network service which implicitly gets dropped at the // end of the block. - let _ = - NetworkService::start(beacon_chain.clone(), &config, &handle, log.clone()).unwrap(); + let _ = NetworkService::start(beacon_chain.clone(), &config, executor).unwrap(); + drop(signal); }); runtime.shutdown_timeout(tokio::time::Duration::from_millis(300)); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ce8e26b3ee..99abc9495d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -48,7 +48,7 @@ use smallvec::SmallVec; use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -181,17 +181,12 @@ impl SingleBlockRequest { /// chain. This allows the chain to be /// dropped during the syncing process which will gracefully end the `SyncManager`. pub fn spawn( - runtime_handle: &tokio::runtime::Handle, + executor: environment::TaskExecutor, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, log: slog::Logger, -) -> ( - mpsc::UnboundedSender>, - oneshot::Sender<()>, -) { - // generate the exit channel - let (sync_exit, exit_rx) = tokio::sync::oneshot::channel(); +) -> mpsc::UnboundedSender> { // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); @@ -215,11 +210,8 @@ pub fn spawn( // spawn the sync manager thread debug!(log, "Sync Manager started"); - runtime_handle.spawn(async move { - futures::future::select(Box::pin(sync_manager.main()), exit_rx).await; - info!(log.clone(), "Sync Manager shutdown"); - }); - (sync_send, sync_exit) + executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync"); + sync_send } impl SyncManager { diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 382f1c30c6..18db2094f5 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -25,7 +25,7 @@ state_processing = { path = "../../consensus/state_processing" } types = { path = "../../consensus/types" } http = "0.2.1" hyper = "0.13.5" -tokio = { version = "0.2", features = ["sync"] } +tokio = { version = "0.2.21", features = ["sync"] } url = "2.1.1" lazy_static = "1.4.0" eth2_config = { path = "../../common/eth2_config" } @@ -36,6 +36,7 @@ parking_lot = "0.10.2" futures = "0.3.5" operation_pool = { path = "../operation_pool" } rayon = "1.3.0" +environment = { path = "../../lighthouse/environment" } [dev-dependencies] assert_matches = "1.3.0" diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index c05cb66ab4..0862b3e04d 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -35,7 +35,7 @@ use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use url_query::UrlQuery; pub use crate::helpers::parse_pubkey_bytes; @@ -51,14 +51,15 @@ pub struct NetworkInfo { // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] pub fn start_server( + executor: environment::TaskExecutor, config: &Config, beacon_chain: Arc>, network_info: NetworkInfo, db_path: PathBuf, freezer_db_path: PathBuf, eth2_config: Eth2Config, - log: slog::Logger, -) -> Result<(oneshot::Sender<()>, SocketAddr), hyper::Error> { +) -> Result { + let log = executor.log(); let inner_log = log.clone(); let eth2_config = Arc::new(eth2_config); @@ -98,7 +99,7 @@ pub fn start_server( let actual_listen_addr = server.local_addr(); // Build a channel to kill the HTTP server. - let (exit_signal, exit) = oneshot::channel::<()>(); + let exit = executor.exit(); let inner_log = log.clone(); let server_exit = async move { let _ = exit.await; @@ -116,7 +117,8 @@ pub fn start_server( inner_log, "HTTP server failed to start, Unable to bind"; "address" => format!("{:?}", e) ) - }); + }) + .unwrap_or_else(|_| ()); info!( log, @@ -125,9 +127,9 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - tokio::spawn(server_future); + executor.spawn_without_exit(server_future, "http"); - Ok((exit_signal, actual_listen_addr)) + Ok(actual_listen_addr) } #[derive(Clone)] diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index ca3162a129..cba45edff0 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -58,7 +58,7 @@ impl ProductionBeaconNode { &matches, &context.eth2_config.spec_constants, &context.eth2_config().spec, - context.log.clone(), + context.log().clone(), )?; Self::new(context, client_config).await } @@ -75,7 +75,7 @@ impl ProductionBeaconNode { let client_config_1 = client_config.clone(); let client_genesis = client_config.genesis.clone(); let store_config = client_config.store.clone(); - let log = context.log.clone(); + let log = context.log().clone(); let db_path = client_config.create_db_path()?; let freezer_db_path_res = client_config.create_freezer_db_path(); diff --git a/beacon_node/tests/test.rs b/beacon_node/tests/test.rs index 7a7f5a759b..a845acf04d 100644 --- a/beacon_node/tests/test.rs +++ b/beacon_node/tests/test.rs @@ -51,4 +51,5 @@ fn http_server_genesis_state() { api_state, db_state, "genesis state from api should match that from the DB" ); + env.fire_signal(); } diff --git a/beacon_node/timer/Cargo.toml b/beacon_node/timer/Cargo.toml index 2c183db53d..4ac22ad2e1 100644 --- a/beacon_node/timer/Cargo.toml +++ b/beacon_node/timer/Cargo.toml @@ -8,7 +8,8 @@ edition = "2018" beacon_chain = { path = "../beacon_chain" } types = { path = "../../consensus/types" } slot_clock = { path = "../../common/slot_clock" } -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } slog = "2.5.2" parking_lot = "0.10.2" futures = "0.3.5" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index 26f8bb60ea..67aca9c27c 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -3,23 +3,20 @@ //! This service allows task execution on the beacon node for various functionality. use beacon_chain::{BeaconChain, BeaconChainTypes}; -use futures::future; use futures::stream::StreamExt; +use slog::info; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain -/// TODO: We might not need a `Handle` to the runtime since this function should be -/// called from the context of a runtime and we can simply spawn using task::spawn. -/// Check for issues without the Handle. -pub fn spawn( +pub fn spawn_timer( + executor: environment::TaskExecutor, beacon_chain: Arc>, milliseconds_per_slot: u64, -) -> Result, &'static str> { - let (exit_signal, exit) = tokio::sync::oneshot::channel(); - +) -> Result<(), &'static str> { + let log = executor.log(); let start_instant = Instant::now() + beacon_chain .slot_clock @@ -27,14 +24,15 @@ pub fn spawn( .ok_or_else(|| "slot_notifier unable to determine time to next slot")?; // Warning: `interval_at` panics if `milliseconds_per_slot` = 0. - let timer_future = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot)) - .for_each(move |_| { + let mut interval = interval_at(start_instant, Duration::from_millis(milliseconds_per_slot)); + let timer_future = async move { + while interval.next().await.is_some() { beacon_chain.per_slot_task(); - future::ready(()) - }); + } + }; - let future = futures::future::select(timer_future, exit); - tokio::spawn(future); + executor.spawn(timer_future, "timer"); + info!(log, "Timer service started"); - Ok(exit_signal) + Ok(()) } diff --git a/beacon_node/websocket_server/Cargo.toml b/beacon_node/websocket_server/Cargo.toml index a470a427de..8e22fa7d44 100644 --- a/beacon_node/websocket_server/Cargo.toml +++ b/beacon_node/websocket_server/Cargo.toml @@ -12,6 +12,7 @@ serde = "1.0.110" serde_derive = "1.0.110" serde_json = "1.0.52" slog = "2.5.2" -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } types = { path = "../../consensus/types" } ws = "0.9.1" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index 7ffff1b89b..f9ed3e97ea 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -1,4 +1,4 @@ -use slog::{debug, error, info, warn, Logger}; +use slog::{debug, error, info, warn}; use std::marker::PhantomData; use std::net::SocketAddr; use types::EthSpec; @@ -34,16 +34,10 @@ impl WebSocketSender { } pub fn start_server( + executor: environment::TaskExecutor, config: &Config, - log: &Logger, -) -> Result< - ( - WebSocketSender, - tokio::sync::oneshot::Sender<()>, - SocketAddr, - ), - String, -> { +) -> Result<(WebSocketSender, SocketAddr), String> { + let log = executor.log(); let server_string = format!("{}:{}", config.listen_address, config.port); // Create a server that simply ignores any incoming messages. @@ -67,31 +61,26 @@ pub fn start_server( let broadcaster = server.broadcaster(); // Produce a signal/channel that can gracefully shutdown the websocket server. - let exit_channel = { - let (exit_channel, exit) = tokio::sync::oneshot::channel(); - - let log_inner = log.clone(); - let broadcaster_inner = server.broadcaster(); - let exit_future = async move { - let _ = exit.await; - if let Err(e) = broadcaster_inner.shutdown() { - warn!( - log_inner, - "Websocket server errored on shutdown"; - "error" => format!("{:?}", e) - ); - } else { - info!(log_inner, "Websocket server shutdown"); - } - }; - - // Place a future on the handle that will shutdown the websocket server when the - // application exits. - tokio::spawn(exit_future); - - exit_channel + let exit = executor.exit(); + let log_inner = log.clone(); + let broadcaster_inner = server.broadcaster(); + let exit_future = async move { + let _ = exit.await; + if let Err(e) = broadcaster_inner.shutdown() { + warn!( + log_inner, + "Websocket server errored on shutdown"; + "error" => format!("{:?}", e) + ); + } else { + info!(log_inner, "Websocket server shutdown"); + } }; + // Place a future on the handle that will shutdown the websocket server when the + // application exits. + executor.runtime_handle().spawn(exit_future); + let log_inner = log.clone(); let _ = std::thread::spawn(move || match server.run() { @@ -122,7 +111,6 @@ pub fn start_server( sender: Some(broadcaster), _phantom: PhantomData, }, - exit_channel, actual_listen_addr, )) } diff --git a/common/eth2_testnet_config/Cargo.toml b/common/eth2_testnet_config/Cargo.toml index a531cb7547..e90d9413f2 100644 --- a/common/eth2_testnet_config/Cargo.toml +++ b/common/eth2_testnet_config/Cargo.toml @@ -16,5 +16,5 @@ tempdir = "0.3.7" serde = "1.0.110" serde_yaml = "0.8.11" types = { path = "../../consensus/types"} -eth2-libp2p = { path = "../../beacon_node/eth2-libp2p"} +enr = { version = "0.1.0", features = ["libsecp256k1", "ed25519"] } eth2_ssz = "0.1.2" diff --git a/common/eth2_testnet_config/src/lib.rs b/common/eth2_testnet_config/src/lib.rs index 969dcab5fd..5895fce353 100644 --- a/common/eth2_testnet_config/src/lib.rs +++ b/common/eth2_testnet_config/src/lib.rs @@ -7,7 +7,7 @@ //! //! https://github.com/sigp/lighthouse/pull/605 -use eth2_libp2p::Enr; +use enr::{CombinedKey, Enr}; use ssz::{Decode, Encode}; use std::fs::{create_dir_all, File}; use std::io::{Read, Write}; @@ -36,7 +36,7 @@ pub const HARDCODED_BOOT_ENR: &[u8] = include_bytes!("../witti-v0-11-3/boot_enr. pub struct Eth2TestnetConfig { pub deposit_contract_address: String, pub deposit_contract_deploy_block: u64, - pub boot_enr: Option>, + pub boot_enr: Option>>, pub genesis_state: Option>, pub yaml_config: Option, } @@ -239,7 +239,7 @@ mod tests { } fn do_test( - boot_enr: Option>, + boot_enr: Option>>, genesis_state: Option>, yaml_config: Option, ) { diff --git a/common/hashset_delay/Cargo.toml b/common/hashset_delay/Cargo.toml index 6470479b04..cba08662d4 100644 --- a/common/hashset_delay/Cargo.toml +++ b/common/hashset_delay/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] futures = "0.3.5" -tokio = { version = "0.2.20", features = ["time"] } +tokio = { version = "0.2.21", features = ["time"] } [dev-dependencies] -tokio = { version = "0.2.20", features = ["time", "rt-threaded", "macros"] } +tokio = { version = "0.2.21", features = ["time", "rt-threaded", "macros"] } diff --git a/common/lighthouse_metrics/src/lib.rs b/common/lighthouse_metrics/src/lib.rs index 2874ddc503..c6314bdb46 100644 --- a/common/lighthouse_metrics/src/lib.rs +++ b/common/lighthouse_metrics/src/lib.rs @@ -56,7 +56,9 @@ use prometheus::{HistogramOpts, HistogramTimer, Opts}; -pub use prometheus::{Encoder, Gauge, Histogram, IntCounter, IntGauge, Result, TextEncoder}; +pub use prometheus::{ + Encoder, Gauge, Histogram, HistogramVec, IntCounter, IntGauge, IntGaugeVec, Result, TextEncoder, +}; /// Collect all the metrics for reporting. pub fn gather() -> Vec { @@ -99,6 +101,48 @@ pub fn try_create_histogram(name: &str, help: &str) -> Result { Ok(histogram) } +/// Attempts to crate a `HistogramVec`, returning `Err` if the registry does not accept the counter +/// (potentially due to naming conflict). +pub fn try_create_histogram_vec( + name: &str, + help: &str, + label_names: &[&str], +) -> Result { + let opts = HistogramOpts::new(name, help); + let histogram_vec = HistogramVec::new(opts, label_names)?; + prometheus::register(Box::new(histogram_vec.clone()))?; + Ok(histogram_vec) +} + +/// Attempts to crate a `IntGaugeVec`, returning `Err` if the registry does not accept the gauge +/// (potentially due to naming conflict). +pub fn try_create_int_gauge_vec( + name: &str, + help: &str, + label_names: &[&str], +) -> Result { + let opts = Opts::new(name, help); + let counter_vec = IntGaugeVec::new(opts, label_names)?; + prometheus::register(Box::new(counter_vec.clone()))?; + Ok(counter_vec) +} + +pub fn get_int_gauge(int_gauge_vec: &Result, name: &[&str]) -> Option { + if let Ok(int_gauge_vec) = int_gauge_vec { + Some(int_gauge_vec.get_metric_with_label_values(name).ok()?) + } else { + None + } +} + +pub fn get_histogram(histogram_vec: &Result, name: &[&str]) -> Option { + if let Ok(histogram_vec) = histogram_vec { + Some(histogram_vec.get_metric_with_label_values(name).ok()?) + } else { + None + } +} + /// Starts a timer for the given `Histogram`, stopping when it gets dropped or given to `stop_timer(..)`. pub fn start_timer(histogram: &Result) -> Option { if let Ok(histogram) = histogram { @@ -133,6 +177,18 @@ pub fn set_gauge(gauge: &Result, value: i64) { } } +pub fn inc_gauge(gauge: &Result) { + if let Ok(gauge) = gauge { + gauge.inc(); + } +} + +pub fn dec_gauge(gauge: &Result) { + if let Ok(gauge) = gauge { + gauge.dec(); + } +} + pub fn maybe_set_gauge(gauge: &Result, value_opt: Option) { if let Some(value) = value_opt { set_gauge(gauge, value) diff --git a/lcli/Cargo.toml b/lcli/Cargo.toml index 9bc1bbdd32..afd8d1f38b 100644 --- a/lcli/Cargo.toml +++ b/lcli/Cargo.toml @@ -27,7 +27,7 @@ dirs = "2.0.2" genesis = { path = "../beacon_node/genesis" } deposit_contract = { path = "../common/deposit_contract" } tree_hash = "0.1.0" -tokio = { version = "0.2.20", features = ["full"] } +tokio = { version = "0.2.21", features = ["full"] } clap_utils = { path = "../common/clap_utils" } eth2-libp2p = { path = "../beacon_node/eth2-libp2p" } validator_dir = { path = "../common/validator_dir", features = ["insecure_keys"] } diff --git a/lcli/src/eth1_genesis.rs b/lcli/src/eth1_genesis.rs index 1d267e3266..2e4fce0e0f 100644 --- a/lcli/src/eth1_genesis.rs +++ b/lcli/src/eth1_genesis.rs @@ -46,7 +46,7 @@ pub fn run(mut env: Environment, matches: &ArgMatches<'_>) -> Res config.lowest_cached_block_number = eth2_testnet_config.deposit_contract_deploy_block; config.follow_distance = spec.eth1_follow_distance / 2; - let genesis_service = Eth1GenesisService::new(config, env.core_context().log.clone()); + let genesis_service = Eth1GenesisService::new(config, env.core_context().log().clone()); env.runtime().block_on(async { let _ = genesis_service diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 296162ce00..f4abcddb6d 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -9,7 +9,7 @@ write_ssz_files = ["beacon_node/write_ssz_files"] # Writes debugging .ssz files [dependencies] beacon_node = { "path" = "../beacon_node" } -tokio = "0.2.20" +tokio = "0.2.21" slog = { version = "2.5.2", features = ["max_level_trace"] } sloggers = "1.0.0" types = { "path" = "../consensus/types" } diff --git a/lighthouse/environment/Cargo.toml b/lighthouse/environment/Cargo.toml index 441f0e275a..bd9a1b981a 100644 --- a/lighthouse/environment/Cargo.toml +++ b/lighthouse/environment/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] clap = "2.33.0" -tokio = "0.2.20" +tokio = "0.2.21" slog = { version = "2.5.2", features = ["max_level_trace"] } sloggers = "1.0.0" types = { "path" = "../../consensus/types" } @@ -20,6 +20,9 @@ ctrlc = { version = "3.1.4", features = ["termination"] } futures = "0.3.5" parking_lot = "0.10.2" slog-json = "2.3.0" +exit-future = "0.2.0" +lazy_static = "1.4.0" +lighthouse_metrics = { path = "../../common/lighthouse_metrics" } [dev-dependencies] beacon_node = { path = "../../beacon_node" } diff --git a/lighthouse/environment/src/executor.rs b/lighthouse/environment/src/executor.rs new file mode 100644 index 0000000000..feb95eeb14 --- /dev/null +++ b/lighthouse/environment/src/executor.rs @@ -0,0 +1,128 @@ +use crate::metrics; +use futures::prelude::*; +use slog::{debug, trace}; +use tokio::runtime::Handle; + +/// A wrapper over a runtime handle which can spawn async and blocking tasks. +#[derive(Clone)] +pub struct TaskExecutor { + /// The handle to the runtime on which tasks are spawned + pub(crate) handle: Handle, + /// The receiver exit future which on receiving shuts down the task + pub(crate) exit: exit_future::Exit, + pub(crate) log: slog::Logger, +} + +impl TaskExecutor { + /// Create a new task executor. + /// + /// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from + /// a [`RuntimeContext`](struct.RuntimeContext.html) + pub fn new(handle: Handle, exit: exit_future::Exit, log: slog::Logger) -> Self { + Self { handle, exit, log } + } + + /// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled + /// when the corresponding exit_future `Signal` is fired/dropped. + /// + /// This function generates prometheus metrics on number of tasks and task duration. + pub fn spawn(&self, task: impl Future + Send + 'static, name: &'static str) { + let exit = self.exit.clone(); + let log = self.log.clone(); + + if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) { + // Task is shutdown before it completes if `exit` receives + let int_gauge_1 = int_gauge.clone(); + let future = future::select(Box::pin(task), exit).then(move |either| { + match either { + future::Either::Left(_) => trace!(log, "Async task completed"; "task" => name), + future::Either::Right(_) => { + debug!(log, "Async task shutdown, exit received"; "task" => name) + } + } + int_gauge_1.dec(); + futures::future::ready(()) + }); + + int_gauge.inc(); + self.handle.spawn(future); + } + } + + /// Spawn a future on the tokio runtime. This function does not wrap the task in an `exit_future::Exit` + /// like [spawn](#method.spawn). + /// The caller of this function is responsible for wrapping up the task with an `exit_future::Exit` to + /// ensure that the task gets canceled appropriately. + /// This function generates prometheus metrics on number of tasks and task duration. + /// + /// This is useful in cases where the future to be spawned needs to do additional cleanup work when + /// the task is completed/canceled (e.g. writing local variables to disk) or the task is created from + /// some framework which does its own cleanup (e.g. a hyper server). + pub fn spawn_without_exit( + &self, + task: impl Future + Send + 'static, + name: &'static str, + ) { + if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) { + let int_gauge_1 = int_gauge.clone(); + let future = task.then(move |_| { + int_gauge_1.dec(); + futures::future::ready(()) + }); + + int_gauge.inc(); + self.handle.spawn(future); + } + } + + /// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future. + /// This function generates prometheus metrics on number of tasks and task duration. + pub fn spawn_blocking(&self, task: F, name: &'static str) + where + F: FnOnce() -> () + Send + 'static, + { + let exit = self.exit.clone(); + let log = self.log.clone(); + + if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) { + if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name]) + { + let int_gauge_1 = int_gauge.clone(); + let timer = metric.start_timer(); + let join_handle = self.handle.spawn_blocking(task); + + let future = future::select(join_handle, exit).then(move |either| { + match either { + future::Either::Left(_) => { + trace!(log, "Blocking task completed"; "task" => name) + } + future::Either::Right(_) => { + debug!(log, "Blocking task shutdown, exit received"; "task" => name) + } + } + timer.observe_duration(); + int_gauge_1.dec(); + futures::future::ready(()) + }); + + int_gauge.inc(); + self.handle.spawn(future); + } + } + } + + /// Returns the underlying runtime handle. + pub fn runtime_handle(&self) -> Handle { + self.handle.clone() + } + + /// Returns a copy of the `exit_future::Exit`. + pub fn exit(&self) -> exit_future::Exit { + self.exit.clone() + } + + /// Returns a reference to the logger. + pub fn log(&self) -> &slog::Logger { + &self.log + } +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 8ba106d998..119f3247ad 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -10,6 +10,8 @@ use eth2_config::Eth2Config; use eth2_testnet_config::Eth2TestnetConfig; use futures::channel::oneshot; + +pub use executor::TaskExecutor; use slog::{info, o, Drain, Level, Logger}; use sloggers::{null::NullLoggerBuilder, Build}; use std::cell::RefCell; @@ -17,8 +19,10 @@ use std::ffi::OsStr; use std::fs::{rename as FsRename, OpenOptions}; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; +use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; use types::{EthSpec, InteropEthSpec, MainnetEthSpec, MinimalEthSpec}; +mod executor; +mod metrics; pub const ETH2_CONFIG_FILENAME: &str = "eth2-spec.toml"; @@ -172,10 +176,13 @@ impl EnvironmentBuilder { /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { + let (signal, exit) = exit_future::signal(); Ok(Environment { runtime: self .runtime .ok_or_else(|| "Cannot build environment without runtime".to_string())?, + signal: Some(signal), + exit, log: self .log .ok_or_else(|| "Cannot build environment without log".to_string())?, @@ -192,8 +199,7 @@ impl EnvironmentBuilder { /// `Runtime`, instead it only has access to a `Runtime`. #[derive(Clone)] pub struct RuntimeContext { - pub runtime_handle: Handle, - pub log: Logger, + pub executor: TaskExecutor, pub eth_spec_instance: E, pub eth2_config: Eth2Config, } @@ -204,8 +210,11 @@ impl RuntimeContext { /// The generated service will have the `service_name` in all it's logs. pub fn service_context(&self, service_name: String) -> Self { Self { - runtime_handle: self.runtime_handle.clone(), - log: self.log.new(o!("service" => service_name)), + executor: TaskExecutor { + handle: self.executor.handle.clone(), + exit: self.executor.exit.clone(), + log: self.executor.log.new(o!("service" => service_name)), + }, eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), } @@ -215,12 +224,19 @@ impl RuntimeContext { pub fn eth2_config(&self) -> &Eth2Config { &self.eth2_config } + + /// Returns a reference to the logger for this service. + pub fn log(&self) -> &slog::Logger { + self.executor.log() + } } /// An environment where Lighthouse services can run. Used to start a production beacon node or /// validator client, or to run tests that involve logging and async task execution. pub struct Environment { runtime: Runtime, + signal: Option, + exit: exit_future::Exit, log: Logger, eth_spec_instance: E, pub eth2_config: Eth2Config, @@ -239,8 +255,11 @@ impl Environment { /// Returns a `Context` where no "service" has been added to the logger output. pub fn core_context(&mut self) -> RuntimeContext { RuntimeContext { - runtime_handle: self.runtime.handle().clone(), - log: self.log.clone(), + executor: TaskExecutor { + exit: self.exit.clone(), + handle: self.runtime().handle().clone(), + log: self.log.clone(), + }, eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), } @@ -249,8 +268,11 @@ impl Environment { /// Returns a `Context` where the `service_name` is added to the logger output. pub fn service_context(&mut self, service_name: String) -> RuntimeContext { RuntimeContext { - runtime_handle: self.runtime.handle().clone(), - log: self.log.new(o!("service" => service_name)), + executor: TaskExecutor { + exit: self.exit.clone(), + handle: self.runtime().handle().clone(), + log: self.log.new(o!("service" => service_name.clone())), + }, eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), } @@ -279,6 +301,13 @@ impl Environment { .shutdown_timeout(std::time::Duration::from_secs(2)) } + /// Fire exit signal which shuts down all spawned services + pub fn fire_signal(&mut self) { + if let Some(signal) = self.signal.take() { + let _ = signal.fire(); + } + } + /// Sets the logger (and all child loggers) to log to a file. pub fn log_to_json_file( &mut self, diff --git a/lighthouse/environment/src/metrics.rs b/lighthouse/environment/src/metrics.rs new file mode 100644 index 0000000000..54f4b93c8a --- /dev/null +++ b/lighthouse/environment/src/metrics.rs @@ -0,0 +1,21 @@ +/// Handles async task metrics +use lazy_static::lazy_static; +pub use lighthouse_metrics::*; + +lazy_static! { + pub static ref ASYNC_TASKS_COUNT: Result = try_create_int_gauge_vec( + "async_tasks_count", + "Total number of async tasks spawned using spawn", + &["async_task_count"] + ); + pub static ref BLOCKING_TASKS_COUNT: Result = try_create_int_gauge_vec( + "blocking_tasks_count", + "Total number of async tasks spawned using spawn_blocking", + &["blocking_task_count"] + ); + pub static ref BLOCKING_TASKS_HISTOGRAM: Result = try_create_histogram_vec( + "blocking_tasks_histogram", + "Time taken by blocking tasks", + &["blocking_task_hist"] + ); +} diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index b037f6d786..2a8b98855d 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -141,7 +141,7 @@ fn run( .eth2_testnet_config(eth2_testnet_config)? .build()?; - let log = environment.core_context().log; + let log = environment.core_context().log().clone(); if let Some(log_path) = matches.value_of("logfile") { let path = log_path @@ -216,11 +216,15 @@ fn run( )) .map_err(|e| format!("Failed to init validator client: {}", e))?; - environment.core_context().runtime_handle.enter(|| { - validator - .start_service() - .map_err(|e| format!("Failed to start validator client service: {}", e)) - })?; + environment + .core_context() + .executor + .runtime_handle() + .enter(|| { + validator + .start_service() + .map_err(|e| format!("Failed to start validator client service: {}", e)) + })?; Some(validator) } else { @@ -234,9 +238,9 @@ fn run( // Block this thread until Crtl+C is pressed. environment.block_until_ctrl_c()?; - info!(log, "Shutting down.."); + environment.fire_signal(); drop(beacon_node); drop(validator_client); diff --git a/testing/eth1_test_rig/Cargo.toml b/testing/eth1_test_rig/Cargo.toml index e93c982881..12d13b4516 100644 --- a/testing/eth1_test_rig/Cargo.toml +++ b/testing/eth1_test_rig/Cargo.toml @@ -5,8 +5,8 @@ authors = ["Paul Hauner "] edition = "2018" [dependencies] +tokio = { version = "0.2.21", features = ["time"] } web3 = "0.11.0" -tokio = { version = "0.2.20", features = ["time"] } futures = { version = "0.3.5", features = ["compat"] } types = { path = "../../consensus/types"} serde_json = "1.0.52" diff --git a/testing/simulator/Cargo.toml b/testing/simulator/Cargo.toml index 6874f935b3..fd2323c010 100644 --- a/testing/simulator/Cargo.toml +++ b/testing/simulator/Cargo.toml @@ -12,7 +12,7 @@ types = { path = "../../consensus/types" } validator_client = { path = "../../validator_client" } parking_lot = "0.10.2" futures = "0.3.5" -tokio = "0.2.20" +tokio = "0.2.21" eth1_test_rig = { path = "../eth1_test_rig" } env_logger = "0.7.1" clap = "2.33.0" diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index d5ba403e48..ea0c99c627 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -9,7 +9,7 @@ name = "validator_client" path = "src/lib.rs" [dev-dependencies] -tokio = {version = "0.2.20", features = ["time", "rt-threaded", "macros"]} +tokio = { version = "0.2.21", features = ["time", "rt-threaded", "macros"] } [dependencies] eth2_ssz = "0.1.2" @@ -27,7 +27,7 @@ serde_json = "1.0.52" slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_trace"] } slog-async = "2.5.0" slog-term = "2.5.0" -tokio = {version = "0.2.20", features = ["time"]} +tokio = { version = "0.2.21", features = ["time"] } error-chain = "0.12.2" bincode = "1.2.1" futures = { version = "0.3.5", features = ["compat"] } diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 76ec305b78..b2426192bc 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,8 +3,7 @@ use crate::{ validator_store::ValidatorStore, }; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, debug, info, trace}; use slot_clock::SlotClock; @@ -118,8 +117,8 @@ impl Deref for AttestationService { impl AttestationService { /// Starts the service which periodically produces attestations. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); let duration_to_next_slot = self @@ -141,13 +140,11 @@ impl AttestationService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - - let runtime_handle = self.context.runtime_handle.clone(); + let executor = self.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { - let log = &self.context.log; + let log = self.context.log(); if let Err(e) = self.spawn_attestation_tasks(slot_duration) { crit!( @@ -164,13 +161,8 @@ impl AttestationService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + executor.spawn(interval_fut, "attestation_service"); + Ok(()) } /// For each each required attestation, spawn a new task that downloads, signs and uploads the @@ -214,7 +206,7 @@ impl AttestationService { .into_iter() .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. - self.inner.context.runtime_handle.spawn( + self.inner.context.executor.runtime_handle().spawn( self.clone().publish_attestations_and_aggregates( slot, committee_index, @@ -243,7 +235,7 @@ impl AttestationService { validator_duties: Vec, aggregate_production_instant: Instant, ) -> Result<(), ()> { - let log = &self.context.log; + let log = self.context.log(); // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have // any validators for the given `slot` and `committee_index`. @@ -314,7 +306,7 @@ impl AttestationService { committee_index: CommitteeIndex, validator_duties: &[DutyAndProof], ) -> Result>, String> { - let log = &self.context.log; + let log = self.context.log(); if validator_duties.is_empty() { return Ok(None); @@ -448,7 +440,7 @@ impl AttestationService { attestation: Attestation, validator_duties: &[DutyAndProof], ) -> Result<(), String> { - let log = &self.context.log; + let log = self.context.log(); let aggregated_attestation = self .beacon_node @@ -548,6 +540,7 @@ impl AttestationService { #[cfg(test)] mod tests { use super::*; + use futures::future::FutureExt; use parking_lot::RwLock; /// This test is to ensure that a `tokio_timer::Delay` with an instant in the past will still diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index b0bc1860a3..faecb63ddc 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,7 +1,6 @@ use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{StreamExt, TryFutureExt}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, error, info, trace}; use slot_clock::SlotClock; @@ -113,8 +112,8 @@ impl Deref for BlockService { impl BlockService { /// Starts the service that periodically attempts to produce blocks. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { + let log = self.context.log().clone(); let duration_to_next_slot = self .slot_clock @@ -136,7 +135,7 @@ impl BlockService { ) }; - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -144,20 +143,14 @@ impl BlockService { } }; - let (exit_signal, exit_fut) = exit_future::signal(); + executor.spawn(interval_fut, "block_service"); - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + Ok(()) } /// Attempt to produce a block for any block producers in the `ValidatorStore`. async fn do_update(&self) -> Result<(), ()> { - let log = &self.context.log; + let log = self.context.log(); let slot = self.slot_clock.now().ok_or_else(move || { crit!(log, "Duties manager failed to read slot clock"); @@ -190,7 +183,7 @@ impl BlockService { iter.for_each(|validator_pubkey| { let service = self.clone(); let log = log.clone(); - self.inner.context.runtime_handle.spawn( + self.inner.context.executor.runtime_handle().spawn( service .publish_block(slot, validator_pubkey) .map_err(move |e| { @@ -208,7 +201,7 @@ impl BlockService { /// Produce a block at the given slot for validator_pubkey async fn publish_block(self, slot: Slot, validator_pubkey: PublicKey) -> Result<(), String> { - let log = &self.context.log; + let log = self.context.log(); let current_slot = self .slot_clock diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index a1e8c93c0a..b7c0c0876c 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1,11 +1,10 @@ use crate::{is_synced::is_synced, validator_store::ValidatorStore}; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parking_lot::RwLock; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription}; -use slog::{debug, error, info, trace, warn}; +use slog::{debug, error, trace, warn}; use slot_clock::SlotClock; use std::collections::HashMap; use std::convert::TryInto; @@ -439,9 +438,7 @@ impl DutiesService { } /// Start the service that periodically polls the beacon node for validator duties. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); - + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let duration_to_next_slot = self .slot_clock .duration_to_next_slot() @@ -456,15 +453,14 @@ impl DutiesService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - // Run an immediate update before starting the updater service. self.inner .context - .runtime_handle + .executor + .runtime_handle() .spawn(self.clone().do_update()); - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -472,18 +468,14 @@ impl DutiesService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); + executor.spawn(interval_fut, "duties_service"); - Ok(exit_signal) + Ok(()) } /// Attempt to download the duties of all managed validators for this epoch and the next. async fn do_update(self) -> Result<(), ()> { - let log = &self.context.log; + let log = self.context.log(); if !is_synced(&self.beacon_node, &self.slot_clock, None).await && !self.allow_unsynced_beacon_node @@ -550,7 +542,7 @@ impl DutiesService { .await .map_err(move |e| format!("Failed to get duties for epoch {}: {:?}", epoch, e))?; - let log = self.context.log.clone(); + let log = self.context.log().clone(); let mut new_validator = 0; let mut new_epoch = 0; @@ -652,7 +644,7 @@ impl DutiesService { ) } - let log = self.context.log.clone(); + let log = self.context.log().clone(); let count = validator_subscriptions.len(); if count == 0 { diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index ae979d4fd2..1407ed3c55 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -1,9 +1,8 @@ use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parking_lot::RwLock; use remote_beacon_node::RemoteBeaconNode; -use slog::{debug, info, trace}; +use slog::{debug, trace}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; @@ -100,9 +99,7 @@ impl ForkService { } /// Starts the service that periodically polls for the `Fork`. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); - + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let duration_to_next_epoch = self .slot_clock .duration_to_next_epoch(E::slots_per_epoch()) @@ -117,15 +114,14 @@ impl ForkService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - // Run an immediate update before starting the updater service. self.inner .context - .runtime_handle + .executor + .runtime_handle() .spawn(self.clone().do_update()); - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -133,18 +129,14 @@ impl ForkService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); + executor.spawn(interval_fut, "fork_service"); - Ok(exit_signal) + Ok(()) } /// Attempts to download the `Fork` from the server. async fn do_update(self) -> Result<(), ()> { - let log = &self.context.log; + let log = self.context.log(); let fork = self .inner diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 17d30df753..c7afbd7c46 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -17,7 +17,6 @@ use clap::ArgMatches; use config::SLASHING_PROTECTION_FILENAME; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; -use exit_future::Signal; use fork_service::{ForkService, ForkServiceBuilder}; use notifier::spawn_notifier; use remote_beacon_node::RemoteBeaconNode; @@ -41,7 +40,6 @@ pub struct ProductionValidatorClient { fork_service: ForkService, block_service: BlockService, attestation_service: AttestationService, - exit_signals: Vec, config: Config, } @@ -60,10 +58,10 @@ impl ProductionValidatorClient { /// Instantiates the validator client, _without_ starting the timers to trigger block /// and attestation production. pub async fn new(mut context: RuntimeContext, config: Config) -> Result { - let log_1 = context.log.clone(); - let log_2 = context.log.clone(); - let log_3 = context.log.clone(); - let log_4 = context.log.clone(); + let log_1 = context.log().clone(); + let log_2 = context.log().clone(); + let log_3 = context.log().clone(); + let log_4 = context.log().clone(); info!( log_1, @@ -217,46 +215,32 @@ impl ProductionValidatorClient { fork_service, block_service, attestation_service, - exit_signals: vec![], config, }) } pub fn start_service(&mut self) -> Result<(), String> { - let duties_exit = self - .duties_service + self.duties_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start duties service: {}", e))?; - let fork_exit = self - .fork_service + self.fork_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start fork service: {}", e))?; - let block_exit = self - .block_service + self.block_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start block service: {}", e))?; - let attestation_exit = self - .attestation_service + self.attestation_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start attestation service: {}", e))?; - let notifier_exit = - spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; - - self.exit_signals = vec![ - duties_exit, - fork_exit, - block_exit, - attestation_exit, - notifier_exit, - ]; + spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; Ok(()) } diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 9d9aa97318..d9ee7faec3 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,16 +1,14 @@ use crate::{is_synced::is_synced, ProductionValidatorClient}; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use slog::{error, info}; use slot_clock::SlotClock; use tokio::time::{interval_at, Duration, Instant}; use types::EthSpec; /// Spawns a notifier service which periodically logs information about the node. -pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result { +pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result<(), String> { let context = client.context.service_context("notifier".into()); - let runtime_handle = context.runtime_handle.clone(); - let log = context.log.clone(); + let executor = context.executor.clone(); let duties_service = client.duties_service.clone(); let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node; @@ -25,7 +23,7 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu let mut interval = interval_at(start_instant, slot_duration); let interval_fut = async move { - let log = &context.log; + let log = context.log(); while interval.next().await.is_some() { if !is_synced( @@ -83,12 +81,6 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu } }; - let (exit_signal, exit) = exit_future::signal(); - let future = futures::future::select( - Box::pin(interval_fut), - exit.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + executor.spawn(interval_fut, "validator_notifier"); + Ok(()) }