diff --git a/account_manager/Cargo.toml b/account_manager/Cargo.toml index cd231552a9..d30f67e261 100644 --- a/account_manager/Cargo.toml +++ b/account_manager/Cargo.toml @@ -25,6 +25,7 @@ validator_client = { path = "../validator_client" } rayon = "1.3.0" eth2_testnet_config = { path = "../eth2/utils/eth2_testnet_config" } web3 = "0.10.0" -futures = "0.3.4" +futures = {version = "0.3.4", features = ["compat"]} clap_utils = { path = "../eth2/utils/clap_utils" } -tokio = "0.2.20" +# reduce feature set +tokio = {version = "0.2.20", features = ["full"]} diff --git a/account_manager/src/deposits.rs b/account_manager/src/deposits.rs index f99840eeee..68db8e9d02 100644 --- a/account_manager/src/deposits.rs +++ b/account_manager/src/deposits.rs @@ -1,15 +1,11 @@ use clap::{App, Arg, ArgMatches}; use clap_utils; use environment::Environment; -use futures::{ - future::{self, Loop}, - Future, -}; +use futures::compat::Future01CompatExt; use slog::{info, Logger}; use std::fs; use std::path::PathBuf; -use std::time::{Duration, Instant}; -use tokio::time::Delay; +use tokio::time::{delay_until, Duration, Instant}; use types::EthSpec; use validator_client::validator_directory::ValidatorDirectoryBuilder; use web3::{ @@ -80,7 +76,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { ) } -pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Result<(), String> { +pub fn cli_run( + matches: &ArgMatches<'_>, + mut env: Environment, +) -> Result<(), String> { let spec = env.core_context().eth2_config.spec; let log = env.core_context().log; @@ -138,12 +137,13 @@ pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Res let tx_hash_log = log.clone(); env.runtime() - .block_on( + .block_on(async { ValidatorDirectoryBuilder::default() .spec(spec.clone()) .custom_deposit_amount(deposit_gwei) .thread_random_keypairs() .submit_eth1_deposit(web3.clone(), from_address, deposit_contract) + .await .map(move |(builder, tx_hash)| { info!( tx_hash_log, @@ -152,8 +152,8 @@ pub fn cli_run(matches: &ArgMatches, mut env: Environment) -> Res "index" => format!("{}/{}", i + 1, n), ); builder - }), - )? + }) + })? .create_directory(validator_dir.clone())? .write_keypair_files()? .write_eth1_data_file()? @@ -183,73 +183,59 @@ fn existing_validator_count(validator_dir: &PathBuf) -> Result { } /// Run a poll on the `eth_syncing` endpoint, blocking until the node is synced. -fn poll_until_synced(web3: Web3, log: Logger) -> impl Future + Send +async fn poll_until_synced(web3: Web3, log: Logger) -> Result<(), String> where T: Transport + Send + 'static, ::Out: Send, { - loop_fn((web3.clone(), log.clone()), move |(web3, log)| { - web3.clone() + loop { + let sync_state = web3 + .clone() .eth() .syncing() - .map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e)) - .and_then::<_, Box + Send>>(move |sync_state| { - match sync_state { - SyncState::Syncing(SyncInfo { - current_block, - highest_block, - .. - }) => { - info!( - log, - "Waiting for eth1 node to sync"; - "est_highest_block" => format!("{}", highest_block), - "current_block" => format!("{}", current_block), - ); + .compat() + .await + .map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e))?; + match sync_state { + SyncState::Syncing(SyncInfo { + current_block, + highest_block, + .. + }) => { + info!( + log, + "Waiting for eth1 node to sync"; + "est_highest_block" => format!("{}", highest_block), + "current_block" => format!("{}", current_block), + ); - Box::new( - Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY) - .map_err(|e| format!("Failed to trigger delay: {:?}", e)) - .and_then(|_| future::ok(Loop::Continue((web3, log)))), - ) - } - SyncState::NotSyncing => Box::new( - web3.clone() - .eth() - .block_number() - .map_err(|e| { - format!("Unable to read block number from eth1 node: {:?}", e) - }) - .and_then::<_, Box + Send>>( - |block_number| { - if block_number > 0.into() { - info!( - log, - "Eth1 node is synced"; - "head_block" => format!("{}", block_number), - ); - Box::new(future::ok(Loop::Break((web3, log)))) - } else { - Box::new( - Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY) - .map_err(|e| { - format!("Failed to trigger delay: {:?}", e) - }) - .and_then(|_| { - info!( - log, - "Waiting for eth1 node to sync"; - "current_block" => 0, - ); - future::ok(Loop::Continue((web3, log))) - }), - ) - } - }, - ), - ), + delay_until(Instant::now() + SYNCING_STATE_RETRY_DELAY).await; + } + SyncState::NotSyncing => { + let block_number = web3 + .clone() + .eth() + .block_number() + .compat() + .await + .map_err(|e| format!("Unable to read block number from eth1 node: {:?}", e))?; + if block_number > 0.into() { + info!( + log, + "Eth1 node is synced"; + "head_block" => format!("{}", block_number), + ); + break; + } else { + delay_until(Instant::now() + SYNCING_STATE_RETRY_DELAY).await; + info!( + log, + "Waiting for eth1 node to sync"; + "current_block" => 0, + ); } - }) - }) - .map(|_| ()) + } + } + } + Ok(()) } diff --git a/account_manager/src/lib.rs b/account_manager/src/lib.rs index 4f3c80ec76..d1bad5f2c6 100644 --- a/account_manager/src/lib.rs +++ b/account_manager/src/lib.rs @@ -5,7 +5,8 @@ use clap::ArgMatches; use deposit_contract::DEPOSIT_GAS; use environment::{Environment, RuntimeContext}; use eth2_testnet_config::Eth2TestnetConfig; -use futures::{future, Future, IntoFuture, Stream}; +use futures::compat::Future01CompatExt; +use futures::{FutureExt, StreamExt}; use rayon::prelude::*; use slog::{error, info, Logger}; use std::fs; @@ -23,7 +24,10 @@ use web3::{ pub use cli::cli_app; /// Run the account manager, returning an error if the operation did not succeed. -pub fn run(matches: &ArgMatches, mut env: Environment) -> Result<(), String> { +pub fn run( + matches: &ArgMatches<'_>, + mut env: Environment, +) -> Result<(), String> { let context = env.core_context(); let log = context.log.clone(); @@ -292,7 +296,7 @@ fn make_validators( /// /// Returns success as soon as the eth1 endpoint accepts the transaction (i.e., does not wait for /// transaction success/revert). -fn deposit_validators( +async fn deposit_validators( context: RuntimeContext, eth1_endpoint: String, deposit_contract: Address, @@ -300,156 +304,152 @@ fn deposit_validators( account_index: usize, deposit_value: u64, password: Option, -) -> impl Future { +) -> Result<(), ()> { let log_1 = context.log.clone(); let log_2 = context.log.clone(); - Http::new(ð1_endpoint) - .map_err(move |e| { - error!( - log_1, - "Failed to start web3 HTTP transport"; - "error" => format!("{:?}", e) + let (event_loop, transport) = Http::new(ð1_endpoint).map_err(move |e| { + error!( + log_1, + "Failed to start web3 HTTP transport"; + "error" => format!("{:?}", e) + ) + })?; + /* + * Loop through the validator directories and submit the deposits. + */ + let web3 = Web3::new(transport); + + let _ = futures::stream::iter(validators) + .for_each(|validator| async { + let web3 = web3.clone(); + let log = log_2.clone(); + let password = password.clone(); + + let _ = deposit_validator( + web3, + deposit_contract, + validator, + deposit_value, + account_index, + password, + log, ) + .await; }) - .into_future() - /* - * Loop through the validator directories and submit the deposits. - */ - .and_then(move |(event_loop, transport)| { - let web3 = Web3::new(transport); - - futures::stream::iter_ok(validators) - .for_each(move |validator| { - let web3 = web3.clone(); - let log = log_2.clone(); - let password = password.clone(); - - deposit_validator( - web3, - deposit_contract, - &validator, - deposit_value, - account_index, - password, - log, - ) - }) - .map(|_| event_loop) - }) - // Web3 gives errors if the event loop is dropped whilst performing requests. - .map(drop) + .map(|_| event_loop) + // // Web3 gives errors if the event loop is dropped whilst performing requests. + .map(drop); + Ok(()) } /// For the given `ValidatorDirectory`, submit a deposit transaction to the `web3` node. /// /// Returns success as soon as the eth1 endpoint accepts the transaction (i.e., does not wait for /// transaction success/revert). -fn deposit_validator( +async fn deposit_validator( web3: Web3, deposit_contract: Address, - validator: &ValidatorDirectory, + validator: ValidatorDirectory, deposit_amount: u64, account_index: usize, password_opt: Option, log: Logger, -) -> impl Future { - validator +) -> Result<(), ()> { + let voting_keypair = validator .voting_keypair .clone() - .ok_or_else(|| error!(log, "Validator does not have voting keypair")) - .and_then(|voting_keypair| { - validator - .deposit_data - .clone() - .ok_or_else(|| error!(log, "Validator does not have deposit data")) - .map(|deposit_data| (voting_keypair, deposit_data)) - }) - .into_future() - .and_then(move |(voting_keypair, deposit_data)| { - let pubkey_1 = voting_keypair.pk.clone(); - let pubkey_2 = voting_keypair.pk; + .ok_or_else(|| error!(log, "Validator does not have voting keypair"))?; - let web3_1 = web3.clone(); - let web3_2 = web3.clone(); + let deposit_data = validator + .deposit_data + .clone() + .ok_or_else(|| error!(log, "Validator does not have deposit data"))?; - let log_1 = log.clone(); - let log_2 = log.clone(); + let pubkey_1 = voting_keypair.pk.clone(); + let pubkey_2 = voting_keypair.pk; - web3.eth() - .accounts() - .map_err(|e| format!("Failed to get accounts: {:?}", e)) - .and_then(move |accounts| { - accounts - .get(account_index) - .cloned() - .ok_or_else(|| "Insufficient accounts for deposit".to_string()) - }) - /* - * If a password was supplied, unlock the account. - */ - .and_then(move |from_address| { - let future: Box + Send> = - if let Some(password) = password_opt { - // Unlock for only a single transaction. - let duration = None; + let log_1 = log.clone(); + let log_2 = log.clone(); - let future = web3_1 - .personal() - .unlock_account(from_address, &password, duration) - .then(move |result| match result { - Ok(true) => Ok(from_address), - Ok(false) => { - Err("Eth1 node refused to unlock account. Check password." - .to_string()) - } - Err(e) => Err(format!("Eth1 unlock request failed: {:?}", e)), - }); + // TODO: creating a future to extract the Error type + // check if there's a better way + let future = async move { + let accounts = web3 + .eth() + .accounts() + .compat() + .await + .map_err(|e| format!("Failed to get accounts: {:?}", e))?; - Box::new(future) - } else { - Box::new(future::ok(from_address)) - }; + let from_address = accounts + .get(account_index) + .cloned() + .ok_or_else(|| "Insufficient accounts for deposit".to_string())?; - future - }) - /* - * Submit the deposit transaction. - */ - .and_then(move |from| { - let tx_request = TransactionRequest { - from, - to: Some(deposit_contract), - gas: Some(U256::from(DEPOSIT_GAS)), - gas_price: None, - value: Some(from_gwei(deposit_amount)), - data: Some(deposit_data.into()), - nonce: None, - condition: None, - }; + /* + * If a password was supplied, unlock the account. + */ + let from = if let Some(password) = password_opt { + // Unlock for only a single transaction. + let duration = None; - web3_2 - .eth() - .send_transaction(tx_request) - .map_err(|e| format!("Failed to call deposit fn: {:?}", e)) - }) - .map(move |tx| { - info!( - log_1, - "Validator deposit successful"; - "eth1_tx_hash" => format!("{:?}", tx), - "validator_voting_pubkey" => format!("{:?}", pubkey_1) + let result = web3 + .personal() + .unlock_account(from_address, &password, duration) + .compat() + .await; + match result { + Ok(true) => from_address, + Ok(false) => { + return Err::<(), String>( + "Eth1 node refused to unlock account. Check password.".to_string(), ) - }) - .map_err(move |e| { - error!( - log_2, - "Validator deposit_failed"; - "error" => e, - "validator_voting_pubkey" => format!("{:?}", pubkey_2) - ) - }) - }) + } + Err(e) => return Err::<(), String>(format!("Eth1 unlock request failed: {:?}", e)), + } + } else { + from_address + }; + + /* + * Submit the deposit transaction. + */ + let tx_request = TransactionRequest { + from, + to: Some(deposit_contract), + gas: Some(U256::from(DEPOSIT_GAS)), + gas_price: None, + value: Some(from_gwei(deposit_amount)), + data: Some(deposit_data.into()), + nonce: None, + condition: None, + }; + + let tx = web3 + .eth() + .send_transaction(tx_request) + .compat() + .await + .map_err(|e| format!("Failed to call deposit fn: {:?}", e))?; + info!( + log_1, + "Validator deposit successful"; + "eth1_tx_hash" => format!("{:?}", tx), + "validator_voting_pubkey" => format!("{:?}", pubkey_1) + ); + Ok(()) + }; + + future.await.map_err(move |e| { + error!( + log_2, + "Validator deposit_failed"; + "error" => e, + "validator_voting_pubkey" => format!("{:?}", pubkey_2) + ); + })?; + Ok(()) } /// Converts gwei to wei. diff --git a/validator_client/src/validator_directory.rs b/validator_client/src/validator_directory.rs index 35bf580d89..23643b6fe1 100644 --- a/validator_client/src/validator_directory.rs +++ b/validator_client/src/validator_directory.rs @@ -304,11 +304,11 @@ impl ValidatorDirectoryBuilder { } pub async fn submit_eth1_deposit( - &self, + self, web3: Web3, from: Address, deposit_contract: Address, - ) -> Result { + ) -> Result<(Self, Hash256), String> { let (deposit_data, deposit_amount) = self.get_deposit_data()?; web3.eth() .send_transaction(TransactionRequest { @@ -324,6 +324,7 @@ impl ValidatorDirectoryBuilder { .compat() .await .map_err(|e| format!("Failed to send transaction: {:?}", e)) + .map(|tx| (self, tx)) } pub fn build(self) -> Result {