diff --git a/beacon_node/eth1/Cargo.toml b/beacon_node/eth1/Cargo.toml index dfdb82c37f..7400f604f1 100644 --- a/beacon_node/eth1/Cargo.toml +++ b/beacon_node/eth1/Cargo.toml @@ -9,10 +9,11 @@ eth1_test_rig = { path = "../../tests/eth1_test_rig" } environment = { path = "../../lighthouse/environment" } toml = "^0.5" web3 = "0.8.0" +sloggers = "0.3.4" [dependencies] reqwest = "0.10" -futures = "0.3" +futures = {version = "0.3", features = ["compat"]} serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } hex = "0.3" diff --git a/beacon_node/eth1/tests/test.rs b/beacon_node/eth1/tests/test.rs index 7945766680..44291bfb17 100644 --- a/beacon_node/eth1/tests/test.rs +++ b/beacon_node/eth1/tests/test.rs @@ -4,17 +4,23 @@ use eth1::http::{get_deposit_count, get_deposit_logs_in_range, get_deposit_root, use eth1::{Config, Service}; use eth1::{DepositCache, DepositLog}; use eth1_test_rig::GanacheEth1Instance; -use futures::Future; +use futures::compat::Future01CompatExt; use merkle_proof::verify_merkle_proof; +use slog::Logger; +use sloggers::{null::NullLoggerBuilder, Build}; use std::ops::Range; use std::time::Duration; -use tokio::runtime::Runtime; use tree_hash::TreeHash; use types::{DepositData, EthSpec, Hash256, Keypair, MainnetEthSpec, MinimalEthSpec, Signature}; use web3::{transports::Http, Web3}; const DEPOSIT_CONTRACT_TREE_DEPTH: usize = 32; +pub fn null_logger() -> Logger { + let log_builder = NullLoggerBuilder; + log_builder.build().expect("should build logger") +} + pub fn new_env() -> Environment { EnvironmentBuilder::minimal() // Use a single thread, so that when all tests are run in parallel they don't have so many @@ -47,76 +53,65 @@ fn random_deposit_data() -> DepositData { } /// Blocking operation to get the deposit logs from the `deposit_contract`. -fn blocking_deposit_logs( - runtime: &mut Runtime, - eth1: &GanacheEth1Instance, - range: Range, -) -> Vec { - runtime - .block_on(get_deposit_logs_in_range( - ð1.endpoint(), - ð1.deposit_contract.address(), - range, - timeout(), - )) - .expect("should get logs") +async fn blocking_deposit_logs(eth1: &GanacheEth1Instance, range: Range) -> Vec { + get_deposit_logs_in_range( + ð1.endpoint(), + ð1.deposit_contract.address(), + range, + timeout(), + ) + .await + .expect("should get logs") } /// Blocking operation to get the deposit root from the `deposit_contract`. -fn blocking_deposit_root( - runtime: &mut Runtime, - eth1: &GanacheEth1Instance, - block_number: u64, -) -> Option { - runtime - .block_on(get_deposit_root( - ð1.endpoint(), - ð1.deposit_contract.address(), - block_number, - timeout(), - )) - .expect("should get deposit root") +async fn blocking_deposit_root(eth1: &GanacheEth1Instance, block_number: u64) -> Option { + get_deposit_root( + ð1.endpoint(), + ð1.deposit_contract.address(), + block_number, + timeout(), + ) + .await + .expect("should get deposit root") } /// Blocking operation to get the deposit count from the `deposit_contract`. -fn blocking_deposit_count( - runtime: &mut Runtime, - eth1: &GanacheEth1Instance, - block_number: u64, -) -> Option { - runtime - .block_on(get_deposit_count( - ð1.endpoint(), - ð1.deposit_contract.address(), - block_number, - timeout(), - )) - .expect("should get deposit count") +async fn blocking_deposit_count(eth1: &GanacheEth1Instance, block_number: u64) -> Option { + get_deposit_count( + ð1.endpoint(), + ð1.deposit_contract.address(), + block_number, + timeout(), + ) + .await + .expect("should get deposit count") } -fn get_block_number(runtime: &mut Runtime, web3: &Web3) -> u64 { - runtime - .block_on(web3.eth().block_number().map(|v| v.as_u64())) +async fn get_block_number(web3: &Web3) -> u64 { + web3.eth() + .block_number() + .compat() + .await + .map(|v| v.as_u64()) .expect("should get block number") } mod eth1_cache { use super::*; - #[test] - fn simple_scenario() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn simple_scenario() { + let log = null_logger(); for follow_distance in 0..2 { - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let initial_block_number = get_block_number(runtime, &web3); + let initial_block_number = get_block_number(&web3).await; let service = Service::new( Config { @@ -145,20 +140,21 @@ mod eth1_cache { }; for _ in 0..blocks { - runtime - .block_on(eth1.ganache.evm_mine()) - .expect("should mine block"); + eth1.ganache.evm_mine().await.expect("should mine block"); } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should update deposit cache"); - runtime - .block_on(service.update_block_cache()) + service + .update_block_cache() + .await .expect("should update block cache"); - runtime - .block_on(service.update_block_cache()) + service + .update_block_cache() + .await .expect("should update cache when nothing has changed"); assert_eq!( @@ -178,14 +174,13 @@ mod eth1_cache { } /// Tests the case where we attempt to download more blocks than will fit in the cache. - #[test] - fn big_skip() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + #[tokio::test] + async fn big_skip() { + let log = null_logger(); + + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); @@ -196,7 +191,7 @@ mod eth1_cache { Config { endpoint: eth1.endpoint(), deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: get_block_number(runtime, &web3), + lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, block_cache_truncation: Some(cache_len), ..Config::default() @@ -207,16 +202,16 @@ mod eth1_cache { let blocks = cache_len * 2; for _ in 0..blocks { - runtime - .block_on(eth1.ganache.evm_mine()) - .expect("should mine block") + eth1.ganache.evm_mine().await.expect("should mine block") } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should update deposit cache"); - runtime - .block_on(service.update_block_cache()) + service + .update_block_cache() + .await .expect("should update block cache"); assert_eq!( @@ -228,14 +223,12 @@ mod eth1_cache { /// Tests to ensure that the cache gets pruned when doing multiple downloads smaller than the /// cache size. - #[test] - fn pruning() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn pruning() { + let log = null_logger(); - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); @@ -246,7 +239,7 @@ mod eth1_cache { Config { endpoint: eth1.endpoint(), deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: get_block_number(runtime, &web3), + lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, block_cache_truncation: Some(cache_len), ..Config::default() @@ -254,17 +247,17 @@ mod eth1_cache { log, ); - for _ in 0..4 { + for _ in 0..4u8 { for _ in 0..cache_len / 2 { - runtime - .block_on(eth1.ganache.evm_mine()) - .expect("should mine block") + eth1.ganache.evm_mine().await.expect("should mine block") } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should update deposit cache"); - runtime - .block_on(service.update_block_cache()) + service + .update_block_cache() + .await .expect("should update block cache"); } @@ -275,16 +268,14 @@ mod eth1_cache { ); } - #[test] - fn double_update() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn double_update() { + let log = null_logger(); let n = 16; - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); @@ -293,7 +284,7 @@ mod eth1_cache { Config { endpoint: eth1.endpoint(), deposit_contract_address: deposit_contract.address(), - lowest_cached_block_number: get_block_number(runtime, &web3), + lowest_cached_block_number: get_block_number(&web3).await, follow_distance: 0, ..Config::default() }, @@ -301,23 +292,14 @@ mod eth1_cache { ); for _ in 0..n { - runtime - .block_on(eth1.ganache.evm_mine()) - .expect("should mine block") + eth1.ganache.evm_mine().await.expect("should mine block") } - runtime - .block_on( - service - .update_deposit_cache() - .join(service.update_deposit_cache()), - ) - .expect("should perform two simultaneous updates of deposit cache"); - runtime - .block_on( - service - .update_block_cache() - .join(service.update_block_cache()), - ) + futures::try_join!( + service.update_deposit_cache(), + service.update_deposit_cache() + ) + .expect("should perform two simultaneous updates of deposit cache"); + futures::try_join!(service.update_block_cache(), service.update_block_cache()) .expect("should perform two simultaneous updates of block cache"); assert!(service.block_cache_len() >= n, "should grow the cache"); @@ -327,21 +309,19 @@ mod eth1_cache { mod deposit_tree { use super::*; - #[test] - fn updating() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn updating() { + let log = null_logger(); let n = 4; - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let start_block = get_block_number(runtime, &web3); + let start_block = get_block_number(&web3).await; let service = Service::new( Config { @@ -359,16 +339,19 @@ mod deposit_tree { for deposit in &deposits { deposit_contract - .deposit(runtime, deposit.clone()) + .deposit(deposit.clone()) + .await .expect("should perform a deposit"); } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should perform update"); - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should perform update when nothing has changed"); let first = n * round; @@ -400,21 +383,19 @@ mod deposit_tree { } } - #[test] - fn double_update() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn double_update() { + let log = null_logger(); let n = 8; - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let start_block = get_block_number(runtime, &web3); + let start_block = get_block_number(&web3).await; let service = Service::new( Config { @@ -432,32 +413,28 @@ mod deposit_tree { for deposit in &deposits { deposit_contract - .deposit(runtime, deposit.clone()) + .deposit(deposit.clone()) + .await .expect("should perform a deposit"); } - runtime - .block_on( - service - .update_deposit_cache() - .join(service.update_deposit_cache()), - ) - .expect("should perform two updates concurrently"); + futures::try_join!( + service.update_deposit_cache(), + service.update_deposit_cache() + ) + .expect("should perform two updates concurrently"); assert_eq!(service.deposit_cache_len(), n); } - #[test] - fn cache_consistency() { - let mut env = new_env(); - let runtime = env.runtime(); - + #[tokio::test] + async fn cache_consistency() { let n = 8; let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); @@ -468,15 +445,18 @@ mod deposit_tree { // Perform deposits to the smart contract, recording it's state along the way. for deposit in &deposits { deposit_contract - .deposit(runtime, deposit.clone()) + .deposit(deposit.clone()) + .await .expect("should perform a deposit"); - let block_number = get_block_number(runtime, &web3); + let block_number = get_block_number(&web3).await; deposit_roots.push( - blocking_deposit_root(runtime, ð1, block_number) + blocking_deposit_root(ð1, block_number) + .await .expect("should get root if contract exists"), ); deposit_counts.push( - blocking_deposit_count(runtime, ð1, block_number) + blocking_deposit_count(ð1, block_number) + .await .expect("should get count if contract exists"), ); } @@ -484,8 +464,9 @@ mod deposit_tree { let mut tree = DepositCache::default(); // Pull all the deposit logs from the contract. - let block_number = get_block_number(runtime, &web3); - let logs: Vec<_> = blocking_deposit_logs(runtime, ð1, 0..block_number) + let block_number = get_block_number(&web3).await; + let logs: Vec<_> = blocking_deposit_logs(ð1, 0..block_number) + .await .iter() .map(|raw| DepositLog::from_log(raw).expect("should parse deposit log")) .inspect(|log| { @@ -546,64 +527,59 @@ mod deposit_tree { mod http { use super::*; - fn get_block(runtime: &mut Runtime, eth1: &GanacheEth1Instance, block_number: u64) -> Block { - runtime - .block_on(eth1::http::get_block( - ð1.endpoint(), - block_number, - timeout(), - )) + async fn get_block(eth1: &GanacheEth1Instance, block_number: u64) -> Block { + eth1::http::get_block(ð1.endpoint(), block_number, timeout()) + .await .expect("should get block number") } - #[test] - fn incrementing_deposits() { - let mut env = new_env(); - let runtime = env.runtime(); - - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + #[tokio::test] + async fn incrementing_deposits() { + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let block_number = get_block_number(runtime, &web3); - let logs = blocking_deposit_logs(runtime, ð1, 0..block_number); + let block_number = get_block_number(&web3).await; + let logs = blocking_deposit_logs(ð1, 0..block_number).await; assert_eq!(logs.len(), 0); - let mut old_root = blocking_deposit_root(runtime, ð1, block_number); - let mut old_block = get_block(runtime, ð1, block_number); + let mut old_root = blocking_deposit_root(ð1, block_number).await; + let mut old_block = get_block(ð1, block_number).await; let mut old_block_number = block_number; assert_eq!( - blocking_deposit_count(runtime, ð1, block_number), + blocking_deposit_count(ð1, block_number).await, Some(0), "should have deposit count zero" ); for i in 1..=8 { - runtime - .block_on(eth1.ganache.increase_time(1)) + eth1.ganache + .increase_time(1) + .await .expect("should be able to increase time on ganache"); deposit_contract - .deposit(runtime, random_deposit_data()) + .deposit(random_deposit_data()) + .await .expect("should perform a deposit"); // Check the logs. - let block_number = get_block_number(runtime, &web3); - let logs = blocking_deposit_logs(runtime, ð1, 0..block_number); + let block_number = get_block_number(&web3).await; + let logs = blocking_deposit_logs(ð1, 0..block_number).await; assert_eq!(logs.len(), i, "the number of logs should be as expected"); // Check the deposit count. assert_eq!( - blocking_deposit_count(runtime, ð1, block_number), + blocking_deposit_count(ð1, block_number).await, Some(i as u64), "should have a correct deposit count" ); // Check the deposit root. - let new_root = blocking_deposit_root(runtime, ð1, block_number); + let new_root = blocking_deposit_root(ð1, block_number).await; assert_ne!( new_root, old_root, "deposit root should change with each deposit" @@ -611,7 +587,7 @@ mod http { old_root = new_root; // Check the block hash. - let new_block = get_block(runtime, ð1, block_number); + let new_block = get_block(ð1, block_number).await; assert_ne!( new_block.hash, old_block.hash, "block hash should change with each deposit" @@ -647,19 +623,17 @@ mod fast { // Adds deposits into deposit cache and matches deposit_count and deposit_root // with the deposit count and root computed from the deposit cache. - #[test] - fn deposit_cache_query() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn deposit_cache_query() { + let log = null_logger(); - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let now = get_block_number(runtime, &web3); + let now = get_block_number(&web3).await; let service = Service::new( Config { endpoint: eth1.endpoint(), @@ -676,16 +650,16 @@ mod fast { let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); for deposit in &deposits { deposit_contract - .deposit(runtime, deposit.clone()) + .deposit(deposit.clone()) + .await .expect("should perform a deposit"); // Mine an extra block between deposits to test for corner cases - runtime - .block_on(eth1.ganache.evm_mine()) - .expect("should mine block"); + eth1.ganache.evm_mine().await.expect("should mine block"); } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should perform update"); assert!( @@ -693,9 +667,9 @@ mod fast { "should have imported n deposits" ); - for block_num in 0..=get_block_number(runtime, &web3) { - let expected_deposit_count = blocking_deposit_count(runtime, ð1, block_num); - let expected_deposit_root = blocking_deposit_root(runtime, ð1, block_num); + for block_num in 0..=get_block_number(&web3).await { + let expected_deposit_count = blocking_deposit_count(ð1, block_num).await; + let expected_deposit_root = blocking_deposit_root(ð1, block_num).await; let deposit_count = service .deposits() @@ -721,19 +695,17 @@ mod fast { mod persist { use super::*; - #[test] - fn test_persist_caches() { - let mut env = new_env(); - let log = env.core_context().log; - let runtime = env.runtime(); + #[tokio::test] + async fn test_persist_caches() { + let log = null_logger(); - let eth1 = runtime - .block_on(GanacheEth1Instance::new()) + let eth1 = GanacheEth1Instance::new() + .await .expect("should start eth1 environment"); let deposit_contract = ð1.deposit_contract; let web3 = eth1.web3(); - let now = get_block_number(runtime, &web3); + let now = get_block_number(&web3).await; let config = Config { endpoint: eth1.endpoint(), deposit_contract_address: deposit_contract.address(), @@ -748,12 +720,14 @@ mod persist { let deposits: Vec<_> = (0..n).map(|_| random_deposit_data()).collect(); for deposit in &deposits { deposit_contract - .deposit(runtime, deposit.clone()) + .deposit(deposit.clone()) + .await .expect("should perform a deposit"); } - runtime - .block_on(service.update_deposit_cache()) + service + .update_deposit_cache() + .await .expect("should perform update"); assert!( @@ -763,8 +737,9 @@ mod persist { let deposit_count = service.deposit_cache_len(); - runtime - .block_on(service.update_block_cache()) + service + .update_block_cache() + .await .expect("should perform update"); assert!(