Replace tokio::spawn with handle.spawn

This commit is contained in:
pawan
2020-05-19 13:48:11 +05:30
parent ea56dcb179
commit 2f4acb9112
9 changed files with 51 additions and 51 deletions

View File

@@ -285,10 +285,8 @@ impl<T: EthSpec, S: Store<T>> CachingEth1Backend<T, S> {
} }
/// Starts the routine which connects to the external eth1 node and updates the caches. /// Starts the routine which connects to the external eth1 node and updates the caches.
pub fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { pub fn start(&self, handle: &tokio::runtime::Handle, exit: tokio::sync::oneshot::Receiver<()>) {
// don't need to spawn as a task is being spawned in auto_update HttpService::auto_update(self.core.clone(), handle, exit);
// TODO: check if this is correct
HttpService::auto_update(self.core.clone(), exit);
} }
/// Instantiates `self` from an existing service. /// Instantiates `self` from an existing service.

View File

@@ -251,9 +251,7 @@ where
.ok_or_else(|| "node timer requires a chain spec".to_string())? .ok_or_else(|| "node timer requires a chain spec".to_string())?
.milliseconds_per_slot; .milliseconds_per_slot;
let timer_exit = context let timer_exit = timer::spawn(&context.runtime_handle, beacon_chain, milliseconds_per_slot)
.runtime_handle
.enter(|| timer::spawn(beacon_chain, milliseconds_per_slot))
.map_err(|e| format!("Unable to start node timer: {}", e))?; .map_err(|e| format!("Unable to start node timer: {}", e))?;
self.exit_channels.push(timer_exit); self.exit_channels.push(timer_exit);
@@ -291,22 +289,21 @@ where
}; };
let log = context.log.clone(); let log = context.log.clone();
let (exit_channel, listening_addr) = context.runtime_handle.enter(|| { let (exit_channel, listening_addr) = rest_api::start_server(
rest_api::start_server( &context.runtime_handle,
&client_config.rest_api, &client_config.rest_api,
beacon_chain, beacon_chain,
network_info, network_info,
client_config client_config
.create_db_path() .create_db_path()
.map_err(|_| "unable to read data dir")?, .map_err(|_| "unable to read data dir")?,
client_config client_config
.create_freezer_db_path() .create_freezer_db_path()
.map_err(|_| "unable to read freezer DB dir")?, .map_err(|_| "unable to read freezer DB dir")?,
eth2_config.clone(), eth2_config.clone(),
log, log,
) )
.map_err(|e| format!("Failed to start HTTP API: {:?}", e)) .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?;
})?;
self.exit_channels.push(exit_channel); self.exit_channels.push(exit_channel);
self.http_listen_addr = Some(listening_addr); self.http_listen_addr = Some(listening_addr);
@@ -335,17 +332,14 @@ where
.ok_or_else(|| "slot_notifier requires a chain spec".to_string())? .ok_or_else(|| "slot_notifier requires a chain spec".to_string())?
.milliseconds_per_slot; .milliseconds_per_slot;
let exit_channel = context let exit_channel = spawn_notifier(
.runtime_handle &context.runtime_handle,
.enter(|| { beacon_chain,
spawn_notifier( network_globals,
beacon_chain, milliseconds_per_slot,
network_globals, context.log.clone(),
milliseconds_per_slot, )
context.log.clone(), .map_err(|e| format!("Unable to start slot notifier: {}", e))?;
)
})
.map_err(|e| format!("Unable to start slot notifier: {}", e))?;
self.exit_channels.push(exit_channel); self.exit_channels.push(exit_channel);
@@ -441,9 +435,8 @@ where
Option<_>, Option<_>,
Option<_>, Option<_>,
) = if config.enabled { ) = if config.enabled {
let (sender, exit, listening_addr) = context let (sender, exit, listening_addr) =
.runtime_handle websocket_server::start_server(&context.runtime_handle, &config, &context.log)?;
.enter(|| websocket_server::start_server(&config, &context.log))?;
(sender, Some(exit), Some(listening_addr)) (sender, Some(exit), Some(listening_addr))
} else { } else {
(WebSocketSender::dummy(), None, None) (WebSocketSender::dummy(), None, None)
@@ -662,7 +655,7 @@ where
}; };
// Starts the service that connects to an eth1 node and periodically updates caches. // Starts the service that connects to an eth1 node and periodically updates caches.
context.runtime_handle.enter(|| backend.start(exit)); backend.start(&context.runtime_handle, exit);
self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend)));

View File

@@ -25,6 +25,7 @@ const SPEEDO_OBSERVATIONS: usize = 4;
/// Spawns a notifier service which periodically logs information about the node. /// Spawns a notifier service which periodically logs information about the node.
pub fn spawn_notifier<T: BeaconChainTypes>( pub fn spawn_notifier<T: BeaconChainTypes>(
handle: &tokio::runtime::Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network: Arc<NetworkGlobals<T::EthSpec>>, network: Arc<NetworkGlobals<T::EthSpec>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
@@ -149,7 +150,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
let (exit_signal, exit) = tokio::sync::oneshot::channel(); let (exit_signal, exit) = tokio::sync::oneshot::channel();
// run the notifier on the current executor // run the notifier on the current executor
tokio::spawn(futures::future::select(Box::pin(interval_future), exit)); handle.spawn(futures::future::select(Box::pin(interval_future), exit));
Ok(exit_signal) Ok(exit_signal)
} }

View File

