diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index ccc86227de..a18ae5f2e2 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -285,8 +285,8 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) -> Result<(), ()> { - self.core.auto_update(exit).await + pub async fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { + tokio::spawn(async move { self.core.auto_update(exit).await }); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index b6354cf870..c25de43c13 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -13,7 +13,6 @@ use environment::RuntimeContext; use eth1::{Config as Eth1Config, Service as Eth1Service}; use eth2_config::Eth2Config; use eth2_libp2p::NetworkGlobals; -use futures::{future, Future, IntoFuture}; use genesis::{interop_genesis_state, Eth1GenesisService}; use network::{NetworkConfig, NetworkMessage, NetworkService}; use slog::info; @@ -109,136 +108,103 @@ where /// Initializes the `BeaconChainBuilder`. The `build_beacon_chain` method will need to be /// called later in order to actually instantiate the `BeaconChain`. - pub fn beacon_chain_builder( + pub async fn beacon_chain_builder( mut self, client_genesis: ClientGenesis, config: ClientConfig, - ) -> impl Future { - let store = self.store.clone(); - let store_migrator = self.store_migrator.take(); - let chain_spec = self.chain_spec.clone(); - let runtime_context = self.runtime_context.clone(); - let eth_spec_instance = self.eth_spec_instance.clone(); - let data_dir = config.data_dir.clone(); - let disabled_forks = config.disabled_forks.clone(); + ) -> Result { + let store = self + .store + .ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; + let store_migrator = self + .store_migrator + .ok_or_else(|| "beacon_chain_start_method requires a store migrator".to_string())?; + let context = self + .runtime_context + .ok_or_else(|| "beacon_chain_start_method requires a runtime context".to_string())? + .service_context("beacon".into()); + let spec = self + .chain_spec + .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; - future::ok(()) - .and_then(move |()| { - let store = store - .ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; - let store_migrator = store_migrator.ok_or_else(|| { - "beacon_chain_start_method requires a store migrator".to_string() - })?; - let context = runtime_context - .ok_or_else(|| { - "beacon_chain_start_method requires a runtime context".to_string() - })? - .service_context("beacon".into()); - let spec = chain_spec - .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; + let builder = BeaconChainBuilder::new(self.eth_spec_instance) + .logger(context.log.clone()) + .store(store) + .store_migrator(store_migrator) + .data_dir(config.data_dir) + .custom_spec(spec.clone()) + .disabled_forks(config.disabled_forks); - let builder = BeaconChainBuilder::new(eth_spec_instance) - .logger(context.log.clone()) - .store(store) - .store_migrator(store_migrator) - .data_dir(data_dir) - .custom_spec(spec.clone()) - .disabled_forks(disabled_forks); + let chain_exists = builder + .store_contains_beacon_chain() + .unwrap_or_else(|_| false); - Ok((builder, spec, context)) - }) - .and_then(move |(builder, spec, context)| { - let chain_exists = builder - .store_contains_beacon_chain() - .unwrap_or_else(|_| false); + // If the client is expect to resume but there's no beacon chain in the database, + // use the `DepositContract` method. This scenario is quite common when the client + // is shutdown before finding genesis via eth1. + // + // Alternatively, if there's a beacon chain in the database then always resume + // using it. + let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists { + info!(context.log, "Defaulting to deposit contract genesis"); - // If the client is expect to resume but there's no beacon chain in the database, - // use the `DepositContract` method. This scenario is quite common when the client - // is shutdown before finding genesis via eth1. - // - // Alternatively, if there's a beacon chain in the database then always resume - // using it. - let client_genesis = if client_genesis == ClientGenesis::FromStore && !chain_exists - { - info!(context.log, "Defaulting to deposit contract genesis"); + ClientGenesis::DepositContract + } else if chain_exists { + ClientGenesis::FromStore + } else { + client_genesis + }; - ClientGenesis::DepositContract - } else if chain_exists { - ClientGenesis::FromStore - } else { - client_genesis - }; + let (beacon_chain_builder, eth1_service_option) = match client_genesis { + ClientGenesis::Interop { + validator_count, + genesis_time, + } => { + let keypairs = generate_deterministic_keypairs(validator_count); + let genesis_state = interop_genesis_state(&keypairs, genesis_time, &spec)?; + builder.genesis_state(genesis_state).map(|v| (v, None))? + } + ClientGenesis::SszBytes { + genesis_state_bytes, + } => { + info!( + context.log, + "Starting from known genesis state"; + ); - let genesis_state_future: Box + Send> = - match client_genesis { - ClientGenesis::Interop { - validator_count, - genesis_time, - } => { - let keypairs = generate_deterministic_keypairs(validator_count); - let result = interop_genesis_state(&keypairs, genesis_time, &spec); + let genesis_state = BeaconState::from_ssz_bytes(&genesis_state_bytes) + .map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e))?; - let future = result - .and_then(move |genesis_state| builder.genesis_state(genesis_state)) - .into_future() - .map(|v| (v, None)); + builder.genesis_state(genesis_state).map(|v| (v, None))? + } + ClientGenesis::DepositContract => { + info!( + context.log, + "Waiting for eth2 genesis from eth1"; + "eth1_endpoint" => &config.eth1.endpoint, + "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, + "deposit_contract" => &config.eth1.deposit_contract_address + ); - Box::new(future) - } - ClientGenesis::SszBytes { - genesis_state_bytes, - } => { - info!( - context.log, - "Starting from known genesis state"; - ); + let genesis_service = Eth1GenesisService::new(config.eth1, context.log.clone()); - let result = BeaconState::from_ssz_bytes(&genesis_state_bytes) - .map_err(|e| format!("Unable to parse genesis state SSZ: {:?}", e)); + let genesis_state = genesis_service + .wait_for_genesis_state( + Duration::from_millis(ETH1_GENESIS_UPDATE_INTERVAL_MILLIS), + context.eth2_config().spec.clone(), + ) + .await?; - let future = result - .and_then(move |genesis_state| builder.genesis_state(genesis_state)) - .into_future() - .map(|v| (v, None)); + builder + .genesis_state(genesis_state) + .map(|v| (v, Some(genesis_service.into_core_service())))? + } + ClientGenesis::FromStore => builder.resume_from_db().map(|v| (v, None))?, + }; - Box::new(future) - } - ClientGenesis::DepositContract => { - info!( - context.log, - "Waiting for eth2 genesis from eth1"; - "eth1_endpoint" => &config.eth1.endpoint, - "contract_deploy_block" => config.eth1.deposit_contract_deploy_block, - "deposit_contract" => &config.eth1.deposit_contract_address - ); - - let genesis_service = - Eth1GenesisService::new(config.eth1, context.log.clone()); - - let future = genesis_service - .wait_for_genesis_state( - Duration::from_millis(ETH1_GENESIS_UPDATE_INTERVAL_MILLIS), - context.eth2_config().spec.clone(), - ) - .and_then(move |genesis_state| builder.genesis_state(genesis_state)) - .map(|v| (v, Some(genesis_service.into_core_service()))); - - Box::new(future) - } - ClientGenesis::FromStore => { - let future = builder.resume_from_db().into_future().map(|v| (v, None)); - - Box::new(future) - } - }; - - genesis_state_future - }) - .map(move |(beacon_chain_builder, eth1_service_option)| { - self.eth1_service = eth1_service_option; - self.beacon_chain_builder = Some(beacon_chain_builder); - self - }) + self.eth1_service = eth1_service_option; + self.beacon_chain_builder = Some(beacon_chain_builder); + Ok(self) } /// Immediately starts the networking stack. @@ -254,7 +220,7 @@ where .service_context("network".into()); let (network_globals, network_send, network_exit) = - NetworkService::start(beacon_chain, config, &context.executor, context.log) + NetworkService::start(beacon_chain, config, &context.runtime_handle, context.log) .map_err(|e| format!("Failed to start network: {:?}", e))?; self.network_globals = Some(network_globals); @@ -281,13 +247,10 @@ where .ok_or_else(|| "node timer requires a chain spec".to_string())? .milliseconds_per_slot; - let timer_exit = timer::spawn( - &context.executor, - beacon_chain, - milliseconds_per_slot, - context.log, - ) - .map_err(|e| format!("Unable to start node timer: {}", e))?; + let timer_exit = context + .runtime_handle + .enter(|| timer::spawn(beacon_chain, milliseconds_per_slot)) + .map_err(|e| format!("Unable to start node timer: {}", e))?; self.exit_channels.push(timer_exit); @@ -323,21 +286,22 @@ where network_chan: network_send, }; - let (exit_channel, listening_addr) = rest_api::start_server( - &client_config.rest_api, - &context.executor, - beacon_chain, - network_info, - client_config - .create_db_path() - .map_err(|_| "unable to read data dir")?, - client_config - .create_freezer_db_path() - .map_err(|_| "unable to read freezer DB dir")?, - eth2_config.clone(), - context.log, - ) - .map_err(|e| format!("Failed to start HTTP API: {:?}", e))?; + let (exit_channel, listening_addr) = context.runtime_handle.enter(|| { + rest_api::start_server( + &client_config.rest_api, + beacon_chain, + network_info, + client_config + .create_db_path() + .map_err(|_| "unable to read data dir")?, + client_config + .create_freezer_db_path() + .map_err(|_| "unable to read freezer DB dir")?, + eth2_config.clone(), + context.log, + ) + .map_err(|e| format!("Failed to start HTTP API: {:?}", e)) + })?; self.exit_channels.push(exit_channel); self.http_listen_addr = Some(listening_addr); @@ -472,8 +436,9 @@ where Option<_>, Option<_>, ) = if config.enabled { - let (sender, exit, listening_addr) = - websocket_server::start_server(&config, &context.executor, &context.log)?; + let (sender, exit, listening_addr) = context + .runtime_handle + .enter(|| websocket_server::start_server(&config, &context.log))?; (sender, Some(exit), Some(listening_addr)) } else { (WebSocketSender::dummy(), None, None) @@ -692,7 +657,7 @@ where }; // Starts the service that connects to an eth1 node and periodically updates caches. - context.executor.spawn(backend.start(exit)); + context.runtime_handle.spawn(backend.start(exit)); self.beacon_chain_builder = Some(beacon_chain_builder.eth1_backend(Some(backend))); diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index 55ef6b9336..3b21ac2e2a 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -246,54 +246,39 @@ impl Service { pub async fn update( &self, ) -> Result<(DepositCacheUpdateOutcome, BlockCacheUpdateOutcome), String> { - let deposit_future = self - .update_deposit_cache() - .map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) - .then(|result| async move{ - match &result { - Ok(DepositCacheUpdateOutcome { logs_imported }) => trace!( - self.log, - "Updated eth1 deposit cache"; - "cached_deposits" => self.inner.deposit_cache.read().cache.len(), - "logs_imported" => logs_imported, - "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, - ), - Err(e) => error!( - self.log, - "Failed to update eth1 deposit cache"; - "error" => e - ), - }; + let update_deposit_cache = async { + let outcome = self + .update_deposit_cache() + .await + .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; - result - }); + trace!( + self.log, + "Updated eth1 deposit cache"; + "cached_deposits" => self.inner.deposit_cache.read().cache.len(), + "logs_imported" => outcome.logs_imported, + "last_processed_eth1_block" => self.inner.deposit_cache.read().last_processed_block, + ); + Ok(outcome) + }; - let block_future = self - .update_block_cache() - .map_err(|e| format!("Failed to update eth1 cache: {:?}", e)) - .then(|result| async move { - match &result { - Ok(BlockCacheUpdateOutcome { - blocks_imported, - head_block_number, - }) => trace!( - self.log, - "Updated eth1 block cache"; - "cached_blocks" => self.inner.block_cache.read().len(), - "blocks_imported" => blocks_imported, - "head_block" => head_block_number, - ), - Err(e) => error!( - self.log, - "Failed to update eth1 block cache"; - "error" => e - ), - }; + let update_block_cache = async { + let outcome = self + .update_block_cache() + .await + .map_err(|e| format!("Failed to update eth1 cache: {:?}", e))?; - result - }); + trace!( + self.log, + "Updated eth1 block cache"; + "cached_blocks" => self.inner.block_cache.read().len(), + "blocks_imported" => outcome.blocks_imported, + "head_block" => outcome.head_block_number, + ); + Ok(outcome) + }; - futures::try_join!(deposit_future, block_future) + futures::try_join!(update_deposit_cache, update_block_cache) } /// A looping future that updates the cache, then waits `config.auto_update_interval` before @@ -305,40 +290,40 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn auto_update( - &self, - mut exit: tokio::sync::oneshot::Receiver<()>, - ) -> Result<(), ()> { + pub async fn auto_update(&self, exit: tokio::sync::oneshot::Receiver<()>) { let update_interval = Duration::from_millis(self.config().auto_update_interval_millis); loop { - let update_result = self.update().await; + let update_future = async move { + /* + let update_result = self.update().await; - match update_result { - Err(e) => error!( - self.log, - "Failed to update eth1 cache"; - "retry_millis" => update_interval.as_millis(), - "error" => e, - ), - Ok((deposit, block)) => debug!( - self.log, - "Updated eth1 cache"; - "retry_millis" => update_interval.as_millis(), - "blocks" => format!("{:?}", block), - "deposits" => format!("{:?}", deposit), - ), + match update_result { + Err(e) => error!( + self.log, + "Failed to update eth1 cache"; + "retry_millis" => update_interval.as_millis(), + "error" => e, + ), + Ok((deposit, block)) => debug!( + self.log, + "Updated eth1 cache"; + "retry_millis" => update_interval.as_millis(), + "blocks" => format!("{:?}", block), + "deposits" => format!("{:?}", deposit), + ), + }; + */ + // WARNING: delay_for doesn't return an error and panics on error. + delay_for(update_interval).await; }; - - // WARNING: delay_for doesn't return an error and panics on error. - delay_for(update_interval).await; - match exit.try_recv() { - Ok(_) | Err(TryRecvError::Closed) => break, - Err(TryRecvError::Empty) => {} + if let futures::future::Either::Right(_) = + futures::future::select(Box::pin(update_future), futures::future::ready(())).await + { + // the exit future returned end + break; } } - - Ok(()) } /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured diff --git a/beacon_node/rest_api/src/lib.rs b/beacon_node/rest_api/src/lib.rs index 4159150acc..8a63df0f0e 100644 --- a/beacon_node/rest_api/src/lib.rs +++ b/beacon_node/rest_api/src/lib.rs @@ -35,7 +35,6 @@ use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; -use tokio::runtime::Handle; use tokio::sync::{mpsc, oneshot}; use url_query::UrlQuery; @@ -53,7 +52,6 @@ pub struct NetworkInfo { #[allow(clippy::too_many_arguments)] pub fn start_server( config: &Config, - handle: &Handle, beacon_chain: Arc>, network_info: NetworkInfo, db_path: PathBuf, @@ -130,7 +128,7 @@ pub fn start_server( "port" => actual_listen_addr.port(), ); - handle.spawn(server_future); + tokio::spawn(server_future); Ok((exit_signal, actual_listen_addr)) } diff --git a/beacon_node/timer/src/lib.rs b/beacon_node/timer/src/lib.rs index eccf06c4dd..26f8bb60ea 100644 --- a/beacon_node/timer/src/lib.rs +++ b/beacon_node/timer/src/lib.rs @@ -3,12 +3,11 @@ //! This service allows task execution on the beacon node for various functionality. use beacon_chain::{BeaconChain, BeaconChainTypes}; +use futures::future; use futures::stream::StreamExt; -use futures::{future, prelude::*}; use slot_clock::SlotClock; use std::sync::Arc; use std::time::Duration; -use tokio::runtime::Handle; use tokio::time::{interval_at, Instant}; /// Spawns a timer service which periodically executes tasks for the beacon chain @@ -16,7 +15,6 @@ use tokio::time::{interval_at, Instant}; /// called from the context of a runtime and we can simply spawn using task::spawn. /// Check for issues without the Handle. pub fn spawn( - handle: &Handle, beacon_chain: Arc>, milliseconds_per_slot: u64, ) -> Result, &'static str> { @@ -35,8 +33,8 @@ pub fn spawn( future::ready(()) }); - let future = futures::future::select(timer_future, exit.map_err(|_| ()).map(|_| ())); - handle.spawn(future); + let future = futures::future::select(timer_future, exit); + tokio::spawn(future); Ok(exit_signal) } diff --git a/beacon_node/websocket_server/src/lib.rs b/beacon_node/websocket_server/src/lib.rs index d1f1d27ebc..759bfab5f3 100644 --- a/beacon_node/websocket_server/src/lib.rs +++ b/beacon_node/websocket_server/src/lib.rs @@ -2,7 +2,6 @@ use futures::future::TryFutureExt; use slog::{debug, error, info, warn, Logger}; use std::marker::PhantomData; use std::net::SocketAddr; -use tokio::runtime::Handle; use types::EthSpec; use ws::{Sender, WebSocket}; @@ -37,7 +36,6 @@ impl WebSocketSender { pub fn start_server( config: &Config, - handle: &Handle, log: &Logger, ) -> Result< ( @@ -92,15 +90,12 @@ pub fn start_server( // Place a future on the handle that will shutdown the websocket server when the // application exits. - // TODO: check if we should spawn using a `Handle` or using `task::spawn` - handle.spawn(exit_future); + tokio::spawn(exit_future); exit_channel }; let log_inner = log.clone(); - // TODO: using tokio `spawn_blocking` instead of `thread::spawn` - // Check which is more apt. let _handle = tokio::task::spawn_blocking(move || match server.run() { Ok(_) => { debug!(