diff --git a/beacon_node/genesis/Cargo.toml b/beacon_node/genesis/Cargo.toml index a5723ee6e1..106846342c 100644 --- a/beacon_node/genesis/Cargo.toml +++ b/beacon_node/genesis/Cargo.toml @@ -6,10 +6,9 @@ edition = "2018" [dev-dependencies] eth1_test_rig = { path = "../../tests/eth1_test_rig" } -futures = "0.1.25" [dependencies] -futures = "0.1.25" +futures = "0.3" types = { path = "../../eth2/types"} environment = { path = "../../lighthouse/environment"} eth1 = { path = "../eth1"} @@ -19,10 +18,10 @@ merkle_proof = { path = "../../eth2/utils/merkle_proof" } eth2_ssz = "0.1" eth2_hashing = { path = "../../eth2/utils/eth2_hashing" } tree_hash = "0.1" -tokio = "0.1.22" +tokio = {version = "0.2", features = ["full"]} parking_lot = "0.7" slog = "^2.2.3" -exit-future = "0.1.4" +exit-future = "0.2" serde = "1.0" serde_derive = "1.0" int_to_bytes = { path = "../../eth2/utils/int_to_bytes" } diff --git a/beacon_node/genesis/src/eth1_genesis_service.rs b/beacon_node/genesis/src/eth1_genesis_service.rs index 85f7de8d4e..b6a94029bf 100644 --- a/beacon_node/genesis/src/eth1_genesis_service.rs +++ b/beacon_node/genesis/src/eth1_genesis_service.rs @@ -2,11 +2,6 @@ pub use crate::{common::genesis_deposits, interop::interop_genesis_state}; pub use eth1::Config as Eth1Config; use eth1::{DepositLog, Eth1Block, Service}; -use futures::{ - future, - future::{loop_fn, Loop}, - Future, -}; use parking_lot::Mutex; use slog::{debug, error, info, trace, Logger}; use state_processing::{ @@ -14,8 +9,8 @@ use state_processing::{ per_block_processing::process_deposit, process_activations, }; use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::timer::Delay; +use std::time::Duration; +use tokio::time::delay_for; use types::{BeaconState, ChainSpec, Deposit, Eth1Data, EthSpec, Hash256}; /// Provides a service that connects to some Eth1 HTTP JSON-RPC endpoint and maintains a cache of eth1 @@ -87,117 +82,85 @@ impl Eth1GenesisService { /// /// - `Ok(state)` once the canonical eth2 genesis state has been discovered. /// - `Err(e)` if there is some internal error during updates. - pub fn wait_for_genesis_state( + pub async fn wait_for_genesis_state( &self, update_interval: Duration, spec: ChainSpec, - ) -> impl Future, Error = String> { + ) -> Result, String> { let service = self.clone(); + let log = service.core.log.clone(); + let min_genesis_active_validator_count = spec.min_genesis_active_validator_count; + let min_genesis_time = spec.min_genesis_time; + loop { + // **WARNING** `delay_for` panics on error + delay_for(update_interval).await; + let update_result = self + .core + .update_deposit_cache() + .await + .map_err(|e| format!("{:?}", e)); - loop_fn::<(ChainSpec, Option>), _, _, _>( - (spec, None), - move |(spec, state)| { - let service_1 = service.clone(); - let service_2 = service.clone(); - let service_3 = service.clone(); - let service_4 = service.clone(); - let log = service.core.log.clone(); - let min_genesis_active_validator_count = spec.min_genesis_active_validator_count; - let min_genesis_time = spec.min_genesis_time; + if let Err(e) = update_result { + error!( + log, + "Failed to update eth1 deposit cache"; + "error" => e + ) + } - Delay::new(Instant::now() + update_interval) - .map_err(|e| format!("Delay between genesis deposit checks failed: {:?}", e)) - .and_then(move |()| { - service_1 - .core - .update_deposit_cache() - .map_err(|e| format!("{:?}", e)) - }) - .then(move |update_result| { - if let Err(e) = update_result { - error!( - log, - "Failed to update eth1 deposit cache"; - "error" => e - ) - } + // Do not exit the loop if there is an error whilst updating. + // Only enable the `sync_blocks` flag if there are enough deposits to feasibly + // trigger genesis. + // + // Note: genesis is triggered by the _active_ validator count, not just the + // deposit count, so it's possible that block downloads are started too early. + // This is just wasteful, not erroneous. + let mut sync_blocks = self.sync_blocks.lock(); - // Do not exit the loop if there is an error whilst updating. - Ok(()) - }) - // Only enable the `sync_blocks` flag if there are enough deposits to feasibly - // trigger genesis. - // - // Note: genesis is triggered by the _active_ validator count, not just the - // deposit count, so it's possible that block downloads are started too early. - // This is just wasteful, not erroneous. - .and_then(move |()| { - let mut sync_blocks = service_2.sync_blocks.lock(); + if !(*sync_blocks) { + if let Some(viable_eth1_block) = + self.first_viable_eth1_block(min_genesis_active_validator_count as usize) + { + info!( + log, + "Minimum genesis deposit count met"; + "deposit_count" => min_genesis_active_validator_count, + "block_number" => viable_eth1_block, + ); + self.core.set_lowest_cached_block(viable_eth1_block); + *sync_blocks = true + } + } - if !(*sync_blocks) { - if let Some(viable_eth1_block) = service_2.first_viable_eth1_block( - min_genesis_active_validator_count as usize, - ) { - info!( - service_2.core.log, - "Minimum genesis deposit count met"; - "deposit_count" => min_genesis_active_validator_count, - "block_number" => viable_eth1_block, - ); - service_2.core.set_lowest_cached_block(viable_eth1_block); - *sync_blocks = true - } - } - - Ok(*sync_blocks) - }) - .and_then(move |should_update_block_cache| { - let maybe_update_future: Box + Send> = - if should_update_block_cache { - Box::new(service_3.core.update_block_cache().then( - move |update_result| { - if let Err(e) = update_result { - error!( - service_3.core.log, - "Failed to update eth1 block cache"; - "error" => format!("{:?}", e) - ); - } - - // Do not exit the loop if there is an error whilst updating. - Ok(()) - }, - )) - } else { - Box::new(future::ok(())) - }; - - maybe_update_future - }) - .and_then(move |()| { - if let Some(genesis_state) = service_4 - .scan_new_blocks::(&spec) - .map_err(|e| format!("Failed to scan for new blocks: {}", e))? - { - Ok(Loop::Break((spec, genesis_state))) - } else { - debug!( - service_4.core.log, - "No eth1 genesis block found"; - "latest_block_timestamp" => service_4.core.latest_block_timestamp(), - "min_genesis_time" => min_genesis_time, - "min_validator_count" => min_genesis_active_validator_count, - "cached_blocks" => service_4.core.block_cache_len(), - "cached_deposits" => service_4.core.deposit_cache_len(), - "cache_head" => service_4.highest_known_block(), - ); - - Ok(Loop::Continue((spec, state))) - } - }) - }, - ) - .map(|(_spec, state)| state) + let should_update_block_cache = *sync_blocks; + if should_update_block_cache { + let update_result = self.core.update_block_cache().await; + if let Err(e) = update_result { + error!( + log, + "Failed to update eth1 block cache"; + "error" => format!("{:?}", e) + ); + } + }; + if let Some(genesis_state) = self + .scan_new_blocks::(&spec) + .map_err(|e| format!("Failed to scan for new blocks: {}", e))? + { + break Ok(genesis_state); + } else { + debug!( + log, + "No eth1 genesis block found"; + "latest_block_timestamp" => self.core.latest_block_timestamp(), + "min_genesis_time" => min_genesis_time, + "min_validator_count" => min_genesis_active_validator_count, + "cached_blocks" => self.core.block_cache_len(), + "cached_deposits" => self.core.deposit_cache_len(), + "cache_head" => self.highest_known_block(), + ); + } + } } /// Processes any new blocks that have appeared since this function was last run.