diff --git a/Cargo.lock b/Cargo.lock index f8016d9940..be6e844dc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,6 +296,7 @@ dependencies = [ "eth2_ssz_derive", "eth2_ssz_types", "execution_layer", + "exit-future", "fork_choice", "futures", "genesis", @@ -2531,6 +2532,7 @@ dependencies = [ "slot_clock", "state_processing", "store", + "task_executor", "tokio", "tokio-stream", "tree_hash", @@ -6028,6 +6030,7 @@ dependencies = [ "lazy_static", "lighthouse_metrics", "slog", + "sloggers", "tokio", ] diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 022b85fa7e..c8b82e3d28 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" } sensitive_url = { path = "../../common/sensitive_url" } superstruct = "0.5.0" hex = "0.4.2" +exit-future = "0.2.0" [[test]] name = "beacon_chain_tests" diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 98dcce9d2a..2efc972ed5 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -27,7 +27,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp}; -use task_executor::ShutdownReason; +use task_executor::{ShutdownReason, TaskExecutor}; use types::{ BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes, Signature, SignedBeaconBlock, Slot, @@ -91,6 +91,7 @@ pub struct BeaconChainBuilder { // Pending I/O batch that is constructed during building and should be executed atomically // alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called. pending_io_batch: Vec, + task_executor: Option, } impl @@ -129,6 +130,7 @@ where slasher: None, validator_monitor: None, pending_io_batch: vec![], + task_executor: None, } } @@ -182,6 +184,13 @@ where self.log = Some(log); self } + + /// Sets the task executor. + pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self { + self.task_executor = Some(task_executor); + self + } + /// Attempt to load an existing eth1 cache from the builder's `Store`. pub fn get_persisted_eth1_backend(&self) -> Result, String> { let store = self @@ -919,6 +928,7 @@ mod test { use std::time::Duration; use store::config::StoreConfig; use store::{HotColdDB, MemoryStore}; + use task_executor::test_utils::TestRuntime; use types::{EthSpec, MinimalEthSpec, Slot}; type TestEthSpec = MinimalEthSpec; @@ -952,10 +962,12 @@ mod test { .expect("should create interop genesis state"); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let runtime = TestRuntime::default(); let chain = BeaconChainBuilder::new(MinimalEthSpec) .logger(log.clone()) .store(Arc::new(store)) + .task_executor(runtime.task_executor.clone()) .genesis_state(genesis_state) .expect("should build state using recent genesis") .dummy_eth1_backend() diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3c8299f165..2dc1d0301d 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -12,15 +12,12 @@ use crate::{ }; use bls::get_withdrawal_credentials; use execution_layer::{ - test_utils::{ - ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK, - }, + test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK}, ExecutionLayer, }; use futures::channel::mpsc::Receiver; pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; -use logging::test_logger; use merkle_proof::MerkleTree; use parking_lot::Mutex; use parking_lot::RwLockWriteGuard; @@ -41,7 +38,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore}; -use task_executor::ShutdownReason; +use task_executor::{test_utils::TestRuntime, ShutdownReason}; use tree_hash::TreeHash; use types::sync_selection_proof::SyncSelectionProof; pub use types::test_utils::generate_deterministic_keypairs; @@ -151,8 +148,8 @@ pub struct Builder { initial_mutator: Option>, store_mutator: Option>, execution_layer: Option, - execution_layer_runtime: Option, mock_execution_layer: Option>, + runtime: TestRuntime, log: Logger, } @@ -255,6 +252,9 @@ where Cold: ItemStore, { pub fn new(eth_spec_instance: E) -> Self { + let runtime = TestRuntime::default(); + let log = runtime.log.clone(); + Self { eth_spec_instance, spec: None, @@ -266,8 +266,8 @@ where store_mutator: None, execution_layer: None, mock_execution_layer: None, - execution_layer_runtime: None, - log: test_logger(), + runtime, + log, } } @@ -330,8 +330,6 @@ where "execution layer already defined" ); - let el_runtime = ExecutionLayerRuntime::default(); - let urls: Vec = urls .iter() .map(|s| SensitiveUrl::parse(*s)) @@ -346,19 +344,19 @@ where }; let execution_layer = ExecutionLayer::from_config( config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), + self.runtime.task_executor.clone(), + self.log.clone(), ) .unwrap(); self.execution_layer = Some(execution_layer); - self.execution_layer_runtime = Some(el_runtime); self } pub fn mock_execution_layer(mut self) -> Self { let spec = self.spec.clone().expect("cannot build without spec"); let mock = MockExecutionLayer::new( + self.runtime.task_executor.clone(), spec.terminal_total_difficulty, DEFAULT_TERMINAL_BLOCK, spec.terminal_block_hash, @@ -383,7 +381,7 @@ where pub fn build(self) -> BeaconChainHarness> { let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1); - let log = test_logger(); + let log = self.log; let spec = self.spec.expect("cannot build without spec"); let seconds_per_slot = spec.seconds_per_slot; let validator_keypairs = self @@ -395,6 +393,7 @@ where .custom_spec(spec) .store(self.store.expect("cannot build without store")) .store_migrator_config(MigratorConfig::default().blocking()) + .task_executor(self.runtime.task_executor.clone()) .execution_layer(self.execution_layer) .dummy_eth1_backend() .expect("should build dummy backend") @@ -434,8 +433,8 @@ where chain: Arc::new(chain), validator_keypairs, shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)), + runtime: self.runtime, mock_execution_layer: self.mock_execution_layer, - execution_layer_runtime: self.execution_layer_runtime, rng: make_rng(), } } @@ -451,9 +450,9 @@ pub struct BeaconChainHarness { pub chain: Arc>, pub spec: ChainSpec, pub shutdown_receiver: Arc>>, + pub runtime: TestRuntime, pub mock_execution_layer: Option>, - pub execution_layer_runtime: Option, pub rng: Mutex, } diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 353b174a02..59f1bebdb4 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -166,6 +166,7 @@ where let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log().clone()) .store(store) + .task_executor(context.executor.clone()) .custom_spec(spec.clone()) .chain_config(chain_config) .graffiti(graffiti) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 023cfa6e32..3b9e94aabf 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -304,11 +304,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future>, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. runtime.block_on(generate_future(self)) } @@ -322,11 +318,7 @@ impl ExecutionLayer { T: Fn(&'a Self) -> U, U: Future, { - let runtime = self - .executor() - .runtime() - .upgrade() - .ok_or(Error::ShuttingDown)?; + let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; // TODO(merge): respect the shutdown signal. Ok(runtime.block_on(generate_future(self))) } @@ -1263,13 +1255,15 @@ impl ExecutionLayer { mod test { use super::*; use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer; + use task_executor::test_utils::TestRuntime; use types::MainnetEthSpec; type MockExecutionLayer = GenericMockExecutionLayer; #[tokio::test] async fn produce_three_valid_pos_execution_blocks() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .produce_valid_execution_payload_on_head() .await @@ -1281,7 +1275,8 @@ mod test { #[tokio::test] async fn finds_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_block_prior_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1300,7 +1295,8 @@ mod test { #[tokio::test] async fn verifies_valid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1316,7 +1312,8 @@ mod test { #[tokio::test] async fn rejects_invalid_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, terminal_block| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; @@ -1334,7 +1331,8 @@ mod test { #[tokio::test] async fn rejects_unknown_terminal_block_hash() { - MockExecutionLayer::default_params() + let runtime = TestRuntime::default(); + MockExecutionLayer::default_params(runtime.task_executor.clone()) .move_to_terminal_block() .with_terminal_block(|spec, el, _| async move { el.engines().upcheck_not_synced(Logging::Disabled).await; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index f5a7313395..5770a8a382 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -2,61 +2,22 @@ use crate::{ test_utils::{MockServer, DEFAULT_TERMINAL_BLOCK, DEFAULT_TERMINAL_DIFFICULTY, JWT_SECRET}, Config, *, }; -use environment::null_logger; use sensitive_url::SensitiveUrl; -use std::sync::Arc; use task_executor::TaskExecutor; use tempfile::NamedTempFile; use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256}; -pub struct ExecutionLayerRuntime { - pub runtime: Option>, - pub _runtime_shutdown: exit_future::Signal, - pub task_executor: TaskExecutor, - pub log: Logger, -} - -impl Default for ExecutionLayerRuntime { - fn default() -> Self { - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(), - ); - let (runtime_shutdown, exit) = exit_future::signal(); - let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let log = null_logger().unwrap(); - let task_executor = - TaskExecutor::new(Arc::downgrade(&runtime), exit, log.clone(), shutdown_tx); - - Self { - runtime: Some(runtime), - _runtime_shutdown: runtime_shutdown, - task_executor, - log, - } - } -} - -impl Drop for ExecutionLayerRuntime { - fn drop(&mut self) { - if let Some(runtime) = self.runtime.take() { - Arc::try_unwrap(runtime).unwrap().shutdown_background() - } - } -} - pub struct MockExecutionLayer { pub server: MockServer, pub el: ExecutionLayer, - pub el_runtime: ExecutionLayerRuntime, + pub executor: TaskExecutor, pub spec: ChainSpec, } impl MockExecutionLayer { - pub fn default_params() -> Self { + pub fn default_params(executor: TaskExecutor) -> Self { Self::new( + executor, DEFAULT_TERMINAL_DIFFICULTY.into(), DEFAULT_TERMINAL_BLOCK, ExecutionBlockHash::zero(), @@ -65,13 +26,13 @@ impl MockExecutionLayer { } pub fn new( + executor: TaskExecutor, terminal_total_difficulty: Uint256, terminal_block: u64, terminal_block_hash: ExecutionBlockHash, terminal_block_hash_activation_epoch: Epoch, ) -> Self { - let el_runtime = ExecutionLayerRuntime::default(); - let handle = el_runtime.runtime.as_ref().unwrap().handle(); + let handle = executor.handle().unwrap(); let mut spec = T::default_spec(); spec.terminal_total_difficulty = terminal_total_difficulty; @@ -79,7 +40,7 @@ impl MockExecutionLayer { spec.terminal_block_hash_activation_epoch = terminal_block_hash_activation_epoch; let server = MockServer::new( - handle, + &handle, terminal_total_difficulty, terminal_block, terminal_block_hash, @@ -97,17 +58,13 @@ impl MockExecutionLayer { suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() }; - let el = ExecutionLayer::from_config( - config, - el_runtime.task_executor.clone(), - el_runtime.log.clone(), - ) - .unwrap(); + let el = + ExecutionLayer::from_config(config, executor.clone(), executor.log().clone()).unwrap(); Self { server, el, - el_runtime, + executor, spec, } } diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index b3dd80d6c5..805f6716fb 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -22,7 +22,7 @@ use types::{EthSpec, ExecutionBlockHash, Uint256}; use warp::{http::StatusCode, Filter, Rejection}; pub use execution_block_generator::{generate_pow_block, ExecutionBlockGenerator}; -pub use mock_execution_layer::{ExecutionLayerRuntime, MockExecutionLayer}; +pub use mock_execution_layer::MockExecutionLayer; pub const DEFAULT_TERMINAL_DIFFICULTY: u64 = 6400; pub const DEFAULT_TERMINAL_BLOCK: u64 = 64; diff --git a/beacon_node/http_api/Cargo.toml b/beacon_node/http_api/Cargo.toml index 0e20f5c8b8..f982f0d022 100644 --- a/beacon_node/http_api/Cargo.toml +++ b/beacon_node/http_api/Cargo.toml @@ -30,6 +30,7 @@ futures = "0.3.8" execution_layer = {path = "../execution_layer"} parking_lot = "0.12.0" safe_arith = {path = "../../consensus/safe_arith"} +task_executor = { path = "../../common/task_executor" } [dev-dependencies] diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 26fb77b1bd..5f53a96156 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -20,6 +20,7 @@ use slot_clock::SlotClock; use state_processing::per_slot_processing; use std::convert::TryInto; use std::sync::Arc; +use task_executor::test_utils::TestRuntime; use tokio::sync::{mpsc, oneshot}; use tokio::time::Duration; use tree_hash::TreeHash; @@ -63,6 +64,7 @@ struct ApiTester { network_rx: mpsc::UnboundedReceiver>, local_enr: Enr, external_peer_id: PeerId, + _runtime: TestRuntime, } impl ApiTester { @@ -185,7 +187,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -212,6 +214,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } @@ -263,7 +266,7 @@ impl ApiTester { external_peer_id, } = create_api_server(chain.clone(), log).await; - tokio::spawn(server); + harness.runtime.task_executor.spawn(server, "api_server"); let client = BeaconNodeHttpClient::new( SensitiveUrl::parse(&format!( @@ -290,6 +293,7 @@ impl ApiTester { network_rx, local_enr, external_peer_id, + _runtime: harness.runtime, } } diff --git a/beacon_node/network/src/beacon_processor/tests.rs b/beacon_node/network/src/beacon_processor/tests.rs index 0f97bc7944..1c9d323576 100644 --- a/beacon_node/network/src/beacon_processor/tests.rs +++ b/beacon_node/network/src/beacon_processor/tests.rs @@ -20,7 +20,7 @@ use std::cmp; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Runtime; +use tokio::runtime::Handle; use tokio::sync::mpsc; use types::{ Attestation, AttesterSlashing, EthSpec, MainnetEthSpec, ProposerSlashing, SignedBeaconBlock, @@ -324,20 +324,19 @@ impl TestRig { .unwrap(); } - fn runtime(&mut self) -> Arc { + fn handle(&mut self) -> Handle { self.environment .as_mut() .unwrap() .core_context() .executor - .runtime() - .upgrade() + .handle() .unwrap() } /// Assert that the `BeaconProcessor` doesn't produce any events in the given `duration`. pub fn assert_no_events_for(&mut self, duration: Duration) { - self.runtime().block_on(async { + self.handle().block_on(async { tokio::select! { _ = tokio::time::sleep(duration) => (), event = self.work_journal_rx.recv() => panic!( @@ -360,7 +359,7 @@ impl TestRig { .iter() .all(|ev| ev != &WORKER_FREED && ev != &NOTHING_TO_DO)); - let (events, worker_freed_remaining) = self.runtime().block_on(async { + let (events, worker_freed_remaining) = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let mut worker_freed_remaining = expected.len(); @@ -415,7 +414,7 @@ impl TestRig { /// We won't attempt to listen for any more than `expected.len()` events. As such, it makes sense /// to use the `NOTHING_TO_DO` event to ensure that execution has completed. pub fn assert_event_journal_with_timeout(&mut self, expected: &[&str], timeout: Duration) { - let events = self.runtime().block_on(async { + let events = self.handle().block_on(async { let mut events = Vec::with_capacity(expected.len()); let drain_future = async { diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index 660cc1ca01..f344dc4735 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Sigma Prime "] edition = "2021" [dependencies] -tokio = { version = "1.14.0", features = ["rt"] } +tokio = { version = "1.14.0", features = ["rt-multi-thread"] } slog = "2.5.2" futures = "0.3.7" exit-future = "0.2.0" lazy_static = "1.4.0" lighthouse_metrics = { path = "../lighthouse_metrics" } +sloggers = { version = "2.1.1", features = ["json"] } diff --git a/common/task_executor/src/lib.rs b/common/task_executor/src/lib.rs index 2d3e941a3e..dd525bea50 100644 --- a/common/task_executor/src/lib.rs +++ b/common/task_executor/src/lib.rs @@ -1,10 +1,11 @@ mod metrics; +pub mod test_utils; use futures::channel::mpsc::Sender; use futures::prelude::*; use slog::{crit, debug, o, trace}; use std::sync::Weak; -use tokio::runtime::Runtime; +use tokio::runtime::{Handle, Runtime}; /// Provides a reason when Lighthouse is shut down. #[derive(Copy, Clone, Debug, PartialEq)] @@ -24,11 +25,51 @@ impl ShutdownReason { } } +/// Provides a `Handle` by either: +/// +/// 1. Holding a `Weak` and calling `Runtime::handle`. +/// 2. Directly holding a `Handle` and cloning it. +/// +/// This enum allows the `TaskExecutor` to work in production where a `Weak` is directly +/// accessible and in testing where the `Runtime` is hidden outside our scope. +#[derive(Clone)] +pub enum HandleProvider { + Runtime(Weak), + Handle(Handle), +} + +impl From for HandleProvider { + fn from(handle: Handle) -> Self { + HandleProvider::Handle(handle) + } +} + +impl From> for HandleProvider { + fn from(weak_runtime: Weak) -> Self { + HandleProvider::Runtime(weak_runtime) + } +} + +impl HandleProvider { + /// Returns a `Handle` to a `Runtime`. + /// + /// May return `None` if the weak reference to the `Runtime` has been dropped (this generally + /// means Lighthouse is shutting down). + pub fn handle(&self) -> Option { + match self { + HandleProvider::Runtime(weak_runtime) => weak_runtime + .upgrade() + .map(|runtime| runtime.handle().clone()), + HandleProvider::Handle(handle) => Some(handle.clone()), + } + } +} + /// A wrapper over a runtime handle which can spawn async and blocking tasks. #[derive(Clone)] pub struct TaskExecutor { /// The handle to the runtime on which tasks are spawned - runtime: Weak, + handle_provider: HandleProvider, /// The receiver exit future which on receiving shuts down the task exit: exit_future::Exit, /// Sender given to tasks, so that if they encounter a state in which execution cannot @@ -43,16 +84,19 @@ pub struct TaskExecutor { impl TaskExecutor { /// Create a new task executor. /// - /// Note: this function is mainly useful in tests. A `TaskExecutor` should be normally obtained from - /// a [`RuntimeContext`](struct.RuntimeContext.html) - pub fn new( - runtime: Weak, + /// ## Note + /// + /// This function should only be used during testing. In production, prefer to obtain an + /// instance of `Self` via a `environment::RuntimeContext` (see the `lighthouse/environment` + /// crate). + pub fn new>( + handle: T, exit: exit_future::Exit, log: slog::Logger, signal_tx: Sender, ) -> Self { Self { - runtime, + handle_provider: handle.into(), exit, signal_tx, log, @@ -62,7 +106,7 @@ impl TaskExecutor { /// Clones the task executor adding a service name. pub fn clone_with_name(&self, service_name: String) -> Self { TaskExecutor { - runtime: self.runtime.clone(), + handle_provider: self.handle_provider.clone(), exit: self.exit.clone(), signal_tx: self.signal_tx.clone(), log: self.log.new(o!("service" => service_name)), @@ -94,8 +138,8 @@ impl TaskExecutor { let mut shutdown_sender = self.shutdown_sender(); let log = self.log.clone(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(async move { + if let Some(handle) = self.handle() { + handle.spawn(async move { let timer = metrics::start_timer_vec(&metrics::TASKS_HISTOGRAM, &[name]); if let Err(join_error) = task_handle.await { if let Ok(panic) = join_error.try_into_panic() { @@ -160,8 +204,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn(future); + if let Some(handle) = self.handle() { + handle.spawn(future); } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); } @@ -211,8 +255,8 @@ impl TaskExecutor { }); int_gauge.inc(); - if let Some(runtime) = self.runtime.upgrade() { - Some(runtime.spawn(future)) + if let Some(handle) = self.handle() { + Some(handle.spawn(future)) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); None @@ -242,8 +286,8 @@ impl TaskExecutor { let timer = metrics::start_timer_vec(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]); metrics::inc_gauge_vec(&metrics::BLOCKING_TASKS_COUNT, &[name]); - let join_handle = if let Some(runtime) = self.runtime.upgrade() { - runtime.spawn_blocking(task) + let join_handle = if let Some(handle) = self.handle() { + handle.spawn_blocking(task) } else { debug!(self.log, "Couldn't spawn task. Runtime shutting down"); return None; @@ -268,8 +312,9 @@ impl TaskExecutor { Some(future) } - pub fn runtime(&self) -> Weak { - self.runtime.clone() + /// Returns a `Handle` to the current runtime. + pub fn handle(&self) -> Option { + self.handle_provider.handle() } /// Returns a copy of the `exit_future::Exit`. diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs new file mode 100644 index 0000000000..7d59cdf022 --- /dev/null +++ b/common/task_executor/src/test_utils.rs @@ -0,0 +1,68 @@ +use crate::TaskExecutor; +use slog::Logger; +use sloggers::{null::NullLoggerBuilder, Build}; +use std::sync::Arc; +use tokio::runtime; + +/// Whilst the `TestRuntime` is not necessarily useful in itself, it provides the necessary +/// components for creating a `TaskExecutor` during tests. +/// +/// May create its own runtime or use an existing one. +/// +/// ## Warning +/// +/// This struct should never be used in production, only testing. +pub struct TestRuntime { + runtime: Option>, + _runtime_shutdown: exit_future::Signal, + pub task_executor: TaskExecutor, + pub log: Logger, +} + +impl Default for TestRuntime { + /// If called *inside* an existing runtime, instantiates `Self` using a handle to that runtime. If + /// called *outside* any existing runtime, create a new `Runtime` and keep it alive until the + /// `Self` is dropped. + fn default() -> Self { + let (runtime_shutdown, exit) = exit_future::signal(); + let (shutdown_tx, _) = futures::channel::mpsc::channel(1); + let log = null_logger().unwrap(); + + let (runtime, handle) = if let Ok(handle) = runtime::Handle::try_current() { + (None, handle) + } else { + let runtime = Arc::new( + runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + ); + let handle = runtime.handle().clone(); + (Some(runtime), handle) + }; + + let task_executor = TaskExecutor::new(handle, exit, log.clone(), shutdown_tx); + + Self { + runtime, + _runtime_shutdown: runtime_shutdown, + task_executor, + log, + } + } +} + +impl Drop for TestRuntime { + fn drop(&mut self) { + if let Some(runtime) = self.runtime.take() { + Arc::try_unwrap(runtime).unwrap().shutdown_background() + } + } +} + +pub fn null_logger() -> Result { + let log_builder = NullLoggerBuilder; + log_builder + .build() + .map_err(|e| format!("Failed to start null logger: {:?}", e)) +} diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 91feef5b05..160f696542 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -13,9 +13,7 @@ use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::{future, StreamExt}; use slog::{error, info, o, warn, Drain, Duplicate, Level, Logger}; -use sloggers::{ - file::FileLoggerBuilder, null::NullLoggerBuilder, types::Format, types::Severity, Build, -}; +use sloggers::{file::FileLoggerBuilder, types::Format, types::Severity, Build}; use std::fs::create_dir_all; use std::path::PathBuf; use std::sync::Arc; @@ -33,6 +31,8 @@ use { #[cfg(not(target_family = "unix"))] use {futures::channel::oneshot, std::cell::RefCell}; +pub use task_executor::test_utils::null_logger; + const LOG_CHANNEL_SIZE: usize = 2048; /// The maximum time in seconds the client will wait for all internal tasks to shutdown. const MAXIMUM_SHUTDOWN_TIME: u64 = 15; @@ -506,13 +506,6 @@ impl Environment { } } -pub fn null_logger() -> Result { - let log_builder = NullLoggerBuilder; - log_builder - .build() - .map_err(|e| format!("Failed to start null logger: {:?}", e)) -} - #[cfg(target_family = "unix")] struct SignalFuture { signal: Signal, diff --git a/validator_client/src/http_api/keystores.rs b/validator_client/src/http_api/keystores.rs index 63cd946063..f88aacfca8 100644 --- a/validator_client/src/http_api/keystores.rs +++ b/validator_client/src/http_api/keystores.rs @@ -14,8 +14,8 @@ use slog::{info, warn, Logger}; use slot_clock::SlotClock; use std::path::PathBuf; use std::sync::Arc; -use std::sync::Weak; -use tokio::runtime::Runtime; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use validator_dir::Builder as ValidatorDirBuilder; use warp::Rejection; @@ -59,7 +59,7 @@ pub fn import( request: ImportKeystoresRequest, validator_dir: PathBuf, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Check request validity. This is the only cases in which we should return a 4xx code. @@ -122,14 +122,14 @@ pub fn import( ImportKeystoreStatus::Error, format!("slashing protection import failed: {:?}", e), ) - } else if let Some(runtime) = runtime.upgrade() { + } else if let Some(handle) = task_executor.handle() { // Import the keystore. match import_single_keystore( keystore, password, validator_dir.clone(), &validator_store, - runtime, + handle, ) { Ok(status) => Status::ok(status), Err(e) => { @@ -159,7 +159,7 @@ fn import_single_keystore( password: ZeroizeString, validator_dir_path: PathBuf, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { // Check if the validator key already exists, erroring if it is a remote signer validator. let pubkey = keystore @@ -198,7 +198,7 @@ fn import_single_keystore( let voting_keystore_path = validator_dir.voting_keystore_path(); drop(validator_dir); - runtime + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, password, @@ -214,7 +214,7 @@ fn import_single_keystore( pub fn delete( request: DeleteKeystoresRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { // Remove from initialized validators. @@ -225,8 +225,11 @@ pub fn delete( .pubkeys .iter() .map(|pubkey_bytes| { - match delete_single_keystore(pubkey_bytes, &mut initialized_validators, runtime.clone()) - { + match delete_single_keystore( + pubkey_bytes, + &mut initialized_validators, + task_executor.clone(), + ) { Ok(status) => Status::ok(status), Err(error) => { warn!( @@ -244,8 +247,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -278,14 +281,14 @@ pub fn delete( fn delete_single_keystore( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, true)) { Ok(_) => Ok(DeleteKeystoreStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 1207ed3b08..bf7261a271 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -22,8 +22,8 @@ use std::future::Future; use std::marker::PhantomData; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::PathBuf; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; use warp::{ @@ -59,7 +59,7 @@ impl From for Error { /// /// The server will gracefully handle the case where any fields are `None`. pub struct Context { - pub runtime: Weak, + pub task_executor: TaskExecutor, pub api_secret: ApiSecret, pub validator_store: Option>>, pub validator_dir: Option, @@ -161,8 +161,8 @@ pub fn serve( }) }); - let inner_runtime = ctx.runtime.clone(); - let runtime_filter = warp::any().map(move || inner_runtime.clone()); + let inner_task_executor = ctx.task_executor.clone(); + let task_executor_filter = warp::any().map(move || inner_task_executor.clone()); let inner_validator_dir = ctx.validator_dir.clone(); let validator_dir_filter = warp::any() @@ -290,18 +290,18 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let (validators, mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( None, None, &body, @@ -316,7 +316,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(response)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -333,16 +333,16 @@ pub fn serve( .and(validator_store_filter.clone()) .and(spec_filter) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, validator_store: Arc>, spec: Arc, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let mnemonic = mnemonic_from_phrase(body.mnemonic.as_str()).map_err(|e| { warp_utils::reject::custom_bad_request(format!( @@ -351,7 +351,7 @@ pub fn serve( )) })?; let (validators, _mnemonic) = - runtime.block_on(create_validators_mnemonic( + handle.block_on(create_validators_mnemonic( Some(mnemonic), Some(body.key_derivation_path_offset), &body.validators, @@ -362,7 +362,7 @@ pub fn serve( Ok(api_types::GenericResponse::from(validators)) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -378,13 +378,13 @@ pub fn serve( .and(validator_dir_filter.clone()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { // Check to ensure the password is correct. let keypair = body @@ -416,8 +416,8 @@ pub fn serve( let suggested_fee_recipient = body.suggested_fee_recipient; let validator_def = { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(validator_store.add_validator_keystore( voting_keystore_path, voting_password, @@ -433,7 +433,7 @@ pub fn serve( })? } else { return Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )); } }; @@ -455,14 +455,14 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |body: Vec, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let web3signers: Vec = body .into_iter() .map(|web3signer| ValidatorDefinition { @@ -478,14 +478,14 @@ pub fn serve( }, }) .collect(); - runtime.block_on(create_validators_web3signer( + handle.block_on(create_validators_web3signer( web3signers, &validator_store, ))?; Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } }) @@ -500,13 +500,13 @@ pub fn serve( .and(warp::body::json()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and_then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, signer, - runtime: Weak| { + task_executor: TaskExecutor| { blocking_signed_json_task(signer, move || { let initialized_validators_rw_lock = validator_store.initialized_validators(); let mut initialized_validators = initialized_validators_rw_lock.write(); @@ -518,8 +518,8 @@ pub fn serve( ))), Some(enabled) if enabled == body.enabled => Ok(()), Some(_) => { - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on( initialized_validators .set_validator_status(&validator_pubkey, body.enabled), @@ -533,7 +533,7 @@ pub fn serve( Ok(()) } else { Err(warp_utils::reject::custom_server_error( - "Runtime shutdown".into(), + "Lighthouse shutting down".into(), )) } } @@ -574,12 +574,12 @@ pub fn serve( .and(signer.clone()) .and(validator_dir_filter) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) .and_then( - |request, signer, validator_dir, validator_store, runtime, log| { + |request, signer, validator_dir, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::import(request, validator_dir, validator_store, runtime, log) + keystores::import(request, validator_dir, validator_store, task_executor, log) }) }, ); @@ -589,11 +589,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - keystores::delete(request, validator_store, runtime, log) + keystores::delete(request, validator_store, task_executor, log) }) }); @@ -610,11 +610,11 @@ pub fn serve( .and(warp::body::json()) .and(signer.clone()) .and(validator_store_filter.clone()) - .and(runtime_filter.clone()) + .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::import(request, validator_store, runtime, log) + remotekeys::import(request, validator_store, task_executor, log) }) }); @@ -623,11 +623,11 @@ pub fn serve( .and(warp::body::json()) .and(signer) .and(validator_store_filter) - .and(runtime_filter) + .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, runtime, log| { + .and_then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { - remotekeys::delete(request, validator_store, runtime, log) + remotekeys::delete(request, validator_store, task_executor, log) }) }); diff --git a/validator_client/src/http_api/remotekeys.rs b/validator_client/src/http_api/remotekeys.rs index b3702a028a..5c3ec73de3 100644 --- a/validator_client/src/http_api/remotekeys.rs +++ b/validator_client/src/http_api/remotekeys.rs @@ -8,8 +8,9 @@ use eth2::lighthouse_vc::std_types::{ }; use slog::{info, warn, Logger}; use slot_clock::SlotClock; -use std::sync::{Arc, Weak}; -use tokio::runtime::Runtime; +use std::sync::Arc; +use task_executor::TaskExecutor; +use tokio::runtime::Handle; use types::{EthSpec, PublicKeyBytes}; use url::Url; use warp::Rejection; @@ -45,7 +46,7 @@ pub fn list( pub fn import( request: ImportRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -57,14 +58,10 @@ pub fn import( let mut statuses = Vec::with_capacity(request.remote_keys.len()); for remotekey in request.remote_keys { - let status = if let Some(runtime) = runtime.upgrade() { + let status = if let Some(handle) = task_executor.handle() { // Import the keystore. - match import_single_remotekey( - remotekey.pubkey, - remotekey.url, - &validator_store, - runtime, - ) { + match import_single_remotekey(remotekey.pubkey, remotekey.url, &validator_store, handle) + { Ok(status) => Status::ok(status), Err(e) => { warn!( @@ -91,7 +88,7 @@ fn import_single_remotekey( pubkey: PublicKeyBytes, url: String, validator_store: &ValidatorStore, - runtime: Arc, + handle: Handle, ) -> Result { if let Err(url_err) = Url::parse(&url) { return Err(format!("failed to parse remotekey URL: {}", url_err)); @@ -129,7 +126,7 @@ fn import_single_remotekey( request_timeout_ms: None, }, }; - runtime + handle .block_on(validator_store.add_validator(web3signer_validator)) .map_err(|e| format!("failed to initialize validator: {:?}", e))?; @@ -139,7 +136,7 @@ fn import_single_remotekey( pub fn delete( request: DeleteRemotekeysRequest, validator_store: Arc>, - runtime: Weak, + task_executor: TaskExecutor, log: Logger, ) -> Result { info!( @@ -158,7 +155,7 @@ pub fn delete( match delete_single_remotekey( pubkey_bytes, &mut initialized_validators, - runtime.clone(), + task_executor.clone(), ) { Ok(status) => Status::ok(status), Err(error) => { @@ -177,8 +174,8 @@ pub fn delete( // Use `update_validators` to update the key cache. It is safe to let the key cache get a bit out // of date as it resets when it can't be decrypted. We update it just a single time to avoid // continually resetting it after each key deletion. - if let Some(runtime) = runtime.upgrade() { - runtime + if let Some(handle) = task_executor.handle() { + handle .block_on(initialized_validators.update_validators()) .map_err(|e| custom_server_error(format!("unable to update key cache: {:?}", e)))?; } @@ -189,15 +186,14 @@ pub fn delete( fn delete_single_remotekey( pubkey_bytes: &PublicKeyBytes, initialized_validators: &mut InitializedValidators, - runtime: Weak, + task_executor: TaskExecutor, ) -> Result { - if let Some(runtime) = runtime.upgrade() { + if let Some(handle) = task_executor.handle() { let pubkey = pubkey_bytes .decompress() .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; - match runtime - .block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) + match handle.block_on(initialized_validators.delete_definition_and_keystore(&pubkey, false)) { Ok(_) => Ok(DeleteRemotekeyStatus::Deleted), Err(e) => match e { diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index eef76eb363..da9c8dc534 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -102,7 +102,7 @@ impl ApiTester { spec, Some(Arc::new(DoppelgangerService::new(log.clone()))), slot_clock, - executor, + executor.clone(), log.clone(), )); @@ -113,7 +113,7 @@ impl ApiTester { let initialized_validators = validator_store.initialized_validators(); let context = Arc::new(Context { - runtime, + task_executor: executor, api_secret, validator_dir: Some(validator_dir.path().into()), validator_store: Some(validator_store.clone()), diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 039b54496c..43f88b54f0 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -498,7 +498,7 @@ impl ProductionValidatorClient { self.http_api_listen_addr = if self.config.http_api.enabled { let ctx = Arc::new(http_api::Context { - runtime: self.context.executor.runtime(), + task_executor: self.context.executor.clone(), api_secret, validator_store: Some(self.validator_store.clone()), validator_dir: Some(self.config.validator_dir.clone()),