Port account_manager to stable futures (#1121)

* Port account_manager to stable futures

* Run async fns in tokio environment
This commit is contained in:
Pawan Dhananjay
2020-05-08 07:19:36 +05:30
committed by GitHub
parent 1f2acad0df
commit 8e81ea911d
4 changed files with 189 additions and 201 deletions

View File

@@ -25,6 +25,7 @@ validator_client = { path = "../validator_client" }
rayon = "1.3.0"
eth2_testnet_config = { path = "../eth2/utils/eth2_testnet_config" }
web3 = "0.10.0"
futures = "0.3.4"
futures = {version = "0.3.4", features = ["compat"]}
clap_utils = { path = "../eth2/utils/clap_utils" }
tokio = "0.2.20"
# reduce feature set
tokio = {version = "0.2.20", features = ["full"]}

View File

@@ -1,15 +1,11 @@
use clap::{App, Arg, ArgMatches};
use clap_utils;
use environment::Environment;
use futures::{
future::{self, Loop},
Future,
};
use futures::compat::Future01CompatExt;
use slog::{info, Logger};
use std::fs;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::time::Delay;
use tokio::time::{delay_until, Duration, Instant};
use types::EthSpec;
use validator_client::validator_directory::ValidatorDirectoryBuilder;
use web3::{
@@ -80,7 +76,10 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
)
}
pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Result<(), String> {
pub fn cli_run<T: EthSpec>(
matches: &ArgMatches<'_>,
mut env: Environment<T>,
) -> Result<(), String> {
let spec = env.core_context().eth2_config.spec;
let log = env.core_context().log;
@@ -138,12 +137,13 @@ pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Res
let tx_hash_log = log.clone();
env.runtime()
.block_on(
.block_on(async {
ValidatorDirectoryBuilder::default()
.spec(spec.clone())
.custom_deposit_amount(deposit_gwei)
.thread_random_keypairs()
.submit_eth1_deposit(web3.clone(), from_address, deposit_contract)
.await
.map(move |(builder, tx_hash)| {
info!(
tx_hash_log,
@@ -152,8 +152,8 @@ pub fn cli_run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Res
"index" => format!("{}/{}", i + 1, n),
);
builder
}),
)?
})
})?
.create_directory(validator_dir.clone())?
.write_keypair_files()?
.write_eth1_data_file()?
@@ -183,73 +183,59 @@ fn existing_validator_count(validator_dir: &PathBuf) -> Result<usize, String> {
}
/// Run a poll on the `eth_syncing` endpoint, blocking until the node is synced.
fn poll_until_synced<T>(web3: Web3<T>, log: Logger) -> impl Future<Item = (), Error = String> + Send
async fn poll_until_synced<T>(web3: Web3<T>, log: Logger) -> Result<(), String>
where
T: Transport + Send + 'static,
<T as Transport>::Out: Send,
{
loop_fn((web3.clone(), log.clone()), move |(web3, log)| {
web3.clone()
loop {
let sync_state = web3
.clone()
.eth()
.syncing()
.map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e))
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(move |sync_state| {
match sync_state {
SyncState::Syncing(SyncInfo {
current_block,
highest_block,
..
}) => {
info!(
log,
"Waiting for eth1 node to sync";
"est_highest_block" => format!("{}", highest_block),
"current_block" => format!("{}", current_block),
);
.compat()
.await
.map_err(|e| format!("Unable to read syncing state from eth1 node: {:?}", e))?;
match sync_state {
SyncState::Syncing(SyncInfo {
current_block,
highest_block,
..
}) => {
info!(
log,
"Waiting for eth1 node to sync";
"est_highest_block" => format!("{}", highest_block),
"current_block" => format!("{}", current_block),
);
Box::new(
Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY)
.map_err(|e| format!("Failed to trigger delay: {:?}", e))
.and_then(|_| future::ok(Loop::Continue((web3, log)))),
)
}
SyncState::NotSyncing => Box::new(
web3.clone()
.eth()
.block_number()
.map_err(|e| {
format!("Unable to read block number from eth1 node: {:?}", e)
})
.and_then::<_, Box<dyn Future<Item = _, Error = _> + Send>>(
|block_number| {
if block_number > 0.into() {
info!(
log,
"Eth1 node is synced";
"head_block" => format!("{}", block_number),
);
Box::new(future::ok(Loop::Break((web3, log))))
} else {
Box::new(
Delay::new(Instant::now() + SYNCING_STATE_RETRY_DELAY)
.map_err(|e| {
format!("Failed to trigger delay: {:?}", e)
})
.and_then(|_| {
info!(
log,
"Waiting for eth1 node to sync";
"current_block" => 0,
);
future::ok(Loop::Continue((web3, log)))
}),
)
}
},
),
),
delay_until(Instant::now() + SYNCING_STATE_RETRY_DELAY).await;
}
SyncState::NotSyncing => {
let block_number = web3
.clone()
.eth()
.block_number()
.compat()
.await
.map_err(|e| format!("Unable to read block number from eth1 node: {:?}", e))?;
if block_number > 0.into() {
info!(
log,
"Eth1 node is synced";
"head_block" => format!("{}", block_number),
);
break;
} else {
delay_until(Instant::now() + SYNCING_STATE_RETRY_DELAY).await;
info!(
log,
"Waiting for eth1 node to sync";
"current_block" => 0,
);
}
})
})
.map(|_| ())
}
}
}
Ok(())
}

