Upgrade simulator to stable futures

This commit is contained in:
Age Manning
2020-05-10 22:34:00 +10:00
parent addde163c4
commit 033cca602f
6 changed files with 461 additions and 555 deletions

View File

@@ -4,7 +4,6 @@
use beacon_node::ProductionBeaconNode; use beacon_node::ProductionBeaconNode;
use environment::RuntimeContext; use environment::RuntimeContext;
use futures::Future;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tempdir::TempDir; use tempdir::TempDir;
@@ -40,10 +39,12 @@ impl<E: EthSpec> LocalBeaconNode<E> {
client_config.data_dir = datadir.path().into(); client_config.data_dir = datadir.path().into();
client_config.network.network_dir = PathBuf::from(datadir.path()).join("network"); client_config.network.network_dir = PathBuf::from(datadir.path()).join("network");
ProductionBeaconNode::new(context, client_config).await.map(move |client| Self { ProductionBeaconNode::new(context, client_config)
client: client.into_inner(), .await
datadir, .map(move |client| Self {
}) client: client.into_inner(),
datadir,
})
} }
} }
@@ -126,7 +127,7 @@ impl<E: EthSpec> LocalValidatorClient<E> {
pub async fn production( pub async fn production(
context: RuntimeContext<E>, context: RuntimeContext<E>,
config: ValidatorConfig, config: ValidatorConfig,
) -> Result<Self,String> { ) -> Result<Self, String> {
// Creates a temporary directory that will be deleted once this `TempDir` is dropped. // Creates a temporary directory that will be deleted once this `TempDir` is dropped.
let datadir = TempDir::new("lighthouse-validator") let datadir = TempDir::new("lighthouse-validator")
.expect("should create temp directory for client datadir"); .expect("should create temp directory for client datadir");
@@ -141,11 +142,13 @@ impl<E: EthSpec> LocalValidatorClient<E> {
) -> Result<Self, String> { ) -> Result<Self, String> {
config.data_dir = datadir.path().into(); config.data_dir = datadir.path().into();
ProductionValidatorClient::new(context, config).await.map(move |mut client| { ProductionValidatorClient::new(context, config)
client .await
.start_service() .map(move |mut client| {
.expect("should start validator services"); client
Self { client, datadir } .start_service()
}) .expect("should start validator services");
Self { client, datadir }
})
} }
} }

View File

