From 033cca602f93af1e8c69d4b812aadccea2a3f161 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Sun, 10 May 2020 22:34:00 +1000 Subject: [PATCH] Upgrade simulator to stable futures --- tests/node_test_rig/src/lib.rs | 27 +- tests/simulator/src/checks.rs | 170 ++++++------ tests/simulator/src/eth1_sim.rs | 209 ++++++-------- tests/simulator/src/local_network.rs | 112 ++++---- tests/simulator/src/no_eth1_sim.rs | 103 +++---- tests/simulator/src/sync_sim.rs | 395 +++++++++++++-------------- 6 files changed, 461 insertions(+), 555 deletions(-) diff --git a/tests/node_test_rig/src/lib.rs b/tests/node_test_rig/src/lib.rs index e0483ef0fd..3d747cc7f8 100644 --- a/tests/node_test_rig/src/lib.rs +++ b/tests/node_test_rig/src/lib.rs @@ -4,7 +4,6 @@ use beacon_node::ProductionBeaconNode; use environment::RuntimeContext; -use futures::Future; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use tempdir::TempDir; @@ -40,10 +39,12 @@ impl LocalBeaconNode { client_config.data_dir = datadir.path().into(); client_config.network.network_dir = PathBuf::from(datadir.path()).join("network"); - ProductionBeaconNode::new(context, client_config).await.map(move |client| Self { - client: client.into_inner(), - datadir, - }) + ProductionBeaconNode::new(context, client_config) + .await + .map(move |client| Self { + client: client.into_inner(), + datadir, + }) } } @@ -126,7 +127,7 @@ impl LocalValidatorClient { pub async fn production( context: RuntimeContext, config: ValidatorConfig, - ) -> Result { + ) -> Result { // Creates a temporary directory that will be deleted once this `TempDir` is dropped. let datadir = TempDir::new("lighthouse-validator") .expect("should create temp directory for client datadir"); @@ -141,11 +142,13 @@ impl LocalValidatorClient { ) -> Result { config.data_dir = datadir.path().into(); - ProductionValidatorClient::new(context, config).await.map(move |mut client| { - client - .start_service() - .expect("should start validator services"); - Self { client, datadir } - }) + ProductionValidatorClient::new(context, config) + .await + .map(move |mut client| { + client + .start_service() + .expect("should start validator services"); + Self { client, datadir } + }) } } diff --git a/tests/simulator/src/checks.rs b/tests/simulator/src/checks.rs index 8cfb0aaa23..a63f1ee43f 100644 --- a/tests/simulator/src/checks.rs +++ b/tests/simulator/src/checks.rs @@ -1,136 +1,128 @@ use crate::local_network::LocalNetwork; -use futures::{stream, Future, IntoFuture, Stream}; -use std::time::{Duration, Instant}; -use tokio::timer::Delay; +use std::time::Duration; use types::{Epoch, EthSpec, Slot, Unsigned}; /// Checks that all of the validators have on-boarded by the start of the second eth1 voting /// period. -pub fn verify_initial_validator_count( +pub async fn verify_initial_validator_count( network: LocalNetwork, slot_duration: Duration, initial_validator_count: usize, -) -> impl Future { - slot_delay(Slot::new(1), slot_duration) - .and_then(move |()| verify_validator_count(network, initial_validator_count)) +) -> Result<(), String> { + slot_delay(Slot::new(1), slot_duration).await; + verify_validator_count(network, initial_validator_count).await?; + Ok(()) } /// Checks that all of the validators have on-boarded by the start of the second eth1 voting /// period. -pub fn verify_validator_onboarding( +pub async fn verify_validator_onboarding( network: LocalNetwork, slot_duration: Duration, expected_validator_count: usize, -) -> impl Future { +) -> Result<(), String> { slot_delay( Slot::new(E::SlotsPerEth1VotingPeriod::to_u64()), slot_duration, ) - .and_then(move |()| verify_validator_count(network, expected_validator_count)) + .await; + verify_validator_count(network, expected_validator_count).await?; + Ok(()) } /// Checks that the chain has made the first possible finalization. /// /// Intended to be run as soon as chain starts. -pub fn verify_first_finalization( +pub async fn verify_first_finalization( network: LocalNetwork, slot_duration: Duration, -) -> impl Future { - epoch_delay(Epoch::new(4), slot_duration, E::slots_per_epoch()) - .and_then(|()| verify_all_finalized_at(network, Epoch::new(2))) +) -> Result<(), String> { + epoch_delay(Epoch::new(4), slot_duration, E::slots_per_epoch()).await; + verify_all_finalized_at(network, Epoch::new(2)).await?; + Ok(()) } /// Delays for `epochs`, plus half a slot extra. -pub fn epoch_delay( - epochs: Epoch, - slot_duration: Duration, - slots_per_epoch: u64, -) -> impl Future { +pub async fn epoch_delay(epochs: Epoch, slot_duration: Duration, slots_per_epoch: u64) { let duration = slot_duration * (epochs.as_u64() * slots_per_epoch) as u32 + slot_duration / 2; - - Delay::new(Instant::now() + duration).map_err(|e| format!("Epoch delay failed: {:?}", e)) + tokio::time::delay_for(duration).await } /// Delays for `slots`, plus half a slot extra. -fn slot_delay(slots: Slot, slot_duration: Duration) -> impl Future { +async fn slot_delay(slots: Slot, slot_duration: Duration) { let duration = slot_duration * slots.as_u64() as u32 + slot_duration / 2; - - Delay::new(Instant::now() + duration).map_err(|e| format!("Epoch delay failed: {:?}", e)) + tokio::time::delay_for(duration).await; } /// Verifies that all beacon nodes in the given network have a head state that has a finalized /// epoch of `epoch`. -pub fn verify_all_finalized_at( +pub async fn verify_all_finalized_at( network: LocalNetwork, epoch: Epoch, -) -> impl Future { - network - .remote_nodes() - .into_future() - .and_then(|remote_nodes| { - stream::unfold(remote_nodes.into_iter(), |mut iter| { - iter.next().map(|remote_node| { - remote_node - .http - .beacon() - .get_head() - .map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) - .map(|epoch| (epoch, iter)) - .map_err(|e| format!("Get head via http failed: {:?}", e)) - }) - }) - .collect() - }) - .and_then(move |epochs| { - if epochs.iter().any(|node_epoch| *node_epoch != epoch) { - Err(format!( - "Nodes are not finalized at epoch {}. Finalized epochs: {:?}", - epoch, epochs - )) - } else { - Ok(()) - } - }) +) -> Result<(), String> { + let epochs = { + let mut epochs = Vec::new(); + for remote_node in network.remote_nodes()? { + epochs.push( + remote_node + .http + .beacon() + .get_head() + .await + .map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) + .map_err(|e| format!("Get head via http failed: {:?}", e))?, + ); + } + epochs + }; + + if epochs.iter().any(|node_epoch| *node_epoch != epoch) { + Err(format!( + "Nodes are not finalized at epoch {}. Finalized epochs: {:?}", + epoch, epochs + )) + } else { + Ok(()) + } } /// Verifies that all beacon nodes in the given `network` have a head state that contains /// `expected_count` validators. -fn verify_validator_count( +async fn verify_validator_count( network: LocalNetwork, expected_count: usize, -) -> impl Future { - network - .remote_nodes() - .into_future() - .and_then(|remote_nodes| { - stream::unfold(remote_nodes.into_iter(), |mut iter| { - iter.next().map(|remote_node| { - let beacon = remote_node.http.beacon(); - beacon - .get_head() - .map_err(|e| format!("Get head via http failed: {:?}", e)) - .and_then(move |head| { - beacon - .get_state_by_root(head.state_root) - .map(|(state, _root)| state) - .map_err(|e| format!("Get state root via http failed: {:?}", e)) - }) - .map(|state| (state.validators.len(), iter)) - }) - }) - .collect() - }) - .and_then(move |validator_counts| { - if validator_counts - .iter() - .any(|count| *count != expected_count) - { - Err(format!( - "Nodes do not all have {} validators in their state. Validator counts: {:?}", - expected_count, validator_counts - )) - } else { - Ok(()) - } - }) +) -> Result<(), String> { + let validator_counts = { + let mut validator_counts = Vec::new(); + for remote_node in network.remote_nodes()? { + let beacon = remote_node.http.beacon(); + + let head = beacon + .get_head() + .await + .map_err(|e| format!("Get head via http failed: {:?}", e))?; + + let vc = beacon + .get_state_by_root(head.state_root) + .await + .map(|(state, _root)| state) + .map_err(|e| format!("Get state root via http failed: {:?}", e))? + .validators + .len(); + validator_counts.push(vc); + } + validator_counts + }; + + if validator_counts + .iter() + .any(|count| *count != expected_count) + { + Err(format!( + "Nodes do not all have {} validators in their state. Validator counts: {:?}", + expected_count, validator_counts + )) + } else { + Ok(()) + } } diff --git a/tests/simulator/src/eth1_sim.rs b/tests/simulator/src/eth1_sim.rs index 3413701c5d..ee4d083137 100644 --- a/tests/simulator/src/eth1_sim.rs +++ b/tests/simulator/src/eth1_sim.rs @@ -1,13 +1,12 @@ use crate::{checks, LocalNetwork, E}; use clap::ArgMatches; use eth1_test_rig::GanacheEth1Instance; -use futures::{future, stream, Future, Stream}; +use futures::prelude::*; use node_test_rig::{ environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, }; use std::net::{IpAddr, Ipv4Addr}; -use std::time::{Duration, Instant}; -use tokio::timer::Interval; +use std::time::Duration; pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let node_count = value_t!(matches, "nodes", usize).expect("missing nodes default"); @@ -50,159 +49,123 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { let deposit_amount = env.eth2_config.spec.max_effective_balance; let context = env.core_context(); - let executor = context.executor.clone(); - let future = GanacheEth1Instance::new() + let main_future = async { /* * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit * validators. */ - .map(move |ganache_eth1_instance| { - let deposit_contract = ganache_eth1_instance.deposit_contract; - let ganache = ganache_eth1_instance.ganache; - let eth1_endpoint = ganache.endpoint(); - let deposit_contract_address = deposit_contract.address(); + let ganache_eth1_instance = GanacheEth1Instance::new().await?; + let deposit_contract = ganache_eth1_instance.deposit_contract; + let ganache = ganache_eth1_instance.ganache; + let eth1_endpoint = ganache.endpoint(); + let deposit_contract_address = deposit_contract.address(); - // Start a timer that produces eth1 blocks on an interval. - executor.spawn( - Interval::new(Instant::now(), eth1_block_time) - .map_err(|_| eprintln!("Eth1 block timer failed")) - .for_each(move |_| ganache.evm_mine().map_err(|_| ())) - .map_err(|_| eprintln!("Eth1 evm_mine failed")) - .map(|_| ()), - ); + // Start a timer that produces eth1 blocks on an interval. + tokio::spawn(async move { + let mut interval = tokio::time::interval(eth1_block_time); + while let Some(_) = interval.next().await { + let _ = ganache.evm_mine().await; + } + }); - // Submit deposits to the deposit contract. - executor.spawn( - stream::unfold(0..total_validator_count, move |mut iter| { - iter.next().map(|i| { - println!("Submitting deposit for validator {}...", i); - deposit_contract - .deposit_deterministic_async::(i, deposit_amount) - .map(|_| ((), iter)) - }) - }) - .collect() - .map(|_| ()) - .map_err(|e| eprintln!("Error submitting deposit: {}", e)), - ); + // Submit deposits to the deposit contract. + tokio::spawn(async move { + for i in 0..total_validator_count { + println!("Submitting deposit for validator {}...", i); + let _ = deposit_contract + .deposit_deterministic_async::(i, deposit_amount) + .await; + } + }); - let mut beacon_config = testing_client_config(); + let mut beacon_config = testing_client_config(); - beacon_config.genesis = ClientGenesis::DepositContract; - beacon_config.eth1.endpoint = eth1_endpoint; - beacon_config.eth1.deposit_contract_address = deposit_contract_address; - beacon_config.eth1.deposit_contract_deploy_block = 0; - beacon_config.eth1.lowest_cached_block_number = 0; - beacon_config.eth1.follow_distance = 1; - beacon_config.dummy_eth1_backend = false; - beacon_config.sync_eth1_chain = true; + beacon_config.genesis = ClientGenesis::DepositContract; + beacon_config.eth1.endpoint = eth1_endpoint; + beacon_config.eth1.deposit_contract_address = deposit_contract_address; + beacon_config.eth1.deposit_contract_deploy_block = 0; + beacon_config.eth1.lowest_cached_block_number = 0; + beacon_config.eth1.follow_distance = 1; + beacon_config.dummy_eth1_backend = false; + beacon_config.sync_eth1_chain = true; - beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); + beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); - beacon_config - }) /* * Create a new `LocalNetwork` with one beacon node. */ - .and_then(move |beacon_config| { - LocalNetwork::new(context, beacon_config.clone()) - .map(|network| (network, beacon_config)) - }) + let network = LocalNetwork::new(context, beacon_config.clone()).await?; /* * One by one, add beacon nodes to the network. */ - .and_then(move |(network, beacon_config)| { - let network_1 = network.clone(); - stream::unfold(0..node_count - 1, move |mut iter| { - iter.next().map(|_| { - network_1 - .add_beacon_node(beacon_config.clone()) - .map(|()| ((), iter)) - }) - }) - .collect() - .map(|_| network) - }) + for _ in 0..node_count - 1 { + network.add_beacon_node(beacon_config.clone()).await?; + } /* * One by one, add validator clients to the network. Each validator client is attached to * a single corresponding beacon node. */ - .and_then(move |network| { - let network_1 = network.clone(); - // Note: presently the validator client future will only resolve once genesis time - // occurs. This is great for this scenario, but likely to change in the future. - // - // If the validator client future behaviour changes, we would need to add a new future - // that delays until genesis. Otherwise, all of the checks that start in the next - // future will start too early. + // Note: presently the validator client future will only resolve once genesis time + // occurs. This is great for this scenario, but likely to change in the future. + // + // If the validator client future behaviour changes, we would need to add a new future + // that delays until genesis. Otherwise, all of the checks that start in the next + // future will start too early. - stream::unfold(0..node_count, move |mut iter| { - iter.next().map(|i| { - let indices = (i * validators_per_node..(i + 1) * validators_per_node) - .collect::>(); + for i in 0..node_count { + let indices = + (i * validators_per_node..(i + 1) * validators_per_node).collect::>(); + network + .add_validator_client(ValidatorConfig::default(), i, indices) + .await?; + } - network_1 - .add_validator_client(ValidatorConfig::default(), i, indices) - .map(|()| ((), iter)) - }) - }) - .collect() - .map(|_| network) - }) /* * Start the processes that will run checks on the network as it runs. */ - .and_then(move |network| { - // The `final_future` either completes immediately or never completes, depending on the value - // of `end_after_checks`. - let final_future: Box + Send> = - if end_after_checks { - Box::new(future::ok(()).map_err(|()| "".to_string())) - } else { - Box::new(future::empty().map_err(|()| "".to_string())) - }; - future::ok(()) - // Check that the chain finalizes at the first given opportunity. - .join(checks::verify_first_finalization( - network.clone(), - slot_duration, - )) - // Check that the chain starts with the expected validator count. - .join(checks::verify_initial_validator_count( - network.clone(), - slot_duration, - initial_validator_count, - )) - // Check that validators greater than `spec.min_genesis_active_validator_count` are - // onboarded at the first possible opportunity. - .join(checks::verify_validator_onboarding( - network.clone(), - slot_duration, - total_validator_count, - )) - // End now or run forever, depending on the `end_after_checks` flag. - .join(final_future) - .map(|_| network) - }) + let _err = futures::join!( + // Check that the chain finalizes at the first given opportunity. + checks::verify_first_finalization(network.clone(), slot_duration), + // Check that the chain starts with the expected validator count. + checks::verify_initial_validator_count( + network.clone(), + slot_duration, + initial_validator_count, + ), + // Check that validators greater than `spec.min_genesis_active_validator_count` are + // onboarded at the first possible opportunity. + checks::verify_validator_onboarding( + network.clone(), + slot_duration, + total_validator_count, + ) + ); + + // The `final_future` either completes immediately or never completes, depending on the value + // of `end_after_checks`. + + if !end_after_checks { + future::pending::<()>().await; + } /* * End the simulation by dropping the network. This will kill all running beacon nodes and * validator clients. */ - .map(|network| { - println!( - "Simulation complete. Finished with {} beacon nodes and {} validator clients", - network.beacon_node_count(), - network.validator_client_count() - ); + println!( + "Simulation complete. Finished with {} beacon nodes and {} validator clients", + network.beacon_node_count(), + network.validator_client_count() + ); - // Be explicit about dropping the network, as this kills all the nodes. This ensures - // all the checks have adequate time to pass. - drop(network) - }); + // Be explicit about dropping the network, as this kills all the nodes. This ensures + // all the checks have adequate time to pass. + drop(network); + Ok::<(), String>(()) + }; - env.runtime().block_on(future) + Ok(env.runtime().block_on(main_future).unwrap()) } diff --git a/tests/simulator/src/local_network.rs b/tests/simulator/src/local_network.rs index 22ffc4d369..61699a66e2 100644 --- a/tests/simulator/src/local_network.rs +++ b/tests/simulator/src/local_network.rs @@ -1,4 +1,3 @@ -use futures::{Future, IntoFuture}; use node_test_rig::{ environment::RuntimeContext, ClientConfig, LocalBeaconNode, LocalValidatorClient, RemoteBeaconNode, ValidatorConfig, @@ -42,23 +41,24 @@ impl Deref for LocalNetwork { impl LocalNetwork { /// Creates a new network with a single `BeaconNode`. - pub fn new( + pub async fn new( context: RuntimeContext, mut beacon_config: ClientConfig, - ) -> impl Future { + ) -> Result { beacon_config.network.discovery_port = BOOTNODE_PORT; beacon_config.network.libp2p_port = BOOTNODE_PORT; beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT); beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT); - LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config).map( - |beacon_node| Self { - inner: Arc::new(Inner { - context, - beacon_nodes: RwLock::new(vec![beacon_node]), - validator_clients: RwLock::new(vec![]), - }), - }, - ) + let beacon_node = + LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config) + .await?; + Ok(Self { + inner: Arc::new(Inner { + context, + beacon_nodes: RwLock::new(vec![beacon_node]), + validator_clients: RwLock::new(vec![]), + }), + }) } /// Returns the number of beacon nodes in the network. @@ -78,72 +78,65 @@ impl LocalNetwork { } /// Adds a beacon node to the network, connecting to the 0'th beacon node via ENR. - pub fn add_beacon_node( - &self, - mut beacon_config: ClientConfig, - ) -> impl Future { + pub async fn add_beacon_node(&self, mut beacon_config: ClientConfig) -> Result<(), String> { let self_1 = self.clone(); println!("Adding beacon node.."); - self.beacon_nodes - .read() - .first() - .map(|boot_node| { - beacon_config.network.boot_nodes.push( - boot_node - .client - .enr() - .expect("bootnode must have a network"), - ); - }) - .expect("should have at least one node"); + { + let read_lock = self.beacon_nodes.read(); + + let boot_node = read_lock.first().expect("should have at least one node"); + + beacon_config.network.boot_nodes.push( + boot_node + .client + .enr() + .expect("bootnode must have a network"), + ); + } let index = self.beacon_nodes.read().len(); - LocalBeaconNode::production( + let beacon_node = LocalBeaconNode::production( self.context.service_context(format!("node_{}", index)), beacon_config, ) - .map(move |beacon_node| { - self_1.beacon_nodes.write().push(beacon_node); - }) + .await?; + self_1.beacon_nodes.write().push(beacon_node); + Ok(()) } /// Adds a validator client to the network, connecting it to the beacon node with index /// `beacon_node`. - pub fn add_validator_client( + pub async fn add_validator_client( &self, mut validator_config: ValidatorConfig, beacon_node: usize, keypair_indices: Vec, - ) -> impl Future { + ) -> Result<(), String> { let index = self.validator_clients.read().len(); let context = self.context.service_context(format!("validator_{}", index)); let self_1 = self.clone(); + let socket_addr = { + let read_lock = self.beacon_nodes.read(); + let beacon_node = read_lock + .get(beacon_node) + .ok_or_else(|| format!("No beacon node for index {}", beacon_node))?; + beacon_node + .client + .http_listen_addr() + .expect("Must have http started") + }; - self.beacon_nodes - .read() - .get(beacon_node) - .map(move |beacon_node| { - let socket_addr = beacon_node - .client - .http_listen_addr() - .expect("Must have http started"); - - validator_config.http_server = - format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); - - validator_config - }) - .ok_or_else(|| format!("No beacon node for index {}", beacon_node)) - .into_future() - .and_then(move |validator_config| { - LocalValidatorClient::production_with_insecure_keypairs( - context, - validator_config, - &keypair_indices, - ) - }) - .map(move |validator_client| self_1.validator_clients.write().push(validator_client)) + validator_config.http_server = + format!("http://{}:{}", socket_addr.ip(), socket_addr.port()); + let validator_client = LocalValidatorClient::production_with_insecure_keypairs( + context, + validator_config, + &keypair_indices, + ) + .await?; + self_1.validator_clients.write().push(validator_client); + Ok(()) } /// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API. @@ -157,13 +150,14 @@ impl LocalNetwork { } /// Return current epoch of bootnode. - pub fn bootnode_epoch(&self) -> impl Future { + pub async fn bootnode_epoch(&self) -> Result { let nodes = self.remote_nodes().expect("Failed to get remote nodes"); let bootnode = nodes.first().expect("Should contain bootnode"); bootnode .http .beacon() .get_head() + .await .map_err(|e| format!("Cannot get head: {:?}", e)) .map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) } diff --git a/tests/simulator/src/no_eth1_sim.rs b/tests/simulator/src/no_eth1_sim.rs index b4d233909f..55b19223f8 100644 --- a/tests/simulator/src/no_eth1_sim.rs +++ b/tests/simulator/src/no_eth1_sim.rs @@ -1,6 +1,6 @@ use crate::{checks, LocalNetwork}; use clap::ArgMatches; -use futures::{future, stream, Future, Stream}; +use futures::prelude::*; use node_test_rig::{ environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, }; @@ -63,88 +63,61 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> { beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); - let future = LocalNetwork::new(context, beacon_config.clone()) + let main_future = async { + let network = LocalNetwork::new(context, beacon_config.clone()).await?; /* * One by one, add beacon nodes to the network. */ - .and_then(move |network| { - let network_1 = network.clone(); - stream::unfold(0..node_count - 1, move |mut iter| { - iter.next().map(|_| { - network_1 - .add_beacon_node(beacon_config.clone()) - .map(|()| ((), iter)) - }) - }) - .collect() - .map(|_| network) - }) + for _ in 0..node_count - 1 { + network.add_beacon_node(beacon_config.clone()).await?; + } /* * One by one, add validator clients to the network. Each validator client is attached to * a single corresponding beacon node. */ - .and_then(move |network| { - let network_1 = network.clone(); - // Note: presently the validator client future will only resolve once genesis time - // occurs. This is great for this scenario, but likely to change in the future. - // - // If the validator client future behaviour changes, we would need to add a new future - // that delays until genesis. Otherwise, all of the checks that start in the next - // future will start too early. + // Note: presently the validator client future will only resolve once genesis time + // occurs. This is great for this scenario, but likely to change in the future. + // + // If the validator client future behaviour changes, we would need to add a new future + // that delays until genesis. Otherwise, all of the checks that start in the next + // future will start too early. - stream::unfold(0..node_count, move |mut iter| { - iter.next().map(|i| { - let indices = (i * validators_per_node..(i + 1) * validators_per_node) - .collect::>(); - - network_1 - .add_validator_client(ValidatorConfig::default(), i, indices) - .map(|()| ((), iter)) - }) - }) - .collect() - .map(|_| network) - }) + for i in 0..node_count { + let indices = + (i * validators_per_node..(i + 1) * validators_per_node).collect::>(); + network + .add_validator_client(ValidatorConfig::default(), i, indices) + .await?; + } /* * Start the processes that will run checks on the network as it runs. */ - .and_then(move |network| { - // The `final_future` either completes immediately or never completes, depending on the value - // of `end_after_checks`. - let final_future: Box + Send> = - if end_after_checks { - Box::new(future::ok(()).map_err(|()| "".to_string())) - } else { - Box::new(future::empty().map_err(|()| "".to_string())) - }; + // Check that the chain finalizes at the first given opportunity. + checks::verify_first_finalization(network.clone(), slot_duration).await?; - future::ok(()) - // Check that the chain finalizes at the first given opportunity. - .join(checks::verify_first_finalization( - network.clone(), - slot_duration, - )) - // End now or run forever, depending on the `end_after_checks` flag. - .join(final_future) - .map(|_| network) - }) + // The `final_future` either completes immediately or never completes, depending on the value + // of `end_after_checks`. + + if !end_after_checks { + future::pending::<()>().await; + } /* * End the simulation by dropping the network. This will kill all running beacon nodes and * validator clients. */ - .map(|network| { - println!( - "Simulation complete. Finished with {} beacon nodes and {} validator clients", - network.beacon_node_count(), - network.validator_client_count() - ); + println!( + "Simulation complete. Finished with {} beacon nodes and {} validator clients", + network.beacon_node_count(), + network.validator_client_count() + ); - // Be explicit about dropping the network, as this kills all the nodes. This ensures - // all the checks have adequate time to pass. - drop(network) - }); + // Be explicit about dropping the network, as this kills all the nodes. This ensures + // all the checks have adequate time to pass. + drop(network); + Ok::<(), String>(()) + }; - env.runtime().block_on(future) + Ok(env.runtime().block_on(main_future).unwrap()) } diff --git a/tests/simulator/src/sync_sim.rs b/tests/simulator/src/sync_sim.rs index 16a62fc327..ccf11e3ac1 100644 --- a/tests/simulator/src/sync_sim.rs +++ b/tests/simulator/src/sync_sim.rs @@ -1,14 +1,13 @@ use crate::checks::{epoch_delay, verify_all_finalized_at}; use crate::local_network::LocalNetwork; use clap::ArgMatches; -use futures::{future, stream, Future, IntoFuture, Stream}; +use futures::prelude::*; use node_test_rig::ClientConfig; use node_test_rig::{ environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, }; use std::net::{IpAddr, Ipv4Addr}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use tokio::timer::Interval; use types::{Epoch, EthSpec}; pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { @@ -78,110 +77,118 @@ fn syncing_sim( beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); - let future = LocalNetwork::new(context, beacon_config.clone()) + let main_future = async { + /* + * Create a new `LocalNetwork` with one beacon node. + */ + let network = LocalNetwork::new(context, beacon_config.clone()).await?; + /* * Add a validator client which handles all validators from the genesis state. */ - .and_then(move |network| { - network - .add_validator_client(ValidatorConfig::default(), 0, (0..num_validators).collect()) - .map(|_| network) - }) - /* - * Start the processes that will run checks on the network as it runs. - */ - .and_then(move |network| { - // The `final_future` either completes immediately or never completes, depending on the value - // of `end_after_checks`. - let final_future: Box + Send> = - if end_after_checks { - Box::new(future::ok(()).map_err(|()| "".to_string())) - } else { - Box::new(future::empty().map_err(|()| "".to_string())) - }; + network + .add_validator_client(ValidatorConfig::default(), 0, (0..num_validators).collect()) + .await?; + + // Check all syncing strategies one after other. + pick_strategy( + &strategy, + network.clone(), + beacon_config.clone(), + slot_duration, + initial_delay, + sync_timeout, + ) + .await?; + + // The `final_future` either completes immediately or never completes, depending on the value + // of `end_after_checks`. + + if !end_after_checks { + future::pending::<()>().await; + } - future::ok(()) - // Check all syncing strategies one after other. - .join(pick_strategy( - &strategy, - network.clone(), - beacon_config.clone(), - slot_duration, - initial_delay, - sync_timeout, - )) - .join(final_future) - .map(|_| network) - }) /* * End the simulation by dropping the network. This will kill all running beacon nodes and * validator clients. */ - .map(|network| { - println!( - "Simulation complete. Finished with {} beacon nodes and {} validator clients", - network.beacon_node_count(), - network.validator_client_count() - ); + println!( + "Simulation complete. Finished with {} beacon nodes and {} validator clients", + network.beacon_node_count(), + network.validator_client_count() + ); - // Be explicit about dropping the network, as this kills all the nodes. This ensures - // all the checks have adequate time to pass. - drop(network) - }); + // Be explicit about dropping the network, as this kills all the nodes. This ensures + // all the checks have adequate time to pass. + drop(network); + Ok::<(), String>(()) + }; - env.runtime().block_on(future) + env.runtime().block_on(main_future) } -pub fn pick_strategy( +pub async fn pick_strategy( strategy: &str, network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, sync_timeout: u64, -) -> Box + Send + 'static> { +) -> Result<(), String> { match strategy { - "one-node" => Box::new(verify_one_node_sync( - network, - beacon_config, - slot_duration, - initial_delay, - sync_timeout, - )), - "two-nodes" => Box::new(verify_two_nodes_sync( - network, - beacon_config, - slot_duration, - initial_delay, - sync_timeout, - )), - "mixed" => Box::new(verify_in_between_sync( - network, - beacon_config, - slot_duration, - initial_delay, - sync_timeout, - )), - "all" => Box::new(verify_syncing( - network, - beacon_config, - slot_duration, - initial_delay, - sync_timeout, - )), - _ => Box::new(Err("Invalid strategy".into()).into_future()), + "one-node" => { + verify_one_node_sync( + network, + beacon_config, + slot_duration, + initial_delay, + sync_timeout, + ) + .await + } + "two-nodes" => { + verify_two_nodes_sync( + network, + beacon_config, + slot_duration, + initial_delay, + sync_timeout, + ) + .await + } + "mixed" => { + verify_in_between_sync( + network, + beacon_config, + slot_duration, + initial_delay, + sync_timeout, + ) + .await + } + "all" => { + verify_syncing( + network, + beacon_config, + slot_duration, + initial_delay, + sync_timeout, + ) + .await + } + _ => Err("Invalid strategy".into()), } } /// Verify one node added after `initial_delay` epochs is in sync /// after `sync_timeout` epochs. -pub fn verify_one_node_sync( +pub async fn verify_one_node_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, sync_timeout: u64, -) -> impl Future { +) -> Result<(), String> { let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing @@ -190,35 +197,34 @@ pub fn verify_one_node_sync( slot_duration, E::slots_per_epoch(), ) - .and_then(move |_| { - // Add a beacon node - network.add_beacon_node(beacon_config).map(|_| network) - }) - .and_then(move |network| { - // Check every `epoch_duration` if nodes are synced - // limited to at most `sync_timeout` epochs - Interval::new_interval(epoch_duration) - .take(sync_timeout) - .map_err(|_| "Failed to create interval".to_string()) - .take_while(move |_| check_still_syncing(&network_c)) - .for_each(|_| Ok(())) // consume the stream - .map(|_| network) - }) - .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) - .and_then(move |(epoch, network)| { - verify_all_finalized_at(network, epoch).map_err(|e| format!("One node sync error: {}", e)) - }) + .await; + // Add a beacon node + network.add_beacon_node(beacon_config).await?; + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + let mut interval = tokio::time::interval(epoch_duration); + let mut count = 0; + while let Some(_) = interval.next().await { + if count >= sync_timeout || !check_still_syncing(&network_c).await? { + break; + } + count += 1; + } + let epoch = network.bootnode_epoch().await?; + verify_all_finalized_at(network, epoch) + .map_err(|e| format!("One node sync error: {}", e)) + .await } /// Verify two nodes added after `initial_delay` epochs are in sync /// after `sync_timeout` epochs. -pub fn verify_two_nodes_sync( +pub async fn verify_two_nodes_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, sync_timeout: u64, -) -> impl Future { +) -> Result<(), String> { let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing @@ -227,41 +233,36 @@ pub fn verify_two_nodes_sync( slot_duration, E::slots_per_epoch(), ) - .and_then(move |_| { - // Add beacon nodes - network - .add_beacon_node(beacon_config.clone()) - .map(|_| (network, beacon_config)) - .and_then(|(network, beacon_config)| { - network.add_beacon_node(beacon_config).map(|_| network) - }) - }) - .and_then(move |network| { - // Check every `epoch_duration` if nodes are synced - // limited to at most `sync_timeout` epochs - Interval::new_interval(epoch_duration) - .take(sync_timeout) - .map_err(|_| "Failed to create interval".to_string()) - .take_while(move |_| check_still_syncing(&network_c)) - .for_each(|_| Ok(())) // consume the stream - .map(|_| network) - }) - .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) - .and_then(move |(epoch, network)| { - verify_all_finalized_at(network, epoch).map_err(|e| format!("Two node sync error: {}", e)) - }) + .await; + // Add beacon nodes + network.add_beacon_node(beacon_config.clone()).await?; + network.add_beacon_node(beacon_config).await?; + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + let mut interval = tokio::time::interval(epoch_duration); + let mut count = 0; + while let Some(_) = interval.next().await { + if count >= sync_timeout || !check_still_syncing(&network_c).await? { + break; + } + count += 1; + } + let epoch = network.bootnode_epoch().await?; + verify_all_finalized_at(network, epoch) + .map_err(|e| format!("One node sync error: {}", e)) + .await } /// Add 2 syncing nodes after `initial_delay` epochs, /// Add another node after `sync_timeout - 5` epochs and verify all are /// in sync after `sync_timeout + 5` epochs. -pub fn verify_in_between_sync( +pub async fn verify_in_between_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, sync_timeout: u64, -) -> impl Future { +) -> Result<(), String> { let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing @@ -271,52 +272,43 @@ pub fn verify_in_between_sync( slot_duration, E::slots_per_epoch(), ) - .and_then(move |_| { - // Add a beacon node - network - .add_beacon_node(beacon_config.clone()) - .map(|_| (network, beacon_config)) - .and_then(|(network, beacon_config)| { - network.add_beacon_node(beacon_config).map(|_| network) - }) - }) - .and_then(move |network| { - // Delay before adding additional syncing nodes. - epoch_delay( - Epoch::new(sync_timeout - 5), - slot_duration, - E::slots_per_epoch(), - ) - .map(|_| network) - }) - .and_then(move |network| { - // Add a beacon node - network.add_beacon_node(config1.clone()).map(|_| network) - }) - .and_then(move |network| { - // Check every `epoch_duration` if nodes are synced - // limited to at most `sync_timeout` epochs - Interval::new_interval(epoch_duration) - .take(sync_timeout + 5) - .map_err(|_| "Failed to create interval".to_string()) - .take_while(move |_| check_still_syncing(&network_c)) - .for_each(|_| Ok(())) // consume the stream - .map(|_| network) - }) - .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) - .and_then(move |(epoch, network)| { - verify_all_finalized_at(network, epoch).map_err(|e| format!("In between sync error: {}", e)) - }) + .await; + // Add two beacon nodes + network.add_beacon_node(beacon_config.clone()).await?; + network.add_beacon_node(beacon_config).await?; + // Delay before adding additional syncing nodes. + epoch_delay( + Epoch::new(sync_timeout - 5), + slot_duration, + E::slots_per_epoch(), + ) + .await; + // Add a beacon node + network.add_beacon_node(config1.clone()).await?; + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + let mut interval = tokio::time::interval(epoch_duration); + let mut count = 0; + while let Some(_) = interval.next().await { + if count >= sync_timeout || !check_still_syncing(&network_c).await? { + break; + } + count += 1; + } + let epoch = network.bootnode_epoch().await?; + verify_all_finalized_at(network, epoch) + .map_err(|e| format!("One node sync error: {}", e)) + .await } /// Run syncing strategies one after other. -pub fn verify_syncing( +pub async fn verify_syncing( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, sync_timeout: u64, -) -> impl Future { +) -> Result<(), String> { verify_one_node_sync( network.clone(), beacon_config.clone(), @@ -324,53 +316,42 @@ pub fn verify_syncing( initial_delay, sync_timeout, ) - .map(|_| println!("Completed one node sync")) - .and_then(move |_| { - verify_two_nodes_sync( - network.clone(), - beacon_config.clone(), - slot_duration, - initial_delay, - sync_timeout, - ) - .map(|_| { - println!("Completed two node sync"); - (network, beacon_config) - }) - }) - .and_then(move |(network, beacon_config)| { - verify_in_between_sync( - network, - beacon_config, - slot_duration, - initial_delay, - sync_timeout, - ) - .map(|_| println!("Completed in between sync")) - }) + .await?; + println!("Completed one node sync"); + verify_two_nodes_sync( + network.clone(), + beacon_config.clone(), + slot_duration, + initial_delay, + sync_timeout, + ) + .await?; + println!("Completed two node sync"); + verify_in_between_sync( + network, + beacon_config, + slot_duration, + initial_delay, + sync_timeout, + ) + .await?; + println!("Completed in between sync"); + Ok(()) } -pub fn check_still_syncing( - network: &LocalNetwork, -) -> impl Future { - network - .remote_nodes() - .into_future() - // get syncing status of nodes - .and_then(|remote_nodes| { - stream::unfold(remote_nodes.into_iter(), |mut iter| { - iter.next().map(|remote_node| { - remote_node - .http - .node() - .syncing_status() - .map(|status| status.is_syncing) - .map(|status| (status, iter)) - .map_err(|e| format!("Get syncing status via http failed: {:?}", e)) - }) - }) - .collect() - }) - .and_then(move |status| Ok(status.iter().any(|is_syncing| *is_syncing))) - .map_err(|e| format!("Failed syncing check: {:?}", e)) +pub async fn check_still_syncing(network: &LocalNetwork) -> Result { + // get syncing status of nodes + let mut status = Vec::new(); + for remote_node in network.remote_nodes()? { + status.push( + remote_node + .http + .node() + .syncing_status() + .await + .map(|status| status.is_syncing) + .map_err(|e| format!("Get syncing status via http failed: {:?}", e))?, + ) + } + Ok(status.iter().any(|is_syncing| *is_syncing)) }