This commit is contained in:
pawan
2020-05-22 13:30:40 +05:30
parent f5d49287b9
commit 85d099f664
10 changed files with 30 additions and 45 deletions

View File

@@ -232,7 +232,7 @@ where
} }
/// Immediately starts the timer service. /// Immediately starts the timer service.
fn timer(mut self) -> Result<Self, String> { fn timer(self) -> Result<Self, String> {
let context = self let context = self
.runtime_context .runtime_context
.as_ref() .as_ref()
@@ -311,7 +311,7 @@ where
} }
/// Immediately starts the service that periodically logs information each slot. /// Immediately starts the service that periodically logs information each slot.
pub fn notifier(mut self) -> Result<Self, String> { pub fn notifier(self) -> Result<Self, String> {
let context = self let context = self
.runtime_context .runtime_context
.as_ref() .as_ref()

View File

@@ -31,7 +31,6 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
milliseconds_per_slot: u64, milliseconds_per_slot: u64,
log: slog::Logger, log: slog::Logger,
) -> Result<(), String> { ) -> Result<(), String> {
let log_1 = log.clone();
let slot_duration = Duration::from_millis(milliseconds_per_slot); let slot_duration = Duration::from_millis(milliseconds_per_slot);
let duration_to_next_slot = beacon_chain let duration_to_next_slot = beacon_chain
.slot_clock .slot_clock
@@ -149,7 +148,7 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
}; };
// run the notifier on the current executor // run the notifier on the current executor
executor.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); executor.spawn(interval_future.unwrap_or_else(|_| ()), "notifier");
Ok(()) Ok(())
} }

View File