@@ -283,7 +283,11 @@ impl Service {
/// - Err(_) if there is an error. /// - Err(_) if there is an error.
/// ///
/// Emits logs for debugging and errors. /// Emits logs for debugging and errors.
pub fn auto_update(service: Self, exit: tokio::sync::oneshot::Receiver<()>) { pub fn auto_update(
service: Self,
handle: &tokio::runtime::Handle,
exit: tokio::sync::oneshot::Receiver<()>,
) {
let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); let update_interval = Duration::from_millis(service.config().auto_update_interval_millis);
let mut interval = interval_at(Instant::now(), update_interval); let mut interval = interval_at(Instant::now(), update_interval);
@@ -298,7 +302,7 @@ impl Service {
let future = futures::future::select(Box::pin(update_future), exit); let future = futures::future::select(Box::pin(update_future), exit);
tokio::task::spawn(future); handle.spawn(future);
} }
async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> {

View File

@@ -84,6 +84,7 @@ pub struct Service<TSpec: EthSpec> {
impl<TSpec: EthSpec> Service<TSpec> { impl<TSpec: EthSpec> Service<TSpec> {
pub fn new( pub fn new(
handle: &tokio::runtime::Handle,
config: &NetworkConfig, config: &NetworkConfig,
enr_fork_id: EnrForkId, enr_fork_id: EnrForkId,
log: &slog::Logger, log: &slog::Logger,
@@ -130,7 +131,7 @@ impl<TSpec: EthSpec> Service<TSpec> {
} }
SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.peer_connection_limit(MAX_CONNECTIONS_PER_PEER) .peer_connection_limit(MAX_CONNECTIONS_PER_PEER)
.executor(Box::new(Executor(tokio::runtime::Handle::current()))) .executor(Box::new(Executor(handle.clone())))
.build() .build()
}; };

View File

@@ -75,7 +75,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// launch libp2p service // launch libp2p service
let (network_globals, mut libp2p) = let (network_globals, mut libp2p) =
runtime_handle.enter(|| LibP2PService::new(config, enr_fork_id, &network_log))?; LibP2PService::new(runtime_handle, config, enr_fork_id, &network_log)?;
for enr in load_dht::<T::Store, T::EthSpec>(store.clone()) { for enr in load_dht::<T::Store, T::EthSpec>(store.clone()) {
libp2p.swarm.add_enr(enr); libp2p.swarm.add_enr(enr);
@@ -111,19 +111,20 @@ impl<T: BeaconChainTypes> NetworkService<T> {
propagation_percentage, propagation_percentage,
}; };
let network_exit = runtime_handle.enter(|| spawn_service(network_service))?; let network_exit = spawn_service(runtime_handle, network_service)?;
Ok((network_globals, network_send, network_exit)) Ok((network_globals, network_send, network_exit))
} }
} }
fn spawn_service<T: BeaconChainTypes>( fn spawn_service<T: BeaconChainTypes>(
handle: &tokio::runtime::Handle,
mut service: NetworkService<T>, mut service: NetworkService<T>,
) -> error::Result<tokio::sync::oneshot::Sender<()>> { ) -> error::Result<tokio::sync::oneshot::Sender<()>> {
let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel(); let (network_exit, mut exit_rx) = tokio::sync::oneshot::channel();
// spawn on the current executor // spawn on the current executor
tokio::spawn(async move { handle.spawn(async move {
loop { loop {
// build the futures to check simultaneously // build the futures to check simultaneously
tokio::select! { tokio::select! {

View File

@@ -35,6 +35,7 @@ use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use url_query::UrlQuery; use url_query::UrlQuery;
@@ -51,6 +52,7 @@ pub struct NetworkInfo<T: BeaconChainTypes> {
// Allowing more than 7 arguments. // Allowing more than 7 arguments.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn start_server<T: BeaconChainTypes>( pub fn start_server<T: BeaconChainTypes>(
handle: &Handle,
config: &Config, config: &Config,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_info: NetworkInfo<T>, network_info: NetworkInfo<T>,
@@ -125,7 +127,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(), "port" => actual_listen_addr.port(),
); );
tokio::spawn(server_future); handle.spawn(server_future);
Ok((exit_signal, actual_listen_addr)) Ok((exit_signal, actual_listen_addr))
} }

View File

@@ -8,13 +8,12 @@ use futures::stream::StreamExt;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time::{interval_at, Instant}; use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
/// TODO: We might not need a `Handle` to the runtime since this function should be
/// called from the context of a runtime and we can simply spawn using task::spawn.
/// Check for issues without the Handle.
pub fn spawn<T: BeaconChainTypes>( pub fn spawn<T: BeaconChainTypes>(
handle: &Handle,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> { ) -> Result<tokio::sync::oneshot::Sender<()>, &'static str> {
@@ -34,7 +33,7 @@ pub fn spawn<T: BeaconChainTypes>(
}); });
let future = futures::future::select(timer_future, exit); let future = futures::future::select(timer_future, exit);
tokio::spawn(future); handle.spawn(future);
Ok(exit_signal) Ok(exit_signal)
} }

View File

@@ -34,6 +34,7 @@ impl<T: EthSpec> WebSocketSender<T> {
} }
pub fn start_server<T: EthSpec>( pub fn start_server<T: EthSpec>(
handle: &tokio::runtime::Handle,
config: &Config, config: &Config,
log: &Logger, log: &Logger,
) -> Result< ) -> Result<
@@ -87,7 +88,7 @@ pub fn start_server<T: EthSpec>(
// Place a future on the handle that will shutdown the websocket server when the // Place a future on the handle that will shutdown the websocket server when the
// application exits. // application exits.
tokio::spawn(exit_future); handle.spawn(exit_future);
exit_channel exit_channel
}; };