diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 81b667994f..f5c708ee55 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -222,7 +222,7 @@ where .clone(); let (network_globals, network_send) = - NetworkService::start(beacon_chain, config, context.runtime_handle, context.log) + NetworkService::start(beacon_chain, config, context.executor, context.log) .map_err(|e| format!("Failed to start network: {:?}", e))?; self.network_globals = Some(network_globals); @@ -249,7 +249,7 @@ where .milliseconds_per_slot; let _ = timer::spawn( - context.runtime_handle, + context.executor, beacon_chain, milliseconds_per_slot, context.log.clone(), @@ -290,7 +290,7 @@ where let log = context.log.clone(); let listening_addr = rest_api::start_server( - context.runtime_handle, + context.executor, &client_config.rest_api, beacon_chain, network_info, @@ -332,7 +332,7 @@ where .milliseconds_per_slot; let _ = spawn_notifier( - context.runtime_handle, + context.executor, beacon_chain, network_globals, milliseconds_per_slot, @@ -428,7 +428,7 @@ where let (sender, listening_addr): (WebSocketSender, Option<_>) = if config.enabled { let (sender, listening_addr) = - websocket_server::start_server(context.runtime_handle, &config, &context.log)?; + websocket_server::start_server(context.executor, &config, &context.log)?; (sender, Some(listening_addr)) } else { (WebSocketSender::dummy(), None) @@ -638,7 +638,7 @@ where self.eth1_service = None; // Starts the service that connects to an eth1 node and periodically updates caches. - backend.start(context.runtime_handle); + backend.start(context.executor); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index adab849d59..3b249f4e6f 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -25,7 +25,7 @@ const SPEEDO_OBSERVATIONS: usize = 4; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier( - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, beacon_chain: Arc>, network: Arc>, milliseconds_per_slot: u64, @@ -149,7 +149,7 @@ pub fn spawn_notifier( }; // run the notifier on the current executor - handle.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); + executor.spawn(interval_future.unwrap_or_else(|_| ()), "beacon_notifier"); Ok(()) } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 8a88c84d32..63b0db6390 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -53,7 +53,7 @@ impl NetworkService { pub fn start( beacon_chain: Arc>, config: &NetworkConfig, - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, network_log: slog::Logger, ) -> error::Result<( Arc>, @@ -74,7 +74,7 @@ impl NetworkService { // launch libp2p service let (network_globals, mut libp2p) = - LibP2PService::new(handle.runtime_handle(), config, enr_fork_id, &network_log)?; + LibP2PService::new(executor.runtime_handle(), config, enr_fork_id, &network_log)?; for enr in load_dht::(store.clone()) { libp2p.swarm.add_enr(enr); @@ -87,7 +87,7 @@ impl NetworkService { beacon_chain.clone(), network_globals.clone(), network_send.clone(), - handle.clone(), + executor.clone(), network_log.clone(), )?; @@ -110,21 +110,20 @@ impl NetworkService { propagation_percentage, }; - let _ = spawn_service(handle, network_service)?; + let _ = spawn_service(executor, network_service)?; Ok((network_globals, network_send)) } } fn spawn_service( - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, mut service: NetworkService, ) -> error::Result<()> { - let mut exit_rx = handle.exit(); - let handle = handle.runtime_handle(); + let mut exit_rx = executor.exit(); // spawn on the current executor - handle.spawn(async move { + executor.runtime_handle().spawn(async move { loop { // build the futures to check simultaneously tokio::select! { diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index e405997559..eef1d70294 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -52,7 +52,7 @@ pub struct NetworkInfo { // Allowing more than 7 arguments. #[allow(clippy::too_many_arguments)] pub fn start_server( - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, config: &Config, beacon_chain: Arc>, network_info: NetworkInfo, @@ -100,7 +100,7 @@ pub fn start_server( let actual_listen_addr = server.local_addr(); // Build a channel to kill the HTTP server. - let exit = handle.exit(); + let exit = executor.exit(); let inner_log = log.clone(); let server_exit = async move { let _ = exit.await; @@ -127,7 +127,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - handle.runtime_handle().spawn(server_future); + executor.runtime_handle().spawn(server_future); Ok(actual_listen_addr) } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index 513d70b6cf..bad02f7dfb 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -13,13 +13,11 @@ use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain pub fn spawn( - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, beacon_chain: Arc>, milliseconds_per_slot: u64, log: slog::Logger, -) -> Result, &'static str> { - let (exit_signal, exit) = tokio::sync::oneshot::channel(); - +) -> Result<(), &'static str> { let start_instant = Instant::now() + beacon_chain .slot_clock @@ -34,8 +32,8 @@ pub fn spawn( } }; - handle.spawn(timer_future, "timer_service"); + executor.spawn(timer_future, "timer_service"); info!(log, "Timer service started"); - Ok(exit_signal) + Ok(()) } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index 325c5d4c28..8c31c22a62 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -34,7 +34,7 @@ impl WebSocketSender { } pub fn start_server( - handle: environment::TaskExecutor, + executor: environment::TaskExecutor, config: &Config, log: &Logger, ) -> Result<(WebSocketSender, SocketAddr), String> { @@ -61,7 +61,7 @@ pub fn start_server( let broadcaster = server.broadcaster(); // Produce a signal/channel that can gracefully shutdown the websocket server. - let exit = handle.exit(); + let exit = executor.exit(); let log_inner = log.clone(); let broadcaster_inner = server.broadcaster(); let exit_future = async move { @@ -79,7 +79,7 @@ pub fn start_server( // Place a future on the handle that will shutdown the websocket server when the // application exits. - handle.runtime_handle().spawn(exit_future); + executor.runtime_handle().spawn(exit_future); let log_inner = log.clone(); diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index edc6fae006..dd15f6358e 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -198,7 +198,7 @@ impl EnvironmentBuilder { /// `Runtime`, instead it only has access to a `Runtime`. #[derive(Clone)] pub struct RuntimeContext { - pub runtime_handle: TaskExecutor, + pub executor: TaskExecutor, // TODO: remove if TaskExecutor contains log pub log: Logger, pub eth_spec_instance: E, @@ -211,7 +211,7 @@ impl RuntimeContext { /// The generated service will have the `service_name` in all it's logs. pub fn service_context(&self, service_name: String) -> Self { Self { - runtime_handle: self.runtime_handle.clone(), + executor: self.executor.clone(), log: self.log.new(o!("service" => service_name)), eth_spec_instance: self.eth_spec_instance.clone(), eth2_config: self.eth2_config.clone(), @@ -302,7 +302,7 @@ impl Environment { /// Returns a `Context` where no "service" has been added to the logger output. pub fn core_context(&mut self) -> RuntimeContext { RuntimeContext { - runtime_handle: TaskExecutor { + executor: TaskExecutor { exit: self.exit.clone(), handle: self.runtime().handle().clone(), log: self.log.clone(), @@ -316,7 +316,7 @@ impl Environment { /// Returns a `Context` where the `service_name` is added to the logger output. pub fn service_context(&mut self, service_name: String) -> RuntimeContext { RuntimeContext { - runtime_handle: TaskExecutor { + executor: TaskExecutor { exit: self.exit.clone(), handle: self.runtime().handle().clone(), log: self.log.new(o!("service" => service_name.clone())), diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index d2a7c94a7f..775f90c5f7 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -218,7 +218,7 @@ fn run( environment .core_context() - .runtime_handle + .executor .runtime_handle() .enter(|| { validator diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 8e7140f67e..2b8a06631d 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -3,7 +3,7 @@ use crate::{ validator_store::ValidatorStore, }; use environment::RuntimeContext; -use futures::{StreamExt, TryFutureExt}; +use futures::StreamExt; use remote_beacon_node::{PublishStatus, RemoteBeaconNode}; use slog::{crit, debug, info, trace}; use slot_clock::SlotClock; @@ -140,7 +140,7 @@ impl AttestationService { ) }; - let runtime_handle = self.context.runtime_handle.clone(); + let executor = self.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -161,7 +161,7 @@ impl AttestationService { } }; - runtime_handle.spawn(interval_fut, "attestation_service"); + executor.spawn(interval_fut, "attestation_service"); Ok(()) } @@ -206,16 +206,13 @@ impl AttestationService { .into_iter() .for_each(|(committee_index, validator_duties)| { // Spawn a separate task for each attestation. - self.inner.context.runtime_handle.spawn( - self.clone() - .publish_attestations_and_aggregates( - slot, - committee_index, - validator_duties, - aggregate_production_instant, - ) - .unwrap_or_else(|_| ()), - "duties_by_committee_index", + self.inner.context.executor.runtime_handle().spawn( + self.clone().publish_attestations_and_aggregates( + slot, + committee_index, + validator_duties, + aggregate_production_instant, + ), ); }); diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 0ca29b8a34..0c696aac67 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -135,7 +135,7 @@ impl BlockService { ) }; - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -143,7 +143,7 @@ impl BlockService { } }; - runtime_handle.spawn(interval_fut, "block_service"); + executor.spawn(interval_fut, "block_service"); Ok(()) } @@ -184,7 +184,7 @@ impl BlockService { let service = self.clone(); let log = log.clone(); // TODO: run this task with a `spawn_without_name` - self.inner.context.runtime_handle.spawn( + self.inner.context.executor.spawn( service .publish_block(slot, validator_pubkey) .map_err(move |e| { diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index c764f4c9f3..bb87f1227a 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -456,11 +456,11 @@ impl DutiesService { // Run an immediate update before starting the updater service. self.inner .context - .runtime_handle + .executor .runtime_handle() .spawn(self.clone().do_update()); - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -468,7 +468,7 @@ impl DutiesService { } }; - runtime_handle.spawn(interval_fut, "duties_service"); + executor.spawn(interval_fut, "duties_service"); Ok(()) } diff --git a/validator_client/src/fork_service.rs b/validator_client/src/fork_service.rs index 1a59d3941d..739125c2eb 100644 --- a/validator_client/src/fork_service.rs +++ b/validator_client/src/fork_service.rs @@ -117,11 +117,11 @@ impl ForkService { // Run an immediate update before starting the updater service. self.inner .context - .runtime_handle + .executor .runtime_handle() .spawn(self.clone().do_update()); - let runtime_handle = self.inner.context.runtime_handle.clone(); + let executor = self.inner.context.executor.clone(); let interval_fut = async move { while interval.next().await.is_some() { @@ -129,7 +129,7 @@ impl ForkService { } }; - runtime_handle.spawn(interval_fut, "fork_service"); + executor.spawn(interval_fut, "fork_service"); Ok(()) } diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 679d5fe565..6c1339eea1 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -8,7 +8,7 @@ use types::EthSpec; /// Spawns a notifier service which periodically logs information about the node. pub fn spawn_notifier(client: &ProductionValidatorClient) -> Result<(), String> { let context = client.context.service_context("notifier".into()); - let runtime_handle = context.runtime_handle.clone(); + let executor = context.executor.clone(); let duties_service = client.duties_service.clone(); let allow_unsynced_beacon_node = client.config.allow_unsynced_beacon_node; @@ -81,6 +81,6 @@ pub fn spawn_notifier(client: &ProductionValidatorClient) -> Resu } }; - runtime_handle.spawn(interval_fut, "validator_notifier"); + executor.spawn(interval_fut, "validator_notifier"); Ok(()) }