From a4dadadcc2323dd1b2bd4e738bb2a25f05ed5df4 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Fri, 22 Nov 2019 17:22:43 +1100 Subject: [PATCH] Ensure a prompt exit --- validator_client/src/attestation_service.rs | 62 ++++++++++----------- validator_client/src/block_service.rs | 35 ++++++------ validator_client/src/duties_service.rs | 35 ++++++------ validator_client/src/fork_service.rs | 35 ++++++------ 4 files changed, 81 insertions(+), 86 deletions(-) diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index a303e757aa..41a67d1db5 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -129,45 +129,43 @@ impl AttestationService { ) }; - info!( - log, - "Waiting for next slot"; - "seconds_to_wait" => duration_to_next_slot.as_secs() - ); - let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); let log_1 = log.clone(); let log_2 = log.clone(); + let log_3 = log.clone(); context.executor.spawn( - interval - .map_err(move |e| { - error! { - log_1, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| { - if let Err(e) = service.clone().spawn_attestation_tasks() { - error!( - log_2, - "Failed to spawn attestation tasks"; - "error" => e - ) - } else { - trace!( - log_2, - "Spawned attestation tasks"; - ) - } + exit_fut + .until( + interval + .map_err(move |e| { + error! { + log_1, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .for_each(move |_| { + if let Err(e) = service.clone().spawn_attestation_tasks() { + error!( + log_2, + "Failed to spawn attestation tasks"; + "error" => e + ) + } else { + trace!( + log_2, + "Spawned attestation tasks"; + ) + } - Ok(()) - }) - // Prevent any errors from escaping and stopping the interval. - .then(|_| Ok(())), + Ok(()) + }) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), + ) + .map(move |_| info!(log_3, "Shutdown complete")), ); Ok(exit_signal) diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 7c74017837..7a202969f8 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -120,28 +120,27 @@ impl BlockService { ) }; - info!( - log, - "Waiting for next slot"; - "seconds_to_wait" => duration_to_next_slot.as_secs() - ); - let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); + let log_1 = log.clone(); + let log_2 = log.clone(); self.context.executor.spawn( - interval - .map_err(move |e| { - error! { - log, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()) - // Prevent any errors from escaping and stopping the interval. - .then(|_| Ok(())), + exit_fut + .until( + interval + .map_err(move |e| { + error! { + log_1, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), + ) + .map(move |_| info!(log_2, "Shutdown complete")), ); Ok(exit_signal) diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 22acf32b94..3f9912cfd1 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -220,31 +220,30 @@ impl DutiesService { ) }; - info!( - log, - "Waiting for next slot"; - "seconds_to_wait" => duration_to_next_slot.as_secs() - ); - let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); + let log_1 = log.clone(); + let log_2 = log.clone(); // Run an immediate update before starting the updater service. self.context.executor.spawn(service.clone().do_update()); self.context.executor.spawn( - interval - .map_err(move |e| { - error! { - log, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()) - // Prevent any errors from escaping and stopping the interval. - .then(|_| Ok(())), + exit_fut + .until( + interval + .map_err(move |e| { + error! { + log_1, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), + ) + .map(move |_| info!(log_2, "Shutdown complete")), ); Ok(exit_signal) diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 42a3a31784..04b922b687 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -100,14 +100,10 @@ impl ForkService { ) }; - info!( - log, - "Waiting for next slot"; - "seconds_to_wait" => duration_to_next_epoch.as_secs() - ); - let (exit_signal, exit_fut) = exit_future::signal(); let service = self.clone(); + let log_1 = log.clone(); + let log_2 = log.clone(); // Run an immediate update before starting the updater service. self.inner @@ -116,18 +112,21 @@ impl ForkService { .spawn(service.clone().do_update()); self.inner.context.executor.spawn( - interval - .map_err(move |e| { - error! { - log, - "Timer thread failed"; - "error" => format!("{}", e) - } - }) - .and_then(move |_| if exit_fut.is_live() { Ok(()) } else { Err(()) }) - .for_each(move |_| service.clone().do_update()) - // Prevent any errors from escaping and stopping the interval. - .then(|_| Ok(())), + exit_fut + .until( + interval + .map_err(move |e| { + error! { + log_1, + "Timer thread failed"; + "error" => format!("{}", e) + } + }) + .for_each(move |_| service.clone().do_update()) + // Prevent any errors from escaping and stopping the interval. + .then(|_| Ok(())), + ) + .map(move |_| info!(log_2, "Shutdown complete")), ); Ok(exit_signal)