@@ -1,136 +1,128 @@
use crate::local_network::LocalNetwork; use crate::local_network::LocalNetwork;
use futures::{stream, Future, IntoFuture, Stream}; use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
use types::{Epoch, EthSpec, Slot, Unsigned}; use types::{Epoch, EthSpec, Slot, Unsigned};
/// Checks that all of the validators have on-boarded by the start of the second eth1 voting /// Checks that all of the validators have on-boarded by the start of the second eth1 voting
/// period. /// period.
pub fn verify_initial_validator_count<E: EthSpec>( pub async fn verify_initial_validator_count<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
slot_duration: Duration, slot_duration: Duration,
initial_validator_count: usize, initial_validator_count: usize,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
slot_delay(Slot::new(1), slot_duration) slot_delay(Slot::new(1), slot_duration).await;
.and_then(move |()| verify_validator_count(network, initial_validator_count)) verify_validator_count(network, initial_validator_count).await?;
Ok(())
} }
/// Checks that all of the validators have on-boarded by the start of the second eth1 voting /// Checks that all of the validators have on-boarded by the start of the second eth1 voting
/// period. /// period.
pub fn verify_validator_onboarding<E: EthSpec>( pub async fn verify_validator_onboarding<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
slot_duration: Duration, slot_duration: Duration,
expected_validator_count: usize, expected_validator_count: usize,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
slot_delay( slot_delay(
Slot::new(E::SlotsPerEth1VotingPeriod::to_u64()), Slot::new(E::SlotsPerEth1VotingPeriod::to_u64()),
slot_duration, slot_duration,
) )
.and_then(move |()| verify_validator_count(network, expected_validator_count)) .await;
verify_validator_count(network, expected_validator_count).await?;
Ok(())
} }
/// Checks that the chain has made the first possible finalization. /// Checks that the chain has made the first possible finalization.
/// ///
/// Intended to be run as soon as chain starts. /// Intended to be run as soon as chain starts.
pub fn verify_first_finalization<E: EthSpec>( pub async fn verify_first_finalization<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
slot_duration: Duration, slot_duration: Duration,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
epoch_delay(Epoch::new(4), slot_duration, E::slots_per_epoch()) epoch_delay(Epoch::new(4), slot_duration, E::slots_per_epoch()).await;
.and_then(|()| verify_all_finalized_at(network, Epoch::new(2))) verify_all_finalized_at(network, Epoch::new(2)).await?;
Ok(())
} }
/// Delays for `epochs`, plus half a slot extra. /// Delays for `epochs`, plus half a slot extra.
pub fn epoch_delay( pub async fn epoch_delay(epochs: Epoch, slot_duration: Duration, slots_per_epoch: u64) {
epochs: Epoch,
slot_duration: Duration,
slots_per_epoch: u64,
) -> impl Future<Item = (), Error = String> {
let duration = slot_duration * (epochs.as_u64() * slots_per_epoch) as u32 + slot_duration / 2; let duration = slot_duration * (epochs.as_u64() * slots_per_epoch) as u32 + slot_duration / 2;
tokio::time::delay_for(duration).await
Delay::new(Instant::now() + duration).map_err(|e| format!("Epoch delay failed: {:?}", e))
} }
/// Delays for `slots`, plus half a slot extra. /// Delays for `slots`, plus half a slot extra.
fn slot_delay(slots: Slot, slot_duration: Duration) -> impl Future<Item = (), Error = String> { async fn slot_delay(slots: Slot, slot_duration: Duration) {
let duration = slot_duration * slots.as_u64() as u32 + slot_duration / 2; let duration = slot_duration * slots.as_u64() as u32 + slot_duration / 2;
tokio::time::delay_for(duration).await;
Delay::new(Instant::now() + duration).map_err(|e| format!("Epoch delay failed: {:?}", e))
} }
/// Verifies that all beacon nodes in the given network have a head state that has a finalized /// Verifies that all beacon nodes in the given network have a head state that has a finalized
/// epoch of `epoch`. /// epoch of `epoch`.
pub fn verify_all_finalized_at<E: EthSpec>( pub async fn verify_all_finalized_at<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
epoch: Epoch, epoch: Epoch,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
network let epochs = {
.remote_nodes() let mut epochs = Vec::new();
.into_future() for remote_node in network.remote_nodes()? {
.and_then(|remote_nodes| { epochs.push(
stream::unfold(remote_nodes.into_iter(), |mut iter| { remote_node
iter.next().map(|remote_node| { .http
remote_node .beacon()
.http .get_head()
.beacon() .await
.get_head() .map(|head| head.finalized_slot.epoch(E::slots_per_epoch()))
.map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) .map_err(|e| format!("Get head via http failed: {:?}", e))?,
.map(|epoch| (epoch, iter)) );
.map_err(|e| format!("Get head via http failed: {:?}", e)) }
}) epochs
}) };
.collect()
}) if epochs.iter().any(|node_epoch| *node_epoch != epoch) {
.and_then(move |epochs| { Err(format!(
if epochs.iter().any(|node_epoch| *node_epoch != epoch) { "Nodes are not finalized at epoch {}. Finalized epochs: {:?}",
Err(format!( epoch, epochs
"Nodes are not finalized at epoch {}. Finalized epochs: {:?}", ))
epoch, epochs } else {
)) Ok(())
} else { }
Ok(())
}
})
} }
/// Verifies that all beacon nodes in the given `network` have a head state that contains /// Verifies that all beacon nodes in the given `network` have a head state that contains
/// `expected_count` validators. /// `expected_count` validators.
fn verify_validator_count<E: EthSpec>( async fn verify_validator_count<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
expected_count: usize, expected_count: usize,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
network let validator_counts = {
.remote_nodes() let mut validator_counts = Vec::new();
.into_future() for remote_node in network.remote_nodes()? {
.and_then(|remote_nodes| { let beacon = remote_node.http.beacon();
stream::unfold(remote_nodes.into_iter(), |mut iter| {
iter.next().map(|remote_node| { let head = beacon
let beacon = remote_node.http.beacon(); .get_head()
beacon .await
.get_head() .map_err(|e| format!("Get head via http failed: {:?}", e))?;
.map_err(|e| format!("Get head via http failed: {:?}", e))
.and_then(move |head| { let vc = beacon
beacon .get_state_by_root(head.state_root)
.get_state_by_root(head.state_root) .await
.map(|(state, _root)| state) .map(|(state, _root)| state)
.map_err(|e| format!("Get state root via http failed: {:?}", e)) .map_err(|e| format!("Get state root via http failed: {:?}", e))?
}) .validators
.map(|state| (state.validators.len(), iter)) .len();
}) validator_counts.push(vc);
}) }
.collect() validator_counts
}) };
.and_then(move |validator_counts| {
if validator_counts if validator_counts
.iter() .iter()
.any(|count| *count != expected_count) .any(|count| *count != expected_count)
{ {
Err(format!( Err(format!(
"Nodes do not all have {} validators in their state. Validator counts: {:?}", "Nodes do not all have {} validators in their state. Validator counts: {:?}",
expected_count, validator_counts expected_count, validator_counts
)) ))
} else { } else {
Ok(()) Ok(())
} }
})
} }

View File

