From 6d3503c05f81b675f5dd6baa745a10e6a45a6de1 Mon Sep 17 00:00:00 2001 From: pawan Date: Wed, 20 May 2020 12:15:55 +0530 Subject: [PATCH] Validator client tasks use task executor --- validator_client/src/attestation_service.rs | 31 +++++++++------------ validator_client/src/block_service.rs | 20 ++++++------- validator_client/src/duties_service.rs | 20 ++++--------- validator_client/src/fork_service.rs | 20 ++++--------- validator_client/src/lib.rs | 22 ++++----------- validator_client/src/notifier.rs | 16 +++-------- 6 files changed, 42 insertions(+), 87 deletions(-) diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 76ec305b78..8e7140f67e 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,8 +3,7 @@ use crate::{ validator_store::ValidatorStore, }; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::{StreamExt, TryFutureExt}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, debug, info, trace}; use slot_clock::SlotClock; @@ -118,7 +117,7 @@ impl Deref for AttestationService { impl AttestationService { /// Starts the service which periodically produces attestations. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log.clone(); let slot_duration = Duration::from_millis(spec.milliseconds_per_slot); @@ -141,8 +140,6 @@ impl AttestationService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - let runtime_handle = self.context.runtime_handle.clone(); let interval_fut = async move { @@ -164,13 +161,8 @@ impl AttestationService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + runtime_handle.spawn(interval_fut, "attestation_service"); + Ok(()) } /// For each each required attestation, spawn a new task that downloads, signs and uploads the @@ -215,12 +207,15 @@ impl AttestationService { .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. self.inner.context.runtime_handle.spawn( - self.clone().publish_attestations_and_aggregates( - slot, - committee_index, - validator_duties, - aggregate_production_instant, - ), + self.clone() + .publish_attestations_and_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + ) + .unwrap_or_else(|_| ()), + "duties_by_committee_index", ); }); diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index b0bc1860a3..0ca29b8a34 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,7 +1,6 @@ use crate::{duties_service::DutiesService, validator_store::ValidatorStore}; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::{StreamExt, TryFutureExt}; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, error, info, trace}; use slot_clock::SlotClock; @@ -113,7 +112,7 @@ impl Deref for BlockService { impl BlockService { /// Starts the service that periodically attempts to produce blocks. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let log = self.context.log.clone(); let duration_to_next_slot = self @@ -144,15 +143,9 @@ impl BlockService { } }; - let (exit_signal, exit_fut) = exit_future::signal(); + runtime_handle.spawn(interval_fut, "block_service"); - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + Ok(()) } /// Attempt to produce a block for any block producers in the `ValidatorStore`. @@ -190,6 +183,7 @@ impl BlockService { iter.for_each(|validator_pubkey| { let service = self.clone(); let log = log.clone(); + // TODO: run this task with a `spawn_without_name` self.inner.context.runtime_handle.spawn( service .publish_block(slot, validator_pubkey) @@ -199,7 +193,9 @@ impl BlockService { "Error whilst producing block"; "message" => e ) - }), + }) + .unwrap_or_else(|_| ()), + "publish_block", ); }); diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index a1e8c93c0a..c764f4c9f3 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1,11 +1,10 @@ use crate::{is_synced::is_synced, validator_store::ValidatorStore}; use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parking_lot::RwLock; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use rest_types::{ValidatorDuty, ValidatorDutyBytes, ValidatorSubscription}; -use slog::{debug, error, info, trace, warn}; +use slog::{debug, error, trace, warn}; use slot_clock::SlotClock; use std::collections::HashMap; use std::convert::TryInto; @@ -439,9 +438,7 @@ impl DutiesService { } /// Start the service that periodically polls the beacon node for validator duties. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); - + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let duration_to_next_slot = self .slot_clock .duration_to_next_slot() @@ -456,12 +453,11 @@ impl DutiesService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - // Run an immediate update before starting the updater service. self.inner .context .runtime_handle + .runtime_handle() .spawn(self.clone().do_update()); let runtime_handle = self.inner.context.runtime_handle.clone(); @@ -472,13 +468,9 @@ impl DutiesService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); + runtime_handle.spawn(interval_fut, "duties_service"); - Ok(exit_signal) + Ok(()) } /// Attempt to download the duties of all managed validators for this epoch and the next. diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index ae979d4fd2..1a59d3941d 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -1,9 +1,8 @@ use environment::RuntimeContext; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use parking_lot::RwLock; use remote_beacon_node::RemoteBeaconNode; -use slog::{debug, info, trace}; +use slog::{debug, trace}; use slot_clock::SlotClock; use std::ops::Deref; use std::sync::Arc; @@ -100,9 +99,7 @@ impl ForkService { } /// Starts the service that periodically polls for the `Fork`. - pub fn start_update_service(self, spec: &ChainSpec) -> Result { - let log = self.context.log.clone(); - + pub fn start_update_service(self, spec: &ChainSpec) -> Result<(), String> { let duration_to_next_epoch = self .slot_clock .duration_to_next_epoch(E::slots_per_epoch()) @@ -117,12 +114,11 @@ impl ForkService { ) }; - let (exit_signal, exit_fut) = exit_future::signal(); - // Run an immediate update before starting the updater service. self.inner .context .runtime_handle + .runtime_handle() .spawn(self.clone().do_update()); let runtime_handle = self.inner.context.runtime_handle.clone(); @@ -133,13 +129,9 @@ impl ForkService { } }; - let future = futures::future::select( - Box::pin(interval_fut), - exit_fut.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); + runtime_handle.spawn(interval_fut, "fork_service"); - Ok(exit_signal) + Ok(()) } /// Attempts to download the `Fork` from the server. diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index d396d40f9b..dc246fa664 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -16,7 +16,6 @@ use block_service::{BlockService, BlockServiceBuilder}; use clap::ArgMatches; use duties_service::{DutiesService, DutiesServiceBuilder}; use environment::RuntimeContext; -use exit_future::Signal; use fork_service::{ForkService, ForkServiceBuilder}; use notifier::spawn_notifier; use remote_beacon_node::RemoteBeaconNode; @@ -40,7 +39,6 @@ pub struct ProductionValidatorClient { fork_service: ForkService, block_service: BlockService, attestation_service: AttestationService, - exit_signals: Vec, config: Config, } @@ -208,46 +206,36 @@ impl ProductionValidatorClient { fork_service, block_service, attestation_service, - exit_signals: vec![], config, }) } pub fn start_service(&mut self) -> Result<(), String> { - let duties_exit = self + let _ = self .duties_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start duties service: {}", e))?; - let fork_exit = self + let _ = self .fork_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start fork service: {}", e))?; - let block_exit = self + let _ = self .block_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start block service: {}", e))?; - let attestation_exit = self + let _ = self .attestation_service .clone() .start_update_service(&self.context.eth2_config.spec) .map_err(|e| format!("Unable to start attestation service: {}", e))?; - let notifier_exit = - spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; - - self.exit_signals = vec![ - duties_exit, - fork_exit, - block_exit, - attestation_exit, - notifier_exit, - ]; + let _ = spawn_notifier(self).map_err(|e| format!("Failed to start notifier: {}", e))?; Ok(()) } diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 9d9aa97318..679d5fe565 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,16 +1,14 @@ use crate::{is_synced::is_synced, ProductionValidatorClient}; -use exit_future::Signal; -use futures::{FutureExt, StreamExt}; +use futures::StreamExt; use slog::{error, info}; use slot_clock::SlotClock; use tokio::time::{interval_at, Duration, Instant}; use types::EthSpec; /// Spawns a notifier service which periodically logs information about the node. -pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result { +pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result<(), String> { let context = client.context.service_context("notifier".into()); let runtime_handle = context.runtime_handle.clone(); - let log = context.log.clone(); let duties_service = client.duties_service.clone(); let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node; @@ -83,12 +81,6 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu } }; - let (exit_signal, exit) = exit_future::signal(); - let future = futures::future::select( - Box::pin(interval_fut), - exit.map(move |_| info!(log, "Shutdown complete")), - ); - runtime_handle.spawn(future); - - Ok(exit_signal) + runtime_handle.spawn(interval_fut, "validator_notifier"); + Ok(()) }