@@ -296,7 +296,7 @@ impl Service {
} }
}; };
handle.spawn(update_future, "eth1_service"); handle.spawn(update_future, "eth1");
} }
async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> {

View File

@@ -58,7 +58,7 @@ impl<T: BeaconChainTypes> Router<T> {
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
runtime_handle: environment::TaskExecutor, executor: environment::TaskExecutor,
log: slog::Logger, log: slog::Logger,
) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> { ) -> error::Result<mpsc::UnboundedSender<RouterMessage<T::EthSpec>>> {
let message_handler_log = log.new(o!("service"=> "router")); let message_handler_log = log.new(o!("service"=> "router"));
@@ -69,7 +69,7 @@ impl<T: BeaconChainTypes> Router<T> {
// Initialise a message instance, which itself spawns the syncing thread. // Initialise a message instance, which itself spawns the syncing thread.
let processor = Processor::new( let processor = Processor::new(
// TODO: spawn_blocking here // TODO: spawn_blocking here
&runtime_handle.runtime_handle(), executor.clone(),
beacon_chain, beacon_chain,
network_globals.clone(), network_globals.clone(),
network_send.clone(), network_send.clone(),
@@ -85,14 +85,14 @@ impl<T: BeaconChainTypes> Router<T> {
}; };
// spawn handler task and move the message handler instance into the spawned thread // spawn handler task and move the message handler instance into the spawned thread
runtime_handle.spawn( executor.spawn(
async move { async move {
handler_recv handler_recv
.for_each(move |msg| future::ready(handler.handle_message(msg))) .for_each(move |msg| future::ready(handler.handle_message(msg)))
.await; .await;
debug!(log, "Network message handler terminated."); debug!(log, "Network message router terminated.");
}, },
"router_service", "router",
); );
Ok(handler_send) Ok(handler_send)

View File

@@ -14,7 +14,7 @@ use slog::{debug, error, o, trace, warn};
use ssz::Encode; use ssz::Encode;
use std::sync::Arc; use std::sync::Arc;
use store::Store; use store::Store;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use types::{ use types::{
Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock, Attestation, ChainSpec, Epoch, EthSpec, Hash256, SignedAggregateAndProof, SignedBeaconBlock,
Slot, Slot,
@@ -33,8 +33,6 @@ pub struct Processor<T: BeaconChainTypes> {
chain: Arc<BeaconChain<T>>, chain: Arc<BeaconChain<T>>,
/// A channel to the syncing thread. /// A channel to the syncing thread.
sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>, sync_send: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
/// A oneshot channel for destroying the sync thread.
_sync_exit: oneshot::Sender<()>,
/// A network context to return and handle RPC requests. /// A network context to return and handle RPC requests.
network: HandlerNetworkContext<T::EthSpec>, network: HandlerNetworkContext<T::EthSpec>,
/// The `RPCHandler` logger. /// The `RPCHandler` logger.
@@ -44,7 +42,7 @@ pub struct Processor<T: BeaconChainTypes> {
impl<T: BeaconChainTypes> Processor<T> { impl<T: BeaconChainTypes> Processor<T> {
/// Instantiate a `Processor` instance /// Instantiate a `Processor` instance
pub fn new( pub fn new(
runtime_handle: &tokio::runtime::Handle, executor: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
@@ -53,8 +51,8 @@ impl<T: BeaconChainTypes> Processor<T> {
let sync_logger = log.new(o!("service"=> "sync")); let sync_logger = log.new(o!("service"=> "sync"));
// spawn the sync thread // spawn the sync thread
let (sync_send, _sync_exit) = crate::sync::manager::spawn( let sync_send = crate::sync::manager::spawn(
runtime_handle, executor,
beacon_chain.clone(), beacon_chain.clone(),
network_globals, network_globals,
network_send.clone(), network_send.clone(),
@@ -64,7 +62,6 @@ impl<T: BeaconChainTypes> Processor<T> {
Processor { Processor {
chain: beacon_chain, chain: beacon_chain,
sync_send, sync_send,
_sync_exit,
network: HandlerNetworkContext::new(network_send, log.clone()), network: HandlerNetworkContext::new(network_send, log.clone()),
log: log.clone(), log: log.clone(),
} }

View File

@@ -14,8 +14,7 @@ use rest_types::ValidatorSubscription;
use slog::{debug, error, info, o, trace}; use slog::{debug, error, info, o, trace};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Handle; use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Delay; use tokio::time::Delay;
use types::EthSpec; use types::EthSpec;
@@ -123,7 +122,7 @@ fn spawn_service<T: BeaconChainTypes>(
let mut exit_rx = executor.exit(); let mut exit_rx = executor.exit();
// spawn on the current executor // spawn on the current executor
executor.runtime_handle().spawn(async move { executor.spawn(async move {
loop { loop {
// build the futures to check simultaneously // build the futures to check simultaneously
tokio::select! { tokio::select! {
@@ -361,7 +360,7 @@ fn spawn_service<T: BeaconChainTypes>(
} }
} }
} }
}); }, "network");
Ok(()) Ok(())
} }

View File

@@ -48,7 +48,7 @@ use smallvec::SmallVec;
use std::boxed::Box; use std::boxed::Box;
use std::ops::Sub; use std::ops::Sub;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use types::{EthSpec, Hash256, SignedBeaconBlock, Slot}; use types::{EthSpec, Hash256, SignedBeaconBlock, Slot};
/// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync
@@ -181,17 +181,12 @@ impl SingleBlockRequest {
/// chain. This allows the chain to be /// chain. This allows the chain to be
/// dropped during the syncing process which will gracefully end the `SyncManager`. /// dropped during the syncing process which will gracefully end the `SyncManager`.
pub fn spawn<T: BeaconChainTypes>( pub fn spawn<T: BeaconChainTypes>(
runtime_handle: &tokio::runtime::Handle, executor: environment::TaskExecutor,
beacon_chain: Arc<BeaconChain<T>>, beacon_chain: Arc<BeaconChain<T>>,
network_globals: Arc<NetworkGlobals<T::EthSpec>>, network_globals: Arc<NetworkGlobals<T::EthSpec>>,
network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>, network_send: mpsc::UnboundedSender<NetworkMessage<T::EthSpec>>,
log: slog::Logger, log: slog::Logger,
) -> ( ) -> mpsc::UnboundedSender<SyncMessage<T::EthSpec>> {
mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
oneshot::Sender<()>,
) {
// generate the exit channel
let (sync_exit, exit_rx) = tokio::sync::oneshot::channel();
// generate the message channel // generate the message channel
let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>(); let (sync_send, sync_recv) = mpsc::unbounded_channel::<SyncMessage<T::EthSpec>>();
@@ -215,11 +210,8 @@ pub fn spawn<T: BeaconChainTypes>(
// spawn the sync manager thread // spawn the sync manager thread
debug!(log, "Sync Manager started"); debug!(log, "Sync Manager started");
runtime_handle.spawn(async move { executor.spawn(async move { Box::pin(sync_manager.main()).await }, "sync");
futures::future::select(Box::pin(sync_manager.main()), exit_rx).await; sync_send
info!(log.clone(), "Sync Manager shutdown");
});
(sync_send, sync_exit)
} }
impl<T: BeaconChainTypes> SyncManager<T> { impl<T: BeaconChainTypes> SyncManager<T> {

View File

@@ -35,8 +35,7 @@ use std::net::SocketAddr;
use std::ops::Deref; use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::runtime::Handle; use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use url_query::UrlQuery; use url_query::UrlQuery;
pub use crate::helpers::parse_pubkey_bytes; pub use crate::helpers::parse_pubkey_bytes;
@@ -128,7 +127,7 @@ pub fn start_server<T: BeaconChainTypes>(
"port" => actual_listen_addr.port(), "port" => actual_listen_addr.port(),
); );
executor.spawn(server_future, "http_service"); executor.spawn(server_future, "http");
Ok(actual_listen_addr) Ok(actual_listen_addr)
} }

View File

@@ -8,7 +8,6 @@ use slog::info;
use slot_clock::SlotClock; use slot_clock::SlotClock;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time::{interval_at, Instant}; use tokio::time::{interval_at, Instant};
/// Spawns a timer service which periodically executes tasks for the beacon chain /// Spawns a timer service which periodically executes tasks for the beacon chain
@@ -32,7 +31,7 @@ pub fn spawn<T: BeaconChainTypes>(
} }
}; };
executor.spawn(timer_future, "timer_service"); executor.spawn(timer_future, "timer");
info!(log, "Timer service started"); info!(log, "Timer service started");
Ok(()) Ok(())

View File

@@ -240,16 +240,16 @@ impl TaskExecutor {
/// TODO: make docs better /// TODO: make docs better
/// Spawn a future on the async runtime wrapped in an exit future /// Spawn a future on the async runtime wrapped in an exit future
/// This function also generates some metrics on number of tasks and task duration. /// This function also generates some metrics on number of tasks and task duration.
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, _name: &'static str) { pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, nam: &'static str) {
let exit = self.exit.clone(); let exit = self.exit.clone();
let log = self.log.clone(); let log = self.log.clone();
// Start the timer for how long this task runs // Start the timer for how long this task runs
if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[_name]) { if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[nam]) {
let timer = metric.start_timer(); let timer = metric.start_timer();
let future = async move { let future = async move {
let _ = future::select(Box::pin(task), exit).await; let _ = future::select(Box::pin(task), exit).await;
info!(log, "Service shutdown"; "name" => _name); info!(log, "Async task shutdown"; "name" => nam);
timer.observe_duration(); timer.observe_duration();
}; };
@@ -260,7 +260,7 @@ impl TaskExecutor {
/// TODO: make docs better /// TODO: make docs better
/// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future. /// Spawn a blocking task on a dedicated tokio thread pool wrapped in an exit future.
/// This function also generates some metrics on number of tasks and task duration. /// This function also generates some metrics on number of tasks and task duration.
pub fn spawn_blocking<F>(&self, task: F, _name: &'static str) pub fn spawn_blocking<F>(&self, task: F, name: &'static str)
where where
F: FnOnce() -> () + Send + 'static, F: FnOnce() -> () + Send + 'static,
{ {
@@ -268,14 +268,14 @@ impl TaskExecutor {
let log = self.log.clone(); let log = self.log.clone();
// Start the timer for how long this task runs // Start the timer for how long this task runs
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[_name]) { if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) {
let timer = metric.start_timer(); let timer = metric.start_timer();
let join_handle = self.handle.spawn_blocking(task); let join_handle = self.handle.spawn_blocking(task);
let future = async move { let future = async move {
// TODO: construct a wrapped prometheus future // TODO: construct a wrapped prometheus future
let _ = future::select(Box::pin(join_handle), exit).await; let _ = future::select(Box::pin(join_handle), exit).await;
info!(log, "Service shutdown"; "name" => _name); info!(log, "Blocking task shutdown"; "name" => name);
timer.observe_duration(); timer.observe_duration();
}; };