@@ -1,13 +1,12 @@
use crate::{checks, LocalNetwork, E}; use crate::{checks, LocalNetwork, E};
use clap::ArgMatches; use clap::ArgMatches;
use eth1_test_rig::GanacheEth1Instance; use eth1_test_rig::GanacheEth1Instance;
use futures::{future, stream, Future, Stream}; use futures::prelude::*;
use node_test_rig::{ use node_test_rig::{
environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig,
}; };
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::timer::Interval;
pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> { pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
let node_count = value_t!(matches, "nodes", usize).expect("missing nodes default"); let node_count = value_t!(matches, "nodes", usize).expect("missing nodes default");
@@ -50,159 +49,123 @@ pub fn run_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
let deposit_amount = env.eth2_config.spec.max_effective_balance; let deposit_amount = env.eth2_config.spec.max_effective_balance;
let context = env.core_context(); let context = env.core_context();
let executor = context.executor.clone();
let future = GanacheEth1Instance::new() let main_future = async {
/* /*
* Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit * Deploy the deposit contract, spawn tasks to keep creating new blocks and deposit
* validators. * validators.
*/ */
.map(move |ganache_eth1_instance| { let ganache_eth1_instance = GanacheEth1Instance::new().await?;
let deposit_contract = ganache_eth1_instance.deposit_contract; let deposit_contract = ganache_eth1_instance.deposit_contract;
let ganache = ganache_eth1_instance.ganache; let ganache = ganache_eth1_instance.ganache;
let eth1_endpoint = ganache.endpoint(); let eth1_endpoint = ganache.endpoint();
let deposit_contract_address = deposit_contract.address(); let deposit_contract_address = deposit_contract.address();
// Start a timer that produces eth1 blocks on an interval. // Start a timer that produces eth1 blocks on an interval.
executor.spawn( tokio::spawn(async move {
Interval::new(Instant::now(), eth1_block_time) let mut interval = tokio::time::interval(eth1_block_time);
.map_err(|_| eprintln!("Eth1 block timer failed")) while let Some(_) = interval.next().await {
.for_each(move |_| ganache.evm_mine().map_err(|_| ())) let _ = ganache.evm_mine().await;
.map_err(|_| eprintln!("Eth1 evm_mine failed")) }
.map(|_| ()), });
);
// Submit deposits to the deposit contract. // Submit deposits to the deposit contract.
executor.spawn( tokio::spawn(async move {
stream::unfold(0..total_validator_count, move |mut iter| { for i in 0..total_validator_count {
iter.next().map(|i| { println!("Submitting deposit for validator {}...", i);
println!("Submitting deposit for validator {}...", i); let _ = deposit_contract
deposit_contract .deposit_deterministic_async::<E>(i, deposit_amount)
.deposit_deterministic_async::<E>(i, deposit_amount) .await;
.map(|_| ((), iter)) }
}) });
})
.collect()
.map(|_| ())
.map_err(|e| eprintln!("Error submitting deposit: {}", e)),
);
let mut beacon_config = testing_client_config(); let mut beacon_config = testing_client_config();
beacon_config.genesis = ClientGenesis::DepositContract; beacon_config.genesis = ClientGenesis::DepositContract;
beacon_config.eth1.endpoint = eth1_endpoint; beacon_config.eth1.endpoint = eth1_endpoint;
beacon_config.eth1.deposit_contract_address = deposit_contract_address; beacon_config.eth1.deposit_contract_address = deposit_contract_address;
beacon_config.eth1.deposit_contract_deploy_block = 0; beacon_config.eth1.deposit_contract_deploy_block = 0;
beacon_config.eth1.lowest_cached_block_number = 0; beacon_config.eth1.lowest_cached_block_number = 0;
beacon_config.eth1.follow_distance = 1; beacon_config.eth1.follow_distance = 1;
beacon_config.dummy_eth1_backend = false; beacon_config.dummy_eth1_backend = false;
beacon_config.sync_eth1_chain = true; beacon_config.sync_eth1_chain = true;
beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
beacon_config
})
/* /*
* Create a new `LocalNetwork` with one beacon node. * Create a new `LocalNetwork` with one beacon node.
*/ */
.and_then(move |beacon_config| { let network = LocalNetwork::new(context, beacon_config.clone()).await?;
LocalNetwork::new(context, beacon_config.clone())
.map(|network| (network, beacon_config))
})
/* /*
* One by one, add beacon nodes to the network. * One by one, add beacon nodes to the network.
*/ */
.and_then(move |(network, beacon_config)| {
let network_1 = network.clone();
stream::unfold(0..node_count - 1, move |mut iter| { for _ in 0..node_count - 1 {
iter.next().map(|_| { network.add_beacon_node(beacon_config.clone()).await?;
network_1 }
.add_beacon_node(beacon_config.clone())
.map(|()| ((), iter))
})
})
.collect()
.map(|_| network)
})
/* /*
* One by one, add validator clients to the network. Each validator client is attached to * One by one, add validator clients to the network. Each validator client is attached to
* a single corresponding beacon node. * a single corresponding beacon node.
*/ */
.and_then(move |network| {
let network_1 = network.clone();
// Note: presently the validator client future will only resolve once genesis time // Note: presently the validator client future will only resolve once genesis time
// occurs. This is great for this scenario, but likely to change in the future. // occurs. This is great for this scenario, but likely to change in the future.
// //
// If the validator client future behaviour changes, we would need to add a new future // If the validator client future behaviour changes, we would need to add a new future
// that delays until genesis. Otherwise, all of the checks that start in the next // that delays until genesis. Otherwise, all of the checks that start in the next
// future will start too early. // future will start too early.
stream::unfold(0..node_count, move |mut iter| { for i in 0..node_count {
iter.next().map(|i| { let indices =
let indices = (i * validators_per_node..(i + 1) * validators_per_node) (i * validators_per_node..(i + 1) * validators_per_node).collect::<Vec<_>>();
.collect::<Vec<_>>(); network
.add_validator_client(ValidatorConfig::default(), i, indices)
.await?;
}
network_1
.add_validator_client(ValidatorConfig::default(), i, indices)
.map(|()| ((), iter))
})
})
.collect()
.map(|_| network)
})
/* /*
* Start the processes that will run checks on the network as it runs. * Start the processes that will run checks on the network as it runs.
*/ */
.and_then(move |network| {
// The `final_future` either completes immediately or never completes, depending on the value
// of `end_after_checks`.
let final_future: Box<dyn Future<Item = (), Error = String> + Send> =
if end_after_checks {
Box::new(future::ok(()).map_err(|()| "".to_string()))
} else {
Box::new(future::empty().map_err(|()| "".to_string()))
};
future::ok(()) let _err = futures::join!(
// Check that the chain finalizes at the first given opportunity. // Check that the chain finalizes at the first given opportunity.
.join(checks::verify_first_finalization( checks::verify_first_finalization(network.clone(), slot_duration),
network.clone(), // Check that the chain starts with the expected validator count.
slot_duration, checks::verify_initial_validator_count(
)) network.clone(),
// Check that the chain starts with the expected validator count. slot_duration,
.join(checks::verify_initial_validator_count( initial_validator_count,
network.clone(), ),
slot_duration, // Check that validators greater than `spec.min_genesis_active_validator_count` are
initial_validator_count, // onboarded at the first possible opportunity.
)) checks::verify_validator_onboarding(
// Check that validators greater than `spec.min_genesis_active_validator_count` are network.clone(),
// onboarded at the first possible opportunity. slot_duration,
.join(checks::verify_validator_onboarding( total_validator_count,
network.clone(), )
slot_duration, );
total_validator_count,
)) // The `final_future` either completes immediately or never completes, depending on the value
// End now or run forever, depending on the `end_after_checks` flag. // of `end_after_checks`.
.join(final_future)
.map(|_| network) if !end_after_checks {
}) future::pending::<()>().await;
}
/* /*
* End the simulation by dropping the network. This will kill all running beacon nodes and * End the simulation by dropping the network. This will kill all running beacon nodes and
* validator clients. * validator clients.
*/ */
.map(|network| { println!(
println!( "Simulation complete. Finished with {} beacon nodes and {} validator clients",
"Simulation complete. Finished with {} beacon nodes and {} validator clients", network.beacon_node_count(),
network.beacon_node_count(), network.validator_client_count()
network.validator_client_count() );
);
// Be explicit about dropping the network, as this kills all the nodes. This ensures // Be explicit about dropping the network, as this kills all the nodes. This ensures
// all the checks have adequate time to pass. // all the checks have adequate time to pass.
drop(network) drop(network);
}); Ok::<(), String>(())
};
env.runtime().block_on(future) Ok(env.runtime().block_on(main_future).unwrap())
} }

