Port eth1_test_rig to stable futures

This commit is contained in:
pawan
2020-02-29 23:31:19 +05:30
committed by Age Manning
parent a984772475
commit 546713f5e1
3 changed files with 91 additions and 107 deletions

View File

@@ -6,8 +6,8 @@ edition = "2018"
[dependencies] [dependencies]
web3 = "0.8.0" web3 = "0.8.0"
tokio = "0.1.22" tokio = { version = "0.2", features = ["time"] }
futures = "0.1.25" futures = { version = "0.3", features = ["compat"] }
types = { path = "../../eth2/types"} types = { path = "../../eth2/types"}
serde_json = "1.0" serde_json = "1.0"
deposit_contract = { path = "../../eth2/utils/deposit_contract"} deposit_contract = { path = "../../eth2/utils/deposit_contract"}

View File

@@ -1,4 +1,4 @@
use futures::Future; use futures::compat::Future01CompatExt;
use serde_json::json; use serde_json::json;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::BufReader; use std::io::BufReader;
@@ -98,28 +98,34 @@ impl GanacheInstance {
} }
/// Increase the timestamp on future blocks by `increase_by` seconds. /// Increase the timestamp on future blocks by `increase_by` seconds.
pub fn increase_time(&self, increase_by: u64) -> impl Future<Item = (), Error = String> { pub async fn increase_time(&self, increase_by: u64) -> Result<(), String> {
self.web3 self.web3
.transport() .transport()
.execute("evm_increaseTime", vec![json!(increase_by)]) .execute("evm_increaseTime", vec![json!(increase_by)])
.compat()
.await
.map(|_json_value| ()) .map(|_json_value| ())
.map_err(|e| format!("Failed to increase time on EVM (is this ganache?): {:?}", e)) .map_err(|e| format!("Failed to increase time on EVM (is this ganache?): {:?}", e))
} }
/// Returns the current block number, as u64 /// Returns the current block number, as u64
pub fn block_number(&self) -> impl Future<Item = u64, Error = String> { pub async fn block_number(&self) -> Result<u64, String> {
self.web3 self.web3
.eth() .eth()
.block_number() .block_number()
.compat()
.await
.map(|v| v.as_u64()) .map(|v| v.as_u64())
.map_err(|e| format!("Failed to get block number: {:?}", e)) .map_err(|e| format!("Failed to get block number: {:?}", e))
} }
/// Mines a single block. /// Mines a single block.
pub fn evm_mine(&self) -> impl Future<Item = (), Error = String> { pub async fn evm_mine(&self) -> Result<(), String> {
self.web3 self.web3
.transport() .transport()
.execute("evm_mine", vec![]) .execute("evm_mine", vec![])
.compat()
.await
.map(|_| ()) .map(|_| ())
.map_err(|_| { .map_err(|_| {
"utils should mine new block with evm_mine (only works with ganache-cli!)" "utils should mine new block with evm_mine (only works with ganache-cli!)"

View File

@@ -10,10 +10,10 @@ mod ganache;
use deposit_contract::{ use deposit_contract::{
encode_eth1_tx_data, testnet, ABI, BYTECODE, CONTRACT_DEPLOY_GAS, DEPOSIT_GAS, encode_eth1_tx_data, testnet, ABI, BYTECODE, CONTRACT_DEPLOY_GAS, DEPOSIT_GAS,
}; };
use futures::{future, stream, Future, IntoFuture, Stream}; use futures::compat::Future01CompatExt;
use ganache::GanacheInstance; use ganache::GanacheInstance;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::{runtime::Runtime, timer::Delay}; use tokio::time::delay_for;
use types::DepositData; use types::DepositData;
use types::{test_utils::generate_deterministic_keypair, EthSpec, Hash256, Keypair, Signature}; use types::{test_utils::generate_deterministic_keypair, EthSpec, Hash256, Keypair, Signature};
use web3::contract::{Contract, Options}; use web3::contract::{Contract, Options};
@@ -31,13 +31,14 @@ pub struct GanacheEth1Instance {
} }
impl GanacheEth1Instance { impl GanacheEth1Instance {
pub fn new() -> impl Future<Item = Self, Error = String> { pub async fn new() -> Result<Self, String> {
GanacheInstance::new().into_future().and_then(|ganache| { let ganache = GanacheInstance::new()?;
DepositContract::deploy(ganache.web3.clone(), 0, None).map(|deposit_contract| Self { DepositContract::deploy(ganache.web3.clone(), 0, None)
.await
.map(|deposit_contract| Self {
ganache, ganache,
deposit_contract, deposit_contract,
}) })
})
} }
pub fn endpoint(&self) -> String { pub fn endpoint(&self) -> String {
@@ -57,19 +58,19 @@ pub struct DepositContract {
} }
impl DepositContract { impl DepositContract {
pub fn deploy( pub async fn deploy(
web3: Web3<Http>, web3: Web3<Http>,
confirmations: usize, confirmations: usize,
password: Option<String>, password: Option<String>,
) -> impl Future<Item = Self, Error = String> { ) -> Result<Self, String> {
Self::deploy_bytecode(web3, confirmations, BYTECODE, ABI, password) Self::deploy_bytecode(web3, confirmations, BYTECODE, ABI, password).await
} }
pub fn deploy_testnet( pub async fn deploy_testnet(
web3: Web3<Http>, web3: Web3<Http>,
confirmations: usize, confirmations: usize,
password: Option<String>, password: Option<String>,
) -> impl Future<Item = Self, Error = String> { ) -> Result<Self, String> {
Self::deploy_bytecode( Self::deploy_bytecode(
web3, web3,
confirmations, confirmations,
@@ -77,35 +78,33 @@ impl DepositContract {
testnet::ABI, testnet::ABI,
password, password,
) )
.await
} }
fn deploy_bytecode( async fn deploy_bytecode(
web3: Web3<Http>, web3: Web3<Http>,
confirmations: usize, confirmations: usize,
bytecode: &[u8], bytecode: &[u8],
abi: &[u8], abi: &[u8],
password: Option<String>, password: Option<String>,
) -> impl Future<Item = Self, Error = String> { ) -> Result<Self, String> {
let web3_1 = web3.clone(); let address = deploy_deposit_contract(
deploy_deposit_contract(
web3.clone(), web3.clone(),
confirmations, confirmations,
bytecode.to_vec(), bytecode.to_vec(),
abi.to_vec(), abi.to_vec(),
password, password,
) )
.await
.map_err(|e| { .map_err(|e| {
format!( format!(
"Failed to deploy contract: {}. Is scripts/ganache_tests_node.sh running?.", "Failed to deploy contract: {}. Is scripts/ganache_tests_node.sh running?.",
e e
) )
}) })?;
.and_then(move |address| { Contract::from_json(web3.clone().eth(), address, ABI)
Contract::from_json(web3_1.eth(), address, ABI) .map_err(|e| format!("Failed to init contract: {:?}", e))
.map_err(|e| format!("Failed to init contract: {:?}", e)) .map(move |contract| Self { contract, web3 })
})
.map(|contract| Self { contract, web3 })
} }
/// The deposit contract's address in `0x00ab...` format. /// The deposit contract's address in `0x00ab...` format.
@@ -136,7 +135,7 @@ impl DepositContract {
/// Creates a random, valid deposit and submits it to the deposit contract. /// Creates a random, valid deposit and submits it to the deposit contract.
/// ///
/// The keypairs are created randomly and destroyed. /// The keypairs are created randomly and destroyed.
pub fn deposit_random<E: EthSpec>(&self, runtime: &mut Runtime) -> Result<(), String> { pub async fn deposit_random<E: EthSpec>(&self) -> Result<(), String> {
let keypair = Keypair::random(); let keypair = Keypair::random();
let mut deposit = DepositData { let mut deposit = DepositData {
@@ -148,21 +147,21 @@ impl DepositContract {
deposit.signature = deposit.create_signature(&keypair.sk, &E::default_spec()); deposit.signature = deposit.create_signature(&keypair.sk, &E::default_spec());
self.deposit(runtime, deposit) self.deposit(deposit).await
} }
/// Perfoms a blocking deposit. /// Perfoms a blocking deposit.
pub fn deposit(&self, runtime: &mut Runtime, deposit_data: DepositData) -> Result<(), String> { pub async fn deposit(&self, deposit_data: DepositData) -> Result<(), String> {
runtime self.deposit_async(deposit_data)
.block_on(self.deposit_async(deposit_data)) .await
.map_err(|e| format!("Deposit failed: {:?}", e)) .map_err(|e| format!("Deposit failed: {:?}", e))
} }
pub fn deposit_deterministic_async<E: EthSpec>( pub async fn deposit_deterministic_async<E: EthSpec>(
&self, &self,
keypair_index: usize, keypair_index: usize,
amount: u64, amount: u64,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
let keypair = generate_deterministic_keypair(keypair_index); let keypair = generate_deterministic_keypair(keypair_index);
let mut deposit = DepositData { let mut deposit = DepositData {
@@ -174,20 +173,17 @@ impl DepositContract {
deposit.signature = deposit.create_signature(&keypair.sk, &E::default_spec()); deposit.signature = deposit.create_signature(&keypair.sk, &E::default_spec());
self.deposit_async(deposit) self.deposit_async(deposit).await
} }
/// Performs a non-blocking deposit. /// Performs a non-blocking deposit.
pub fn deposit_async( pub async fn deposit_async(&self, deposit_data: DepositData) -> Result<(), String> {
&self, let from = self
deposit_data: DepositData, .web3
) -> impl Future<Item = (), Error = String> {
let contract = self.contract.clone();
let web3_1 = self.web3.clone();
self.web3
.eth() .eth()
.accounts() .accounts()
.compat()
.await
.map_err(|e| format!("Failed to get accounts: {:?}", e)) .map_err(|e| format!("Failed to get accounts: {:?}", e))
.and_then(|accounts| { .and_then(|accounts| {
accounts accounts
@@ -216,31 +212,18 @@ impl DepositContract {
web3_1 web3_1
.eth() .eth()
.send_transaction(tx_request) .send_transaction(tx_request)
.map_err(|e| format!("Failed to call deposit fn: {:?}", e)) .compa().await()
}) .map_err(|e| format!("Failed to call deposit fn: {:?}", e))?;
.map(|_| ()) Ok())
} }
/// Peforms many deposits, each preceded by a delay. /// Peforms many deposits, each preceded by a delay.
pub fn deposit_multiple( pub async fn deposit_multiple(&self, deposits: Vec<DelayThenDeposit>) -> Result<(), String> {
&self, for deposit in deposits.into_iter() {
deposits: Vec<DelayThenDeposit>, delay_for(deposit.delay).await;
) -> impl Future<Item = (), Error = String> { self.deposit_async(deposit.deposit).await?;
let s = self.clone(); }
stream::unfold(deposits.into_iter(), move |mut deposit_iter| { Ok(())
let s = s.clone();
match deposit_iter.next() {
Some(deposit) => Some(
Delay::new(Instant::now() + deposit.delay)
.map_err(|e| format!("Failed to execute delay: {:?}", e))
.and_then(move |_| s.deposit_async(deposit.deposit))
.map(move |yielded| (yielded, deposit_iter)),
),
None => None,
}
})
.collect()
.map(|_| ())
} }
} }
@@ -260,61 +243,56 @@ fn from_gwei(gwei: u64) -> U256 {
/// Deploys the deposit contract to the given web3 instance using the account with index /// Deploys the deposit contract to the given web3 instance using the account with index
/// `DEPLOYER_ACCOUNTS_INDEX`. /// `DEPLOYER_ACCOUNTS_INDEX`.
fn deploy_deposit_contract( async fn deploy_deposit_contract(
web3: Web3<Http>, web3: Web3<Http>,
confirmations: usize, confirmations: usize,
bytecode: Vec<u8>, bytecode: Vec<u8>,
abi: Vec<u8>, abi: Vec<u8>,
password_opt: Option<String>, password_opt: Option<String>,
) -> impl Future<Item = Address, Error = String> { ) -> Result<Address, String> {
let bytecode = String::from_utf8(bytecode).expect("bytecode must be valid utf8"); let bytecode = String::from_utf8(bytecode).expect("bytecode must be valid utf8");
let web3_1 = web3.clone();
web3.eth() let from_address = web3
.eth()
.accounts() .accounts()
.compat()
.await
.map_err(|e| format!("Failed to get accounts: {:?}", e)) .map_err(|e| format!("Failed to get accounts: {:?}", e))
.and_then(|accounts| { .and_then(|accounts| {
accounts accounts
.get(DEPLOYER_ACCOUNTS_INDEX) .get(DEPLOYER_ACCOUNTS_INDEX)
.cloned() .cloned()
.ok_or_else(|| "Insufficient accounts for deployer".to_string()) .ok_or_else(|| "Insufficient accounts for deployer".to_string())
}) })?;
.and_then(move |from_address| {
let future: Box<dyn Future<Item = Address, Error = String> + Send> =
if let Some(password) = password_opt {
// Unlock for only a single transaction.
let duration = None;
let future = web3_1 let deploy_address = if let Some(password) = password_opt {
.personal() let result = web3
.unlock_account(from_address, &password, duration) .personal()
.then(move |result| match result { .unlock_account(from_address, &password, None)
Ok(true) => Ok(from_address), .compat()
Ok(false) => Err("Eth1 node refused to unlock account".to_string()), .await;
Err(e) => Err(format!("Eth1 unlock request failed: {:?}", e)), match result {
}); Ok(true) => return Ok(from_address),
Ok(false) => return Err("Eth1 node refused to unlock account".to_string()),
Err(e) => return Err(format!("Eth1 unlock request failed: {:?}", e)),
};
} else {
from_address
};
Box::new(future) let pending_contract = Contract::deploy(web3.eth(), &abi)
} else { .map_err(|e| format!("Unable to build contract deployer: {:?}", e))?
Box::new(future::ok(from_address)) .confirmations(confirmations)
}; .options(Options {
gas: Some(U256::from(CONTRACT_DEPLOY_GAS)),
..Options::default()
})
.execute(bytecode, (), deploy_address)
.map_err(|e| format!("Failed to execute deployment: {:?}", e))?;
future pending_contract
}) .compat()
.and_then(move |deploy_address| { .await
Contract::deploy(web3.eth(), &abi) .map(|contract| contract.address())
.map_err(|e| format!("Unable to build contract deployer: {:?}", e))? .map_err(|e| format!("Unable to resolve pending contract: {:?}", e))
.confirmations(confirmations)
.options(Options {
gas: Some(U256::from(CONTRACT_DEPLOY_GAS)),
..Options::default()
})
.execute(bytecode, (), deploy_address)
.map_err(|e| format!("Failed to execute deployment: {:?}", e))
})
.and_then(|pending_contract| {
pending_contract
.map(|contract| contract.address())
.map_err(|e| format!("Unable to resolve pending contract: {:?}", e))
})
} }