View File

@@ -5,7 +5,8 @@ use clap::ArgMatches;
use deposit_contract::DEPOSIT_GAS;
use environment::{Environment, RuntimeContext};
use eth2_testnet_config::Eth2TestnetConfig;
use futures::{future, Future, IntoFuture, Stream};
use futures::compat::Future01CompatExt;
use futures::{FutureExt, StreamExt};
use rayon::prelude::*;
use slog::{error, info, Logger};
use std::fs;
@@ -23,7 +24,10 @@ use web3::{
pub use cli::cli_app;
/// Run the account manager, returning an error if the operation did not succeed.
pub fn run<T: EthSpec>(matches: &ArgMatches, mut env: Environment<T>) -> Result<(), String> {
pub fn run<T: EthSpec>(
matches: &ArgMatches<'_>,
mut env: Environment<T>,
) -> Result<(), String> {
let context = env.core_context();
let log = context.log.clone();
@@ -292,7 +296,7 @@ fn make_validators(
///
/// Returns success as soon as the eth1 endpoint accepts the transaction (i.e., does not wait for
/// transaction success/revert).
fn deposit_validators<E: EthSpec>(
async fn deposit_validators<E: EthSpec>(
context: RuntimeContext<E>,
eth1_endpoint: String,
deposit_contract: Address,
@@ -300,156 +304,152 @@ fn deposit_validators<E: EthSpec>(
account_index: usize,
deposit_value: u64,
password: Option<String>,
) -> impl Future<Item = (), Error = ()> {
) -> Result<(), ()> {
let log_1 = context.log.clone();
let log_2 = context.log.clone();
Http::new(&eth1_endpoint)
.map_err(move |e| {
error!(
log_1,
"Failed to start web3 HTTP transport";
"error" => format!("{:?}", e)
let (event_loop, transport) = Http::new(&eth1_endpoint).map_err(move |e| {
error!(
log_1,
"Failed to start web3 HTTP transport";
"error" => format!("{:?}", e)
)
})?;
/*
* Loop through the validator directories and submit the deposits.
*/
let web3 = Web3::new(transport);
let _ = futures::stream::iter(validators)
.for_each(|validator| async {
let web3 = web3.clone();
let log = log_2.clone();
let password = password.clone();
let _ = deposit_validator(
web3,
deposit_contract,
validator,
deposit_value,
account_index,
password,
log,
)
.await;
})
.into_future()
/*
* Loop through the validator directories and submit the deposits.
*/
.and_then(move |(event_loop, transport)| {
let web3 = Web3::new(transport);
futures::stream::iter_ok(validators)
.for_each(move |validator| {
let web3 = web3.clone();
let log = log_2.clone();
let password = password.clone();
deposit_validator(
web3,
deposit_contract,
&validator,
deposit_value,
account_index,
password,
log,
)
})
.map(|_| event_loop)
})
// Web3 gives errors if the event loop is dropped whilst performing requests.
.map(drop)
.map(|_| event_loop)
// // Web3 gives errors if the event loop is dropped whilst performing requests.
.map(drop);
Ok(())
}
/// For the given `ValidatorDirectory`, submit a deposit transaction to the `web3` node.
///
/// Returns success as soon as the eth1 endpoint accepts the transaction (i.e., does not wait for
/// transaction success/revert).
fn deposit_validator(
async fn deposit_validator(
web3: Web3<Http>,
deposit_contract: Address,
validator: &ValidatorDirectory,
validator: ValidatorDirectory,
deposit_amount: u64,
account_index: usize,
password_opt: Option<String>,
log: Logger,
) -> impl Future<Item = (), Error = ()> {
validator
) -> Result<(), ()> {
let voting_keypair = validator
.voting_keypair
.clone()
.ok_or_else(|| error!(log, "Validator does not have voting keypair"))
.and_then(|voting_keypair| {
validator
.deposit_data
.clone()
.ok_or_else(|| error!(log, "Validator does not have deposit data"))
.map(|deposit_data| (voting_keypair, deposit_data))
})
.into_future()
.and_then(move |(voting_keypair, deposit_data)| {
let pubkey_1 = voting_keypair.pk.clone();
let pubkey_2 = voting_keypair.pk;
.ok_or_else(|| error!(log, "Validator does not have voting keypair"))?;
let web3_1 = web3.clone();
let web3_2 = web3.clone();
let deposit_data = validator
.deposit_data
.clone()
.ok_or_else(|| error!(log, "Validator does not have deposit data"))?;
let log_1 = log.clone();
let log_2 = log.clone();
let pubkey_1 = voting_keypair.pk.clone();
let pubkey_2 = voting_keypair.pk;
web3.eth()
.accounts()
.map_err(|e| format!("Failed to get accounts: {:?}", e))
.and_then(move |accounts| {
accounts
.get(account_index)
.cloned()
.ok_or_else(|| "Insufficient accounts for deposit".to_string())
})
/*
* If a password was supplied, unlock the account.
*/
.and_then(move |from_address| {
let future: Box<dyn Future<Item = Address, Error = String> + Send> =
if let Some(password) = password_opt {
// Unlock for only a single transaction.
let duration = None;
let log_1 = log.clone();
let log_2 = log.clone();
let future = web3_1
.personal()
.unlock_account(from_address, &password, duration)
.then(move |result| match result {
Ok(true) => Ok(from_address),
Ok(false) => {
Err("Eth1 node refused to unlock account. Check password."
.to_string())
}
Err(e) => Err(format!("Eth1 unlock request failed: {:?}", e)),
});
// TODO: creating a future to extract the Error type
// check if there's a better way
let future = async move {
let accounts = web3
.eth()
.accounts()
.compat()
.await
.map_err(|e| format!("Failed to get accounts: {:?}", e))?;
Box::new(future)
} else {
Box::new(future::ok(from_address))
};
let from_address = accounts
.get(account_index)
.cloned()
.ok_or_else(|| "Insufficient accounts for deposit".to_string())?;
future
})
/*
* Submit the deposit transaction.
*/
.and_then(move |from| {
let tx_request = TransactionRequest {
from,
to: Some(deposit_contract),
gas: Some(U256::from(DEPOSIT_GAS)),
gas_price: None,
value: Some(from_gwei(deposit_amount)),
data: Some(deposit_data.into()),
nonce: None,
condition: None,
};
/*
* If a password was supplied, unlock the account.
*/
let from = if let Some(password) = password_opt {
// Unlock for only a single transaction.
let duration = None;
web3_2
.eth()
.send_transaction(tx_request)
.map_err(|e| format!("Failed to call deposit fn: {:?}", e))
})
.map(move |tx| {
info!(
log_1,
"Validator deposit successful";
"eth1_tx_hash" => format!("{:?}", tx),
"validator_voting_pubkey" => format!("{:?}", pubkey_1)
let result = web3
.personal()
.unlock_account(from_address, &password, duration)
.compat()
.await;
match result {
Ok(true) => from_address,
Ok(false) => {
return Err::<(), String>(
"Eth1 node refused to unlock account. Check password.".to_string(),
)
})
.map_err(move |e| {
error!(
log_2,
"Validator deposit_failed";
"error" => e,
"validator_voting_pubkey" => format!("{:?}", pubkey_2)
)
})
})
}
Err(e) => return Err::<(), String>(format!("Eth1 unlock request failed: {:?}", e)),
}
} else {
from_address
};
/*
* Submit the deposit transaction.
*/
let tx_request = TransactionRequest {
from,
to: Some(deposit_contract),
gas: Some(U256::from(DEPOSIT_GAS)),
gas_price: None,
value: Some(from_gwei(deposit_amount)),
data: Some(deposit_data.into()),
nonce: None,
condition: None,
};
let tx = web3
.eth()
.send_transaction(tx_request)
.compat()
.await
.map_err(|e| format!("Failed to call deposit fn: {:?}", e))?;
info!(
log_1,
"Validator deposit successful";
"eth1_tx_hash" => format!("{:?}", tx),
"validator_voting_pubkey" => format!("{:?}", pubkey_1)
);
Ok(())
};
future.await.map_err(move |e| {
error!(
log_2,
"Validator deposit_failed";
"error" => e,
"validator_voting_pubkey" => format!("{:?}", pubkey_2)
);
})?;
Ok(())
}
/// Converts gwei to wei.

View File

@@ -304,11 +304,11 @@ impl ValidatorDirectoryBuilder {
}
pub async fn submit_eth1_deposit<T: Transport>(
&self,
self,
web3: Web3<T>,
from: Address,
deposit_contract: Address,
) -> Result<Hash256, String> {
) -> Result<(Self, Hash256), String> {
let (deposit_data, deposit_amount) = self.get_deposit_data()?;
web3.eth()
.send_transaction(TransactionRequest {
@@ -324,6 +324,7 @@ impl ValidatorDirectoryBuilder {
.compat()
.await
.map_err(|e| format!("Failed to send transaction: {:?}", e))
.map(|tx| (self, tx))
}
pub fn build(self) -> Result<ValidatorDirectory, String> {