View File

@@ -1,4 +1,3 @@
use futures::{Future, IntoFuture};
use node_test_rig::{ use node_test_rig::{
environment::RuntimeContext, ClientConfig, LocalBeaconNode, LocalValidatorClient, environment::RuntimeContext, ClientConfig, LocalBeaconNode, LocalValidatorClient,
RemoteBeaconNode, ValidatorConfig, RemoteBeaconNode, ValidatorConfig,
@@ -42,23 +41,24 @@ impl<E: EthSpec> Deref for LocalNetwork<E> {
impl<E: EthSpec> LocalNetwork<E> { impl<E: EthSpec> LocalNetwork<E> {
/// Creates a new network with a single `BeaconNode`. /// Creates a new network with a single `BeaconNode`.
pub fn new( pub async fn new(
context: RuntimeContext<E>, context: RuntimeContext<E>,
mut beacon_config: ClientConfig, mut beacon_config: ClientConfig,
) -> impl Future<Item = Self, Error = String> { ) -> Result<Self, String> {
beacon_config.network.discovery_port = BOOTNODE_PORT; beacon_config.network.discovery_port = BOOTNODE_PORT;
beacon_config.network.libp2p_port = BOOTNODE_PORT; beacon_config.network.libp2p_port = BOOTNODE_PORT;
beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT); beacon_config.network.enr_udp_port = Some(BOOTNODE_PORT);
beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT); beacon_config.network.enr_tcp_port = Some(BOOTNODE_PORT);
LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config).map( let beacon_node =
|beacon_node| Self { LocalBeaconNode::production(context.service_context("boot_node".into()), beacon_config)
inner: Arc::new(Inner { .await?;
context, Ok(Self {
beacon_nodes: RwLock::new(vec![beacon_node]), inner: Arc::new(Inner {
validator_clients: RwLock::new(vec![]), context,
}), beacon_nodes: RwLock::new(vec![beacon_node]),
}, validator_clients: RwLock::new(vec![]),
) }),
})
} }
/// Returns the number of beacon nodes in the network. /// Returns the number of beacon nodes in the network.
@@ -78,72 +78,65 @@ impl<E: EthSpec> LocalNetwork<E> {
} }
/// Adds a beacon node to the network, connecting to the 0'th beacon node via ENR. /// Adds a beacon node to the network, connecting to the 0'th beacon node via ENR.
pub fn add_beacon_node( pub async fn add_beacon_node(&self, mut beacon_config: ClientConfig) -> Result<(), String> {
&self,
mut beacon_config: ClientConfig,
) -> impl Future<Item = (), Error = String> {
let self_1 = self.clone(); let self_1 = self.clone();
println!("Adding beacon node.."); println!("Adding beacon node..");
self.beacon_nodes {
.read() let read_lock = self.beacon_nodes.read();
.first()
.map(|boot_node| { let boot_node = read_lock.first().expect("should have at least one node");
beacon_config.network.boot_nodes.push(
boot_node beacon_config.network.boot_nodes.push(
.client boot_node
.enr() .client
.expect("bootnode must have a network"), .enr()
); .expect("bootnode must have a network"),
}) );
.expect("should have at least one node"); }
let index = self.beacon_nodes.read().len(); let index = self.beacon_nodes.read().len();
LocalBeaconNode::production( let beacon_node = LocalBeaconNode::production(
self.context.service_context(format!("node_{}", index)), self.context.service_context(format!("node_{}", index)),
beacon_config, beacon_config,
) )
.map(move |beacon_node| { .await?;
self_1.beacon_nodes.write().push(beacon_node); self_1.beacon_nodes.write().push(beacon_node);
}) Ok(())
} }
/// Adds a validator client to the network, connecting it to the beacon node with index /// Adds a validator client to the network, connecting it to the beacon node with index
/// `beacon_node`. /// `beacon_node`.
pub fn add_validator_client( pub async fn add_validator_client(
&self, &self,
mut validator_config: ValidatorConfig, mut validator_config: ValidatorConfig,
beacon_node: usize, beacon_node: usize,
keypair_indices: Vec<usize>, keypair_indices: Vec<usize>,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
let index = self.validator_clients.read().len(); let index = self.validator_clients.read().len();
let context = self.context.service_context(format!("validator_{}", index)); let context = self.context.service_context(format!("validator_{}", index));
let self_1 = self.clone(); let self_1 = self.clone();
let socket_addr = {
let read_lock = self.beacon_nodes.read();
let beacon_node = read_lock
.get(beacon_node)
.ok_or_else(|| format!("No beacon node for index {}", beacon_node))?;
beacon_node
.client
.http_listen_addr()
.expect("Must have http started")
};
self.beacon_nodes validator_config.http_server =
.read() format!("http://{}:{}", socket_addr.ip(), socket_addr.port());
.get(beacon_node) let validator_client = LocalValidatorClient::production_with_insecure_keypairs(
.map(move |beacon_node| { context,
let socket_addr = beacon_node validator_config,
.client &keypair_indices,
.http_listen_addr() )
.expect("Must have http started"); .await?;
self_1.validator_clients.write().push(validator_client);
validator_config.http_server = Ok(())
format!("http://{}:{}", socket_addr.ip(), socket_addr.port());
validator_config
})
.ok_or_else(|| format!("No beacon node for index {}", beacon_node))
.into_future()
.and_then(move |validator_config| {
LocalValidatorClient::production_with_insecure_keypairs(
context,
validator_config,
&keypair_indices,
)
})
.map(move |validator_client| self_1.validator_clients.write().push(validator_client))
} }
/// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API. /// For all beacon nodes in `Self`, return a HTTP client to access each nodes HTTP API.
@@ -157,13 +150,14 @@ impl<E: EthSpec> LocalNetwork<E> {
} }
/// Return current epoch of bootnode. /// Return current epoch of bootnode.
pub fn bootnode_epoch(&self) -> impl Future<Item = Epoch, Error = String> { pub async fn bootnode_epoch(&self) -> Result<Epoch, String> {
let nodes = self.remote_nodes().expect("Failed to get remote nodes"); let nodes = self.remote_nodes().expect("Failed to get remote nodes");
let bootnode = nodes.first().expect("Should contain bootnode"); let bootnode = nodes.first().expect("Should contain bootnode");
bootnode bootnode
.http .http
.beacon() .beacon()
.get_head() .get_head()
.await
.map_err(|e| format!("Cannot get head: {:?}", e)) .map_err(|e| format!("Cannot get head: {:?}", e))
.map(|head| head.finalized_slot.epoch(E::slots_per_epoch())) .map(|head| head.finalized_slot.epoch(E::slots_per_epoch()))
} }

