diff --git a/beacon_node/beacon_chain/src/eth1_chain.rs b/beacon_node/beacon_chain/src/eth1_chain.rs index 1020fc0826..5f4434fc59 100644 --- a/beacon_node/beacon_chain/src/eth1_chain.rs +++ b/beacon_node/beacon_chain/src/eth1_chain.rs @@ -285,8 +285,10 @@ impl> CachingEth1Backend { } /// Starts the routine which connects to the external eth1 node and updates the caches. - fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { - tokio::spawn(HttpService::auto_update(self.core.clone(), exit)); + pub fn start(&self, exit: tokio::sync::oneshot::Receiver<()>) { + // don't need to spawn as a task is being spawned in auto_update + // TODO: check if this is correct + HttpService::auto_update(self.core.clone(), exit); } /// Instantiates `self` from an existing service. diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 2483e70a98..f3dacc18ff 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -113,27 +113,31 @@ where client_genesis: ClientGenesis, config: ClientConfig, ) -> Result { - let store = self - .store + let store = self.store.clone(); + let store_migrator = self.store_migrator.take(); + let chain_spec = self.chain_spec.clone(); + let runtime_context = self.runtime_context.clone(); + let eth_spec_instance = self.eth_spec_instance.clone(); + let data_dir = config.data_dir.clone(); + let disabled_forks = config.disabled_forks.clone(); + + let store = store .ok_or_else(|| "beacon_chain_start_method requires a store".to_string())?; - let store_migrator = self - .store_migrator + let store_migrator = store_migrator .ok_or_else(|| "beacon_chain_start_method requires a store migrator".to_string())?; - let context = self - .runtime_context + let context = runtime_context .ok_or_else(|| "beacon_chain_start_method requires a runtime context".to_string())? .service_context("beacon".into()); - let spec = self - .chain_spec + let spec = chain_spec .ok_or_else(|| "beacon_chain_start_method requires a chain spec".to_string())?; - let builder = BeaconChainBuilder::new(self.eth_spec_instance) + let builder = BeaconChainBuilder::new(eth_spec_instance) .logger(context.log.clone()) .store(store) .store_migrator(store_migrator) - .data_dir(config.data_dir) + .data_dir(data_dir) .custom_spec(spec.clone()) - .disabled_forks(config.disabled_forks); + .disabled_forks(disabled_forks); let chain_exists = builder .store_contains_beacon_chain() @@ -286,6 +290,7 @@ where network_chan: network_send, }; + let log = context.log.clone(); let (exit_channel, listening_addr) = context.runtime_handle.enter(|| { rest_api::start_server( &client_config.rest_api, @@ -298,7 +303,7 @@ where .create_freezer_db_path() .map_err(|_| "unable to read freezer DB dir")?, eth2_config.clone(), - context.log, + log, ) .map_err(|e| format!("Failed to start HTTP API: {:?}", e)) })?; diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index add49b96d9..aa13536537 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -43,7 +43,7 @@ pub fn spawn_notifier( let interval_duration = slot_duration; let speedo = Mutex::new(Speedo::default()); - let interval = tokio::time::interval_at(start_instant, interval_duration); + let mut interval = tokio::time::interval_at(start_instant, interval_duration); let interval_future = async move { while let Some(_) = interval.next().await { diff --git a/beacon_node/eth1/src/service.rs b/beacon_node/eth1/src/service.rs index bb83e66f10..a31a79d77c 100644 --- a/beacon_node/eth1/src/service.rs +++ b/beacon_node/eth1/src/service.rs @@ -2,18 +2,18 @@ use crate::metrics; use crate::{ block_cache::{BlockCache, Error as BlockCacheError, Eth1Block}, deposit_cache::Error as DepositCacheError, - http::{get_block, get_block_number, get_deposit_logs_in_range}, + http::{get_block, get_block_number, get_deposit_logs_in_range, Log}, inner::{DepositUpdater, Inner}, DepositLog, }; -use futures::{future::TryFutureExt, stream, stream::TryStreamExt}; +use futures::{future::TryFutureExt, stream, stream::TryStreamExt, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard}; use serde::{Deserialize, Serialize}; use slog::{debug, error, info, trace, Logger}; use std::ops::{Range, RangeInclusive}; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::time::delay_for; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::time::{interval_at, Duration, Instant}; const STANDARD_TIMEOUT_MILLIS: u64 = 15_000; @@ -283,38 +283,38 @@ impl Service { /// - Err(_) if there is an error. /// /// Emits logs for debugging and errors. - pub async fn auto_update(service: Self, mut exit: tokio::sync::oneshot::Receiver<()>) { + pub fn auto_update(service: Self, exit: tokio::sync::oneshot::Receiver<()>) { let update_interval = Duration::from_millis(service.config().auto_update_interval_millis); - loop { - let update_future = async { - let update_result = Service::update(service.clone()).await; + let interval = interval_at(Instant::now(), update_interval); - match update_result { - Err(e) => error!( - service.log, - "Failed to update eth1 cache"; - "retry_millis" => update_interval.as_millis(), - "error" => e, - ), - Ok((deposit, block)) => debug!( - service.log, - "Updated eth1 cache"; - "retry_millis" => update_interval.as_millis(), - "blocks" => format!("{:?}", block), - "deposits" => format!("{:?}", deposit), - ), - }; - // WARNING: delay_for doesn't return an error and panics on error. - delay_for(update_interval).await; - }; - if let futures::future::Either::Right(_) = - futures::future::select(Box::pin(update_future), &mut exit).await - { - // the exit future returned end - break; - } - } + let update_future = interval.for_each(move |_| { + let _ = Service::do_update(service.clone(), update_interval); + futures::future::ready(()) + }); + let future = futures::future::select(update_future, exit); + tokio::task::spawn(future); + } + + async fn do_update(service: Self, update_interval: Duration) -> Result<(), ()> { + let update_result = Service::update(service.clone()).await; + println!("Going on"); + match update_result { + Err(e) => error!( + service.log, + "Failed to update eth1 cache"; + "retry_millis" => update_interval.as_millis(), + "error" => e, + ), + Ok((deposit, block)) => debug!( + service.log, + "Updated eth1 cache"; + "retry_millis" => update_interval.as_millis(), + "blocks" => format!("{:?}", block), + "deposits" => format!("{:?}", deposit), + ), + }; + Ok(()) } /// Contacts the remote eth1 node and attempts to import deposit logs up to the configured @@ -365,7 +365,7 @@ impl Service { Vec::new() }; - let logs_imported = + let logs: Vec<(Range, Vec)> = stream::try_unfold(block_number_chunks.into_iter(), |mut chunks| async { match chunks.next() { Some(chunk) => { @@ -385,55 +385,53 @@ impl Service { None => Ok(None), } }) - .try_fold(0, |mut sum: usize, (block_range, log_chunk)| async move { - let mut cache = service.deposits().write(); - - log_chunk - .into_iter() - .map(|raw_log| { - DepositLog::from_log(&raw_log).map_err(|error| { - Error::FailedToParseDepositLog { - block_range: block_range.clone(), - error, - } - }) - }) - // Return early if any of the logs cannot be parsed. - // - // This costs an additional `collect`, however it enforces that no logs are - // imported if any one of them cannot be parsed. - .collect::, _>>()? - .into_iter() - .map(|deposit_log| { - cache - .cache - .insert_log(deposit_log) - .map_err(Error::FailedToInsertDeposit)?; - - sum += 1; - - Ok(()) - }) - // Returns if a deposit is unable to be added to the cache. - // - // If this error occurs, the cache will no longer be guaranteed to hold either - // none or all of the logs for each block (i.e., they may exist _some_ logs for - // a block, but not _all_ logs for that block). This scenario can cause the - // node to choose an invalid genesis state or propose an invalid block. - .collect::>()?; - - cache.last_processed_block = Some(block_range.end.saturating_sub(1)); - - metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64); - metrics::set_gauge( - &metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK, - cache.last_processed_block.unwrap_or_else(|| 0) as i64, - ); - - Ok(sum) - }) + .try_collect() .await?; + let mut logs_imported = 0; + for (block_range, log_chunk) in logs.iter() { + let mut cache = service.deposits().write(); + log_chunk + .into_iter() + .map(|raw_log| { + DepositLog::from_log(&raw_log).map_err(|error| Error::FailedToParseDepositLog { + block_range: block_range.clone(), + error, + }) + }) + // Return early if any of the logs cannot be parsed. + // + // This costs an additional `collect`, however it enforces that no logs are + // imported if any one of them cannot be parsed. + .collect::, _>>()? + .into_iter() + .map(|deposit_log| { + cache + .cache + .insert_log(deposit_log) + .map_err(Error::FailedToInsertDeposit)?; + + logs_imported += 1; + + Ok(()) + }) + // Returns if a deposit is unable to be added to the cache. + // + // If this error occurs, the cache will no longer be guaranteed to hold either + // none or all of the logs for each block (i.e., they may exist _some_ logs for + // a block, but not _all_ logs for that block). This scenario can cause the + // node to choose an invalid genesis state or propose an invalid block. + .collect::>()?; + + cache.last_processed_block = Some(block_range.end.saturating_sub(1)); + + metrics::set_gauge(&metrics::DEPOSIT_CACHE_LEN, cache.cache.len() as i64); + metrics::set_gauge( + &metrics::HIGHEST_PROCESSED_DEPOSIT_BLOCK, + cache.last_processed_block.unwrap_or_else(|| 0) as i64, + ); + } + if logs_imported > 0 { info!( service.log, @@ -531,9 +529,9 @@ impl Service { // Produce a stream from the list of required block numbers and return a future that // consumes the it. - let eth1_blocks = stream::try_unfold( + let eth1_blocks: Vec = stream::try_unfold( required_block_numbers.into_iter(), - |mut block_numbers| async move { + |mut block_numbers| async { match block_numbers.next() { Some(block_number) => { match download_eth1_block(service.inner.clone(), block_number).await { @@ -544,34 +542,35 @@ impl Service { None => Ok(None), } }, - ); + ) + .try_collect() + .await?; - let blocks_imported = eth1_blocks - .try_fold(0, |sum: usize, eth1_block| async move { + let mut blocks_imported = 0; + for eth1_block in eth1_blocks { + service + .inner + .block_cache + .write() + .insert_root_or_child(eth1_block) + .map_err(Error::FailedToInsertEth1Block)?; + + metrics::set_gauge( + &metrics::BLOCK_CACHE_LEN, + service.inner.block_cache.read().len() as i64, + ); + metrics::set_gauge( + &metrics::LATEST_CACHED_BLOCK_TIMESTAMP, service .inner .block_cache - .write() - .insert_root_or_child(eth1_block) - .map_err(Error::FailedToInsertEth1Block)?; + .read() + .latest_block_timestamp() + .unwrap_or_else(|| 0) as i64, + ); - metrics::set_gauge( - &metrics::BLOCK_CACHE_LEN, - service.inner.block_cache.read().len() as i64, - ); - metrics::set_gauge( - &metrics::LATEST_CACHED_BLOCK_TIMESTAMP, - service - .inner - .block_cache - .read() - .latest_block_timestamp() - .unwrap_or_else(|| 0) as i64, - ); - - Ok(sum + 1) - }) - .await?; + blocks_imported += 1; + } // Prune the block cache, preventing it from growing too large. service.inner.prune_blocks(); diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index 44291bfb17..76da63aa4a 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -143,17 +143,14 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block"); } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should update deposit cache"); - service - .update_block_cache() + Service::update_block_cache(service.clone()) .await .expect("should update block cache"); - service - .update_block_cache() + Service::update_block_cache(service.clone()) .await .expect("should update cache when nothing has changed"); @@ -205,12 +202,10 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should update deposit cache"); - service - .update_block_cache() + Service::update_block_cache(service.clone()) .await .expect("should update block cache"); @@ -251,12 +246,10 @@ mod eth1_cache { for _ in 0..cache_len / 2 { eth1.ganache.evm_mine().await.expect("should mine block") } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should update deposit cache"); - service - .update_block_cache() + Service::update_block_cache(service.clone()) .await .expect("should update block cache"); } @@ -295,12 +288,15 @@ mod eth1_cache { eth1.ganache.evm_mine().await.expect("should mine block") } futures::try_join!( - service.update_deposit_cache(), - service.update_deposit_cache() + Service::update_deposit_cache(service.clone()), + Service::update_deposit_cache(service.clone()) ) .expect("should perform two simultaneous updates of deposit cache"); - futures::try_join!(service.update_block_cache(), service.update_block_cache()) - .expect("should perform two simultaneous updates of block cache"); + futures::try_join!( + Service::update_block_cache(service.clone()), + Service::update_block_cache(service.clone()) + ) + .expect("should perform two simultaneous updates of block cache"); assert!(service.block_cache_len() >= n, "should grow the cache"); } @@ -344,13 +340,11 @@ mod deposit_tree { .expect("should perform a deposit"); } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should perform update"); - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should perform update when nothing has changed"); @@ -419,8 +413,8 @@ mod deposit_tree { } futures::try_join!( - service.update_deposit_cache(), - service.update_deposit_cache() + Service::update_deposit_cache(service.clone()), + Service::update_deposit_cache(service.clone()) ) .expect("should perform two updates concurrently"); @@ -657,8 +651,7 @@ mod fast { eth1.ganache.evm_mine().await.expect("should mine block"); } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should perform update"); @@ -725,8 +718,7 @@ mod persist { .expect("should perform a deposit"); } - service - .update_deposit_cache() + Service::update_deposit_cache(service.clone()) .await .expect("should perform update"); @@ -737,8 +729,7 @@ mod persist { let deposit_count = service.deposit_cache_len(); - service - .update_block_cache() + Service::update_block_cache(service.clone()) .await .expect("should perform update"); diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index 40aa3aec58..8d4f82fafd 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -94,9 +94,7 @@ impl Eth1GenesisService { loop { // **WARNING** `delay_for` panics on error delay_for(update_interval).await; - let update_result = self - .core - .update_deposit_cache() + let update_result = Service::update_deposit_cache(self.core.clone()) .await .map_err(|e| format!("{:?}", e)); @@ -134,7 +132,7 @@ impl Eth1GenesisService { let should_update_block_cache = *sync_blocks; if should_update_block_cache { - let update_result = self.core.update_block_cache().await; + let update_result = Service::update_block_cache(self.core.clone()).await; if let Err(e) = update_result { error!( log, diff --git a/beacon_node/src/lib.rs b/beacon_node/src/lib.rs index 20870a99a3..54a62fcd55 100644 --- a/beacon_node/src/lib.rs +++ b/beacon_node/src/lib.rs @@ -18,7 +18,6 @@ use beacon_chain::{ use clap::ArgMatches; use config::get_config; use environment::RuntimeContext; -use futures::{Future, IntoFuture}; use slog::{info, warn}; use std::ops::{Deref, DerefMut}; use types::EthSpec; @@ -51,27 +50,26 @@ impl ProductionBeaconNode { /// Identical to `start_from_client_config`, however the `client_config` is generated from the /// given `matches` and potentially configuration files on the local filesystem or other /// configurations hosted remotely. - pub fn new_from_cli<'a, 'b>( + pub async fn new_from_cli<'a, 'b>( context: RuntimeContext, matches: &ArgMatches<'b>, - ) -> impl Future + 'a { - get_config::( + ) -> Result { + let client_config = get_config::( &matches, &context.eth2_config.spec_constants, &context.eth2_config().spec, context.log.clone(), - ) - .into_future() - .and_then(move |client_config| Self::new(context, client_config)) + )?; + Self::new(context, client_config).await } /// Starts a new beacon node `Client` in the given `environment`. /// /// Client behaviour is defined by the given `client_config`. - pub fn new( + pub async fn new( context: RuntimeContext, mut client_config: ClientConfig, - ) -> impl Future { + ) -> Result { let http_eth2_config = context.eth2_config().clone(); let spec = context.eth2_config().spec.clone(); let client_config_1 = client_config.clone(); @@ -79,60 +77,56 @@ impl ProductionBeaconNode { let store_config = client_config.store.clone(); let log = context.log.clone(); - let db_path_res = client_config.create_db_path(); + let db_path = client_config.create_db_path()?; let freezer_db_path_res = client_config.create_freezer_db_path(); - db_path_res - .into_future() - .and_then(move |db_path| { - Ok(ClientBuilder::new(context.eth_spec_instance.clone()) - .runtime_context(context) - .chain_spec(spec) - .disk_store(&db_path, &freezer_db_path_res?, store_config)? - .background_migrator()?) - }) - .and_then(move |builder| builder.beacon_chain_builder(client_genesis, client_config_1)) - .and_then(move |builder| { - let builder = if client_config.sync_eth1_chain && !client_config.dummy_eth1_backend - { - info!( - log, - "Block production enabled"; - "endpoint" => &client_config.eth1.endpoint, - "method" => "json rpc via http" - ); - builder.caching_eth1_backend(client_config.eth1.clone())? - } else if client_config.dummy_eth1_backend { - warn!( - log, - "Block production impaired"; - "reason" => "dummy eth1 backend is enabled" - ); - builder.dummy_eth1_backend()? - } else { - info!( - log, - "Block production disabled"; - "reason" => "no eth1 backend configured" - ); - builder.no_eth1_backend()? - }; + let builder = ClientBuilder::new(context.eth_spec_instance.clone()) + .runtime_context(context) + .chain_spec(spec) + .disk_store(&db_path, &freezer_db_path_res?, store_config)? + .background_migrator()?; - let builder = builder - .system_time_slot_clock()? - .websocket_event_handler(client_config.websocket_server.clone())? - .build_beacon_chain()? - .network(&mut client_config.network)? - .notifier()?; + let builder = builder + .beacon_chain_builder(client_genesis, client_config_1) + .await?; + let builder = if client_config.sync_eth1_chain && !client_config.dummy_eth1_backend { + info!( + log, + "Block production enabled"; + "endpoint" => &client_config.eth1.endpoint, + "method" => "json rpc via http" + ); + builder.caching_eth1_backend(client_config.eth1.clone())? + } else if client_config.dummy_eth1_backend { + warn!( + log, + "Block production impaired"; + "reason" => "dummy eth1 backend is enabled" + ); + builder.dummy_eth1_backend()? + } else { + info!( + log, + "Block production disabled"; + "reason" => "no eth1 backend configured" + ); + builder.no_eth1_backend()? + }; - let builder = if client_config.rest_api.enabled { - builder.http_server(&client_config, &http_eth2_config)? - } else { - builder - }; + let builder = builder + .system_time_slot_clock()? + .websocket_event_handler(client_config.websocket_server.clone())? + .build_beacon_chain()? + .network(&mut client_config.network)? + .notifier()?; - Ok(Self(builder.build())) - }) + let builder = if client_config.rest_api.enabled { + builder.http_server(&client_config, &http_eth2_config)? + } else { + builder + }; + + Ok(Self(builder.build())) } pub fn into_inner(self) -> ProductionClient { diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 1ed249cba9..e6076d49f1 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -75,8 +75,13 @@ impl EnvironmentBuilder { /// /// The `Runtime` used is just the standard tokio runtime. pub fn multi_threaded_tokio_runtime(mut self) -> Result { - self.runtime = - Some(Runtime::new().map_err(|e| format!("Failed to start runtime: {:?}", e))?); + self.runtime = Some( + RuntimeBuilder::new() + .threaded_scheduler() + .enable_all() + .build() + .map_err(|e| format!("Failed to start runtime: {:?}", e))?, + ); Ok(self) } @@ -87,7 +92,8 @@ impl EnvironmentBuilder { pub fn single_thread_tokio_runtime(mut self) -> Result { self.runtime = Some( RuntimeBuilder::new() - .core_threads(1) + .basic_scheduler() + .enable_all() .build() .map_err(|e| format!("Failed to start runtime: {:?}", e))?, ); diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index 9cfcc7eeaa..e0483ef0fd 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -29,10 +29,10 @@ impl LocalBeaconNode { /// Starts a new, production beacon node on the tokio runtime in the given `context`. /// /// The node created is using the same types as the node we use in production. - pub fn production( + pub async fn production( context: RuntimeContext, mut client_config: ClientConfig, - ) -> impl Future { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempDir::new("lighthouse_node_test_rig") .expect("should create temp directory for client datadir"); @@ -40,7 +40,7 @@ impl LocalBeaconNode { client_config.data_dir = datadir.path().into(); client_config.network.network_dir = PathBuf::from(datadir.path()).join("network"); - ProductionBeaconNode::new(context, client_config).map(move |client| Self { + ProductionBeaconNode::new(context, client_config).await.map(move |client| Self { client: client.into_inner(), datadir, }) @@ -105,43 +105,43 @@ impl LocalValidatorClient { /// are created in a temp dir then removed when the process exits. /// /// The validator created is using the same types as the node we use in production. - pub fn production_with_insecure_keypairs( + pub async fn production_with_insecure_keypairs( context: RuntimeContext, mut config: ValidatorConfig, keypair_indices: &[usize], - ) -> impl Future { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempDir::new("lighthouse-beacon-node") .expect("should create temp directory for client datadir"); config.key_source = KeySource::InsecureKeypairs(keypair_indices.to_vec()); - Self::new(context, config, datadir) + Self::new(context, config, datadir).await } /// Creates a validator client that attempts to read keys from the default data dir. /// /// - The validator created is using the same types as the node we use in production. /// - It is recommended to use `production_with_insecure_keypairs` for testing. - pub fn production( + pub async fn production( context: RuntimeContext, config: ValidatorConfig, - ) -> impl Future { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempDir::new("lighthouse-validator") .expect("should create temp directory for client datadir"); - Self::new(context, config, datadir) + Self::new(context, config, datadir).await } - fn new( + async fn new( context: RuntimeContext, mut config: ValidatorConfig, datadir: TempDir, - ) -> impl Future { + ) -> Result { config.data_dir = datadir.path().into(); - ProductionValidatorClient::new(context, config).map(move |mut client| { + ProductionValidatorClient::new(context, config).await.map(move |mut client| { client .start_service() .expect("should start validator services");