diff --git a/Cargo.lock b/Cargo.lock index 8ac385f518..80594cbd90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,7 +25,7 @@ dependencies = [ "slog-async", "slog-term", "tempdir", - "tokio 0.2.20", + "tokio 0.2.21", "types", "validator_client", "validator_dir", @@ -285,7 +285,7 @@ dependencies = [ "state_processing", "store", "tempfile", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", "websocket_server", @@ -318,7 +318,7 @@ dependencies = [ "slog-async", "slog-term", "store", - "tokio 0.2.20", + "tokio 0.2.21", "toml", "types", "version", @@ -623,7 +623,7 @@ dependencies = [ "slot_clock", "store", "timer", - "tokio 0.2.20", + "tokio 0.2.21", "toml", "tree_hash", "types", @@ -1012,7 +1012,7 @@ dependencies = [ "rlp", "sha2", "smallvec 1.4.0", - "tokio 0.2.20", + "tokio 0.2.21", "uint", "zeroize", ] @@ -1127,7 +1127,7 @@ dependencies = [ "slog-json", "slog-term", "sloggers", - "tokio 0.2.20", + "tokio 0.2.21", "types", ] @@ -1163,7 +1163,7 @@ dependencies = [ "slog", "sloggers", "state_processing", - "tokio 0.2.20", + "tokio 0.2.21", "toml", "tree_hash", "types", @@ -1177,7 +1177,7 @@ dependencies = [ "deposit_contract", "futures 0.3.5", "serde_json", - "tokio 0.2.20", + "tokio 0.2.21", "types", "web3", ] @@ -1214,7 +1214,7 @@ dependencies = [ "snap", "tempdir", "tiny-keccak 2.0.2", - "tokio 0.2.20", + "tokio 0.2.21", "tokio-io-timeout", "tokio-util", "types", @@ -1687,7 +1687,7 @@ dependencies = [ "serde_derive", "slog", "state_processing", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", ] @@ -1758,7 +1758,7 @@ dependencies = [ "indexmap", "log 0.4.8", "slab 0.4.2", - "tokio 0.2.20", + "tokio 0.2.21", "tokio-util", ] @@ -1777,7 +1777,7 @@ name = "hashset_delay" version = "0.2.0" dependencies = [ "futures 0.3.5", - "tokio 0.2.20", + "tokio 0.2.21", ] [[package]] @@ -1968,7 +1968,7 @@ dependencies = [ "net2", "pin-project", "time", - "tokio 0.2.20", + "tokio 0.2.21", "tower-service", "want 0.3.0", ] @@ -1995,7 +1995,7 @@ dependencies = [ "bytes 0.5.4", "hyper 0.13.5", "native-tls", - "tokio 0.2.20", + "tokio 0.2.21", "tokio-tls 0.3.1", ] @@ -2194,7 +2194,7 @@ dependencies = [ "serde_yaml", "simple_logger", "state_processing", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", "validator_dir", @@ -2469,7 +2469,7 @@ dependencies = [ "ipnet", "libp2p-core", "log 0.4.8", - "tokio 0.2.20", + "tokio 0.2.21", ] [[package]] @@ -2563,7 +2563,7 @@ dependencies = [ "slog-term", "sloggers", "tempfile", - "tokio 0.2.20", + "tokio 0.2.21", "types", "validator_client", "validator_dir", @@ -2868,6 +2868,7 @@ name = "network" version = "0.1.2" dependencies = [ "beacon_chain", + "environment", "error-chain", "eth2-libp2p", "eth2_ssz", @@ -2889,7 +2890,7 @@ dependencies = [ "smallvec 1.4.0", "store", "tempfile", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", ] @@ -3679,7 +3680,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "time", - "tokio 0.2.20", + "tokio 0.2.21", "tokio-tls 0.3.1", "url 2.1.1", "wasm-bindgen", @@ -3694,6 +3695,7 @@ version = "0.1.2" dependencies = [ "beacon_chain", "bls", + "environment", "eth2-libp2p", "eth2_config", "eth2_ssz", @@ -3720,7 +3722,7 @@ dependencies = [ "slot_clock", "state_processing", "store", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", "url 2.1.1", @@ -4154,7 +4156,7 @@ dependencies = [ "node_test_rig", "parking_lot 0.10.2", "rayon", - "tokio 0.2.20", + "tokio 0.2.21", "types", "validator_client", ] @@ -4622,11 +4624,12 @@ name = "timer" version = "0.1.2" dependencies = [ "beacon_chain", + "environment", "futures 0.3.5", "parking_lot 0.10.2", "slog", "slot_clock", - "tokio 0.2.20", + "tokio 0.2.21", "types", ] @@ -4700,9 +4703,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.20" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05c1d570eb1a36f0345a5ce9c6c6e665b70b73d11236912c0b477616aeec47b1" +checksum = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58" dependencies = [ "bytes 0.5.4", "fnv", @@ -4812,7 +4815,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9390a43272c8a6ac912ed1d1e2b6abeafd5047e05530a2fa304deee041a06215" dependencies = [ "bytes 0.5.4", - "tokio 0.2.20", + "tokio 0.2.21", ] [[package]] @@ -4926,7 +4929,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" dependencies = [ "native-tls", - "tokio 0.2.20", + "tokio 0.2.21", ] [[package]] @@ -4991,7 +4994,7 @@ dependencies = [ "futures-sink", "log 0.4.8", "pin-project-lite", - "tokio 0.2.20", + "tokio 0.2.21", ] [[package]] @@ -5283,7 +5286,7 @@ dependencies = [ "slog-term", "slot_clock", "tempdir", - "tokio 0.2.20", + "tokio 0.2.21", "tree_hash", "types", "validator_dir", @@ -5584,12 +5587,13 @@ dependencies = [ name = "websocket_server" version = "0.1.2" dependencies = [ + "environment", "futures 0.3.5", "serde", "serde_derive", "serde_json", "slog", - "tokio 0.2.20", + "tokio 0.2.21", "types", "ws", ] diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index b13b2f95a3..9b6d80194f 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -46,7 +46,8 @@ tempfile = "3.1.0" bitvec = "0.17.4" bls = { path = "../../crypto/bls" } safe_arith = { path = "../../consensus/safe_arith" } +environment = { path = "../../lighthouse/environment" } [dev-dependencies] lazy_static = "1.4.0" -environment = { path = "../../lighthouse/environment" } + diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 7f8d1a6715..b1d31aafc7 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -1,4 +1,5 @@ use crate::metrics; +use environment::TaskExecutor; use eth1::{Config as Eth1Config, Eth1Block, Service as HttpService}; use eth2_hashing::hash; use slog::{debug, error, trace, Logger}; @@ -285,8 +286,8 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - pub fn start(&self, handle: &tokio::runtime::Handle, exit: tokio::sync::oneshot::Receiver<()>) { - HttpService::auto_update(self.core.clone(), handle, exit); + pub fn start(&self, handle: TaskExecutor) { + HttpService::auto_update(self.core.clone(), handle); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9bb8bf12ce..81b667994f 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -50,7 +50,6 @@ pub struct ClientBuilder { beacon_chain_builder: Option>, beacon_chain: Option>>, eth1_service: Option, - exit_channels: Vec>, event_handler: Option, network_globals: Option>>, network_send: Option>>, @@ -84,7 +83,6 @@ where beacon_chain_builder: None, beacon_chain: None, eth1_service: None, - exit_channels: vec![], event_handler: None, network_globals: None, network_send: None, @@ -223,13 +221,12 @@ where .ok_or_else(|| "network requires a runtime_context")? .clone(); - let (network_globals, network_send, network_exit) = - NetworkService::start(beacon_chain, config, &context.runtime_handle, context.log) + let (network_globals, network_send) = + NetworkService::start(beacon_chain, config, context.runtime_handle, context.log) .map_err(|e| format!("Failed to start network: {:?}", e))?; self.network_globals = Some(network_globals); self.network_send = Some(network_send); - self.exit_channels.push(network_exit); Ok(self) } @@ -251,16 +248,14 @@ where .ok_or_else(|| "node timer requires a chain spec".to_string())? .milliseconds_per_slot; - let timer_exit = timer::spawn( - &context.runtime_handle, + let _ = timer::spawn( + context.runtime_handle, beacon_chain, milliseconds_per_slot, context.log.clone(), ) .map_err(|e| format!("Unable to start node timer: {}", e))?; - self.exit_channels.push(timer_exit); - Ok(self) } @@ -294,8 +289,8 @@ where }; let log = context.log.clone(); - let (exit_channel, listening_addr) = rest_api::start_server( - &context.runtime_handle, + let listening_addr = rest_api::start_server( + context.runtime_handle, &client_config.rest_api, beacon_chain, network_info, @@ -310,7 +305,6 @@ where ) .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; - self.exit_channels.push(exit_channel); self.http_listen_addr = Some(listening_addr); Ok(self) @@ -337,8 +331,8 @@ where .ok_or_else(|| "slot_notifier requires a chain spec".to_string())? .milliseconds_per_slot; - let exit_channel = spawn_notifier( - &context.runtime_handle, + let _ = spawn_notifier( + context.runtime_handle, beacon_chain, network_globals, milliseconds_per_slot, @@ -346,8 +340,6 @@ where ) .map_err(|e| format!("Unable to start slot notifier: {}", e))?; - self.exit_channels.push(exit_channel); - Ok(self) } @@ -364,7 +356,6 @@ where network_globals: self.network_globals, http_listen_addr: self.http_listen_addr, websocket_listen_addr: self.websocket_listen_addr, - _exit_channels: self.exit_channels, } } } @@ -435,21 +426,14 @@ where .ok_or_else(|| "websocket_event_handler requires a runtime_context")? .service_context("ws".into()); - let (sender, exit_channel, listening_addr): ( - WebSocketSender, - Option<_>, - Option<_>, - ) = if config.enabled { - let (sender, exit, listening_addr) = - websocket_server::start_server(&context.runtime_handle, &config, &context.log)?; - (sender, Some(exit), Some(listening_addr)) + let (sender, listening_addr): (WebSocketSender, Option<_>) = if config.enabled { + let (sender, listening_addr) = + websocket_server::start_server(context.runtime_handle, &config, &context.log)?; + (sender, Some(listening_addr)) } else { - (WebSocketSender::dummy(), None, None) + (WebSocketSender::dummy(), None) }; - if let Some(channel) = exit_channel { - self.exit_channels.push(channel); - } self.event_handler = Some(sender); self.websocket_listen_addr = listening_addr; @@ -653,14 +637,8 @@ where self.eth1_service = None; - let exit = { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.exit_channels.push(tx); - rx - }; - // Starts the service that connects to an eth1 node and periodically updates caches. - backend.start(&context.runtime_handle, exit); + backend.start(context.runtime_handle); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/client/src/lib.rs b/beacon_node/client/src/lib.rs index 7f665b9cb1..da670ff134 100644 --- a/beacon_node/client/src/lib.rs +++ b/beacon_node/client/src/lib.rs @@ -25,8 +25,6 @@ pub struct Client { network_globals: Option>>, http_listen_addr: Option, websocket_listen_addr: Option, - /// Exit channels will complete/error when dropped, causing each service to exit gracefully. - _exit_channels: Vec>, } impl Client { diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 271feabacf..adab849d59 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -25,12 +25,12 @@ const SPEEDO_OBSERVATIONS: usize = 4; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier( - handle: &tokio::runtime::Handle, + handle: environment::TaskExecutor, beacon_chain: Arc>, network: Arc>, milliseconds_per_slot: u64, log: slog::Logger, -) -> Result, String> { +) -> Result<(), String> { let log_1 = log.clone(); let slot_duration = Duration::from_millis(milliseconds_per_slot); let duration_to_next_slot = beacon_chain @@ -148,20 +148,10 @@ pub fn spawn_notifier( Ok::<(), ()>(()) }; - let (exit_signal, exit) = tokio::sync::oneshot::channel(); - - let exit_future = async move { - let _ = exit.await.ok(); - info!(log_1, "Notifier service shutdown"); - }; - // run the notifier on the current executor - handle.spawn(futures::future::select( - Box::pin(interval_future), - Box::pin(exit_future), - )); + handle.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); - Ok(exit_signal) + Ok(()) } /// Returns the peer count, returning something helpful if it's `usize::max_value` (effectively a diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index 1777e7ed92..f7938242cf 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [dev-dependencies] eth1_test_rig = { path = "../../testing/eth1_test_rig" } -environment = { path = "../../lighthouse/environment" } toml = "0.5.6" web3 = "0.10.0" sloggers = "1.0.0" @@ -30,3 +29,4 @@ state_processing = { path = "../../consensus/state_processing" } libflate = "1.0.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics"} lazy_static = "1.4.0" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 68fe357258..46dd9619a6 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -283,13 +283,8 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub fn auto_update( - service: Self, - handle: &tokio::runtime::Handle, - exit: tokio::sync::oneshot::Receiver<()>, - ) { + pub fn auto_update(service: Self, handle: environment::TaskExecutor) { let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); - let log = service.log.clone(); let mut interval = interval_at(Instant::now(), update_interval); @@ -301,14 +296,7 @@ impl Service { } }; - let exit_future = async move { - let _ = exit.await.ok(); - info!(log, "Eth1 service shutdown"); - }; - - let future = futures::future::select(Box::pin(update_future), Box::pin(exit_future)); - - handle.spawn(future); + handle.spawn(update_future, "eth1_service"); } async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 5bc3d02c53..97556dd55d 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -84,7 +84,7 @@ pub struct Service { impl Service { pub fn new( - handle: &tokio::runtime::Handle, + handle: tokio::runtime::Handle, config: &NetworkConfig, enr_fork_id: EnrForkId, log: &slog::Logger, @@ -131,7 +131,7 @@ impl Service { } SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) .peer_connection_limit(MAX_CONNECTIONS_PER_PEER) - .executor(Box::new(Executor(handle.clone()))) + .executor(Box::new(Executor(handle))) .build() }; diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index 8ac89de6b3..2636a507b9 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -34,3 +34,4 @@ fnv = "1.0.6" rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } +environment = { path = "../../lighthouse/environment" } \ No newline at end of file diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index 78e1b8b2a1..eb3f6d4bd1 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -58,7 +58,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - runtime_handle: &tokio::runtime::Handle, + runtime_handle: environment::TaskExecutor, log: slog::Logger, ) -> error::Result>> { let message_handler_log = log.new(o!("service"=> "router")); @@ -68,7 +68,8 @@ impl Router { // Initialise a message instance, which itself spawns the syncing thread. let processor = Processor::new( - runtime_handle, + // TODO: spawn_blocking here + &runtime_handle.runtime_handle(), beacon_chain, network_globals.clone(), network_send.clone(), @@ -84,12 +85,15 @@ impl Router { }; // spawn handler task and move the message handler instance into the spawned thread - runtime_handle.spawn(async move { - handler_recv - .for_each(move |msg| future::ready(handler.handle_message(msg))) - .await; - debug!(log, "Network message handler terminated."); - }); + runtime_handle.spawn( + async move { + handler_recv + .for_each(move |msg| future::ready(handler.handle_message(msg))) + .await; + debug!(log, "Network message handler terminated."); + }, + "router_service", + ); Ok(handler_send) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 9afc0541eb..8a88c84d32 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -53,12 +53,11 @@ impl NetworkService { pub fn start( beacon_chain: Arc>, config: &NetworkConfig, - runtime_handle: &Handle, + handle: environment::TaskExecutor, network_log: slog::Logger, ) -> error::Result<( Arc>, mpsc::UnboundedSender>, - oneshot::Sender<()>, )> { // build the network channel let (network_send, network_recv) = mpsc::unbounded_channel::>(); @@ -75,7 +74,7 @@ impl NetworkService { // launch libp2p service let (network_globals, mut libp2p) = - LibP2PService::new(runtime_handle, config, enr_fork_id, &network_log)?; + LibP2PService::new(handle.runtime_handle(), config, enr_fork_id, &network_log)?; for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); @@ -88,7 +87,7 @@ impl NetworkService { beacon_chain.clone(), network_globals.clone(), network_send.clone(), - runtime_handle, + handle.clone(), network_log.clone(), )?; @@ -111,17 +110,18 @@ impl NetworkService { propagation_percentage, }; - let network_exit = spawn_service(runtime_handle, network_service)?; + let _ = spawn_service(handle, network_service)?; - Ok((network_globals, network_send, network_exit)) + Ok((network_globals, network_send)) } } fn spawn_service( - handle: &tokio::runtime::Handle, + handle: environment::TaskExecutor, mut service: NetworkService, -) -> error::Result> { - let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); +) -> error::Result<()> { + let mut exit_rx = handle.exit(); + let handle = handle.runtime_handle(); // spawn on the current executor handle.spawn(async move { @@ -364,7 +364,7 @@ fn spawn_service( } }); - Ok(network_exit) + Ok(()) } /// Returns a `Delay` that triggers shortly after the next change in the beacon chain fork version. diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index d2c8a8660f..d458eaa1bf 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -36,6 +36,7 @@ parking_lot = "0.10.2" futures = "0.3.5" operation_pool = { path = "../operation_pool" } rayon = "1.3.0" +environment = { path = "../../lighthouse/environment" } [dev-dependencies] remote_beacon_node = { path = "../../common/remote_beacon_node" } diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 669ea05e4d..e405997559 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -52,7 +52,7 @@ pub struct NetworkInfo { // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] pub fn start_server( - handle: &Handle, + handle: environment::TaskExecutor, config: &Config, beacon_chain: Arc>, network_info: NetworkInfo, @@ -60,7 +60,7 @@ pub fn start_server( freezer_db_path: PathBuf, eth2_config: Eth2Config, log: slog::Logger, -) -> Result<(oneshot::Sender<()>, SocketAddr), hyper::Error> { +) -> Result { let inner_log = log.clone(); let eth2_config = Arc::new(eth2_config); @@ -100,7 +100,7 @@ pub fn start_server( let actual_listen_addr = server.local_addr(); // Build a channel to kill the HTTP server. - let (exit_signal, exit) = oneshot::channel::<()>(); + let exit = handle.exit(); let inner_log = log.clone(); let server_exit = async move { let _ = exit.await; @@ -127,9 +127,9 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - handle.spawn(server_future); + handle.runtime_handle().spawn(server_future); - Ok((exit_signal, actual_listen_addr)) + Ok(actual_listen_addr) } #[derive(Clone)] diff --git a/beacon_node/timer/Cargo.toml b/beacon_node/timer/Cargo.toml index d1d37e181a..4ac22ad2e1 100644 --- a/beacon_node/timer/Cargo.toml +++ b/beacon_node/timer/Cargo.toml @@ -12,3 +12,4 @@ tokio = { version = "0.2.21", features = ["full"] } slog = "2.5.2" parking_lot = "0.10.2" futures = "0.3.5" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index fa79736761..513d70b6cf 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -13,7 +13,7 @@ use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain pub fn spawn( - handle: &Handle, + handle: environment::TaskExecutor, beacon_chain: Arc>, milliseconds_per_slot: u64, log: slog::Logger, @@ -34,14 +34,7 @@ pub fn spawn( } }; - let log_1 = log.clone(); - let exit_future = async move { - let _ = exit.await.ok(); - info!(log_1, "Timer service shutdown"); - }; - - let future = futures::future::select(Box::pin(timer_future), Box::pin(exit_future)); - handle.spawn(future); + handle.spawn(timer_future, "timer_service"); info!(log, "Timer service started"); Ok(exit_signal) diff --git a/beacon_node/websocket_server/Cargo.toml b/beacon_node/websocket_server/Cargo.toml index 808a2f7421..8e22fa7d44 100644 --- a/beacon_node/websocket_server/Cargo.toml +++ b/beacon_node/websocket_server/Cargo.toml @@ -15,3 +15,4 @@ slog = "2.5.2" tokio = { version = "0.2.21", features = ["full"] } types = { path = "../../consensus/types" } ws = "0.9.1" +environment = { path = "../../lighthouse/environment" } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index f3f378bc82..325c5d4c28 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -34,17 +34,10 @@ impl WebSocketSender { } pub fn start_server( - handle: &tokio::runtime::Handle, + handle: environment::TaskExecutor, config: &Config, log: &Logger, -) -> Result< - ( - WebSocketSender, - tokio::sync::oneshot::Sender<()>, - SocketAddr, - ), - String, -> { +) -> Result<(WebSocketSender, SocketAddr), String> { let server_string = format!("{}:{}", config.listen_address, config.port); // Create a server that simply ignores any incoming messages. @@ -68,31 +61,26 @@ pub fn start_server( let broadcaster = server.broadcaster(); // Produce a signal/channel that can gracefully shutdown the websocket server. - let exit_channel = { - let (exit_channel, exit) = tokio::sync::oneshot::channel(); - - let log_inner = log.clone(); - let broadcaster_inner = server.broadcaster(); - let exit_future = async move { - let _ = exit.await; - if let Err(e) = broadcaster_inner.shutdown() { - warn!( - log_inner, - "Websocket server errored on shutdown"; - "error" => format!("{:?}", e) - ); - } else { - info!(log_inner, "Websocket server shutdown"); - } - }; - - // Place a future on the handle that will shutdown the websocket server when the - // application exits. - handle.spawn(exit_future); - - exit_channel + let exit = handle.exit(); + let log_inner = log.clone(); + let broadcaster_inner = server.broadcaster(); + let exit_future = async move { + let _ = exit.await; + if let Err(e) = broadcaster_inner.shutdown() { + warn!( + log_inner, + "Websocket server errored on shutdown"; + "error" => format!("{:?}", e) + ); + } else { + info!(log_inner, "Websocket server shutdown"); + } }; + // Place a future on the handle that will shutdown the websocket server when the + // application exits. + handle.runtime_handle().spawn(exit_future); + let log_inner = log.clone(); let _ = std::thread::spawn(move || match server.run() { @@ -123,7 +111,6 @@ pub fn start_server( sender: Some(broadcaster), _phantom: PhantomData, }, - exit_channel, actual_listen_addr, )) } diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index b037f6d786..d2a7c94a7f 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -216,11 +216,15 @@ fn run( )) .map_err(|e| format!("Failed to init validator client: {}", e))?; - environment.core_context().runtime_handle.enter(|| { - validator - .start_service() - .map_err(|e| format!("Failed to start validator client service: {}", e)) - })?; + environment + .core_context() + .runtime_handle + .runtime_handle() + .enter(|| { + validator + .start_service() + .map_err(|e| format!("Failed to start validator client service: {}", e)) + })?; Some(validator) } else { @@ -234,9 +238,9 @@ fn run( // Block this thread until Crtl+C is pressed. environment.block_until_ctrl_c()?; - info!(log, "Shutting down.."); + environment.fire_signal(); drop(beacon_node); drop(validator_client);