View File

@@ -1,6 +1,6 @@
use crate::{checks, LocalNetwork}; use crate::{checks, LocalNetwork};
use clap::ArgMatches; use clap::ArgMatches;
use futures::{future, stream, Future, Stream}; use futures::prelude::*;
use node_test_rig::{ use node_test_rig::{
environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig,
}; };
@@ -63,88 +63,61 @@ pub fn run_no_eth1_sim(matches: &ArgMatches) -> Result<(), String> {
beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
let future = LocalNetwork::new(context, beacon_config.clone()) let main_future = async {
let network = LocalNetwork::new(context, beacon_config.clone()).await?;
/* /*
* One by one, add beacon nodes to the network. * One by one, add beacon nodes to the network.
*/ */
.and_then(move |network| {
let network_1 = network.clone();
stream::unfold(0..node_count - 1, move |mut iter| { for _ in 0..node_count - 1 {
iter.next().map(|_| { network.add_beacon_node(beacon_config.clone()).await?;
network_1 }
.add_beacon_node(beacon_config.clone())
.map(|()| ((), iter))
})
})
.collect()
.map(|_| network)
})
/* /*
* One by one, add validator clients to the network. Each validator client is attached to * One by one, add validator clients to the network. Each validator client is attached to
* a single corresponding beacon node. * a single corresponding beacon node.
*/ */
.and_then(move |network| {
let network_1 = network.clone();
// Note: presently the validator client future will only resolve once genesis time // Note: presently the validator client future will only resolve once genesis time
// occurs. This is great for this scenario, but likely to change in the future. // occurs. This is great for this scenario, but likely to change in the future.
// //
// If the validator client future behaviour changes, we would need to add a new future // If the validator client future behaviour changes, we would need to add a new future
// that delays until genesis. Otherwise, all of the checks that start in the next // that delays until genesis. Otherwise, all of the checks that start in the next
// future will start too early. // future will start too early.
stream::unfold(0..node_count, move |mut iter| { for i in 0..node_count {
iter.next().map(|i| { let indices =
let indices = (i * validators_per_node..(i + 1) * validators_per_node) (i * validators_per_node..(i + 1) * validators_per_node).collect::<Vec<_>>();
.collect::<Vec<_>>(); network
.add_validator_client(ValidatorConfig::default(), i, indices)
network_1 .await?;
.add_validator_client(ValidatorConfig::default(), i, indices) }
.map(|()| ((), iter))
})
})
.collect()
.map(|_| network)
})
/* /*
* Start the processes that will run checks on the network as it runs. * Start the processes that will run checks on the network as it runs.
*/ */
.and_then(move |network| { // Check that the chain finalizes at the first given opportunity.
// The `final_future` either completes immediately or never completes, depending on the value checks::verify_first_finalization(network.clone(), slot_duration).await?;
// of `end_after_checks`.
let final_future: Box<dyn Future<Item = (), Error = String> + Send> =
if end_after_checks {
Box::new(future::ok(()).map_err(|()| "".to_string()))
} else {
Box::new(future::empty().map_err(|()| "".to_string()))
};
future::ok(()) // The `final_future` either completes immediately or never completes, depending on the value
// Check that the chain finalizes at the first given opportunity. // of `end_after_checks`.
.join(checks::verify_first_finalization(
network.clone(), if !end_after_checks {
slot_duration, future::pending::<()>().await;
)) }
// End now or run forever, depending on the `end_after_checks` flag.
.join(final_future)
.map(|_| network)
})
/* /*
* End the simulation by dropping the network. This will kill all running beacon nodes and * End the simulation by dropping the network. This will kill all running beacon nodes and
* validator clients. * validator clients.
*/ */
.map(|network| { println!(
println!( "Simulation complete. Finished with {} beacon nodes and {} validator clients",
"Simulation complete. Finished with {} beacon nodes and {} validator clients", network.beacon_node_count(),
network.beacon_node_count(), network.validator_client_count()
network.validator_client_count() );
);
// Be explicit about dropping the network, as this kills all the nodes. This ensures // Be explicit about dropping the network, as this kills all the nodes. This ensures
// all the checks have adequate time to pass. // all the checks have adequate time to pass.
drop(network) drop(network);
}); Ok::<(), String>(())
};
env.runtime().block_on(future) Ok(env.runtime().block_on(main_future).unwrap())
} }

