From 38050fa460c28812f5c2026765fe12f497f03ef5 Mon Sep 17 00:00:00 2001 From: Paul Hauner Date: Mon, 16 May 2022 08:35:59 +0000 Subject: [PATCH] Allow `TaskExecutor` to be used in `async` tests (#3178) # Description Since the `TaskExecutor` currently requires a `Weak`, it's impossible to use it in an async test where the `Runtime` is created outside our scope. Whilst we *could* create a new `Runtime` instance inside the async test, dropping that `Runtime` would cause a panic (you can't drop a `Runtime` in an async context). To address this issue, this PR creates the `enum Handle`, which supports either: - A `Weak` (for use in our production code) - A `Handle` to a runtime (for use in testing) In theory, there should be no change to the behaviour of our production code (beyond some slightly different descriptions in HTTP 500 errors), or even our tests. If there is no change, you might ask *"why bother?"*. There are two PRs (#3070 and #3175) that are waiting on these fixes to introduce some new tests. Since we've added the EL to the `BeaconChain` (for the merge), we are now doing more async stuff in tests. I've also added a `RuntimeExecutor` to the `BeaconChainTestHarness`. Whilst that's not immediately useful, it will become useful in the near future with all the new async testing. --- Cargo.lock | 3 + beacon_node/beacon_chain/Cargo.toml | 1 + beacon_node/beacon_chain/src/builder.rs | 14 +++- beacon_node/beacon_chain/src/test_utils.rs | 31 ++++--- beacon_node/client/src/builder.rs | 1 + beacon_node/execution_layer/src/lib.rs | 28 +++---- .../src/test_utils/mock_execution_layer.rs | 61 ++------------ .../execution_layer/src/test_utils/mod.rs | 2 +- beacon_node/http_api/Cargo.toml | 1 + beacon_node/http_api/tests/tests.rs | 8 +- .../network/src/beacon_processor/tests.rs | 13 ++- common/task_executor/Cargo.toml | 3 +- common/task_executor/src/lib.rs | 81 ++++++++++++++---- common/task_executor/src/test_utils.rs | 68 +++++++++++++++ lighthouse/environment/src/lib.rs | 13 +-- validator_client/src/http_api/keystores.rs | 33 ++++---- validator_client/src/http_api/mod.rs | 84 +++++++++---------- validator_client/src/http_api/remotekeys.rs | 36 ++++---- validator_client/src/http_api/tests.rs | 4 +- validator_client/src/lib.rs | 2 +- 20 files changed, 284 insertions(+), 203 deletions(-) create mode 100644 common/task_executor/src/test_utils.rs diff --git a/Cargo.lock b/Cargo.lock index f8016d9940..be6e844dc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ dependencies = [ "eth2_ssz_derive", "eth2_ssz_types", "execution_layer", + "exit-future", "fork_choice", "futures", "genesis", @@ -2531,6 +2532,7 @@ dependencies = [ "slot_clock", "state_processing", "store", + "task_executor", "tokio", "tokio-stream", "tree_hash", @@ -6028,6 +6030,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "slog", + "sloggers", "tokio", ] diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 022b85fa7e..c8b82e3d28 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" } sensitive_url = { path = "../../common/sensitive_url" } superstruct = "0.5.0" hex = "0.4.2" +exit-future = "0.2.0" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 98dcce9d2a..2efc972ed5 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -27,7 +27,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; -use task_executor::ShutdownReason; +use task_executor::{ShutdownReason, TaskExecutor}; use types::{ BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, @@ -91,6 +91,7 @@ pub struct BeaconChainBuilder { // Pending I/O batch that is constructed during building and should be executed atomically // alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called. pending_io_batch: Vec, + task_executor: Option, } impl @@ -129,6 +130,7 @@ where slasher: None, validator_monitor: None, pending_io_batch: vec![], + task_executor: None, } } @@ -182,6 +184,13 @@ where self.log = Some(log); self } + + /// Sets the task executor. + pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self { + self.task_executor = Some(task_executor); + self + } + /// Attempt to load an existing eth1 cache from the builder's `Store`. pub fn get_persisted_eth1_backend(&self) -> Result, String> { let store = self @@ -919,6 +928,7 @@ mod test { use std::time::Duration; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; + use task_executor::test_utils::TestRuntime; use types::{EthSpec, MinimalEthSpec, Slot}; type TestEthSpec = MinimalEthSpec; @@ -952,10 +962,12 @@ mod test { .expect("should create interop genesis state"); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let runtime = TestRuntime::default(); let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(Arc::new(store)) + .task_executor(runtime.task_executor.clone()) .genesis_state(genesis_state) .expect("should build state using recent genesis") .dummy_eth1_backend() diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3c8299f165..2dc1d0301d 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -12,15 +12,12 @@ use crate::{ }; use bls::get_withdrawal_credentials; use execution_layer::{ - test_utils::{ - ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK, - }, + test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK}, ExecutionLayer, }; use futures::channel::mpsc::Receiver; pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; -use logging::test_logger; use merkle_proof::MerkleTree; use parking_lot::Mutex; use parking_lot::RwLockWriteGuard; @@ -41,7 +38,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; -use task_executor::ShutdownReason; +use task_executor::{test_utils::TestRuntime, ShutdownReason}; use tree_hash::TreeHash; use types::sync_selection_proof::SyncSelectionProof; pub use types::test_utils::generate_deterministic_keypairs; @@ -151,8 +148,8 @@ pub struct Builder { initial_mutator: Option>, store_mutator: Option>, execution_layer: Option, - execution_layer_runtime: Option, mock_execution_layer: Option>, + runtime: TestRuntime, log: Logger, } @@ -255,6 +252,9 @@ where Cold: ItemStore, { pub fn new(eth_spec_instance: E) -> Self { + let runtime = TestRuntime::default(); + let log = runtime.log.clone(); + Self { eth_spec_instance, spec: None, @@ -266,8 +266,8 @@ where store_mutator: None, execution_layer: None, mock_execution_layer: None, - execution_layer_runtime: None, - log: test_logger(), + runtime, + log, } } @@ -330,8 +330,6 @@ where "execution layer already defined" ); - let el_runtime = ExecutionLayerRuntime::default(); - let urls: Vec = urls .iter() .map(|s| SensitiveUrl::parse(*s)) @@ -346,19 +344,19 @@ where }; let execution_layer = ExecutionLayer::from_config( config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), + self.runtime.task_executor.clone(), + self.log.clone(), ) .unwrap(); self.execution_layer = Some(execution_layer); - self.execution_layer_runtime = Some(el_runtime); self } pub fn mock_execution_layer(mut self) -> Self { let spec = self.spec.clone().expect("cannot build without spec"); let mock = MockExecutionLayer::new( + self.runtime.task_executor.clone(), spec.terminal_total_difficulty, DEFAULT_TERMINAL_BLOCK, spec.terminal_block_hash, @@ -383,7 +381,7 @@ where pub fn build(self) -> BeaconChainHarness> { let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - let log = test_logger(); + let log = self.log; let spec = self.spec.expect("cannot build without spec"); let seconds_per_slot = spec.seconds_per_slot; let validator_keypairs = self @@ -395,6 +393,7 @@ where .custom_spec(spec) .store(self.store.expect("cannot build without store")) .store_migrator_config(MigratorConfig::default().blocking()) + .task_executor(self.runtime.task_executor.clone()) .execution_layer(self.execution_layer) .dummy_eth1_backend() .expect("should build dummy backend") @@ -434,8 +433,8 @@ where chain: Arc::new(chain), validator_keypairs, shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)), + runtime: self.runtime, mock_execution_layer: self.mock_execution_layer, - execution_layer_runtime: self.execution_layer_runtime, rng: make_rng(), } } @@ -451,9 +450,9 @@ pub struct BeaconChainHarness { pub chain: Arc>, pub spec: ChainSpec, pub shutdown_receiver: Arc>>, + pub runtime: TestRuntime, pub mock_execution_layer: Option>, - pub execution_layer_runtime: Option, pub rng: Mutex, } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 353b174a02..59f1bebdb4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -166,6 +166,7 @@ where let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) .store(store) + .task_executor(context.executor.clone()) .custom_spec(spec.clone()) .chain_config(chain_config) .graffiti(graffiti) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 023cfa6e32..3b9e94aabf 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -304,11 +304,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future>, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. runtime.block_on(generate_future(self)) } @@ -322,11 +318,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. Ok(runtime.block_on(generate_future(self))) } @@ -1263,13 +1255,15 @@ impl ExecutionLayer { mod test { use super::*; use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer; + use task_executor::test_utils::TestRuntime; use types::MainnetEthSpec; type MockExecutionLayer = GenericMockExecutionLayer; #[tokio::test] async fn produce_three_valid_pos_execution_blocks() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .produce_valid_execution_payload_on_head() .await @@ -1281,7 +1275,8 @@ mod test { #[tokio::test] async fn finds_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_block_prior_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1300,7 +1295,8 @@ mod test { #[tokio::test] async fn verifies_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1316,7 +1312,8 @@ mod test { #[tokio::test] async fn rejects_invalid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1334,7 +1331,8 @@ mod test { #[tokio::test] async fn rejects_unknown_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index f5a7313395..5770a8a382 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -2,61 +2,22 @@ use crate::{ test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET}, Config, *, }; -use environment::null_logger; use sensitive_url::SensitiveUrl; -use std::sync::Arc; use task_executor::TaskExecutor; use tempfile::NamedTempFile; use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256}; -pub struct ExecutionLayerRuntime { - pub runtime: Option>, - pub _runtime_shutdown: exit_future::Signal, - pub task_executor: TaskExecutor, - pub log: Logger, -} - -impl Default for ExecutionLayerRuntime { - fn default() -> Self { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ); - let (runtime_shutdown, exit) = exit_future::signal(); - let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let log = null_logger().unwrap(); - let task_executor = - TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); - - Self { - runtime: Some(runtime), - _runtime_shutdown: runtime_shutdown, - task_executor, - log, - } - } -} - -impl Drop for ExecutionLayerRuntime { - fn drop(&mut self) { - if let Some(runtime) = self.runtime.take() { - Arc::try_unwrap(runtime).unwrap().shutdown_background() - } - } -} - pub struct MockExecutionLayer { pub server: MockServer, pub el: ExecutionLayer, - pub el_runtime: ExecutionLayerRuntime, + pub executor: TaskExecutor, pub spec: ChainSpec, } impl MockExecutionLayer { - pub fn default_params() -> Self { + pub fn default_params(executor: TaskExecutor) -> Self { Self::new( + executor, DEFAULT_TERMINAL_DIFFICULTY.into(), DEFAULT_TERMINAL_BLOCK, ExecutionBlockHash::zero(), @@ -65,13 +26,13 @@ impl MockExecutionLayer { } pub fn new( + executor: TaskExecutor, terminal_total_difficulty: Uint256, terminal_block: u64, terminal_block_hash: ExecutionBlockHash, terminal_block_hash_activation_epoch: Epoch, ) -> Self { - let el_runtime = ExecutionLayerRuntime::default(); - let handle = el_runtime.runtime.as_ref().unwrap().handle(); + let handle = executor.handle().unwrap(); let mut spec = T::default_spec(); spec.terminal_total_difficulty = terminal_total_difficulty; @@ -79,7 +40,7 @@ impl MockExecutionLayer { spec.terminal_block_hash_activation_epoch = terminal_block_hash_activation_epoch; let server = MockServer::new( - handle, + &handle, terminal_total_difficulty, terminal_block, terminal_block_hash, @@ -97,17 +58,13 @@ impl MockExecutionLayer { suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; - let el = ExecutionLayer::from_config( - config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), - ) - .unwrap(); + let el = + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); Self { server, el, - el_runtime, + executor, spec, } } diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index b3dd80d6c5..805f6716fb 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -22,7 +22,7 @@ use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator}; -pub use mock_execution_layer::{ExecutionLayerRuntime, MockExecutionLayer}; +pub use mock_execution_layer::MockExecutionLayer; pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0e20f5c8b8..f982f0d022 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -30,6 +30,7 @@ futures = "0.3.8" execution_layer = {path = "../execution_layer"} parking_lot = "0.12.0" safe_arith = {path = "../../consensus/safe_arith"} +task_executor = { path = "../../common/task_executor" } [dev-dependencies] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 26fb77b1bd..5f53a96156 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -20,6 +20,7 @@ use slot_clock::SlotClock; use state_processing::per_slot_processing; use std::convert::TryInto; use std::sync::Arc; +use task_executor::test_utils::TestRuntime; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tree_hash::TreeHash; @@ -63,6 +64,7 @@ struct ApiTester { network_rx: mpsc::UnboundedReceiver>, local_enr: Enr, external_peer_id: PeerId, + _runtime: TestRuntime, } impl ApiTester { @@ -185,7 +187,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -212,6 +214,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } @@ -263,7 +266,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -290,6 +293,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 0f97bc7944..1c9d323576 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -20,7 +20,7 @@ use std::cmp; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Runtime; +use tokio::runtime::Handle; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, @@ -324,20 +324,19 @@ impl TestRig { .unwrap(); } - fn runtime(&mut self) -> Arc { + fn handle(&mut self) -> Handle { self.environment .as_mut() .unwrap() .core_context() .executor - .runtime() - .upgrade() + .handle() .unwrap() } /// Assert that the `BeaconProcessor` doesn't produce any events in the given `duration`. pub fn assert_no_events_for(&mut self, duration: Duration) { - self.runtime().block_on(async { + self.handle().block_on(async { tokio::select! { _ = tokio::time::sleep(duration) => (), event = self.work_journal_rx.recv() => panic!( @@ -360,7 +359,7 @@ impl TestRig { .iter() .all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO)); - let (events, worker_freed_remaining) = self.runtime().block_on(async { + let (events, worker_freed_remaining) = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let mut worker_freed_remaining = expected.len(); @@ -415,7 +414,7 @@ impl TestRig { /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) { - let events = self.runtime().block_on(async { + let events = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let drain_future = async { diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 660cc1ca01..f344dc4735 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Sigma Prime "] edition = "2021" [dependencies] -tokio = { version = "1.14.0", features = ["rt"] } +tokio = { version = "1.14.0", features = ["rt-multi-thread"] } slog = "2.5.2" futures = "0.3.7" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../lighthouse_metrics" } +sloggers = { version = "2.1.1", features = ["json"] } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 2d3e941a3e..dd525bea50 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -1,10 +1,11 @@ mod metrics; +pub mod test_utils; use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{crit, debug, o, trace}; use std::sync::Weak; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] @@ -24,11 +25,51 @@ impl ShutdownReason { } } +/// Provides a `Handle` by either: +/// +/// 1. Holding a `Weak` and calling `Runtime::handle`. +/// 2. Directly holding a `Handle` and cloning it. +/// +/// This enum allows the `TaskExecutor` to work in production where a `Weak` is directly +/// accessible and in testing where the `Runtime` is hidden outside our scope. +#[derive(Clone)] +pub enum HandleProvider { + Runtime(Weak), + Handle(Handle), +} + +impl From for HandleProvider { + fn from(handle: Handle) -> Self { + HandleProvider::Handle(handle) + } +} + +impl From> for HandleProvider { + fn from(weak_runtime: Weak) -> Self { + HandleProvider::Runtime(weak_runtime) + } +} + +impl HandleProvider { + /// Returns a `Handle` to a `Runtime`. + /// + /// May return `None` if the weak reference to the `Runtime` has been dropped (this generally + /// means Lighthouse is shutting down). + pub fn handle(&self) -> Option { + match self { + HandleProvider::Runtime(weak_runtime) => weak_runtime + .upgrade() + .map(|runtime| runtime.handle().clone()), + HandleProvider::Handle(handle) => Some(handle.clone()), + } + } +} + /// A wrapper over a runtime handle which can spawn async and blocking tasks. #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned - runtime: Weak, + handle_provider: HandleProvider, /// The receiver exit future which on receiving shuts down the task exit: exit_future::Exit, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -43,16 +84,19 @@ pub struct TaskExecutor { impl TaskExecutor { /// Create a new task executor. /// - /// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from - /// a [`RuntimeContext`](struct.RuntimeContext.html) - pub fn new( - runtime: Weak, + /// ## Note + /// + /// This function should only be used during testing. In production, prefer to obtain an + /// instance of `Self` via a `environment::RuntimeContext` (see the `lighthouse/environment` + /// crate). + pub fn new>( + handle: T, exit: exit_future::Exit, log: slog::Logger, signal_tx: Sender, ) -> Self { Self { - runtime, + handle_provider: handle.into(), exit, signal_tx, log, @@ -62,7 +106,7 @@ impl TaskExecutor { /// Clones the task executor adding a service name. pub fn clone_with_name(&self, service_name: String) -> Self { TaskExecutor { - runtime: self.runtime.clone(), + handle_provider: self.handle_provider.clone(), exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), log: self.log.new(o!("service" => service_name)), @@ -94,8 +138,8 @@ impl TaskExecutor { let mut shutdown_sender = self.shutdown_sender(); let log = self.log.clone(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(async move { + if let Some(handle) = self.handle() { + handle.spawn(async move { let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]); if let Err(join_error) = task_handle.await { if let Ok(panic) = join_error.try_into_panic() { @@ -160,8 +204,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(future); + if let Some(handle) = self.handle() { + handle.spawn(future); } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); } @@ -211,8 +255,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - Some(runtime.spawn(future)) + if let Some(handle) = self.handle() { + Some(handle.spawn(future)) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); None @@ -242,8 +286,8 @@ impl TaskExecutor { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); - let join_handle = if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn_blocking(task) + let join_handle = if let Some(handle) = self.handle() { + handle.spawn_blocking(task) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); return None; @@ -268,8 +312,9 @@ impl TaskExecutor { Some(future) } - pub fn runtime(&self) -> Weak { - self.runtime.clone() + /// Returns a `Handle` to the current runtime. + pub fn handle(&self) -> Option { + self.handle_provider.handle() } /// Returns a copy of the `exit_future::Exit`. diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs new file mode 100644 index 0000000000..7d59cdf022 --- /dev/null +++ b/common/task_executor/src/test_utils.rs @@ -0,0 +1,68 @@ +use crate::TaskExecutor; +use slog::Logger; +use sloggers::{null::NullLoggerBuilder, Build}; +use std::sync::Arc; +use tokio::runtime; + +/// Whilst the `TestRuntime` is not necessarily useful in itself, it provides the necessary +/// components for creating a `TaskExecutor` during tests. +/// +/// May create its own runtime or use an existing one. +/// +/// ## Warning +/// +/// This struct should never be used in production, only testing. +pub struct TestRuntime { + runtime: Option>, + _runtime_shutdown: exit_future::Signal, + pub task_executor: TaskExecutor, + pub log: Logger, +} + +impl Default for TestRuntime { + /// If called *inside* an existing runtime, instantiates `Self` using a handle to that runtime. If + /// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the + /// `Self` is dropped. + fn default() -> Self { + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let log = null_logger().unwrap(); + + let (runtime, handle) = if let Ok(handle) = runtime::Handle::try_current() { + (None, handle) + } else { + let runtime = Arc::new( + runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let handle = runtime.handle().clone(); + (Some(runtime), handle) + }; + + let task_executor = TaskExecutor::new(handle, exit, log.clone(), shutdown_tx); + + Self { + runtime, + _runtime_shutdown: runtime_shutdown, + task_executor, + log, + } + } +} + +impl Drop for TestRuntime { + fn drop(&mut self) { + if let Some(runtime) = self.runtime.take() { + Arc::try_unwrap(runtime).unwrap().shutdown_background() + } + } +} + +pub fn null_logger() -> Result { + let log_builder = NullLoggerBuilder; + log_builder + .build() + .map_err(|e| format!("Failed to start null logger: {:?}", e)) +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 91feef5b05..160f696542 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -13,9 +13,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger}; -use sloggers::{ - file::FileLoggerBuilder, null::NullLoggerBuilder, types::Format, types::Severity, Build, -}; +use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build}; use std::fs::create_dir_all; use std::path::PathBuf; use std::sync::Arc; @@ -33,6 +31,8 @@ use { #[cfg(not(target_family = "unix"))] use {futures::channel::oneshot, std::cell::RefCell}; +pub use task_executor::test_utils::null_logger; + const LOG_CHANNEL_SIZE: usize = 2048; /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -506,13 +506,6 @@ impl Environment { } } -pub fn null_logger() -> Result { - let log_builder = NullLoggerBuilder; - log_builder - .build() - .map_err(|e| format!("Failed to start null logger: {:?}", e)) -} - #[cfg(target_family = "unix")] struct SignalFuture { signal: Signal, diff --git a/validator_client/src/http_api/keystores.rs b/validator_client/src/http_api/keystores.rs index 63cd946063..f88aacfca8 100644 --- a/validator_client/src/http_api/keystores.rs +++ b/validator_client/src/http_api/keystores.rs @@ -14,8 +14,8 @@ use slog::{info, warn, Logger}; use slot_clock::SlotClock; use std::path::PathBuf; use std::sync::Arc; -use std::sync::Weak; -use tokio::runtime::Runtime; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use validator_dir::Builder as ValidatorDirBuilder; use warp::Rejection; @@ -59,7 +59,7 @@ pub fn import( request: ImportKeystoresRequest, validator_dir: PathBuf, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Check request validity. This is the only cases in which we should return a 4xx code. @@ -122,14 +122,14 @@ pub fn import( ImportKeystoreStatus::Error, format!("slashing protection import failed: {:?}", e), ) - } else if let Some(runtime) = runtime.upgrade() { + } else if let Some(handle) = task_executor.handle() { // Import the keystore. match import_single_keystore( keystore, password, validator_dir.clone(), &validator_store, - runtime, + handle, ) { Ok(status) => Status::ok(status), Err(e) => { @@ -159,7 +159,7 @@ fn import_single_keystore( password: ZeroizeString, validator_dir_path: PathBuf, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { // Check if the validator key already exists, erroring if it is a remote signer validator. let pubkey = keystore @@ -198,7 +198,7 @@ fn import_single_keystore( let voting_keystore_path = validator_dir.voting_keystore_path(); drop(validator_dir); - runtime + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, password, @@ -214,7 +214,7 @@ fn import_single_keystore( pub fn delete( request: DeleteKeystoresRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Remove from initialized validators. @@ -225,8 +225,11 @@ pub fn delete( .pubkeys .iter() .map(|pubkey_bytes| { - match delete_single_keystore(pubkey_bytes, &mut initialized_validators, runtime.clone()) - { + match delete_single_keystore( + pubkey_bytes, + &mut initialized_validators, + task_executor.clone(), + ) { Ok(status) => Status::ok(status), Err(error) => { warn!( @@ -244,8 +247,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -278,14 +281,14 @@ pub fn delete( fn delete_single_keystore( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) { Ok(_) => Ok(DeleteKeystoreStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 1207ed3b08..bf7261a271 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -22,8 +22,8 @@ use std::future::Future; use std::marker::PhantomData; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; use warp::{ @@ -59,7 +59,7 @@ impl From for Error { /// /// The server will gracefully handle the case where any fields are `None`. pub struct Context { - pub runtime: Weak, + pub task_executor: TaskExecutor, pub api_secret: ApiSecret, pub validator_store: Option>>, pub validator_dir: Option, @@ -161,8 +161,8 @@ pub fn serve( }) }); - let inner_runtime = ctx.runtime.clone(); - let runtime_filter = warp::any().map(move || inner_runtime.clone()); + let inner_task_executor = ctx.task_executor.clone(); + let task_executor_filter = warp::any().map(move || inner_task_executor.clone()); let inner_validator_dir = ctx.validator_dir.clone(); let validator_dir_filter = warp::any() @@ -290,18 +290,18 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let (validators, mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( None, None, &body, @@ -316,7 +316,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(response)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -333,16 +333,16 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let mnemonic = mnemonic_from_phrase(body.mnemonic.as_str()).map_err(|e| { warp_utils::reject::custom_bad_request(format!( @@ -351,7 +351,7 @@ pub fn serve( )) })?; let (validators, _mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( Some(mnemonic), Some(body.key_derivation_path_offset), &body.validators, @@ -362,7 +362,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(validators)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -378,13 +378,13 @@ pub fn serve( .and(validator_dir_filter.clone()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { // Check to ensure the password is correct. let keypair = body @@ -416,8 +416,8 @@ pub fn serve( let suggested_fee_recipient = body.suggested_fee_recipient; let validator_def = { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, voting_password, @@ -433,7 +433,7 @@ pub fn serve( })? } else { return Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )); } }; @@ -455,14 +455,14 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let web3signers: Vec = body .into_iter() .map(|web3signer| ValidatorDefinition { @@ -478,14 +478,14 @@ pub fn serve( }, }) .collect(); - runtime.block_on(create_validators_web3signer( + handle.block_on(create_validators_web3signer( web3signers, &validator_store, ))?; Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -500,13 +500,13 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { let initialized_validators_rw_lock = validator_store.initialized_validators(); let mut initialized_validators = initialized_validators_rw_lock.write(); @@ -518,8 +518,8 @@ pub fn serve( ))), Some(enabled) if enabled == body.enabled => Ok(()), Some(_) => { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on( initialized_validators .set_validator_status(&validator_pubkey, body.enabled), @@ -533,7 +533,7 @@ pub fn serve( Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } } @@ -574,12 +574,12 @@ pub fn serve( .and(signer.clone()) .and(validator_dir_filter) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) .and_then( - |request, signer, validator_dir, validator_store, runtime, log| { + |request, signer, validator_dir, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::import(request, validator_dir, validator_store, runtime, log) + keystores::import(request, validator_dir, validator_store, task_executor, log) }) }, ); @@ -589,11 +589,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::delete(request, validator_store, runtime, log) + keystores::delete(request, validator_store, task_executor, log) }) }); @@ -610,11 +610,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::import(request, validator_store, runtime, log) + remotekeys::import(request, validator_store, task_executor, log) }) }); @@ -623,11 +623,11 @@ pub fn serve( .and(warp::body::json()) .and(signer) .and(validator_store_filter) - .and(runtime_filter) + .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::delete(request, validator_store, runtime, log) + remotekeys::delete(request, validator_store, task_executor, log) }) }); diff --git a/validator_client/src/http_api/remotekeys.rs b/validator_client/src/http_api/remotekeys.rs index b3702a028a..5c3ec73de3 100644 --- a/validator_client/src/http_api/remotekeys.rs +++ b/validator_client/src/http_api/remotekeys.rs @@ -8,8 +8,9 @@ use eth2::lighthouse_vc::std_types::{ }; use slog::{info, warn, Logger}; use slot_clock::SlotClock; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use url::Url; use warp::Rejection; @@ -45,7 +46,7 @@ pub fn list( pub fn import( request: ImportRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -57,14 +58,10 @@ pub fn import( let mut statuses = Vec::with_capacity(request.remote_keys.len()); for remotekey in request.remote_keys { - let status = if let Some(runtime) = runtime.upgrade() { + let status = if let Some(handle) = task_executor.handle() { // Import the keystore. - match import_single_remotekey( - remotekey.pubkey, - remotekey.url, - &validator_store, - runtime, - ) { + match import_single_remotekey(remotekey.pubkey, remotekey.url, &validator_store, handle) + { Ok(status) => Status::ok(status), Err(e) => { warn!( @@ -91,7 +88,7 @@ fn import_single_remotekey( pubkey: PublicKeyBytes, url: String, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { if let Err(url_err) = Url::parse(&url) { return Err(format!("failed to parse remotekey URL: {}", url_err)); @@ -129,7 +126,7 @@ fn import_single_remotekey( request_timeout_ms: None, }, }; - runtime + handle .block_on(validator_store.add_validator(web3signer_validator)) .map_err(|e| format!("failed to initialize validator: {:?}", e))?; @@ -139,7 +136,7 @@ fn import_single_remotekey( pub fn delete( request: DeleteRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -158,7 +155,7 @@ pub fn delete( match delete_single_remotekey( pubkey_bytes, &mut initialized_validators, - runtime.clone(), + task_executor.clone(), ) { Ok(status) => Status::ok(status), Err(error) => { @@ -177,8 +174,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -189,15 +186,14 @@ pub fn delete( fn delete_single_remotekey( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime - .block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) { Ok(_) => Ok(DeleteRemotekeyStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index eef76eb363..da9c8dc534 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -102,7 +102,7 @@ impl ApiTester { spec, Some(Arc::new(DoppelgangerService::new(log.clone()))), slot_clock, - executor, + executor.clone(), log.clone(), )); @@ -113,7 +113,7 @@ impl ApiTester { let initialized_validators = validator_store.initialized_validators(); let context = Arc::new(Context { - runtime, + task_executor: executor, api_secret, validator_dir: Some(validator_dir.path().into()), validator_store: Some(validator_store.clone()), diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 039b54496c..43f88b54f0 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -498,7 +498,7 @@ impl ProductionValidatorClient { self.http_api_listen_addr = if self.config.http_api.enabled { let ctx = Arc::new(http_api::Context { - runtime: self.context.executor.runtime(), + task_executor: self.context.executor.clone(), api_secret, validator_store: Some(self.validator_store.clone()), validator_dir: Some(self.config.validator_dir.clone()),