From 85d099f66451e7c4ed4c6c9ca8bc2ef44d1e6952 Mon Sep 17 00:00:00 2001 From: pawan Date: Fri, 22 May 2020 13:30:40 +0530 Subject: [PATCH] Cleanup --- beacon_node/client/src/builder.rs | 4 ++-- beacon_node/client/src/notifier.rs | 3 +-- beacon_node/eth1/src/service.rs | 2 +- beacon_node/network/src/router/mod.rs | 10 +++++----- beacon_node/network/src/router/processor.rs | 11 ++++------- beacon_node/network/src/service.rs | 7 +++---- beacon_node/network/src/sync/manager.rs | 18 +++++------------- beacon_node/rest_api/src/lib.rs | 5 ++--- beacon_node/timer/src/lib.rs | 3 +-- lighthouse/environment/src/lib.rs | 12 ++++++------ 10 files changed, 30 insertions(+), 45 deletions(-) diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index f5c708ee55..3a753bb4ac 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -232,7 +232,7 @@ where } /// Immediately starts the timer service. - fn timer(mut self) -> Result { + fn timer(self) -> Result { let context = self .runtime_context .as_ref() @@ -311,7 +311,7 @@ where } /// Immediately starts the service that periodically logs information each slot. - pub fn notifier(mut self) -> Result { + pub fn notifier(self) -> Result { let context = self .runtime_context .as_ref() diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 3b249f4e6f..d8e7ad75d4 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -31,7 +31,6 @@ pub fn spawn_notifier( milliseconds_per_slot: u64, log: slog::Logger, ) -> Result<(), String> { - let log_1 = log.clone(); let slot_duration = Duration::from_millis(milliseconds_per_slot); let duration_to_next_slot = beacon_chain .slot_clock @@ -149,7 +148,7 @@ pub fn spawn_notifier( }; // run the notifier on the current executor - executor.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); + executor.spawn(interval_future.unwrap_or_else(|_| ()), "notifier"); Ok(()) } diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 46dd9619a6..f42c1427e4 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -296,7 +296,7 @@ impl Service { } }; - handle.spawn(update_future, "eth1_service"); + handle.spawn(update_future, "eth1"); } async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { diff --git a/beacon_node/network/src/router/mod.rs b/beacon_node/network/src/router/mod.rs index eb3f6d4bd1..01ccaecc28 100644 --- a/beacon_node/network/src/router/mod.rs +++ b/beacon_node/network/src/router/mod.rs @@ -58,7 +58,7 @@ impl Router { beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, - runtime_handle: environment::TaskExecutor, + executor: environment::TaskExecutor, log: slog::Logger, ) -> error::Result>> { let message_handler_log = log.new(o!("service"=> "router")); @@ -69,7 +69,7 @@ impl Router { // Initialise a message instance, which itself spawns the syncing thread. let processor = Processor::new( // TODO: spawn_blocking here - &runtime_handle.runtime_handle(), + executor.clone(), beacon_chain, network_globals.clone(), network_send.clone(), @@ -85,14 +85,14 @@ impl Router { }; // spawn handler task and move the message handler instance into the spawned thread - runtime_handle.spawn( + executor.spawn( async move { handler_recv .for_each(move |msg| future::ready(handler.handle_message(msg))) .await; - debug!(log, "Network message handler terminated."); + debug!(log, "Network message router terminated."); }, - "router_service", + "router", ); Ok(handler_send) diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 76f0cdf449..d35b22d362 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -14,7 +14,7 @@ use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; use store::Store; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::{ Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, Slot, @@ -33,8 +33,6 @@ pub struct Processor { chain: Arc>, /// A channel to the syncing thread. sync_send: mpsc::UnboundedSender>, - /// A oneshot channel for destroying the sync thread. - _sync_exit: oneshot::Sender<()>, /// A network context to return and handle RPC requests. network: HandlerNetworkContext, /// The `RPCHandler` logger. @@ -44,7 +42,7 @@ pub struct Processor { impl Processor { /// Instantiate a `Processor` instance pub fn new( - runtime_handle: &tokio::runtime::Handle, + executor: environment::TaskExecutor, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, @@ -53,8 +51,8 @@ impl Processor { let sync_logger = log.new(o!("service"=> "sync")); // spawn the sync thread - let (sync_send, _sync_exit) = crate::sync::manager::spawn( - runtime_handle, + let sync_send = crate::sync::manager::spawn( + executor, beacon_chain.clone(), network_globals, network_send.clone(), @@ -64,7 +62,6 @@ impl Processor { Processor { chain: beacon_chain, sync_send, - _sync_exit, network: HandlerNetworkContext::new(network_send, log.clone()), log: log.clone(), } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 63b0db6390..423c4a25ce 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -14,8 +14,7 @@ use rest_types::ValidatorSubscription; use slog::{debug, error, info, o, trace}; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Handle; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio::time::Delay; use types::EthSpec; @@ -123,7 +122,7 @@ fn spawn_service( let mut exit_rx = executor.exit(); // spawn on the current executor - executor.runtime_handle().spawn(async move { + executor.spawn(async move { loop { // build the futures to check simultaneously tokio::select! { @@ -361,7 +360,7 @@ fn spawn_service( } } } - }); + }, "network"); Ok(()) } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index ce8e26b3ee..99abc9495d 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -48,7 +48,7 @@ use smallvec::SmallVec; use std::boxed::Box; use std::ops::Sub; use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -181,17 +181,12 @@ impl SingleBlockRequest { /// chain. This allows the chain to be /// dropped during the syncing process which will gracefully end the `SyncManager`. pub fn spawn( - runtime_handle: &tokio::runtime::Handle, + executor: environment::TaskExecutor, beacon_chain: Arc>, network_globals: Arc>, network_send: mpsc::UnboundedSender>, log: slog::Logger, -) -> ( - mpsc::UnboundedSender>, - oneshot::Sender<()>, -) { - // generate the exit channel - let (sync_exit, exit_rx) = tokio::sync::oneshot::channel(); +) -> mpsc::UnboundedSender> { // generate the message channel let (sync_send, sync_recv) = mpsc::unbounded_channel::>(); @@ -215,11 +210,8 @@ pub fn spawn( // spawn the sync manager thread debug!(log, "Sync Manager started"); - runtime_handle.spawn(async move { - futures::future::select(Box::pin(sync_manager.main()), exit_rx).await; - info!(log.clone(), "Sync Manager shutdown"); - }); - (sync_send, sync_exit) + executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync"); + sync_send } impl SyncManager { diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index ed88199e69..3906071c22 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -35,8 +35,7 @@ use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use tokio::runtime::Handle; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use url_query::UrlQuery; pub use crate::helpers::parse_pubkey_bytes; @@ -128,7 +127,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - executor.spawn(server_future, "http_service"); + executor.spawn(server_future, "http"); Ok(actual_listen_addr) } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index bad02f7dfb..235be0808b 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -8,7 +8,6 @@ use slog::info; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Handle; use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain @@ -32,7 +31,7 @@ pub fn spawn( } }; - executor.spawn(timer_future, "timer_service"); + executor.spawn(timer_future, "timer"); info!(log, "Timer service started"); Ok(()) diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 6a9c59ec34..a81b1cc6df 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -240,16 +240,16 @@ impl TaskExecutor { /// TODO: make docs better /// Spawn a future on the async runtime wrapped in an exit future /// This function also generates some metrics on number of tasks and task duration. - pub fn spawn(&self, task: impl Future + Send + 'static, _name: &'static str) { + pub fn spawn(&self, task: impl Future + Send + 'static, nam: &'static str) { let exit = self.exit.clone(); let log = self.log.clone(); // Start the timer for how long this task runs - if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[_name]) { + if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[nam]) { let timer = metric.start_timer(); let future = async move { let _ = future::select(Box::pin(task), exit).await; - info!(log, "Service shutdown"; "name" => _name); + info!(log, "Async task shutdown"; "name" => nam); timer.observe_duration(); }; @@ -260,7 +260,7 @@ impl TaskExecutor { /// TODO: make docs better /// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future. /// This function also generates some metrics on number of tasks and task duration. - pub fn spawn_blocking(&self, task: F, _name: &'static str) + pub fn spawn_blocking(&self, task: F, name: &'static str) where F: FnOnce() -> () + Send + 'static, { @@ -268,14 +268,14 @@ impl TaskExecutor { let log = self.log.clone(); // Start the timer for how long this task runs - if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[_name]) { + if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) { let timer = metric.start_timer(); let join_handle = self.handle.spawn_blocking(task); let future = async move { // TODO: construct a wrapped prometheus future let _ = future::select(Box::pin(join_handle), exit).await; - info!(log, "Service shutdown"; "name" => _name); + info!(log, "Blocking task shutdown"; "name" => name); timer.observe_duration(); };