View File

@@ -1,14 +1,13 @@
use crate::checks::{epoch_delay, verify_all_finalized_at}; use crate::checks::{epoch_delay, verify_all_finalized_at};
use crate::local_network::LocalNetwork; use crate::local_network::LocalNetwork;
use clap::ArgMatches; use clap::ArgMatches;
use futures::{future, stream, Future, IntoFuture, Stream}; use futures::prelude::*;
use node_test_rig::ClientConfig; use node_test_rig::ClientConfig;
use node_test_rig::{ use node_test_rig::{
environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig, environment::EnvironmentBuilder, testing_client_config, ClientGenesis, ValidatorConfig,
}; };
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::timer::Interval;
use types::{Epoch, EthSpec}; use types::{Epoch, EthSpec};
pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> { pub fn run_syncing_sim(matches: &ArgMatches) -> Result<(), String> {
@@ -78,110 +77,118 @@ fn syncing_sim(
beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); beacon_config.network.enr_address = Some(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
let future = LocalNetwork::new(context, beacon_config.clone()) let main_future = async {
/*
* Create a new `LocalNetwork` with one beacon node.
*/
let network = LocalNetwork::new(context, beacon_config.clone()).await?;
/* /*
* Add a validator client which handles all validators from the genesis state. * Add a validator client which handles all validators from the genesis state.
*/ */
.and_then(move |network| { network
network .add_validator_client(ValidatorConfig::default(), 0, (0..num_validators).collect())
.add_validator_client(ValidatorConfig::default(), 0, (0..num_validators).collect()) .await?;
.map(|_| network)
}) // Check all syncing strategies one after other.
/* pick_strategy(
* Start the processes that will run checks on the network as it runs. &strategy,
*/ network.clone(),
.and_then(move |network| { beacon_config.clone(),
// The `final_future` either completes immediately or never completes, depending on the value slot_duration,
// of `end_after_checks`. initial_delay,
let final_future: Box<dyn Future<Item = (), Error = String> + Send> = sync_timeout,
if end_after_checks { )
Box::new(future::ok(()).map_err(|()| "".to_string())) .await?;
} else {
Box::new(future::empty().map_err(|()| "".to_string())) // The `final_future` either completes immediately or never completes, depending on the value
}; // of `end_after_checks`.
if !end_after_checks {
future::pending::<()>().await;
}
future::ok(())
// Check all syncing strategies one after other.
.join(pick_strategy(
&strategy,
network.clone(),
beacon_config.clone(),
slot_duration,
initial_delay,
sync_timeout,
))
.join(final_future)
.map(|_| network)
})
/* /*
* End the simulation by dropping the network. This will kill all running beacon nodes and * End the simulation by dropping the network. This will kill all running beacon nodes and
* validator clients. * validator clients.
*/ */
.map(|network| { println!(
println!( "Simulation complete. Finished with {} beacon nodes and {} validator clients",
"Simulation complete. Finished with {} beacon nodes and {} validator clients", network.beacon_node_count(),
network.beacon_node_count(), network.validator_client_count()
network.validator_client_count() );
);
// Be explicit about dropping the network, as this kills all the nodes. This ensures // Be explicit about dropping the network, as this kills all the nodes. This ensures
// all the checks have adequate time to pass. // all the checks have adequate time to pass.
drop(network) drop(network);
}); Ok::<(), String>(())
};
env.runtime().block_on(future) env.runtime().block_on(main_future)
} }
pub fn pick_strategy<E: EthSpec>( pub async fn pick_strategy<E: EthSpec>(
strategy: &str, strategy: &str,
network: LocalNetwork<E>, network: LocalNetwork<E>,
beacon_config: ClientConfig, beacon_config: ClientConfig,
slot_duration: Duration, slot_duration: Duration,
initial_delay: u64, initial_delay: u64,
sync_timeout: u64, sync_timeout: u64,
) -> Box<dyn Future<Item = (), Error = String> + Send + 'static> { ) -> Result<(), String> {
match strategy { match strategy {
"one-node" => Box::new(verify_one_node_sync( "one-node" => {
network, verify_one_node_sync(
beacon_config, network,
slot_duration, beacon_config,
initial_delay, slot_duration,
sync_timeout, initial_delay,
)), sync_timeout,
"two-nodes" => Box::new(verify_two_nodes_sync( )
network, .await
beacon_config, }
slot_duration, "two-nodes" => {
initial_delay, verify_two_nodes_sync(
sync_timeout, network,
)), beacon_config,
"mixed" => Box::new(verify_in_between_sync( slot_duration,
network, initial_delay,
beacon_config, sync_timeout,
slot_duration, )
initial_delay, .await
sync_timeout, }
)), "mixed" => {
"all" => Box::new(verify_syncing( verify_in_between_sync(
network, network,
beacon_config, beacon_config,
slot_duration, slot_duration,
initial_delay, initial_delay,
sync_timeout, sync_timeout,
)), )
_ => Box::new(Err("Invalid strategy".into()).into_future()), .await
}
"all" => {
verify_syncing(
network,
beacon_config,
slot_duration,
initial_delay,
sync_timeout,
)
.await
}
_ => Err("Invalid strategy".into()),
} }
} }
/// Verify one node added after `initial_delay` epochs is in sync /// Verify one node added after `initial_delay` epochs is in sync
/// after `sync_timeout` epochs. /// after `sync_timeout` epochs.
pub fn verify_one_node_sync<E: EthSpec>( pub async fn verify_one_node_sync<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
beacon_config: ClientConfig, beacon_config: ClientConfig,
slot_duration: Duration, slot_duration: Duration,
initial_delay: u64, initial_delay: u64,
sync_timeout: u64, sync_timeout: u64,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let epoch_duration = slot_duration * (E::slots_per_epoch() as u32);
let network_c = network.clone(); let network_c = network.clone();
// Delay for `initial_delay` epochs before adding another node to start syncing // Delay for `initial_delay` epochs before adding another node to start syncing
@@ -190,35 +197,34 @@ pub fn verify_one_node_sync<E: EthSpec>(
slot_duration, slot_duration,
E::slots_per_epoch(), E::slots_per_epoch(),
) )
.and_then(move |_| { .await;
// Add a beacon node // Add a beacon node
network.add_beacon_node(beacon_config).map(|_| network) network.add_beacon_node(beacon_config).await?;
}) // Check every `epoch_duration` if nodes are synced
.and_then(move |network| { // limited to at most `sync_timeout` epochs
// Check every `epoch_duration` if nodes are synced let mut interval = tokio::time::interval(epoch_duration);
// limited to at most `sync_timeout` epochs let mut count = 0;
Interval::new_interval(epoch_duration) while let Some(_) = interval.next().await {
.take(sync_timeout) if count >= sync_timeout || !check_still_syncing(&network_c).await? {
.map_err(|_| "Failed to create interval".to_string()) break;
.take_while(move |_| check_still_syncing(&network_c)) }
.for_each(|_| Ok(())) // consume the stream count += 1;
.map(|_| network) }
}) let epoch = network.bootnode_epoch().await?;
.and_then(move |network| network.bootnode_epoch().map(|e| (e, network))) verify_all_finalized_at(network, epoch)
.and_then(move |(epoch, network)| { .map_err(|e| format!("One node sync error: {}", e))
verify_all_finalized_at(network, epoch).map_err(|e| format!("One node sync error: {}", e)) .await
})
} }
/// Verify two nodes added after `initial_delay` epochs are in sync /// Verify two nodes added after `initial_delay` epochs are in sync
/// after `sync_timeout` epochs. /// after `sync_timeout` epochs.
pub fn verify_two_nodes_sync<E: EthSpec>( pub async fn verify_two_nodes_sync<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
beacon_config: ClientConfig, beacon_config: ClientConfig,
slot_duration: Duration, slot_duration: Duration,
initial_delay: u64, initial_delay: u64,
sync_timeout: u64, sync_timeout: u64,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let epoch_duration = slot_duration * (E::slots_per_epoch() as u32);
let network_c = network.clone(); let network_c = network.clone();
// Delay for `initial_delay` epochs before adding another node to start syncing // Delay for `initial_delay` epochs before adding another node to start syncing
@@ -227,41 +233,36 @@ pub fn verify_two_nodes_sync<E: EthSpec>(
slot_duration, slot_duration,
E::slots_per_epoch(), E::slots_per_epoch(),
) )
.and_then(move |_| { .await;
// Add beacon nodes // Add beacon nodes
network network.add_beacon_node(beacon_config.clone()).await?;
.add_beacon_node(beacon_config.clone()) network.add_beacon_node(beacon_config).await?;
.map(|_| (network, beacon_config)) // Check every `epoch_duration` if nodes are synced
.and_then(|(network, beacon_config)| { // limited to at most `sync_timeout` epochs
network.add_beacon_node(beacon_config).map(|_| network) let mut interval = tokio::time::interval(epoch_duration);
}) let mut count = 0;
}) while let Some(_) = interval.next().await {
.and_then(move |network| { if count >= sync_timeout || !check_still_syncing(&network_c).await? {
// Check every `epoch_duration` if nodes are synced break;
// limited to at most `sync_timeout` epochs }
Interval::new_interval(epoch_duration) count += 1;
.take(sync_timeout) }
.map_err(|_| "Failed to create interval".to_string()) let epoch = network.bootnode_epoch().await?;
.take_while(move |_| check_still_syncing(&network_c)) verify_all_finalized_at(network, epoch)
.for_each(|_| Ok(())) // consume the stream .map_err(|e| format!("One node sync error: {}", e))
.map(|_| network) .await
})
.and_then(move |network| network.bootnode_epoch().map(|e| (e, network)))
.and_then(move |(epoch, network)| {
verify_all_finalized_at(network, epoch).map_err(|e| format!("Two node sync error: {}", e))
})
} }
/// Add 2 syncing nodes after `initial_delay` epochs, /// Add 2 syncing nodes after `initial_delay` epochs,
/// Add another node after `sync_timeout - 5` epochs and verify all are /// Add another node after `sync_timeout - 5` epochs and verify all are
/// in sync after `sync_timeout + 5` epochs. /// in sync after `sync_timeout + 5` epochs.
pub fn verify_in_between_sync<E: EthSpec>( pub async fn verify_in_between_sync<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
beacon_config: ClientConfig, beacon_config: ClientConfig,
slot_duration: Duration, slot_duration: Duration,
initial_delay: u64, initial_delay: u64,
sync_timeout: u64, sync_timeout: u64,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
let epoch_duration = slot_duration * (E::slots_per_epoch() as u32); let epoch_duration = slot_duration * (E::slots_per_epoch() as u32);
let network_c = network.clone(); let network_c = network.clone();
// Delay for `initial_delay` epochs before adding another node to start syncing // Delay for `initial_delay` epochs before adding another node to start syncing
@@ -271,52 +272,43 @@ pub fn verify_in_between_sync<E: EthSpec>(
slot_duration, slot_duration,
E::slots_per_epoch(), E::slots_per_epoch(),
) )
.and_then(move |_| { .await;
// Add a beacon node // Add two beacon nodes
network network.add_beacon_node(beacon_config.clone()).await?;
.add_beacon_node(beacon_config.clone()) network.add_beacon_node(beacon_config).await?;
.map(|_| (network, beacon_config)) // Delay before adding additional syncing nodes.
.and_then(|(network, beacon_config)| { epoch_delay(
network.add_beacon_node(beacon_config).map(|_| network) Epoch::new(sync_timeout - 5),
}) slot_duration,
}) E::slots_per_epoch(),
.and_then(move |network| { )
// Delay before adding additional syncing nodes. .await;
epoch_delay( // Add a beacon node
Epoch::new(sync_timeout - 5), network.add_beacon_node(config1.clone()).await?;
slot_duration, // Check every `epoch_duration` if nodes are synced
E::slots_per_epoch(), // limited to at most `sync_timeout` epochs
) let mut interval = tokio::time::interval(epoch_duration);
.map(|_| network) let mut count = 0;
}) while let Some(_) = interval.next().await {
.and_then(move |network| { if count >= sync_timeout || !check_still_syncing(&network_c).await? {
// Add a beacon node break;
network.add_beacon_node(config1.clone()).map(|_| network) }
}) count += 1;
.and_then(move |network| { }
// Check every `epoch_duration` if nodes are synced let epoch = network.bootnode_epoch().await?;
// limited to at most `sync_timeout` epochs verify_all_finalized_at(network, epoch)
Interval::new_interval(epoch_duration) .map_err(|e| format!("One node sync error: {}", e))
.take(sync_timeout + 5) .await
.map_err(|_| "Failed to create interval".to_string())
.take_while(move |_| check_still_syncing(&network_c))
.for_each(|_| Ok(())) // consume the stream
.map(|_| network)
})
.and_then(move |network| network.bootnode_epoch().map(|e| (e, network)))
.and_then(move |(epoch, network)| {
verify_all_finalized_at(network, epoch).map_err(|e| format!("In between sync error: {}", e))
})
} }
/// Run syncing strategies one after other. /// Run syncing strategies one after other.
pub fn verify_syncing<E: EthSpec>( pub async fn verify_syncing<E: EthSpec>(
network: LocalNetwork<E>, network: LocalNetwork<E>,
beacon_config: ClientConfig, beacon_config: ClientConfig,
slot_duration: Duration, slot_duration: Duration,
initial_delay: u64, initial_delay: u64,
sync_timeout: u64, sync_timeout: u64,
) -> impl Future<Item = (), Error = String> { ) -> Result<(), String> {
verify_one_node_sync( verify_one_node_sync(
network.clone(), network.clone(),
beacon_config.clone(), beacon_config.clone(),
@@ -324,53 +316,42 @@ pub fn verify_syncing<E: EthSpec>(
initial_delay, initial_delay,
sync_timeout, sync_timeout,
) )
.map(|_| println!("Completed one node sync")) .await?;
.and_then(move |_| { println!("Completed one node sync");
verify_two_nodes_sync( verify_two_nodes_sync(
network.clone(), network.clone(),
beacon_config.clone(), beacon_config.clone(),
slot_duration, slot_duration,
initial_delay, initial_delay,
sync_timeout, sync_timeout,
) )
.map(|_| { .await?;
println!("Completed two node sync"); println!("Completed two node sync");
(network, beacon_config) verify_in_between_sync(
}) network,
}) beacon_config,
.and_then(move |(network, beacon_config)| { slot_duration,
verify_in_between_sync( initial_delay,
network, sync_timeout,
beacon_config, )
slot_duration, .await?;
initial_delay, println!("Completed in between sync");
sync_timeout, Ok(())
)
.map(|_| println!("Completed in between sync"))
})
} }
pub fn check_still_syncing<E: EthSpec>( pub async fn check_still_syncing<E: EthSpec>(network: &LocalNetwork<E>) -> Result<bool, String> {
network: &LocalNetwork<E>, // get syncing status of nodes
) -> impl Future<Item = bool, Error = String> { let mut status = Vec::new();
network for remote_node in network.remote_nodes()? {
.remote_nodes() status.push(
.into_future() remote_node
// get syncing status of nodes .http
.and_then(|remote_nodes| { .node()
stream::unfold(remote_nodes.into_iter(), |mut iter| { .syncing_status()
iter.next().map(|remote_node| { .await
remote_node .map(|status| status.is_syncing)
.http .map_err(|e| format!("Get syncing status via http failed: {:?}", e))?,
.node() )
.syncing_status() }
.map(|status| status.is_syncing) Ok(status.iter().any(|is_syncing| *is_syncing))
.map(|status| (status, iter))
.map_err(|e| format!("Get syncing status via http failed: {:?}", e))
})
})
.collect()
})
.and_then(move |status| Ok(status.iter().any(|is_syncing| *is_syncing)))
.map_err(|e| format!("Failed syncing check: {:?}", e))
} }