This commit is contained in:
pawan
2020-05-22 15:49:30 +05:30
parent 85d099f664
commit 1be19db3bf
2 changed files with 18 additions and 14 deletions

View File

@@ -87,10 +87,10 @@ impl<T: BeaconChainTypes> Router<T> {
// spawn handler task and move the message handler instance into the spawned thread
executor.spawn(
async move {
debug!(log, "Network message router started");
handler_recv
.for_each(move |msg| future::ready(handler.handle_message(msg)))
.await;
debug!(log, "Network message router terminated.");
},
"router",
);

View File

@@ -225,8 +225,8 @@ impl<E: EthSpec> RuntimeContext<E> {
}
/// A wrapper over a runtime handle which can spawn async and blocking tasks.
/// Spawned futures are wrapped in an exit_future which shuts down the task
/// when the corresponding exit future Signal fires or is dropped.
/// Spawned futures are wrapped in an exit_future which cancels the task
/// when the corresponding exit_future `Signal` is fired/dropped.
#[derive(Clone)]
pub struct TaskExecutor {
/// The handle to the runtime on which tasks are spawned
@@ -237,19 +237,22 @@ pub struct TaskExecutor {
}
impl TaskExecutor {
/// TODO: make docs better
/// Spawn a future on the async runtime wrapped in an exit future
/// Spawn a future on the tokio runtime wrapped in an exit_future `Exit`. The task is canceled
/// when the corresponding exit_future `Signal` is fired/dropped.
///
/// This function also generates some metrics on number of tasks and task duration.
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, nam: &'static str) {
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, name: &'static str) {
let exit = self.exit.clone();
let log = self.log.clone();
// Start the timer for how long this task runs
if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[nam]) {
if let Some(metric) = metrics::get_histogram(&metrics::ASYNC_TASKS_HISTOGRAM, &[name]) {
let timer = metric.start_timer();
let future = async move {
let _ = future::select(Box::pin(task), exit).await;
info!(log, "Async task shutdown"; "name" => nam);
// Task is shutdown before it completes if `exit` receives
if let future::Either::Right(_) = future::select(Box::pin(task), exit).await {
info!(log, "Async task shutdown"; "task" => name);
}
timer.observe_duration();
};
@@ -273,16 +276,17 @@ impl TaskExecutor {
let join_handle = self.handle.spawn_blocking(task);
let future = async move {
// TODO: construct a wrapped prometheus future
let _ = future::select(Box::pin(join_handle), exit).await;
info!(log, "Blocking task shutdown"; "name" => name);
// Task is shutdown before it completes if `exit` receives
if let future::Either::Right(_) = future::select(Box::pin(join_handle), exit).await
{
info!(log, "Blocking task shutdown"; "task" => name);
}
timer.observe_duration();
};
metrics::inc_counter(&metrics::BLOCKING_TASKS_COUNT);
self.handle.spawn(future);
}
// TODO: handle else case
}
/// Returns the underlying runtime handle.
@@ -290,7 +294,7 @@ impl TaskExecutor {
self.handle.clone()
}
/// Returns the underlying exit future.
/// Returns a copy of the `exit_future::Exit`.
pub fn exit(&self) -> exit_future::Exit {
self.exit.clone()
}