From 2f4acb91125bd0f5b070d6b9122b23e1f52244f5 Mon Sep 17 00:00:00 2001 From: pawan Date: Tue, 19 May 2020 13:48:11 +0530 Subject: [PATCH] Replace tokio::spawn with handle.spawn --- beacon_node/beacon_chain/src/eth1_chain.rs | 6 +-- beacon_node/client/src/builder.rs | 61 ++++++++++------------ beacon_node/client/src/notifier.rs | 3 +- beacon_node/eth1/src/service.rs | 8 ++- beacon_node/eth2-libp2p/src/service.rs | 3 +- beacon_node/network/src/service.rs | 7 +-- beacon_node/rest_api/src/lib.rs | 4 +- beacon_node/timer/src/lib.rs | 7 ++- beacon_node/websocket_server/src/lib.rs | 3 +- 9 files changed, 51 insertions(+), 51 deletions(-) diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 47929fdeea..7f8d1a6715 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -285,10 +285,8 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - pub fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { - // don't need to spawn as a task is being spawned in auto_update - // TODO: check if this is correct - HttpService::auto_update(self.core.clone(), exit); + pub fn start(&self, handle: &tokio::runtime::Handle, exit: tokio::sync::oneshot::Receiver<()>) { + HttpService::auto_update(self.core.clone(), handle, exit); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 8e0699bfe5..a7d2e8a83c 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -251,9 +251,7 @@ where .ok_or_else(|| "node timer requires a chain spec".to_string())? .milliseconds_per_slot; - let timer_exit = context - .runtime_handle - .enter(|| timer::spawn(beacon_chain, milliseconds_per_slot)) + let timer_exit = timer::spawn(&context.runtime_handle, beacon_chain, milliseconds_per_slot) .map_err(|e| format!("Unable to start node timer: {}", e))?; self.exit_channels.push(timer_exit); @@ -291,22 +289,21 @@ where }; let log = context.log.clone(); - let (exit_channel, listening_addr) = context.runtime_handle.enter(|| { - rest_api::start_server( - &client_config.rest_api, - beacon_chain, - network_info, - client_config - .create_db_path() - .map_err(|_| "unable to read data dir")?, - client_config - .create_freezer_db_path() - .map_err(|_| "unable to read freezer DB dir")?, - eth2_config.clone(), - log, - ) - .map_err(|e| format!("Failed to start HTTP API: {:?}", e)) - })?; + let (exit_channel, listening_addr) = rest_api::start_server( + &context.runtime_handle, + &client_config.rest_api, + beacon_chain, + network_info, + client_config + .create_db_path() + .map_err(|_| "unable to read data dir")?, + client_config + .create_freezer_db_path() + .map_err(|_| "unable to read freezer DB dir")?, + eth2_config.clone(), + log, + ) + .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; self.exit_channels.push(exit_channel); self.http_listen_addr = Some(listening_addr); @@ -335,17 +332,14 @@ where .ok_or_else(|| "slot_notifier requires a chain spec".to_string())? .milliseconds_per_slot; - let exit_channel = context - .runtime_handle - .enter(|| { - spawn_notifier( - beacon_chain, - network_globals, - milliseconds_per_slot, - context.log.clone(), - ) - }) - .map_err(|e| format!("Unable to start slot notifier: {}", e))?; + let exit_channel = spawn_notifier( + &context.runtime_handle, + beacon_chain, + network_globals, + milliseconds_per_slot, + context.log.clone(), + ) + .map_err(|e| format!("Unable to start slot notifier: {}", e))?; self.exit_channels.push(exit_channel); @@ -441,9 +435,8 @@ where Option<_>, Option<_>, ) = if config.enabled { - let (sender, exit, listening_addr) = context - .runtime_handle - .enter(|| websocket_server::start_server(&config, &context.log))?; + let (sender, exit, listening_addr) = + websocket_server::start_server(&context.runtime_handle, &config, &context.log)?; (sender, Some(exit), Some(listening_addr)) } else { (WebSocketSender::dummy(), None, None) @@ -662,7 +655,7 @@ where }; // Starts the service that connects to an eth1 node and periodically updates caches. - context.runtime_handle.enter(|| backend.start(exit)); + backend.start(&context.runtime_handle, exit); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index aa13536537..b5fd3a7351 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -25,6 +25,7 @@ const SPEEDO_OBSERVATIONS: usize = 4; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier( + handle: &tokio::runtime::Handle, beacon_chain: Arc>, network: Arc>, milliseconds_per_slot: u64, @@ -149,7 +150,7 @@ pub fn spawn_notifier( let (exit_signal, exit) = tokio::sync::oneshot::channel(); // run the notifier on the current executor - tokio::spawn(futures::future::select(Box::pin(interval_future), exit)); + handle.spawn(futures::future::select(Box::pin(interval_future), exit)); Ok(exit_signal) } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index b3b1165bf4..ccb163e41f 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -283,7 +283,11 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub fn auto_update(service: Self, exit: tokio::sync::oneshot::Receiver<()>) { + pub fn auto_update( + service: Self, + handle: &tokio::runtime::Handle, + exit: tokio::sync::oneshot::Receiver<()>, + ) { let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); let mut interval = interval_at(Instant::now(), update_interval); @@ -298,7 +302,7 @@ impl Service { let future = futures::future::select(Box::pin(update_future), exit); - tokio::task::spawn(future); + handle.spawn(future); } async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { diff --git a/beacon_node/eth2-libp2p/src/service.rs b/beacon_node/eth2-libp2p/src/service.rs index 0d98d9df15..5bc3d02c53 100644 --- a/beacon_node/eth2-libp2p/src/service.rs +++ b/beacon_node/eth2-libp2p/src/service.rs @@ -84,6 +84,7 @@ pub struct Service { impl Service { pub fn new( + handle: &tokio::runtime::Handle, config: &NetworkConfig, enr_fork_id: EnrForkId, log: &slog::Logger, @@ -130,7 +131,7 @@ impl Service { } SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) .peer_connection_limit(MAX_CONNECTIONS_PER_PEER) - .executor(Box::new(Executor(tokio::runtime::Handle::current()))) + .executor(Box::new(Executor(handle.clone()))) .build() }; diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ff9063ea50..9afc0541eb 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -75,7 +75,7 @@ impl NetworkService { // launch libp2p service let (network_globals, mut libp2p) = - runtime_handle.enter(|| LibP2PService::new(config, enr_fork_id, &network_log))?; + LibP2PService::new(runtime_handle, config, enr_fork_id, &network_log)?; for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); @@ -111,19 +111,20 @@ impl NetworkService { propagation_percentage, }; - let network_exit = runtime_handle.enter(|| spawn_service(network_service))?; + let network_exit = spawn_service(runtime_handle, network_service)?; Ok((network_globals, network_send, network_exit)) } } fn spawn_service( + handle: &tokio::runtime::Handle, mut service: NetworkService, ) -> error::Result> { let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); // spawn on the current executor - tokio::spawn(async move { + handle.spawn(async move { loop { // build the futures to check simultaneously tokio::select! { diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index c05cb66ab4..669ea05e4d 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -35,6 +35,7 @@ use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; +use tokio::runtime::Handle; use tokio::sync::{mpsc, oneshot}; use url_query::UrlQuery; @@ -51,6 +52,7 @@ pub struct NetworkInfo { // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] pub fn start_server( + handle: &Handle, config: &Config, beacon_chain: Arc>, network_info: NetworkInfo, @@ -125,7 +127,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - tokio::spawn(server_future); + handle.spawn(server_future); Ok((exit_signal, actual_listen_addr)) } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index 26f8bb60ea..8cafe52900 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -8,13 +8,12 @@ use futures::stream::StreamExt; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; +use tokio::runtime::Handle; use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain -/// TODO: We might not need a `Handle` to the runtime since this function should be -/// called from the context of a runtime and we can simply spawn using task::spawn. -/// Check for issues without the Handle. pub fn spawn( + handle: &Handle, beacon_chain: Arc>, milliseconds_per_slot: u64, ) -> Result, &'static str> { @@ -34,7 +33,7 @@ pub fn spawn( }); let future = futures::future::select(timer_future, exit); - tokio::spawn(future); + handle.spawn(future); Ok(exit_signal) } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index 7ffff1b89b..f3f378bc82 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -34,6 +34,7 @@ impl WebSocketSender { } pub fn start_server( + handle: &tokio::runtime::Handle, config: &Config, log: &Logger, ) -> Result< @@ -87,7 +88,7 @@ pub fn start_server( // Place a future on the handle that will shutdown the websocket server when the // application exits. - tokio::spawn(exit_future); + handle.spawn(exit_future); exit_channel };