From 4cba745df6d29f83ed559acdcdaaa5c305a0ab07 Mon Sep 17 00:00:00 2001 From: divma Date: Mon, 6 Apr 2020 02:04:06 -0500 Subject: [PATCH] make sync sim end faster by checking sync every epoch (#986) --- tests/simulator/src/cli.rs | 8 +-- tests/simulator/src/main.rs | 12 ++-- tests/simulator/src/sync_sim.rs | 107 +++++++++++++++++++++++--------- 3 files changed, 88 insertions(+), 39 deletions(-) diff --git a/tests/simulator/src/cli.rs b/tests/simulator/src/cli.rs index 93c8cb051b..dbc6bca074 100644 --- a/tests/simulator/src/cli.rs +++ b/tests/simulator/src/cli.rs @@ -57,17 +57,17 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .help("Epoch delay for new beacon node to start syncing (default 50)"), ) .arg( - Arg::with_name("sync_delay") - .long("sync_delay") + Arg::with_name("sync_timeout") + .long("sync_timeout") .takes_value(true) - .help("Epoch delay for newly added beacon nodes get synced (default 10)"), + .help("Number of epochs after which newly added beacon nodes must be synced (default 10)"), ) .arg( Arg::with_name("strategy") .long("strategy") .takes_value(true) .possible_values(&["one-node", "two-nodes", "mixed", "all"]) - .help("Sync strategy to run. (default all)"), + .help("Sync verification strategy to run. (default all)"), ), ) } diff --git a/tests/simulator/src/main.rs b/tests/simulator/src/main.rs index 85be8ccae0..21946641e6 100644 --- a/tests/simulator/src/main.rs +++ b/tests/simulator/src/main.rs @@ -93,13 +93,13 @@ fn run_beacon_chain_sim(matches: &ArgMatches) -> Result<(), String> { fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { let initial_delay = value_t!(matches, "initial_delay", u64).unwrap_or(50); - let sync_delay = value_t!(matches, "sync_delay", u64).unwrap_or(10); + let sync_timeout = value_t!(matches, "sync_timeout", u64).unwrap_or(10); let speed_up_factor = value_t!(matches, "speedup", u64).unwrap_or(15); let strategy = value_t!(matches, "strategy", String).unwrap_or("all".into()); println!("Syncing Simulator:"); - println!(" initial_delay:{}", initial_delay); - println!(" sync delay:{}", sync_delay); + println!(" initial delay:{}", initial_delay); + println!(" sync timeout:{}", sync_timeout); println!(" speed up factor:{}", speed_up_factor); println!(" strategy:{}", strategy); @@ -109,7 +109,7 @@ fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { syncing_sim( speed_up_factor, initial_delay, - sync_delay, + sync_timeout, strategy, log_level, log_format, @@ -119,7 +119,7 @@ fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { fn syncing_sim( speed_up_factor: u64, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, strategy: String, log_level: &str, log_format: Option<&str>, @@ -171,7 +171,7 @@ fn syncing_sim( beacon_config.clone(), slot_duration, initial_delay, - sync_delay, + sync_timeout, )) .join(final_future) .map(|_| network) diff --git a/tests/simulator/src/sync_sim.rs b/tests/simulator/src/sync_sim.rs index 13fa61d042..bee0b4ba12 100644 --- a/tests/simulator/src/sync_sim.rs +++ b/tests/simulator/src/sync_sim.rs @@ -1,8 +1,10 @@ use crate::checks::{epoch_delay, verify_all_finalized_at}; use crate::local_network::LocalNetwork; -use futures::{Future, IntoFuture}; +use futures::stream; +use futures::{Future, IntoFuture, Stream}; use node_test_rig::ClientConfig; use std::time::Duration; +use tokio::timer::Interval; use types::{Epoch, EthSpec}; pub fn pick_strategy( @@ -11,7 +13,7 @@ pub fn pick_strategy( beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, ) -> Box + Send + 'static> { match strategy { "one-node" => Box::new(verify_one_node_sync( @@ -19,42 +21,44 @@ pub fn pick_strategy( beacon_config, slot_duration, initial_delay, - sync_delay, + sync_timeout, )), "two-nodes" => Box::new(verify_two_nodes_sync( network, beacon_config, slot_duration, initial_delay, - sync_delay, + sync_timeout, )), "mixed" => Box::new(verify_in_between_sync( network, beacon_config, slot_duration, initial_delay, - sync_delay, + sync_timeout, )), "all" => Box::new(verify_syncing( network, beacon_config, slot_duration, initial_delay, - sync_delay, + sync_timeout, )), _ => Box::new(Err("Invalid strategy".into()).into_future()), } } /// Verify one node added after `initial_delay` epochs is in sync -/// after `sync_delay` epochs. +/// after `sync_timeout` epochs. pub fn verify_one_node_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, ) -> impl Future { + let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); + let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing epoch_delay( Epoch::new(initial_delay), @@ -66,8 +70,14 @@ pub fn verify_one_node_sync( network.add_beacon_node(beacon_config).map(|_| network) }) .and_then(move |network| { - // Delay for `sync_delay` epochs before verifying synced state. - epoch_delay(Epoch::new(sync_delay), slot_duration, E::slots_per_epoch()).map(|_| network) + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + Interval::new_interval(epoch_duration) + .take(sync_timeout) + .map_err(|_| "Failed to create interval".to_string()) + .take_while(move |_| check_still_syncing(&network_c)) + .for_each(|_| Ok(())) // consume the stream + .map(|_| network) }) .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) .and_then(move |(epoch, network)| { @@ -76,14 +86,16 @@ pub fn verify_one_node_sync( } /// Verify two nodes added after `initial_delay` epochs are in sync -/// after `sync_delay` epochs. +/// after `sync_timeout` epochs. pub fn verify_two_nodes_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, ) -> impl Future { + let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); + let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing epoch_delay( Epoch::new(initial_delay), @@ -100,8 +112,14 @@ pub fn verify_two_nodes_sync( }) }) .and_then(move |network| { - // Delay for `sync_delay` epochs before verifying synced state. - epoch_delay(Epoch::new(sync_delay), slot_duration, E::slots_per_epoch()).map(|_| network) + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + Interval::new_interval(epoch_duration) + .take(sync_timeout) + .map_err(|_| "Failed to create interval".to_string()) + .take_while(move |_| check_still_syncing(&network_c)) + .for_each(|_| Ok(())) // consume the stream + .map(|_| network) }) .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) .and_then(move |(epoch, network)| { @@ -110,15 +128,17 @@ pub fn verify_two_nodes_sync( } /// Add 2 syncing nodes after `initial_delay` epochs, -/// Add another node after `sync_delay - 5` epochs and verify all are -/// in sync after `sync_delay + 5` epochs. +/// Add another node after `sync_timeout - 5` epochs and verify all are +/// in sync after `sync_timeout + 5` epochs. pub fn verify_in_between_sync( network: LocalNetwork, beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, ) -> impl Future { + let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); + let network_c = network.clone(); // Delay for `initial_delay` epochs before adding another node to start syncing let config1 = beacon_config.clone(); epoch_delay( @@ -138,7 +158,7 @@ pub fn verify_in_between_sync( .and_then(move |network| { // Delay before adding additional syncing nodes. epoch_delay( - Epoch::new(sync_delay - 5), + Epoch::new(sync_timeout - 5), slot_duration, E::slots_per_epoch(), ) @@ -149,13 +169,14 @@ pub fn verify_in_between_sync( network.add_beacon_node(config1.clone()).map(|_| network) }) .and_then(move |network| { - // Delay for `sync_delay` epochs before verifying synced state. - epoch_delay( - Epoch::new(sync_delay + 5), - slot_duration, - E::slots_per_epoch(), - ) - .map(|_| network) + // Check every `epoch_duration` if nodes are synced + // limited to at most `sync_timeout` epochs + Interval::new_interval(epoch_duration) + .take(sync_timeout + 5) + .map_err(|_| "Failed to create interval".to_string()) + .take_while(move |_| check_still_syncing(&network_c)) + .for_each(|_| Ok(())) // consume the stream + .map(|_| network) }) .and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) .and_then(move |(epoch, network)| { @@ -169,14 +190,14 @@ pub fn verify_syncing( beacon_config: ClientConfig, slot_duration: Duration, initial_delay: u64, - sync_delay: u64, + sync_timeout: u64, ) -> impl Future { verify_one_node_sync( network.clone(), beacon_config.clone(), slot_duration, initial_delay, - sync_delay, + sync_timeout, ) .map(|_| println!("Completed one node sync")) .and_then(move |_| { @@ -185,7 +206,7 @@ pub fn verify_syncing( beacon_config.clone(), slot_duration, initial_delay, - sync_delay, + sync_timeout, ) .map(|_| { println!("Completed two node sync"); @@ -198,8 +219,36 @@ pub fn verify_syncing( beacon_config, slot_duration, initial_delay, - sync_delay, + sync_timeout, ) .map(|_| println!("Completed in between sync")) }) } + +pub fn check_still_syncing( + network: &LocalNetwork, +) -> impl Future { + let net = network.clone(); + network + .remote_nodes() + .into_future() + // get all head epochs + .and_then(|remote_nodes| { + stream::unfold(remote_nodes.into_iter(), |mut iter| { + iter.next().map(|remote_node| { + remote_node + .http + .beacon() + .get_head() + .map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) + .map(|epoch| (epoch, iter)) + .map_err(|e| format!("Get head via http failed: {:?}", e)) + }) + }) + .collect() + }) + // find current epoch + .and_then(move |epochs| net.bootnode_epoch().map(|epoch| (epochs, epoch))) + .and_then(move |(epochs, epoch)| Ok(epochs.iter().any(|head_epoch| *head_epoch != epoch))) + .map_err(|e| format!("Failed syncing check: {:?}", e)) +}